Compare commits
14 Commits
nilmdb-2.1
...
master
Author | SHA1 | Date | |
---|---|---|---|
517b237636 | |||
07f138e0f4 | |||
7538c6201b | |||
4d9a106ca1 | |||
e90a79ddad | |||
7056c5b4ec | |||
df4e7f0967 | |||
b6bba16505 | |||
d4003d0d34 | |||
759492298a | |||
b5f6fcc253 | |||
905e325ded | |||
648b6f4b70 | |||
7f8a2c7027 |
|
@ -39,6 +39,10 @@ class RetryFsck(FsckError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FsckFormatError(FsckError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def log(format, *args):
|
def log(format, *args):
|
||||||
printf(format, *args)
|
printf(format, *args)
|
||||||
|
|
||||||
|
@ -48,7 +52,7 @@ def err(format, *args):
|
||||||
|
|
||||||
|
|
||||||
# Decorator that retries a function if it returns a specific value
|
# Decorator that retries a function if it returns a specific value
|
||||||
def retry_if_raised(exc, message=None, max_retries=100):
|
def retry_if_raised(exc, message=None, max_retries=1000):
|
||||||
def f1(func):
|
def f1(func):
|
||||||
def f2(*args, **kwargs):
|
def f2(*args, **kwargs):
|
||||||
for n in range(max_retries):
|
for n in range(max_retries):
|
||||||
|
@ -56,7 +60,7 @@ def retry_if_raised(exc, message=None, max_retries=100):
|
||||||
return func(*args, **kwargs)
|
return func(*args, **kwargs)
|
||||||
except exc:
|
except exc:
|
||||||
if message:
|
if message:
|
||||||
log("%s\n\n", message)
|
log(f"{message} ({n+1})\n\n")
|
||||||
raise Exception("Max number of retries (%d) exceeded; giving up" %
|
raise Exception("Max number of retries (%d) exceeded; giving up" %
|
||||||
max_retries)
|
max_retries)
|
||||||
return f2
|
return f2
|
||||||
|
@ -234,42 +238,97 @@ class Fsck(object):
|
||||||
try:
|
try:
|
||||||
posiset += new
|
posiset += new
|
||||||
except IntervalError:
|
except IntervalError:
|
||||||
raise FsckError("%s: overlap in file offsets:\n"
|
self.fix_row_overlap(sid, path, posiset, new)
|
||||||
"set: %s\nnew: %s",
|
|
||||||
path, str(posiset), str(new))
|
|
||||||
|
|
||||||
# check bulkdata
|
try:
|
||||||
|
# Check bulkdata
|
||||||
self.check_bulkdata(sid, path, bulk)
|
self.check_bulkdata(sid, path, bulk)
|
||||||
|
|
||||||
# Check that we can open bulkdata
|
# Check that we can open bulkdata
|
||||||
try:
|
|
||||||
tab = nilmdb.server.bulkdata.Table(bulk)
|
tab = nilmdb.server.bulkdata.Table(bulk)
|
||||||
except Exception as e: # pragma: no cover --
|
except FsckFormatError:
|
||||||
# No coverage here because, in the current code,
|
# If there are no files except _format, try deleting
|
||||||
# everything that would cause the bulkdata to fail
|
# the entire stream; this may remove metadata, but
|
||||||
# has been already checked.
|
# it's probably unimportant.
|
||||||
|
files = list(os.listdir(bulk))
|
||||||
|
if len(files) > 1:
|
||||||
|
raise FsckFormatError(f"{path}: can't load _format, "
|
||||||
|
f"but data is also present")
|
||||||
|
|
||||||
|
# Since the stream was empty, just remove it
|
||||||
|
self.fix_remove_stream(sid, path, bulk,
|
||||||
|
"empty, with corrupted format file")
|
||||||
|
except FsckError as e:
|
||||||
|
raise e
|
||||||
|
except Exception as e: # pragma: no cover
|
||||||
|
# No coverage because this is an unknown/unexpected error
|
||||||
raise FsckError("%s: can't open bulkdata: %s",
|
raise FsckError("%s: can't open bulkdata: %s",
|
||||||
path, str(e))
|
path, str(e))
|
||||||
tab.close()
|
tab.close()
|
||||||
|
|
||||||
|
def fix_row_overlap(self, sid, path, existing, new):
|
||||||
|
# If the file rows (spos, epos) overlap in the interval table,
|
||||||
|
# and the overlapping ranges look like this:
|
||||||
|
# A --------- C
|
||||||
|
# B -------- D
|
||||||
|
# Then we can try changing the first interval to go from
|
||||||
|
# A to B instead.
|
||||||
|
msg = (f"{path}: overlap in file offsets:\n"
|
||||||
|
f"existing ranges: {existing}\n"
|
||||||
|
f"overlapping interval: {new}")
|
||||||
|
if not self.fix:
|
||||||
|
raise FixableFsckError(msg)
|
||||||
|
err(f"\n{msg}\nSeeing if we can truncate one of them...\n")
|
||||||
|
|
||||||
|
# See if there'e exactly one interval that overlaps the
|
||||||
|
# conflicting one in the right way
|
||||||
|
match = None
|
||||||
|
for intv in self.stream_interval[sid]:
|
||||||
|
(stime, etime, spos, epos) = intv
|
||||||
|
if spos < new.start and epos > new.start:
|
||||||
|
if match:
|
||||||
|
err(f"no, more than one interval matched:\n"
|
||||||
|
f"{intv}\n{match}\n")
|
||||||
|
raise FsckError(f"{path}: unfixable overlap")
|
||||||
|
match = intv
|
||||||
|
if match is None:
|
||||||
|
err("no intervals overlapped in the right way\n")
|
||||||
|
raise FsckError(f"{path}: unfixable overlap")
|
||||||
|
|
||||||
|
# Truncate the file position
|
||||||
|
err(f"truncating {match}\n")
|
||||||
|
with self.sql:
|
||||||
|
cur = self.sql.cursor()
|
||||||
|
cur.execute("UPDATE ranges SET end_pos=? "
|
||||||
|
"WHERE stream_id=? AND start_time=? AND "
|
||||||
|
"end_time=? AND start_pos=? AND end_pos=?",
|
||||||
|
(new.start, sid, *match))
|
||||||
|
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
|
||||||
|
raise FsckError("failed to fix SQL database")
|
||||||
|
raise RetryFsck
|
||||||
|
|
||||||
### Check that bulkdata is good enough to be opened
|
### Check that bulkdata is good enough to be opened
|
||||||
|
|
||||||
@retry_if_raised(RetryFsck)
|
@retry_if_raised(RetryFsck)
|
||||||
def check_bulkdata(self, sid, path, bulk):
|
def check_bulkdata(self, sid, path, bulk):
|
||||||
|
try:
|
||||||
with open(os.path.join(bulk, b"_format"), "rb") as f:
|
with open(os.path.join(bulk, b"_format"), "rb") as f:
|
||||||
fmt = pickle.load(f)
|
fmt = pickle.load(f)
|
||||||
|
except Exception as e:
|
||||||
|
raise FsckFormatError(f"{path}: can't load _format file ({e})")
|
||||||
|
|
||||||
if fmt["version"] != 3:
|
if fmt["version"] != 3:
|
||||||
raise FsckError("%s: bad or unsupported bulkdata version %d",
|
raise FsckFormatError("%s: bad or unsupported bulkdata version %d",
|
||||||
path, fmt["version"])
|
path, fmt["version"])
|
||||||
rows_per_file = int(fmt["rows_per_file"])
|
rows_per_file = int(fmt["rows_per_file"])
|
||||||
if rows_per_file < 1:
|
if rows_per_file < 1:
|
||||||
raise FsckError(f"{path}: bad rows_per_file {rows_per_file}")
|
raise FsckFormatError(f"{path}: bad rows_per_file {rows_per_file}")
|
||||||
files_per_dir = int(fmt["files_per_dir"])
|
files_per_dir = int(fmt["files_per_dir"])
|
||||||
if files_per_dir < 1:
|
if files_per_dir < 1:
|
||||||
raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
|
raise FsckFormatError(f"{path}: bad files_per_dir {files_per_dir}")
|
||||||
layout = fmt["layout"]
|
layout = fmt["layout"]
|
||||||
if layout != self.stream_layout[sid]:
|
if layout != self.stream_layout[sid]:
|
||||||
raise FsckError("%s: layout mismatch %s != %s", path,
|
raise FsckFormatError("%s: layout mismatch %s != %s", path,
|
||||||
layout, self.stream_layout[sid])
|
layout, self.stream_layout[sid])
|
||||||
|
|
||||||
# Every file should have a size that's the multiple of the row size
|
# Every file should have a size that's the multiple of the row size
|
||||||
|
@ -287,7 +346,7 @@ class Fsck(object):
|
||||||
files = list(filter(regex.search, os.listdir(subpath)))
|
files = list(filter(regex.search, os.listdir(subpath)))
|
||||||
if not files:
|
if not files:
|
||||||
self.fix_empty_subdir(subpath)
|
self.fix_empty_subdir(subpath)
|
||||||
raise RetryFsck # pragma: no cover; raised by fix_empty_subdir
|
|
||||||
# Verify that their size is a multiple of the row size
|
# Verify that their size is a multiple of the row size
|
||||||
for filename in files:
|
for filename in files:
|
||||||
filepath = os.path.join(subpath, filename)
|
filepath = os.path.join(subpath, filename)
|
||||||
|
@ -328,6 +387,24 @@ class Fsck(object):
|
||||||
f.truncate(newsize)
|
f.truncate(newsize)
|
||||||
raise RetryFsck
|
raise RetryFsck
|
||||||
|
|
||||||
|
def fix_remove_stream(self, sid, path, bulk, reason):
|
||||||
|
msg = f"stream {path} is corrupted: {reason}"
|
||||||
|
if not self.fix:
|
||||||
|
raise FixableFsckError(msg)
|
||||||
|
# Remove the stream from disk and the database
|
||||||
|
err(f"\n{msg}\n")
|
||||||
|
err(f"Removing stream {path} from disk and database\n")
|
||||||
|
shutil.rmtree(bulk)
|
||||||
|
with self.sql:
|
||||||
|
cur = self.sql.cursor()
|
||||||
|
cur.execute("DELETE FROM streams WHERE id=?",
|
||||||
|
(sid,))
|
||||||
|
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
|
||||||
|
raise FsckError("failed to remove stream")
|
||||||
|
cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
|
||||||
|
cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
|
||||||
|
raise RetryFsck
|
||||||
|
|
||||||
### Check interval endpoints
|
### Check interval endpoints
|
||||||
|
|
||||||
def check_intervals(self):
|
def check_intervals(self):
|
||||||
|
@ -364,7 +441,7 @@ class Fsck(object):
|
||||||
erow = tab[epos-1] # noqa: F841 unused
|
erow = tab[epos-1] # noqa: F841 unused
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.fix_bad_interval(sid, intv, tab, str(e))
|
self.fix_bad_interval(sid, intv, tab, str(e))
|
||||||
raise RetryFsck # pragma: no cover; raised by fix_bad_interval
|
|
||||||
return len(ints)
|
return len(ints)
|
||||||
|
|
||||||
def fix_bad_interval(self, sid, intv, tab, msg):
|
def fix_bad_interval(self, sid, intv, tab, msg):
|
||||||
|
@ -400,9 +477,9 @@ class Fsck(object):
|
||||||
|
|
||||||
# Otherwise, the only hope is to delete the interval entirely.
|
# Otherwise, the only hope is to delete the interval entirely.
|
||||||
err("*** Deleting the entire interval from SQL.\n")
|
err("*** Deleting the entire interval from SQL.\n")
|
||||||
err("This may leave stale data on disk. To fix that, copy all\n")
|
err("This may leave stale data on disk. To fix that, copy all "
|
||||||
err("data from this stream to a new stream, then remove all data\n")
|
"data from this stream to a new stream using nilm-copy, then\n")
|
||||||
err("from and destroy %s.\n", path)
|
err("remove all data from and destroy %s.\n", path)
|
||||||
with self.sql:
|
with self.sql:
|
||||||
cur = self.sql.cursor()
|
cur = self.sql.cursor()
|
||||||
cur.execute("DELETE FROM ranges WHERE "
|
cur.execute("DELETE FROM ranges WHERE "
|
||||||
|
@ -473,18 +550,35 @@ class Fsck(object):
|
||||||
# Verify that all timestamps are in range.
|
# Verify that all timestamps are in range.
|
||||||
match = (ts < stime) | (ts >= etime)
|
match = (ts < stime) | (ts >= etime)
|
||||||
if match.any():
|
if match.any():
|
||||||
row = start + numpy.argmax(match)
|
row = numpy.argmax(match)
|
||||||
|
if ts[row] != 0:
|
||||||
raise FsckError("%s: data timestamp %d at row %d "
|
raise FsckError("%s: data timestamp %d at row %d "
|
||||||
"outside interval range [%d,%d)",
|
"outside interval range [%d,%d)",
|
||||||
path, data['timestamp'][row], row,
|
path, ts[row], row + start,
|
||||||
stime, etime)
|
stime, etime)
|
||||||
|
|
||||||
|
# Timestamp is zero and out of the expected range;
|
||||||
|
# assume file ends with zeroed data and just truncate it.
|
||||||
|
self.fix_table_by_truncating(
|
||||||
|
path, tab, row + start,
|
||||||
|
"data timestamp is out of range, and zero")
|
||||||
|
|
||||||
# Verify that timestamps are monotonic
|
# Verify that timestamps are monotonic
|
||||||
match = numpy.diff(ts) <= 0
|
match = numpy.diff(ts) <= 0
|
||||||
if match.any():
|
if match.any():
|
||||||
row = start + numpy.argmax(match)
|
row = numpy.argmax(match)
|
||||||
raise FsckError("%s: non-monotonic timestamp (%d -> %d) "
|
if ts[row+1] != 0:
|
||||||
"at row %d", path, ts[row], ts[row+1], row)
|
raise FsckError(
|
||||||
|
"%s: non-monotonic timestamp (%d -> %d) "
|
||||||
|
"at row %d", path, ts[row], ts[row+1],
|
||||||
|
row + start)
|
||||||
|
|
||||||
|
# Timestamp is zero and non-monotonic;
|
||||||
|
# assume file ends with zeroed data and just truncate it.
|
||||||
|
self.fix_table_by_truncating(
|
||||||
|
path, tab, row + start + 1,
|
||||||
|
"data timestamp is non-monotonic, and zero")
|
||||||
|
|
||||||
first_ts = ts[0]
|
first_ts = ts[0]
|
||||||
if last_ts is not None and first_ts <= last_ts:
|
if last_ts is not None and first_ts <= last_ts:
|
||||||
raise FsckError("%s: first interval timestamp %d is not "
|
raise FsckError("%s: first interval timestamp %d is not "
|
||||||
|
@ -502,3 +596,15 @@ class Fsck(object):
|
||||||
done += count
|
done += count
|
||||||
update(done)
|
update(done)
|
||||||
return done
|
return done
|
||||||
|
|
||||||
|
def fix_table_by_truncating(self, path, tab, row, reason):
|
||||||
|
# Simple fix for bad data: truncate the table at the given row.
|
||||||
|
# On retry, fix_bad_interval will correct the database and timestamps
|
||||||
|
# to account for this truncation.
|
||||||
|
msg = f"{path}: bad data in table, starting at row {row}: {reason}"
|
||||||
|
if not self.fix:
|
||||||
|
raise FixableFsckError(msg)
|
||||||
|
err(f"\n{msg}\nWill try truncating table\n")
|
||||||
|
(subdir, fname, offs, count) = tab._offset_from_row(row)
|
||||||
|
tab._remove_or_truncate_file(subdir, fname, offs)
|
||||||
|
raise RetryFsck
|
||||||
|
|
|
@ -293,8 +293,8 @@ class Table():
|
||||||
"layout": layout,
|
"layout": layout,
|
||||||
"version": 3
|
"version": 3
|
||||||
}
|
}
|
||||||
with open(os.path.join(root, b"_format"), "wb") as f:
|
nilmdb.utils.atomic.replace_file(
|
||||||
pickle.dump(fmt, f, 2)
|
os.path.join(root, b"_format"), pickle.dumps(fmt, 2))
|
||||||
|
|
||||||
# Normal methods
|
# Normal methods
|
||||||
def __init__(self, root, initial_nrows=0):
|
def __init__(self, root, initial_nrows=0):
|
||||||
|
|
BIN
tests/fsck-data/test2v/data.sql
Normal file
BIN
tests/fsck-data/test2v/data.sql
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2v/data/a/b/0000/0000
Normal file
BIN
tests/fsck-data/test2v/data/a/b/0000/0000
Normal file
Binary file not shown.
0
tests/fsck-data/test2v/data/a/b/_format
Normal file
0
tests/fsck-data/test2v/data/a/b/_format
Normal file
BIN
tests/fsck-data/test2v1/data.sql
Normal file
BIN
tests/fsck-data/test2v1/data.sql
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2v1/data/a/b/0000/0000
Normal file
BIN
tests/fsck-data/test2v1/data/a/b/0000/0000
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2v2/data.sql
Normal file
BIN
tests/fsck-data/test2v2/data.sql
Normal file
Binary file not shown.
0
tests/fsck-data/test2v2/data/a/b/_format
Normal file
0
tests/fsck-data/test2v2/data/a/b/_format
Normal file
BIN
tests/fsck-data/test2w1/data.sql
Normal file
BIN
tests/fsck-data/test2w1/data.sql
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2w1/data/a/b/0000/0000
Normal file
BIN
tests/fsck-data/test2w1/data/a/b/0000/0000
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2w1/data/a/b/_format
Normal file
BIN
tests/fsck-data/test2w1/data/a/b/_format
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2w2/data.sql
Normal file
BIN
tests/fsck-data/test2w2/data.sql
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2w2/data/a/b/0000/0000
Normal file
BIN
tests/fsck-data/test2w2/data/a/b/0000/0000
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2w2/data/a/b/_format
Normal file
BIN
tests/fsck-data/test2w2/data/a/b/_format
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2x1/data.sql
Normal file
BIN
tests/fsck-data/test2x1/data.sql
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2x1/data/a/b/0000/0000
Normal file
BIN
tests/fsck-data/test2x1/data/a/b/0000/0000
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2x1/data/a/b/_format
Normal file
BIN
tests/fsck-data/test2x1/data/a/b/_format
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2x2/data.sql
Normal file
BIN
tests/fsck-data/test2x2/data.sql
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2x2/data/a/b/0000/0000
Normal file
BIN
tests/fsck-data/test2x2/data/a/b/0000/0000
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2x2/data/a/b/_format
Normal file
BIN
tests/fsck-data/test2x2/data/a/b/_format
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2x3/data.sql
Normal file
BIN
tests/fsck-data/test2x3/data.sql
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2x3/data/a/b/0000/0000
Normal file
BIN
tests/fsck-data/test2x3/data/a/b/0000/0000
Normal file
Binary file not shown.
BIN
tests/fsck-data/test2x3/data/a/b/_format
Normal file
BIN
tests/fsck-data/test2x3/data/a/b/_format
Normal file
Binary file not shown.
|
@ -123,7 +123,7 @@ class TestFsck(object):
|
||||||
self.failmsg("test2h", "missing bulkdata dir")
|
self.failmsg("test2h", "missing bulkdata dir")
|
||||||
self.failmsg("test2i", "bad bulkdata table")
|
self.failmsg("test2i", "bad bulkdata table")
|
||||||
self.failmsg("test2j", "overlap in intervals")
|
self.failmsg("test2j", "overlap in intervals")
|
||||||
self.failmsg("test2k", "overlap in file offsets")
|
self.failmsg("test2k", "overlap in file offsets", fix=False)
|
||||||
self.ok("test2k1")
|
self.ok("test2k1")
|
||||||
self.failmsg("test2l", "unsupported bulkdata version")
|
self.failmsg("test2l", "unsupported bulkdata version")
|
||||||
self.failmsg("test2m", "bad rows_per_file")
|
self.failmsg("test2m", "bad rows_per_file")
|
||||||
|
@ -163,3 +163,22 @@ class TestFsck(object):
|
||||||
raise Exception("hi")
|
raise Exception("hi")
|
||||||
with assert_raises(Exception):
|
with assert_raises(Exception):
|
||||||
foo()
|
foo()
|
||||||
|
|
||||||
|
self.failmsg("test2v", "can't load _format, but data is also present")
|
||||||
|
self.failmsg("test2v1", "bad bulkdata table")
|
||||||
|
self.failmsg("test2v2", "empty, with corrupted format file", fix=False)
|
||||||
|
self.okmsg("test2v2", "empty, with corrupted format file")
|
||||||
|
|
||||||
|
self.failmsg("test2w1", "out of range, and zero", fix=False)
|
||||||
|
self.okmsg("test2w1", "Will try truncating table")
|
||||||
|
self.contain("Deleting the entire interval")
|
||||||
|
|
||||||
|
self.failmsg("test2w2", "non-monotonic, and zero", fix=False)
|
||||||
|
self.okmsg("test2w2", "Will try truncating table")
|
||||||
|
self.contain("new end: time 237000001, pos 238")
|
||||||
|
|
||||||
|
self.failmsg("test2x1", "overlap in file offsets", fix=False)
|
||||||
|
self.okmsg("test2x1", "truncating")
|
||||||
|
|
||||||
|
self.failmsg("test2x2", "unfixable overlap")
|
||||||
|
self.failmsg("test2x3", "unfixable overlap")
|
||||||
|
|
Loading…
Reference in New Issue
Block a user