Compare commits

...

7 Commits

Author SHA1 Message Date
b6bba16505 fsck: fix error in reporting row number for timestamp errors
Since we process in chunks, we need to add "start" to the row number;
however, we don't need to use this when accessing data['timestamp']
(aka ts)
2020-08-18 00:31:40 -04:00
d4003d0d34 tests: fill out coverage for new fsck features 2020-08-17 23:27:32 -04:00
759492298a bulkdata: write _format file atomically, to reduce chance of corruption
Some databases were seeing _format truncated to 0 bytes, after crashing
soon after a new stream was created (e.g. during decimation).
2020-08-17 23:04:36 -04:00
b5f6fcc253 fsck: detect problems with _format file, and remove stream if empty
A bulkdata dir may get created for a new stream with an empty or
corrupted _format, before any data gets actually written.  In that
case, we can just delete the new stream; worst case, we lose some
metadata.

Note: The info in _format should really get moved into the database.
This was born when bulkdata switched from PyTables to a custom storage
system, and was probably stored this way to avoid tying the main DB
to specific implementation details while they were still in flux.
2020-08-17 22:55:44 -04:00
905e325ded fsck: add fixer that fully removes a stream 2020-08-17 22:55:32 -04:00
648b6f4b70 fsck: improve instructions about removing leftover data 2020-08-17 22:55:09 -04:00
7f8a2c7027 fsck: remove unnecessary raises
The called functions already raise RetryFsck.
2020-08-17 22:54:36 -04:00
10 changed files with 72 additions and 27 deletions

View File

@@ -39,6 +39,10 @@ class RetryFsck(FsckError):
pass
class FsckFormatError(FsckError):
pass
def log(format, *args):
printf(format, *args)
@@ -238,16 +242,28 @@ class Fsck(object):
"set: %s\nnew: %s",
path, str(posiset), str(new))
# check bulkdata
try:
# Check bulkdata
self.check_bulkdata(sid, path, bulk)
# Check that we can open bulkdata
try:
tab = nilmdb.server.bulkdata.Table(bulk)
except Exception as e: # pragma: no cover --
# No coverage here because, in the current code,
# everything that would cause the bulkdata to fail
# has been already checked.
except FsckFormatError as e:
# If there are no files except _format, try deleting
# the entire stream; this may remove metadata, but
# it's probably unimportant.
files = list(os.listdir(bulk))
if len(files) > 1:
raise FsckFormatError(f"{path}: can't load _format, "
f"but data is also present")
# Since the stream was empty, just remove it
self.fix_remove_stream(sid, path, bulk,
"empty, with corrupted format file")
except FsckError as e:
raise e
except Exception as e: # pragma: no cover
# No coverage because this is an unknown/unexpected error
raise FsckError("%s: can't open bulkdata: %s",
path, str(e))
tab.close()
@@ -256,20 +272,24 @@ class Fsck(object):
@retry_if_raised(RetryFsck)
def check_bulkdata(self, sid, path, bulk):
try:
with open(os.path.join(bulk, b"_format"), "rb") as f:
fmt = pickle.load(f)
except Exception as e:
raise FsckFormatError(f"{path}: can't load _format file ({e})")
if fmt["version"] != 3:
raise FsckError("%s: bad or unsupported bulkdata version %d",
raise FsckFormatError("%s: bad or unsupported bulkdata version %d",
path, fmt["version"])
rows_per_file = int(fmt["rows_per_file"])
if rows_per_file < 1:
raise FsckError(f"{path}: bad rows_per_file {rows_per_file}")
raise FsckFormatError(f"{path}: bad rows_per_file {rows_per_file}")
files_per_dir = int(fmt["files_per_dir"])
if files_per_dir < 1:
raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
raise FsckFormatError(f"{path}: bad files_per_dir {files_per_dir}")
layout = fmt["layout"]
if layout != self.stream_layout[sid]:
raise FsckError("%s: layout mismatch %s != %s", path,
raise FsckFormatError("%s: layout mismatch %s != %s", path,
layout, self.stream_layout[sid])
# Every file should have a size that's the multiple of the row size
@@ -287,7 +307,7 @@ class Fsck(object):
files = list(filter(regex.search, os.listdir(subpath)))
if not files:
self.fix_empty_subdir(subpath)
raise RetryFsck # pragma: no cover; raised by fix_empty_subdir
# Verify that their size is a multiple of the row size
for filename in files:
filepath = os.path.join(subpath, filename)
@@ -328,6 +348,24 @@ class Fsck(object):
f.truncate(newsize)
raise RetryFsck
def fix_remove_stream(self, sid, path, bulk, reason):
msg = f"stream {path} is corrupted: {reason}"
if not self.fix:
raise FixableFsckError(msg)
# Remove the stream from disk and the database
err(f"\n{msg}\n")
err(f"Removing stream {path} from disk and database\n")
shutil.rmtree(bulk)
with self.sql:
cur = self.sql.cursor()
cur.execute("DELETE FROM streams WHERE id=?",
(sid,))
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
raise FsckError("failed to remove stream")
cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
raise RetryFsck
### Check interval endpoints
def check_intervals(self):
@@ -364,7 +402,7 @@ class Fsck(object):
erow = tab[epos-1] # noqa: F841 unused
except Exception as e:
self.fix_bad_interval(sid, intv, tab, str(e))
raise RetryFsck # pragma: no cover; raised by fix_bad_interval
return len(ints)
def fix_bad_interval(self, sid, intv, tab, msg):
@@ -401,8 +439,8 @@ class Fsck(object):
# Otherwise, the only hope is to delete the interval entirely.
err("*** Deleting the entire interval from SQL.\n")
err("This may leave stale data on disk. To fix that, copy all\n")
err("data from this stream to a new stream, then remove all data\n")
err("from and destroy %s.\n", path)
err("data from this stream to a new stream using nilm-copy, then\n")
err("remove all data from and destroy %s.\n", path)
with self.sql:
cur = self.sql.cursor()
cur.execute("DELETE FROM ranges WHERE "
@@ -473,18 +511,20 @@ class Fsck(object):
# Verify that all timestamps are in range.
match = (ts < stime) | (ts >= etime)
if match.any():
row = start + numpy.argmax(match)
row = numpy.argmax(match)
raise FsckError("%s: data timestamp %d at row %d "
"outside interval range [%d,%d)",
path, data['timestamp'][row], row,
path, ts[row], row + start,
stime, etime)
# Verify that timestamps are monotonic
match = numpy.diff(ts) <= 0
if match.any():
row = start + numpy.argmax(match)
raise FsckError("%s: non-monotonic timestamp (%d -> %d) "
"at row %d", path, ts[row], ts[row+1], row)
row = numpy.argmax(match)
raise FsckError("%s: non-monotonic timestamp (%d -> %d)"
" at row %d", path, ts[row], ts[row+1],
row + start)
first_ts = ts[0]
if last_ts is not None and first_ts <= last_ts:
raise FsckError("%s: first interval timestamp %d is not "

View File

@@ -293,8 +293,8 @@ class Table():
"layout": layout,
"version": 3
}
with open(os.path.join(root, b"_format"), "wb") as f:
pickle.dump(fmt, f, 2)
nilmdb.utils.atomic.replace_file(
os.path.join(root, b"_format"), pickle.dumps(fmt, 2))
# Normal methods
def __init__(self, root, initial_nrows=0):

View File

Binary file not shown.

View File

Binary file not shown.

View File

View File

Binary file not shown.

View File

Binary file not shown.

View File

Binary file not shown.

View File

View File

@@ -163,3 +163,8 @@ class TestFsck(object):
raise Exception("hi")
with assert_raises(Exception):
foo()
self.failmsg("test2v", "can't load _format, but data is also present")
self.failmsg("test2v1", "bad bulkdata table")
self.failmsg("test2v2", "empty, with corrupted format file", fix=False)
self.okmsg("test2v2", "empty, with corrupted format file")