Compare commits

..

14 Commits

Author SHA1 Message Date
517b237636 tests: fix test databases 2020-08-18 10:23:56 -04:00
07f138e0f4 flake8: fix style 2020-08-18 10:22:50 -04:00
7538c6201b tests: new fsck tests for interval overlap 2020-08-18 10:18:54 -04:00
4d9a106ca1 fsck: add fix for file position (row) overlap in database
The following sequence could lead to this corruption:
  (1) Append new data to bulkdata
  (2) Update interval file positions in SQL
  (3) Flush (2)
  (4) Crash before flushing (1)
  (5) Reload database without running fsck
  (6) Start writing new data to end of bulkdata and introduce new interval
2020-08-18 10:04:33 -04:00
e90a79ddad fsck: increase max restarts from 100 to 1000
This is effectively the number of problems with the database that can
be fixed, since we restart fsck after each one.
2020-08-18 10:03:57 -04:00
7056c5b4ec tests: new fsck tests 2020-08-18 10:03:34 -04:00
df4e7f0967 fsck: If data timestamps are unexpectedly zero, truncate data
This probably means that the file was written, and metadata was journaled,
but the system crashed before data was written.  If that happens, the
end of the file will be zeroed out.  We don't bother checking the entire
file here; if we see just one timestamp that is unexpectedly zero, let's
truncate the data there.
2020-08-18 00:32:51 -04:00
b6bba16505 fsck: fix error in reporting row number for timestamp errors
Since we process in chunks, we need to add "start" to the row number;
however, we don't need to use this when accessing data['timestamp']
(aka ts)
2020-08-18 00:31:40 -04:00
d4003d0d34 tests: fill out coverage for new fsck features 2020-08-17 23:27:32 -04:00
759492298a bulkdata: write _format file atomically, to reduce chance of corruption
Some databases were seeing _format truncated to 0 bytes, after crashing
soon after a new stream was created (e.g. during decimation).
2020-08-17 23:04:36 -04:00
b5f6fcc253 fsck: detect problems with _format file, and remove stream if empty
A bulkdata dir may get created for a new stream with an empty or
corrupted _format, before any data gets actually written.  In that
case, we can just delete the new stream; worst case, we lose some
metadata.

Note: The info in _format should really get moved into the database.
This was born when bulkdata switched from PyTables to a custom storage
system, and was probably stored this way to avoid tying the main DB
to specific implementation details while they were still in flux.
2020-08-17 22:55:44 -04:00
905e325ded fsck: add fixer that fully removes a stream 2020-08-17 22:55:32 -04:00
648b6f4b70 fsck: improve instructions about removing leftover data 2020-08-17 22:55:09 -04:00
7f8a2c7027 fsck: remove unnecessary raises
The called functions already raise RetryFsck.
2020-08-17 22:54:36 -04:00
25 changed files with 162 additions and 37 deletions

View File

@ -39,6 +39,10 @@ class RetryFsck(FsckError):
pass pass
class FsckFormatError(FsckError):
pass
def log(format, *args): def log(format, *args):
printf(format, *args) printf(format, *args)
@ -48,7 +52,7 @@ def err(format, *args):
# Decorator that retries a function if it returns a specific value # Decorator that retries a function if it returns a specific value
def retry_if_raised(exc, message=None, max_retries=100): def retry_if_raised(exc, message=None, max_retries=1000):
def f1(func): def f1(func):
def f2(*args, **kwargs): def f2(*args, **kwargs):
for n in range(max_retries): for n in range(max_retries):
@ -56,7 +60,7 @@ def retry_if_raised(exc, message=None, max_retries=100):
return func(*args, **kwargs) return func(*args, **kwargs)
except exc: except exc:
if message: if message:
log("%s\n\n", message) log(f"{message} ({n+1})\n\n")
raise Exception("Max number of retries (%d) exceeded; giving up" % raise Exception("Max number of retries (%d) exceeded; giving up" %
max_retries) max_retries)
return f2 return f2
@ -234,43 +238,98 @@ class Fsck(object):
try: try:
posiset += new posiset += new
except IntervalError: except IntervalError:
raise FsckError("%s: overlap in file offsets:\n" self.fix_row_overlap(sid, path, posiset, new)
"set: %s\nnew: %s",
path, str(posiset), str(new))
# check bulkdata
self.check_bulkdata(sid, path, bulk)
# Check that we can open bulkdata
try: try:
# Check bulkdata
self.check_bulkdata(sid, path, bulk)
# Check that we can open bulkdata
tab = nilmdb.server.bulkdata.Table(bulk) tab = nilmdb.server.bulkdata.Table(bulk)
except Exception as e: # pragma: no cover -- except FsckFormatError:
# No coverage here because, in the current code, # If there are no files except _format, try deleting
# everything that would cause the bulkdata to fail # the entire stream; this may remove metadata, but
# has been already checked. # it's probably unimportant.
files = list(os.listdir(bulk))
if len(files) > 1:
raise FsckFormatError(f"{path}: can't load _format, "
f"but data is also present")
# Since the stream was empty, just remove it
self.fix_remove_stream(sid, path, bulk,
"empty, with corrupted format file")
except FsckError as e:
raise e
except Exception as e: # pragma: no cover
# No coverage because this is an unknown/unexpected error
raise FsckError("%s: can't open bulkdata: %s", raise FsckError("%s: can't open bulkdata: %s",
path, str(e)) path, str(e))
tab.close() tab.close()
def fix_row_overlap(self, sid, path, existing, new):
# If the file rows (spos, epos) overlap in the interval table,
# and the overlapping ranges look like this:
# A --------- C
# B -------- D
# Then we can try changing the first interval to go from
# A to B instead.
msg = (f"{path}: overlap in file offsets:\n"
f"existing ranges: {existing}\n"
f"overlapping interval: {new}")
if not self.fix:
raise FixableFsckError(msg)
err(f"\n{msg}\nSeeing if we can truncate one of them...\n")
# See if there'e exactly one interval that overlaps the
# conflicting one in the right way
match = None
for intv in self.stream_interval[sid]:
(stime, etime, spos, epos) = intv
if spos < new.start and epos > new.start:
if match:
err(f"no, more than one interval matched:\n"
f"{intv}\n{match}\n")
raise FsckError(f"{path}: unfixable overlap")
match = intv
if match is None:
err("no intervals overlapped in the right way\n")
raise FsckError(f"{path}: unfixable overlap")
# Truncate the file position
err(f"truncating {match}\n")
with self.sql:
cur = self.sql.cursor()
cur.execute("UPDATE ranges SET end_pos=? "
"WHERE stream_id=? AND start_time=? AND "
"end_time=? AND start_pos=? AND end_pos=?",
(new.start, sid, *match))
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
raise FsckError("failed to fix SQL database")
raise RetryFsck
### Check that bulkdata is good enough to be opened ### Check that bulkdata is good enough to be opened
@retry_if_raised(RetryFsck) @retry_if_raised(RetryFsck)
def check_bulkdata(self, sid, path, bulk): def check_bulkdata(self, sid, path, bulk):
with open(os.path.join(bulk, b"_format"), "rb") as f: try:
fmt = pickle.load(f) with open(os.path.join(bulk, b"_format"), "rb") as f:
fmt = pickle.load(f)
except Exception as e:
raise FsckFormatError(f"{path}: can't load _format file ({e})")
if fmt["version"] != 3: if fmt["version"] != 3:
raise FsckError("%s: bad or unsupported bulkdata version %d", raise FsckFormatError("%s: bad or unsupported bulkdata version %d",
path, fmt["version"]) path, fmt["version"])
rows_per_file = int(fmt["rows_per_file"]) rows_per_file = int(fmt["rows_per_file"])
if rows_per_file < 1: if rows_per_file < 1:
raise FsckError(f"{path}: bad rows_per_file {rows_per_file}") raise FsckFormatError(f"{path}: bad rows_per_file {rows_per_file}")
files_per_dir = int(fmt["files_per_dir"]) files_per_dir = int(fmt["files_per_dir"])
if files_per_dir < 1: if files_per_dir < 1:
raise FsckError(f"{path}: bad files_per_dir {files_per_dir}") raise FsckFormatError(f"{path}: bad files_per_dir {files_per_dir}")
layout = fmt["layout"] layout = fmt["layout"]
if layout != self.stream_layout[sid]: if layout != self.stream_layout[sid]:
raise FsckError("%s: layout mismatch %s != %s", path, raise FsckFormatError("%s: layout mismatch %s != %s", path,
layout, self.stream_layout[sid]) layout, self.stream_layout[sid])
# Every file should have a size that's the multiple of the row size # Every file should have a size that's the multiple of the row size
rkt = nilmdb.server.rocket.Rocket(layout, None) rkt = nilmdb.server.rocket.Rocket(layout, None)
@ -287,7 +346,7 @@ class Fsck(object):
files = list(filter(regex.search, os.listdir(subpath))) files = list(filter(regex.search, os.listdir(subpath)))
if not files: if not files:
self.fix_empty_subdir(subpath) self.fix_empty_subdir(subpath)
raise RetryFsck # pragma: no cover; raised by fix_empty_subdir
# Verify that their size is a multiple of the row size # Verify that their size is a multiple of the row size
for filename in files: for filename in files:
filepath = os.path.join(subpath, filename) filepath = os.path.join(subpath, filename)
@ -328,6 +387,24 @@ class Fsck(object):
f.truncate(newsize) f.truncate(newsize)
raise RetryFsck raise RetryFsck
def fix_remove_stream(self, sid, path, bulk, reason):
msg = f"stream {path} is corrupted: {reason}"
if not self.fix:
raise FixableFsckError(msg)
# Remove the stream from disk and the database
err(f"\n{msg}\n")
err(f"Removing stream {path} from disk and database\n")
shutil.rmtree(bulk)
with self.sql:
cur = self.sql.cursor()
cur.execute("DELETE FROM streams WHERE id=?",
(sid,))
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
raise FsckError("failed to remove stream")
cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
raise RetryFsck
### Check interval endpoints ### Check interval endpoints
def check_intervals(self): def check_intervals(self):
@ -364,7 +441,7 @@ class Fsck(object):
erow = tab[epos-1] # noqa: F841 unused erow = tab[epos-1] # noqa: F841 unused
except Exception as e: except Exception as e:
self.fix_bad_interval(sid, intv, tab, str(e)) self.fix_bad_interval(sid, intv, tab, str(e))
raise RetryFsck # pragma: no cover; raised by fix_bad_interval
return len(ints) return len(ints)
def fix_bad_interval(self, sid, intv, tab, msg): def fix_bad_interval(self, sid, intv, tab, msg):
@ -400,9 +477,9 @@ class Fsck(object):
# Otherwise, the only hope is to delete the interval entirely. # Otherwise, the only hope is to delete the interval entirely.
err("*** Deleting the entire interval from SQL.\n") err("*** Deleting the entire interval from SQL.\n")
err("This may leave stale data on disk. To fix that, copy all\n") err("This may leave stale data on disk. To fix that, copy all "
err("data from this stream to a new stream, then remove all data\n") "data from this stream to a new stream using nilm-copy, then\n")
err("from and destroy %s.\n", path) err("remove all data from and destroy %s.\n", path)
with self.sql: with self.sql:
cur = self.sql.cursor() cur = self.sql.cursor()
cur.execute("DELETE FROM ranges WHERE " cur.execute("DELETE FROM ranges WHERE "
@ -473,18 +550,35 @@ class Fsck(object):
# Verify that all timestamps are in range. # Verify that all timestamps are in range.
match = (ts < stime) | (ts >= etime) match = (ts < stime) | (ts >= etime)
if match.any(): if match.any():
row = start + numpy.argmax(match) row = numpy.argmax(match)
raise FsckError("%s: data timestamp %d at row %d " if ts[row] != 0:
"outside interval range [%d,%d)", raise FsckError("%s: data timestamp %d at row %d "
path, data['timestamp'][row], row, "outside interval range [%d,%d)",
stime, etime) path, ts[row], row + start,
stime, etime)
# Timestamp is zero and out of the expected range;
# assume file ends with zeroed data and just truncate it.
self.fix_table_by_truncating(
path, tab, row + start,
"data timestamp is out of range, and zero")
# Verify that timestamps are monotonic # Verify that timestamps are monotonic
match = numpy.diff(ts) <= 0 match = numpy.diff(ts) <= 0
if match.any(): if match.any():
row = start + numpy.argmax(match) row = numpy.argmax(match)
raise FsckError("%s: non-monotonic timestamp (%d -> %d) " if ts[row+1] != 0:
"at row %d", path, ts[row], ts[row+1], row) raise FsckError(
"%s: non-monotonic timestamp (%d -> %d) "
"at row %d", path, ts[row], ts[row+1],
row + start)
# Timestamp is zero and non-monotonic;
# assume file ends with zeroed data and just truncate it.
self.fix_table_by_truncating(
path, tab, row + start + 1,
"data timestamp is non-monotonic, and zero")
first_ts = ts[0] first_ts = ts[0]
if last_ts is not None and first_ts <= last_ts: if last_ts is not None and first_ts <= last_ts:
raise FsckError("%s: first interval timestamp %d is not " raise FsckError("%s: first interval timestamp %d is not "
@ -502,3 +596,15 @@ class Fsck(object):
done += count done += count
update(done) update(done)
return done return done
def fix_table_by_truncating(self, path, tab, row, reason):
# Simple fix for bad data: truncate the table at the given row.
# On retry, fix_bad_interval will correct the database and timestamps
# to account for this truncation.
msg = f"{path}: bad data in table, starting at row {row}: {reason}"
if not self.fix:
raise FixableFsckError(msg)
err(f"\n{msg}\nWill try truncating table\n")
(subdir, fname, offs, count) = tab._offset_from_row(row)
tab._remove_or_truncate_file(subdir, fname, offs)
raise RetryFsck

View File

@ -293,8 +293,8 @@ class Table():
"layout": layout, "layout": layout,
"version": 3 "version": 3
} }
with open(os.path.join(root, b"_format"), "wb") as f: nilmdb.utils.atomic.replace_file(
pickle.dump(fmt, f, 2) os.path.join(root, b"_format"), pickle.dumps(fmt, 2))
# Normal methods # Normal methods
def __init__(self, root, initial_nrows=0): def __init__(self, root, initial_nrows=0):

Binary file not shown.

Binary file not shown.

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -123,7 +123,7 @@ class TestFsck(object):
self.failmsg("test2h", "missing bulkdata dir") self.failmsg("test2h", "missing bulkdata dir")
self.failmsg("test2i", "bad bulkdata table") self.failmsg("test2i", "bad bulkdata table")
self.failmsg("test2j", "overlap in intervals") self.failmsg("test2j", "overlap in intervals")
self.failmsg("test2k", "overlap in file offsets") self.failmsg("test2k", "overlap in file offsets", fix=False)
self.ok("test2k1") self.ok("test2k1")
self.failmsg("test2l", "unsupported bulkdata version") self.failmsg("test2l", "unsupported bulkdata version")
self.failmsg("test2m", "bad rows_per_file") self.failmsg("test2m", "bad rows_per_file")
@ -163,3 +163,22 @@ class TestFsck(object):
raise Exception("hi") raise Exception("hi")
with assert_raises(Exception): with assert_raises(Exception):
foo() foo()
self.failmsg("test2v", "can't load _format, but data is also present")
self.failmsg("test2v1", "bad bulkdata table")
self.failmsg("test2v2", "empty, with corrupted format file", fix=False)
self.okmsg("test2v2", "empty, with corrupted format file")
self.failmsg("test2w1", "out of range, and zero", fix=False)
self.okmsg("test2w1", "Will try truncating table")
self.contain("Deleting the entire interval")
self.failmsg("test2w2", "non-monotonic, and zero", fix=False)
self.okmsg("test2w2", "Will try truncating table")
self.contain("new end: time 237000001, pos 238")
self.failmsg("test2x1", "overlap in file offsets", fix=False)
self.okmsg("test2x1", "truncating")
self.failmsg("test2x2", "unfixable overlap")
self.failmsg("test2x3", "unfixable overlap")