Compare commits
7 Commits
nilmdb-2.1
...
b6bba16505
Author | SHA1 | Date | |
---|---|---|---|
b6bba16505 | |||
d4003d0d34 | |||
759492298a | |||
b5f6fcc253 | |||
905e325ded | |||
648b6f4b70 | |||
7f8a2c7027 |
@@ -39,6 +39,10 @@ class RetryFsck(FsckError):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FsckFormatError(FsckError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def log(format, *args):
|
def log(format, *args):
|
||||||
printf(format, *args)
|
printf(format, *args)
|
||||||
|
|
||||||
@@ -238,16 +242,28 @@ class Fsck(object):
|
|||||||
"set: %s\nnew: %s",
|
"set: %s\nnew: %s",
|
||||||
path, str(posiset), str(new))
|
path, str(posiset), str(new))
|
||||||
|
|
||||||
# check bulkdata
|
try:
|
||||||
|
# Check bulkdata
|
||||||
self.check_bulkdata(sid, path, bulk)
|
self.check_bulkdata(sid, path, bulk)
|
||||||
|
|
||||||
# Check that we can open bulkdata
|
# Check that we can open bulkdata
|
||||||
try:
|
|
||||||
tab = nilmdb.server.bulkdata.Table(bulk)
|
tab = nilmdb.server.bulkdata.Table(bulk)
|
||||||
except Exception as e: # pragma: no cover --
|
except FsckFormatError as e:
|
||||||
# No coverage here because, in the current code,
|
# If there are no files except _format, try deleting
|
||||||
# everything that would cause the bulkdata to fail
|
# the entire stream; this may remove metadata, but
|
||||||
# has been already checked.
|
# it's probably unimportant.
|
||||||
|
files = list(os.listdir(bulk))
|
||||||
|
if len(files) > 1:
|
||||||
|
raise FsckFormatError(f"{path}: can't load _format, "
|
||||||
|
f"but data is also present")
|
||||||
|
|
||||||
|
# Since the stream was empty, just remove it
|
||||||
|
self.fix_remove_stream(sid, path, bulk,
|
||||||
|
"empty, with corrupted format file")
|
||||||
|
except FsckError as e:
|
||||||
|
raise e
|
||||||
|
except Exception as e: # pragma: no cover
|
||||||
|
# No coverage because this is an unknown/unexpected error
|
||||||
raise FsckError("%s: can't open bulkdata: %s",
|
raise FsckError("%s: can't open bulkdata: %s",
|
||||||
path, str(e))
|
path, str(e))
|
||||||
tab.close()
|
tab.close()
|
||||||
@@ -256,20 +272,24 @@ class Fsck(object):
|
|||||||
|
|
||||||
@retry_if_raised(RetryFsck)
|
@retry_if_raised(RetryFsck)
|
||||||
def check_bulkdata(self, sid, path, bulk):
|
def check_bulkdata(self, sid, path, bulk):
|
||||||
|
try:
|
||||||
with open(os.path.join(bulk, b"_format"), "rb") as f:
|
with open(os.path.join(bulk, b"_format"), "rb") as f:
|
||||||
fmt = pickle.load(f)
|
fmt = pickle.load(f)
|
||||||
|
except Exception as e:
|
||||||
|
raise FsckFormatError(f"{path}: can't load _format file ({e})")
|
||||||
|
|
||||||
if fmt["version"] != 3:
|
if fmt["version"] != 3:
|
||||||
raise FsckError("%s: bad or unsupported bulkdata version %d",
|
raise FsckFormatError("%s: bad or unsupported bulkdata version %d",
|
||||||
path, fmt["version"])
|
path, fmt["version"])
|
||||||
rows_per_file = int(fmt["rows_per_file"])
|
rows_per_file = int(fmt["rows_per_file"])
|
||||||
if rows_per_file < 1:
|
if rows_per_file < 1:
|
||||||
raise FsckError(f"{path}: bad rows_per_file {rows_per_file}")
|
raise FsckFormatError(f"{path}: bad rows_per_file {rows_per_file}")
|
||||||
files_per_dir = int(fmt["files_per_dir"])
|
files_per_dir = int(fmt["files_per_dir"])
|
||||||
if files_per_dir < 1:
|
if files_per_dir < 1:
|
||||||
raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
|
raise FsckFormatError(f"{path}: bad files_per_dir {files_per_dir}")
|
||||||
layout = fmt["layout"]
|
layout = fmt["layout"]
|
||||||
if layout != self.stream_layout[sid]:
|
if layout != self.stream_layout[sid]:
|
||||||
raise FsckError("%s: layout mismatch %s != %s", path,
|
raise FsckFormatError("%s: layout mismatch %s != %s", path,
|
||||||
layout, self.stream_layout[sid])
|
layout, self.stream_layout[sid])
|
||||||
|
|
||||||
# Every file should have a size that's the multiple of the row size
|
# Every file should have a size that's the multiple of the row size
|
||||||
@@ -287,7 +307,7 @@ class Fsck(object):
|
|||||||
files = list(filter(regex.search, os.listdir(subpath)))
|
files = list(filter(regex.search, os.listdir(subpath)))
|
||||||
if not files:
|
if not files:
|
||||||
self.fix_empty_subdir(subpath)
|
self.fix_empty_subdir(subpath)
|
||||||
raise RetryFsck # pragma: no cover; raised by fix_empty_subdir
|
|
||||||
# Verify that their size is a multiple of the row size
|
# Verify that their size is a multiple of the row size
|
||||||
for filename in files:
|
for filename in files:
|
||||||
filepath = os.path.join(subpath, filename)
|
filepath = os.path.join(subpath, filename)
|
||||||
@@ -328,6 +348,24 @@ class Fsck(object):
|
|||||||
f.truncate(newsize)
|
f.truncate(newsize)
|
||||||
raise RetryFsck
|
raise RetryFsck
|
||||||
|
|
||||||
|
def fix_remove_stream(self, sid, path, bulk, reason):
|
||||||
|
msg = f"stream {path} is corrupted: {reason}"
|
||||||
|
if not self.fix:
|
||||||
|
raise FixableFsckError(msg)
|
||||||
|
# Remove the stream from disk and the database
|
||||||
|
err(f"\n{msg}\n")
|
||||||
|
err(f"Removing stream {path} from disk and database\n")
|
||||||
|
shutil.rmtree(bulk)
|
||||||
|
with self.sql:
|
||||||
|
cur = self.sql.cursor()
|
||||||
|
cur.execute("DELETE FROM streams WHERE id=?",
|
||||||
|
(sid,))
|
||||||
|
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
|
||||||
|
raise FsckError("failed to remove stream")
|
||||||
|
cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
|
||||||
|
cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
|
||||||
|
raise RetryFsck
|
||||||
|
|
||||||
### Check interval endpoints
|
### Check interval endpoints
|
||||||
|
|
||||||
def check_intervals(self):
|
def check_intervals(self):
|
||||||
@@ -364,7 +402,7 @@ class Fsck(object):
|
|||||||
erow = tab[epos-1] # noqa: F841 unused
|
erow = tab[epos-1] # noqa: F841 unused
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.fix_bad_interval(sid, intv, tab, str(e))
|
self.fix_bad_interval(sid, intv, tab, str(e))
|
||||||
raise RetryFsck # pragma: no cover; raised by fix_bad_interval
|
|
||||||
return len(ints)
|
return len(ints)
|
||||||
|
|
||||||
def fix_bad_interval(self, sid, intv, tab, msg):
|
def fix_bad_interval(self, sid, intv, tab, msg):
|
||||||
@@ -401,8 +439,8 @@ class Fsck(object):
|
|||||||
# Otherwise, the only hope is to delete the interval entirely.
|
# Otherwise, the only hope is to delete the interval entirely.
|
||||||
err("*** Deleting the entire interval from SQL.\n")
|
err("*** Deleting the entire interval from SQL.\n")
|
||||||
err("This may leave stale data on disk. To fix that, copy all\n")
|
err("This may leave stale data on disk. To fix that, copy all\n")
|
||||||
err("data from this stream to a new stream, then remove all data\n")
|
err("data from this stream to a new stream using nilm-copy, then\n")
|
||||||
err("from and destroy %s.\n", path)
|
err("remove all data from and destroy %s.\n", path)
|
||||||
with self.sql:
|
with self.sql:
|
||||||
cur = self.sql.cursor()
|
cur = self.sql.cursor()
|
||||||
cur.execute("DELETE FROM ranges WHERE "
|
cur.execute("DELETE FROM ranges WHERE "
|
||||||
@@ -473,18 +511,20 @@ class Fsck(object):
|
|||||||
# Verify that all timestamps are in range.
|
# Verify that all timestamps are in range.
|
||||||
match = (ts < stime) | (ts >= etime)
|
match = (ts < stime) | (ts >= etime)
|
||||||
if match.any():
|
if match.any():
|
||||||
row = start + numpy.argmax(match)
|
row = numpy.argmax(match)
|
||||||
raise FsckError("%s: data timestamp %d at row %d "
|
raise FsckError("%s: data timestamp %d at row %d "
|
||||||
"outside interval range [%d,%d)",
|
"outside interval range [%d,%d)",
|
||||||
path, data['timestamp'][row], row,
|
path, ts[row], row + start,
|
||||||
stime, etime)
|
stime, etime)
|
||||||
|
|
||||||
# Verify that timestamps are monotonic
|
# Verify that timestamps are monotonic
|
||||||
match = numpy.diff(ts) <= 0
|
match = numpy.diff(ts) <= 0
|
||||||
if match.any():
|
if match.any():
|
||||||
row = start + numpy.argmax(match)
|
row = numpy.argmax(match)
|
||||||
raise FsckError("%s: non-monotonic timestamp (%d -> %d) "
|
raise FsckError("%s: non-monotonic timestamp (%d -> %d)"
|
||||||
"at row %d", path, ts[row], ts[row+1], row)
|
" at row %d", path, ts[row], ts[row+1],
|
||||||
|
row + start)
|
||||||
|
|
||||||
first_ts = ts[0]
|
first_ts = ts[0]
|
||||||
if last_ts is not None and first_ts <= last_ts:
|
if last_ts is not None and first_ts <= last_ts:
|
||||||
raise FsckError("%s: first interval timestamp %d is not "
|
raise FsckError("%s: first interval timestamp %d is not "
|
||||||
|
@@ -293,8 +293,8 @@ class Table():
|
|||||||
"layout": layout,
|
"layout": layout,
|
||||||
"version": 3
|
"version": 3
|
||||||
}
|
}
|
||||||
with open(os.path.join(root, b"_format"), "wb") as f:
|
nilmdb.utils.atomic.replace_file(
|
||||||
pickle.dump(fmt, f, 2)
|
os.path.join(root, b"_format"), pickle.dumps(fmt, 2))
|
||||||
|
|
||||||
# Normal methods
|
# Normal methods
|
||||||
def __init__(self, root, initial_nrows=0):
|
def __init__(self, root, initial_nrows=0):
|
||||||
|
BIN
tests/fsck-data/test2v/data.sql
Normal file
BIN
tests/fsck-data/test2v/data.sql
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2v/data/a/b/0000/0000
Normal file
BIN
tests/fsck-data/test2v/data/a/b/0000/0000
Normal file
Binary file not shown.
0
tests/fsck-data/test2v/data/a/b/_format
Normal file
0
tests/fsck-data/test2v/data/a/b/_format
Normal file
BIN
tests/fsck-data/test2v1/data.sql
Normal file
BIN
tests/fsck-data/test2v1/data.sql
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2v1/data/a/b/0000/0000
Normal file
BIN
tests/fsck-data/test2v1/data/a/b/0000/0000
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2v2/data.sql
Normal file
BIN
tests/fsck-data/test2v2/data.sql
Normal file
Binary file not shown.
0
tests/fsck-data/test2v2/data/a/b/_format
Normal file
0
tests/fsck-data/test2v2/data/a/b/_format
Normal file
@@ -163,3 +163,8 @@ class TestFsck(object):
|
|||||||
raise Exception("hi")
|
raise Exception("hi")
|
||||||
with assert_raises(Exception):
|
with assert_raises(Exception):
|
||||||
foo()
|
foo()
|
||||||
|
|
||||||
|
self.failmsg("test2v", "can't load _format, but data is also present")
|
||||||
|
self.failmsg("test2v1", "bad bulkdata table")
|
||||||
|
self.failmsg("test2v2", "empty, with corrupted format file", fix=False)
|
||||||
|
self.okmsg("test2v2", "empty, with corrupted format file")
|
||||||
|
Reference in New Issue
Block a user