Compare commits
6 Commits
276fbc652a
...
e0559c2ed1
Author | SHA1 | Date | |
---|---|---|---|
e0559c2ed1 | |||
759492298a | |||
b5f6fcc253 | |||
905e325ded | |||
648b6f4b70 | |||
7f8a2c7027 |
|
@ -39,6 +39,10 @@ class RetryFsck(FsckError):
|
|||
pass
|
||||
|
||||
|
||||
class FsckFormatError(FsckError):
|
||||
pass
|
||||
|
||||
|
||||
def log(format, *args):
|
||||
printf(format, *args)
|
||||
|
||||
|
@ -238,16 +242,28 @@ class Fsck(object):
|
|||
"set: %s\nnew: %s",
|
||||
path, str(posiset), str(new))
|
||||
|
||||
# check bulkdata
|
||||
self.check_bulkdata(sid, path, bulk)
|
||||
|
||||
# Check that we can open bulkdata
|
||||
try:
|
||||
# Check bulkdata
|
||||
self.check_bulkdata(sid, path, bulk)
|
||||
|
||||
# Check that we can open bulkdata
|
||||
tab = nilmdb.server.bulkdata.Table(bulk)
|
||||
except Exception as e: # pragma: no cover --
|
||||
# No coverage here because, in the current code,
|
||||
# everything that would cause the bulkdata to fail
|
||||
# has been already checked.
|
||||
except FsckFormatError as e:
|
||||
# If there are no files except _format, try deleting
|
||||
# the entire stream; this may remove metadata, but
|
||||
# it's probably unimportant.
|
||||
files = list(os.listdir(bulk))
|
||||
if len(files) > 1:
|
||||
raise FsckFormatError(f"{path}: can't load _format, "
|
||||
f"but data is also present")
|
||||
|
||||
# Since the stream was empty, just remove it
|
||||
self.fix_remove_stream(sid, path, bulk,
|
||||
"empty, with corrupted format file")
|
||||
except FsckError as e:
|
||||
raise e
|
||||
except Exception as e: # pragma: no cover
|
||||
# No coverage because this is an unknown/unexpected error
|
||||
raise FsckError("%s: can't open bulkdata: %s",
|
||||
path, str(e))
|
||||
tab.close()
|
||||
|
@ -256,21 +272,25 @@ class Fsck(object):
|
|||
|
||||
@retry_if_raised(RetryFsck)
|
||||
def check_bulkdata(self, sid, path, bulk):
|
||||
with open(os.path.join(bulk, b"_format"), "rb") as f:
|
||||
fmt = pickle.load(f)
|
||||
try:
|
||||
with open(os.path.join(bulk, b"_format"), "rb") as f:
|
||||
fmt = pickle.load(f)
|
||||
except Exception as e:
|
||||
raise FsckFormatError(f"{path}: can't load _format file ({e})")
|
||||
|
||||
if fmt["version"] != 3:
|
||||
raise FsckError("%s: bad or unsupported bulkdata version %d",
|
||||
path, fmt["version"])
|
||||
raise FsckFormatError("%s: bad or unsupported bulkdata version %d",
|
||||
path, fmt["version"])
|
||||
rows_per_file = int(fmt["rows_per_file"])
|
||||
if rows_per_file < 1:
|
||||
raise FsckError(f"{path}: bad rows_per_file {rows_per_file}")
|
||||
raise FsckFormatError(f"{path}: bad rows_per_file {rows_per_file}")
|
||||
files_per_dir = int(fmt["files_per_dir"])
|
||||
if files_per_dir < 1:
|
||||
raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
|
||||
raise FsckFormatError(f"{path}: bad files_per_dir {files_per_dir}")
|
||||
layout = fmt["layout"]
|
||||
if layout != self.stream_layout[sid]:
|
||||
raise FsckError("%s: layout mismatch %s != %s", path,
|
||||
layout, self.stream_layout[sid])
|
||||
raise FsckFormatError("%s: layout mismatch %s != %s", path,
|
||||
layout, self.stream_layout[sid])
|
||||
|
||||
# Every file should have a size that's the multiple of the row size
|
||||
rkt = nilmdb.server.rocket.Rocket(layout, None)
|
||||
|
@ -287,7 +307,7 @@ class Fsck(object):
|
|||
files = list(filter(regex.search, os.listdir(subpath)))
|
||||
if not files:
|
||||
self.fix_empty_subdir(subpath)
|
||||
raise RetryFsck # pragma: no cover; raised by fix_empty_subdir
|
||||
|
||||
# Verify that their size is a multiple of the row size
|
||||
for filename in files:
|
||||
filepath = os.path.join(subpath, filename)
|
||||
|
@ -328,6 +348,24 @@ class Fsck(object):
|
|||
f.truncate(newsize)
|
||||
raise RetryFsck
|
||||
|
||||
def fix_remove_stream(self, sid, path, bulk, reason):
|
||||
msg = f"stream {path} is corrupted: {reason}"
|
||||
if not self.fix:
|
||||
raise FixableFsckError(msg)
|
||||
# Remove the stream from disk and the database
|
||||
err(f"\n{msg}\n")
|
||||
err(f"Removing stream {path} from disk and database\n")
|
||||
shutil.rmtree(bulk)
|
||||
with self.sql:
|
||||
cur = self.sql.cursor()
|
||||
cur.execute("DELETE FROM streams WHERE id=?",
|
||||
(sid,))
|
||||
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
|
||||
raise FsckError("failed to remove stream")
|
||||
cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
|
||||
cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
|
||||
raise RetryFsck
|
||||
|
||||
### Check interval endpoints
|
||||
|
||||
def check_intervals(self):
|
||||
|
@ -364,7 +402,7 @@ class Fsck(object):
|
|||
erow = tab[epos-1] # noqa: F841 unused
|
||||
except Exception as e:
|
||||
self.fix_bad_interval(sid, intv, tab, str(e))
|
||||
raise RetryFsck # pragma: no cover; raised by fix_bad_interval
|
||||
|
||||
return len(ints)
|
||||
|
||||
def fix_bad_interval(self, sid, intv, tab, msg):
|
||||
|
@ -401,8 +439,8 @@ class Fsck(object):
|
|||
# Otherwise, the only hope is to delete the interval entirely.
|
||||
err("*** Deleting the entire interval from SQL.\n")
|
||||
err("This may leave stale data on disk. To fix that, copy all\n")
|
||||
err("data from this stream to a new stream, then remove all data\n")
|
||||
err("from and destroy %s.\n", path)
|
||||
err("data from this stream to a new stream using nilm-copy, then\n")
|
||||
err("remove all data from and destroy %s.\n", path)
|
||||
with self.sql:
|
||||
cur = self.sql.cursor()
|
||||
cur.execute("DELETE FROM ranges WHERE "
|
||||
|
|
|
@ -293,8 +293,8 @@ class Table():
|
|||
"layout": layout,
|
||||
"version": 3
|
||||
}
|
||||
with open(os.path.join(root, b"_format"), "wb") as f:
|
||||
pickle.dump(fmt, f, 2)
|
||||
nilmdb.utils.atomic.replace_file(
|
||||
os.path.join(root, b"_format"), pickle.dumps(fmt, 2))
|
||||
|
||||
# Normal methods
|
||||
def __init__(self, root, initial_nrows=0):
|
||||
|
|
|
@ -163,3 +163,8 @@ class TestFsck(object):
|
|||
raise Exception("hi")
|
||||
with assert_raises(Exception):
|
||||
foo()
|
||||
|
||||
self.failmsg("test2v", "can't load _format, but data is also present")
|
||||
self.failmsg("test2v1", "bad bulkdata table")
|
||||
self.failmsg("test2v2", "empty, with corrupted format file", fix=False)
|
||||
self.okmsg("test2v2", "empty, with corrupted format file")
|
||||
|
|
Loading…
Reference in New Issue
Block a user