@@ -39,6 +39,10 @@ class RetryFsck(FsckError):
pass
class FsckFormatError(FsckError):
pass
def log(format, *args):
printf(format, *args)
@@ -238,16 +242,28 @@ class Fsck(object):
"set: %s\nnew: %s",
path, str(posiset), str(new))
# check bulkdata
self.check_bulkdata(sid, path, bulk)
# Check that we can open bulkdata
try:
# Check bulkdata
self.check_bulkdata(sid, path, bulk)
# Check that we can open bulkdata
tab = nilmdb.server.bulkdata.Table(bulk)
except Exception as e: # pragma: no cover --
# No coverage here because, in the current code,
# everything that would cause the bulkdata to fail
# has been already checked.
except FsckFormatError as e:
# If there are no files except _format, try deleting
# the entire stream; this may remove metadata, but
# it's probably unimportant.
files = list(os.listdir(bulk))
if len(files) > 1:
raise FsckFormatError(f"{path}: can't load _format, "
f"but data is also present")
# Since the stream was empty, just remove it
self.fix_remove_stream(sid, path, bulk,
"empty, with corrupted format file")
except FsckError as e:
raise e
except Exception as e: # pragma: no cover
# No coverage because this is an unknown/unexpected error
raise FsckError("%s: can't open bulkdata: %s",
path, str(e))
tab.close()
@@ -256,21 +272,25 @@ class Fsck(object):
@retry_if_raised(RetryFsck)
def check_bulkdata(self, sid, path, bulk):
with open(os.path.join(bulk, b"_format"), "rb") as f:
fmt = pickle.load(f)
try:
with open(os.path.join(bulk, b"_format"), "rb") as f:
fmt = pickle.load(f)
except Exception as e:
raise FsckFormatError(f"{path}: can't load _format file ({e})")
if fmt["version"] != 3:
raise FsckError("%s: bad or unsupported bulkdata version %d",
path, fmt["version"])
raise FsckFormat Error("%s: bad or unsupported bulkdata version %d",
path, fmt["version"])
rows_per_file = int(fmt["rows_per_file"])
if rows_per_file < 1:
raise FsckError(f"{path}: bad rows_per_file {rows_per_file}")
raise FsckFormat Error(f"{path}: bad rows_per_file {rows_per_file}")
files_per_dir = int(fmt["files_per_dir"])
if files_per_dir < 1:
raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
raise FsckFormat Error(f"{path}: bad files_per_dir {files_per_dir}")
layout = fmt["layout"]
if layout != self.stream_layout[sid]:
raise FsckError("%s: layout mismatch %s != %s", path,
layout, self.stream_layout[sid])
raise FsckFormat Error("%s: layout mismatch %s != %s", path,
layout, self.stream_layout[sid])
# Every file should have a size that's the multiple of the row size
rkt = nilmdb.server.rocket.Rocket(layout, None)
@@ -287,7 +307,7 @@ class Fsck(object):
files = list(filter(regex.search, os.listdir(subpath)))
if not files:
self.fix_empty_subdir(subpath)
raise RetryFsck # pragma: no cover; raised by fix_empty_subdir
# Verify that their size is a multiple of the row size
for filename in files:
filepath = os.path.join(subpath, filename)
@@ -328,6 +348,24 @@ class Fsck(object):
f.truncate(newsize)
raise RetryFsck
def fix_remove_stream(self, sid, path, bulk, reason):
msg = f"stream {path} is corrupted: {reason}"
if not self.fix:
raise FixableFsckError(msg)
# Remove the stream from disk and the database
err(f"\n{msg}\n")
err(f"Removing stream {path} from disk and database\n")
shutil.rmtree(bulk)
with self.sql:
cur = self.sql.cursor()
cur.execute("DELETE FROM streams WHERE id=?",
(sid,))
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
raise FsckError("failed to remove stream")
cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
raise RetryFsck
### Check interval endpoints
def check_intervals(self):
@@ -364,7 +402,7 @@ class Fsck(object):
erow = tab[epos-1] # noqa: F841 unused
except Exception as e:
self.fix_bad_interval(sid, intv, tab, str(e))
raise RetryFsck # pragma: no cover; raised by fix_bad_interval
return len(ints)
def fix_bad_interval(self, sid, intv, tab, msg):
@@ -401,8 +439,8 @@ class Fsck(object):
# Otherwise, the only hope is to delete the interval entirely.
err("*** Deleting the entire interval from SQL.\n")
err("This may leave stale data on disk. To fix that, copy all\n")
err("data from this stream to a new stream, then remove all data \n")
err("from and destroy %s.\n", path)
err("data from this stream to a new stream using nilm-copy, then \n")
err("remove all data from and destroy %s.\n", path)
with self.sql:
cur = self.sql.cursor()
cur.execute("DELETE FROM ranges WHERE "