Compare commits

..

No commits in common. "e0559c2ed1bd3833cc42bc831f9248f6d763a521" and "276fbc652a3efb30c897d2775e6eae95ebd7ad96" have entirely different histories.

3 changed files with 22 additions and 65 deletions

View File

@ -39,10 +39,6 @@ class RetryFsck(FsckError):
pass pass
class FsckFormatError(FsckError):
pass
def log(format, *args): def log(format, *args):
printf(format, *args) printf(format, *args)
@ -242,28 +238,16 @@ class Fsck(object):
"set: %s\nnew: %s", "set: %s\nnew: %s",
path, str(posiset), str(new)) path, str(posiset), str(new))
# check bulkdata
self.check_bulkdata(sid, path, bulk)
# Check that we can open bulkdata
try: try:
# Check bulkdata
self.check_bulkdata(sid, path, bulk)
# Check that we can open bulkdata
tab = nilmdb.server.bulkdata.Table(bulk) tab = nilmdb.server.bulkdata.Table(bulk)
except FsckFormatError as e: except Exception as e: # pragma: no cover --
# If there are no files except _format, try deleting # No coverage here because, in the current code,
# the entire stream; this may remove metadata, but # everything that would cause the bulkdata to fail
# it's probably unimportant. # has been already checked.
files = list(os.listdir(bulk))
if len(files) > 1:
raise FsckFormatError(f"{path}: can't load _format, "
f"but data is also present")
# Since the stream was empty, just remove it
self.fix_remove_stream(sid, path, bulk,
"empty, with corrupted format file")
except FsckError as e:
raise e
except Exception as e: # pragma: no cover
# No coverage because this is an unknown/unexpected error
raise FsckError("%s: can't open bulkdata: %s", raise FsckError("%s: can't open bulkdata: %s",
path, str(e)) path, str(e))
tab.close() tab.close()
@ -272,25 +256,21 @@ class Fsck(object):
@retry_if_raised(RetryFsck) @retry_if_raised(RetryFsck)
def check_bulkdata(self, sid, path, bulk): def check_bulkdata(self, sid, path, bulk):
try: with open(os.path.join(bulk, b"_format"), "rb") as f:
with open(os.path.join(bulk, b"_format"), "rb") as f: fmt = pickle.load(f)
fmt = pickle.load(f)
except Exception as e:
raise FsckFormatError(f"{path}: can't load _format file ({e})")
if fmt["version"] != 3: if fmt["version"] != 3:
raise FsckFormatError("%s: bad or unsupported bulkdata version %d", raise FsckError("%s: bad or unsupported bulkdata version %d",
path, fmt["version"]) path, fmt["version"])
rows_per_file = int(fmt["rows_per_file"]) rows_per_file = int(fmt["rows_per_file"])
if rows_per_file < 1: if rows_per_file < 1:
raise FsckFormatError(f"{path}: bad rows_per_file {rows_per_file}") raise FsckError(f"{path}: bad rows_per_file {rows_per_file}")
files_per_dir = int(fmt["files_per_dir"]) files_per_dir = int(fmt["files_per_dir"])
if files_per_dir < 1: if files_per_dir < 1:
raise FsckFormatError(f"{path}: bad files_per_dir {files_per_dir}") raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
layout = fmt["layout"] layout = fmt["layout"]
if layout != self.stream_layout[sid]: if layout != self.stream_layout[sid]:
raise FsckFormatError("%s: layout mismatch %s != %s", path, raise FsckError("%s: layout mismatch %s != %s", path,
layout, self.stream_layout[sid]) layout, self.stream_layout[sid])
# Every file should have a size that's the multiple of the row size # Every file should have a size that's the multiple of the row size
rkt = nilmdb.server.rocket.Rocket(layout, None) rkt = nilmdb.server.rocket.Rocket(layout, None)
@ -307,7 +287,7 @@ class Fsck(object):
files = list(filter(regex.search, os.listdir(subpath))) files = list(filter(regex.search, os.listdir(subpath)))
if not files: if not files:
self.fix_empty_subdir(subpath) self.fix_empty_subdir(subpath)
raise RetryFsck # pragma: no cover; raised by fix_empty_subdir
# Verify that their size is a multiple of the row size # Verify that their size is a multiple of the row size
for filename in files: for filename in files:
filepath = os.path.join(subpath, filename) filepath = os.path.join(subpath, filename)
@ -348,24 +328,6 @@ class Fsck(object):
f.truncate(newsize) f.truncate(newsize)
raise RetryFsck raise RetryFsck
def fix_remove_stream(self, sid, path, bulk, reason):
msg = f"stream {path} is corrupted: {reason}"
if not self.fix:
raise FixableFsckError(msg)
# Remove the stream from disk and the database
err(f"\n{msg}\n")
err(f"Removing stream {path} from disk and database\n")
shutil.rmtree(bulk)
with self.sql:
cur = self.sql.cursor()
cur.execute("DELETE FROM streams WHERE id=?",
(sid,))
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
raise FsckError("failed to remove stream")
cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
raise RetryFsck
### Check interval endpoints ### Check interval endpoints
def check_intervals(self): def check_intervals(self):
@ -402,7 +364,7 @@ class Fsck(object):
erow = tab[epos-1] # noqa: F841 unused erow = tab[epos-1] # noqa: F841 unused
except Exception as e: except Exception as e:
self.fix_bad_interval(sid, intv, tab, str(e)) self.fix_bad_interval(sid, intv, tab, str(e))
raise RetryFsck # pragma: no cover; raised by fix_bad_interval
return len(ints) return len(ints)
def fix_bad_interval(self, sid, intv, tab, msg): def fix_bad_interval(self, sid, intv, tab, msg):
@ -439,8 +401,8 @@ class Fsck(object):
# Otherwise, the only hope is to delete the interval entirely. # Otherwise, the only hope is to delete the interval entirely.
err("*** Deleting the entire interval from SQL.\n") err("*** Deleting the entire interval from SQL.\n")
err("This may leave stale data on disk. To fix that, copy all\n") err("This may leave stale data on disk. To fix that, copy all\n")
err("data from this stream to a new stream using nilm-copy, then\n") err("data from this stream to a new stream, then remove all data\n")
err("remove all data from and destroy %s.\n", path) err("from and destroy %s.\n", path)
with self.sql: with self.sql:
cur = self.sql.cursor() cur = self.sql.cursor()
cur.execute("DELETE FROM ranges WHERE " cur.execute("DELETE FROM ranges WHERE "

View File

@ -293,8 +293,8 @@ class Table():
"layout": layout, "layout": layout,
"version": 3 "version": 3
} }
nilmdb.utils.atomic.replace_file( with open(os.path.join(root, b"_format"), "wb") as f:
os.path.join(root, b"_format"), pickle.dumps(fmt, 2)) pickle.dump(fmt, f, 2)
# Normal methods # Normal methods
def __init__(self, root, initial_nrows=0): def __init__(self, root, initial_nrows=0):

View File

@ -163,8 +163,3 @@ class TestFsck(object):
raise Exception("hi") raise Exception("hi")
with assert_raises(Exception): with assert_raises(Exception):
foo() foo()
self.failmsg("test2v", "can't load _format, but data is also present")
self.failmsg("test2v1", "bad bulkdata table")
self.failmsg("test2v2", "empty, with corrupted format file", fix=False)
self.okmsg("test2v2", "empty, with corrupted format file")