Compare commits
6 Commits
276fbc652a
...
e0559c2ed1
Author | SHA1 | Date | |
---|---|---|---|
e0559c2ed1 | |||
759492298a | |||
b5f6fcc253 | |||
905e325ded | |||
648b6f4b70 | |||
7f8a2c7027 |
|
@ -39,6 +39,10 @@ class RetryFsck(FsckError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FsckFormatError(FsckError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def log(format, *args):
|
def log(format, *args):
|
||||||
printf(format, *args)
|
printf(format, *args)
|
||||||
|
|
||||||
|
@ -238,16 +242,28 @@ class Fsck(object):
|
||||||
"set: %s\nnew: %s",
|
"set: %s\nnew: %s",
|
||||||
path, str(posiset), str(new))
|
path, str(posiset), str(new))
|
||||||
|
|
||||||
# check bulkdata
|
|
||||||
self.check_bulkdata(sid, path, bulk)
|
|
||||||
|
|
||||||
# Check that we can open bulkdata
|
|
||||||
try:
|
try:
|
||||||
|
# Check bulkdata
|
||||||
|
self.check_bulkdata(sid, path, bulk)
|
||||||
|
|
||||||
|
# Check that we can open bulkdata
|
||||||
tab = nilmdb.server.bulkdata.Table(bulk)
|
tab = nilmdb.server.bulkdata.Table(bulk)
|
||||||
except Exception as e: # pragma: no cover --
|
except FsckFormatError as e:
|
||||||
# No coverage here because, in the current code,
|
# If there are no files except _format, try deleting
|
||||||
# everything that would cause the bulkdata to fail
|
# the entire stream; this may remove metadata, but
|
||||||
# has been already checked.
|
# it's probably unimportant.
|
||||||
|
files = list(os.listdir(bulk))
|
||||||
|
if len(files) > 1:
|
||||||
|
raise FsckFormatError(f"{path}: can't load _format, "
|
||||||
|
f"but data is also present")
|
||||||
|
|
||||||
|
# Since the stream was empty, just remove it
|
||||||
|
self.fix_remove_stream(sid, path, bulk,
|
||||||
|
"empty, with corrupted format file")
|
||||||
|
except FsckError as e:
|
||||||
|
raise e
|
||||||
|
except Exception as e: # pragma: no cover
|
||||||
|
# No coverage because this is an unknown/unexpected error
|
||||||
raise FsckError("%s: can't open bulkdata: %s",
|
raise FsckError("%s: can't open bulkdata: %s",
|
||||||
path, str(e))
|
path, str(e))
|
||||||
tab.close()
|
tab.close()
|
||||||
|
@ -256,21 +272,25 @@ class Fsck(object):
|
||||||
|
|
||||||
@retry_if_raised(RetryFsck)
|
@retry_if_raised(RetryFsck)
|
||||||
def check_bulkdata(self, sid, path, bulk):
|
def check_bulkdata(self, sid, path, bulk):
|
||||||
with open(os.path.join(bulk, b"_format"), "rb") as f:
|
try:
|
||||||
fmt = pickle.load(f)
|
with open(os.path.join(bulk, b"_format"), "rb") as f:
|
||||||
|
fmt = pickle.load(f)
|
||||||
|
except Exception as e:
|
||||||
|
raise FsckFormatError(f"{path}: can't load _format file ({e})")
|
||||||
|
|
||||||
if fmt["version"] != 3:
|
if fmt["version"] != 3:
|
||||||
raise FsckError("%s: bad or unsupported bulkdata version %d",
|
raise FsckFormatError("%s: bad or unsupported bulkdata version %d",
|
||||||
path, fmt["version"])
|
path, fmt["version"])
|
||||||
rows_per_file = int(fmt["rows_per_file"])
|
rows_per_file = int(fmt["rows_per_file"])
|
||||||
if rows_per_file < 1:
|
if rows_per_file < 1:
|
||||||
raise FsckError(f"{path}: bad rows_per_file {rows_per_file}")
|
raise FsckFormatError(f"{path}: bad rows_per_file {rows_per_file}")
|
||||||
files_per_dir = int(fmt["files_per_dir"])
|
files_per_dir = int(fmt["files_per_dir"])
|
||||||
if files_per_dir < 1:
|
if files_per_dir < 1:
|
||||||
raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
|
raise FsckFormatError(f"{path}: bad files_per_dir {files_per_dir}")
|
||||||
layout = fmt["layout"]
|
layout = fmt["layout"]
|
||||||
if layout != self.stream_layout[sid]:
|
if layout != self.stream_layout[sid]:
|
||||||
raise FsckError("%s: layout mismatch %s != %s", path,
|
raise FsckFormatError("%s: layout mismatch %s != %s", path,
|
||||||
layout, self.stream_layout[sid])
|
layout, self.stream_layout[sid])
|
||||||
|
|
||||||
# Every file should have a size that's the multiple of the row size
|
# Every file should have a size that's the multiple of the row size
|
||||||
rkt = nilmdb.server.rocket.Rocket(layout, None)
|
rkt = nilmdb.server.rocket.Rocket(layout, None)
|
||||||
|
@ -287,7 +307,7 @@ class Fsck(object):
|
||||||
files = list(filter(regex.search, os.listdir(subpath)))
|
files = list(filter(regex.search, os.listdir(subpath)))
|
||||||
if not files:
|
if not files:
|
||||||
self.fix_empty_subdir(subpath)
|
self.fix_empty_subdir(subpath)
|
||||||
raise RetryFsck # pragma: no cover; raised by fix_empty_subdir
|
|
||||||
# Verify that their size is a multiple of the row size
|
# Verify that their size is a multiple of the row size
|
||||||
for filename in files:
|
for filename in files:
|
||||||
filepath = os.path.join(subpath, filename)
|
filepath = os.path.join(subpath, filename)
|
||||||
|
@ -328,6 +348,24 @@ class Fsck(object):
|
||||||
f.truncate(newsize)
|
f.truncate(newsize)
|
||||||
raise RetryFsck
|
raise RetryFsck
|
||||||
|
|
||||||
|
def fix_remove_stream(self, sid, path, bulk, reason):
|
||||||
|
msg = f"stream {path} is corrupted: {reason}"
|
||||||
|
if not self.fix:
|
||||||
|
raise FixableFsckError(msg)
|
||||||
|
# Remove the stream from disk and the database
|
||||||
|
err(f"\n{msg}\n")
|
||||||
|
err(f"Removing stream {path} from disk and database\n")
|
||||||
|
shutil.rmtree(bulk)
|
||||||
|
with self.sql:
|
||||||
|
cur = self.sql.cursor()
|
||||||
|
cur.execute("DELETE FROM streams WHERE id=?",
|
||||||
|
(sid,))
|
||||||
|
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
|
||||||
|
raise FsckError("failed to remove stream")
|
||||||
|
cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
|
||||||
|
cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
|
||||||
|
raise RetryFsck
|
||||||
|
|
||||||
### Check interval endpoints
|
### Check interval endpoints
|
||||||
|
|
||||||
def check_intervals(self):
|
def check_intervals(self):
|
||||||
|
@ -364,7 +402,7 @@ class Fsck(object):
|
||||||
erow = tab[epos-1] # noqa: F841 unused
|
erow = tab[epos-1] # noqa: F841 unused
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.fix_bad_interval(sid, intv, tab, str(e))
|
self.fix_bad_interval(sid, intv, tab, str(e))
|
||||||
raise RetryFsck # pragma: no cover; raised by fix_bad_interval
|
|
||||||
return len(ints)
|
return len(ints)
|
||||||
|
|
||||||
def fix_bad_interval(self, sid, intv, tab, msg):
|
def fix_bad_interval(self, sid, intv, tab, msg):
|
||||||
|
@ -401,8 +439,8 @@ class Fsck(object):
|
||||||
# Otherwise, the only hope is to delete the interval entirely.
|
# Otherwise, the only hope is to delete the interval entirely.
|
||||||
err("*** Deleting the entire interval from SQL.\n")
|
err("*** Deleting the entire interval from SQL.\n")
|
||||||
err("This may leave stale data on disk. To fix that, copy all\n")
|
err("This may leave stale data on disk. To fix that, copy all\n")
|
||||||
err("data from this stream to a new stream, then remove all data\n")
|
err("data from this stream to a new stream using nilm-copy, then\n")
|
||||||
err("from and destroy %s.\n", path)
|
err("remove all data from and destroy %s.\n", path)
|
||||||
with self.sql:
|
with self.sql:
|
||||||
cur = self.sql.cursor()
|
cur = self.sql.cursor()
|
||||||
cur.execute("DELETE FROM ranges WHERE "
|
cur.execute("DELETE FROM ranges WHERE "
|
||||||
|
|
|
@ -293,8 +293,8 @@ class Table():
|
||||||
"layout": layout,
|
"layout": layout,
|
||||||
"version": 3
|
"version": 3
|
||||||
}
|
}
|
||||||
with open(os.path.join(root, b"_format"), "wb") as f:
|
nilmdb.utils.atomic.replace_file(
|
||||||
pickle.dump(fmt, f, 2)
|
os.path.join(root, b"_format"), pickle.dumps(fmt, 2))
|
||||||
|
|
||||||
# Normal methods
|
# Normal methods
|
||||||
def __init__(self, root, initial_nrows=0):
|
def __init__(self, root, initial_nrows=0):
|
||||||
|
|
|
@ -163,3 +163,8 @@ class TestFsck(object):
|
||||||
raise Exception("hi")
|
raise Exception("hi")
|
||||||
with assert_raises(Exception):
|
with assert_raises(Exception):
|
||||||
foo()
|
foo()
|
||||||
|
|
||||||
|
self.failmsg("test2v", "can't load _format, but data is also present")
|
||||||
|
self.failmsg("test2v1", "bad bulkdata table")
|
||||||
|
self.failmsg("test2v2", "empty, with corrupted format file", fix=False)
|
||||||
|
self.okmsg("test2v2", "empty, with corrupted format file")
|
||||||
|
|
Loading…
Reference in New Issue
Block a user