6 Commits

Author SHA1 Message Date
  Jim Paris e0559c2ed1 tests: fill out coverage for new fsck features 5 months ago
  Jim Paris 759492298a bulkdata: write _format file atomically, to reduce chance of corruption 5 months ago
  Jim Paris b5f6fcc253 fsck: detect problems with _format file, and remove stream if empty 5 months ago
  Jim Paris 905e325ded fsck: add fixer that fully removes a stream 5 months ago
  Jim Paris 648b6f4b70 fsck: improve instructions about removing leftover data 5 months ago
  Jim Paris 7f8a2c7027 fsck: remove unnecessary raises 5 months ago
3 changed files with 65 additions and 22 deletions
Split View
  1. +58
    -20
      nilmdb/fsck/fsck.py
  2. +2
    -2
      nilmdb/server/bulkdata.py
  3. +5
    -0
      tests/test_fsck.py

+ 58
- 20
nilmdb/fsck/fsck.py View File

@@ -39,6 +39,10 @@ class RetryFsck(FsckError):
pass


class FsckFormatError(FsckError):
pass


def log(format, *args):
printf(format, *args)

@@ -238,16 +242,28 @@ class Fsck(object):
"set: %s\nnew: %s",
path, str(posiset), str(new))

# check bulkdata
self.check_bulkdata(sid, path, bulk)

# Check that we can open bulkdata
try:
# Check bulkdata
self.check_bulkdata(sid, path, bulk)

# Check that we can open bulkdata
tab = nilmdb.server.bulkdata.Table(bulk)
except Exception as e: # pragma: no cover --
# No coverage here because, in the current code,
# everything that would cause the bulkdata to fail
# has been already checked.
except FsckFormatError as e:
# If there are no files except _format, try deleting
# the entire stream; this may remove metadata, but
# it's probably unimportant.
files = list(os.listdir(bulk))
if len(files) > 1:
raise FsckFormatError(f"{path}: can't load _format, "
f"but data is also present")

# Since the stream was empty, just remove it
self.fix_remove_stream(sid, path, bulk,
"empty, with corrupted format file")
except FsckError as e:
raise e
except Exception as e: # pragma: no cover
# No coverage because this is an unknown/unexpected error
raise FsckError("%s: can't open bulkdata: %s",
path, str(e))
tab.close()
@@ -256,21 +272,25 @@ class Fsck(object):

@retry_if_raised(RetryFsck)
def check_bulkdata(self, sid, path, bulk):
with open(os.path.join(bulk, b"_format"), "rb") as f:
fmt = pickle.load(f)
try:
with open(os.path.join(bulk, b"_format"), "rb") as f:
fmt = pickle.load(f)
except Exception as e:
raise FsckFormatError(f"{path}: can't load _format file ({e})")

if fmt["version"] != 3:
raise FsckError("%s: bad or unsupported bulkdata version %d",
path, fmt["version"])
raise FsckFormatError("%s: bad or unsupported bulkdata version %d",
path, fmt["version"])
rows_per_file = int(fmt["rows_per_file"])
if rows_per_file < 1:
raise FsckError(f"{path}: bad rows_per_file {rows_per_file}")
raise FsckFormatError(f"{path}: bad rows_per_file {rows_per_file}")
files_per_dir = int(fmt["files_per_dir"])
if files_per_dir < 1:
raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
raise FsckFormatError(f"{path}: bad files_per_dir {files_per_dir}")
layout = fmt["layout"]
if layout != self.stream_layout[sid]:
raise FsckError("%s: layout mismatch %s != %s", path,
layout, self.stream_layout[sid])
raise FsckFormatError("%s: layout mismatch %s != %s", path,
layout, self.stream_layout[sid])

# Every file should have a size that's the multiple of the row size
rkt = nilmdb.server.rocket.Rocket(layout, None)
@@ -287,7 +307,7 @@ class Fsck(object):
files = list(filter(regex.search, os.listdir(subpath)))
if not files:
self.fix_empty_subdir(subpath)
raise RetryFsck # pragma: no cover; raised by fix_empty_subdir
# Verify that their size is a multiple of the row size
for filename in files:
filepath = os.path.join(subpath, filename)
@@ -328,6 +348,24 @@ class Fsck(object):
f.truncate(newsize)
raise RetryFsck

def fix_remove_stream(self, sid, path, bulk, reason):
msg = f"stream {path} is corrupted: {reason}"
if not self.fix:
raise FixableFsckError(msg)
# Remove the stream from disk and the database
err(f"\n{msg}\n")
err(f"Removing stream {path} from disk and database\n")
shutil.rmtree(bulk)
with self.sql:
cur = self.sql.cursor()
cur.execute("DELETE FROM streams WHERE id=?",
(sid,))
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
raise FsckError("failed to remove stream")
cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
raise RetryFsck

### Check interval endpoints

def check_intervals(self):
@@ -364,7 +402,7 @@ class Fsck(object):
erow = tab[epos-1] # noqa: F841 unused
except Exception as e:
self.fix_bad_interval(sid, intv, tab, str(e))
raise RetryFsck # pragma: no cover; raised by fix_bad_interval
return len(ints)

def fix_bad_interval(self, sid, intv, tab, msg):
@@ -401,8 +439,8 @@ class Fsck(object):
# Otherwise, the only hope is to delete the interval entirely.
err("*** Deleting the entire interval from SQL.\n")
err("This may leave stale data on disk. To fix that, copy all\n")
err("data from this stream to a new stream, then remove all data\n")
err("from and destroy %s.\n", path)
err("data from this stream to a new stream using nilm-copy, then\n")
err("remove all data from and destroy %s.\n", path)
with self.sql:
cur = self.sql.cursor()
cur.execute("DELETE FROM ranges WHERE "


+ 2
- 2
nilmdb/server/bulkdata.py View File

@@ -293,8 +293,8 @@ class Table():
"layout": layout,
"version": 3
}
with open(os.path.join(root, b"_format"), "wb") as f:
pickle.dump(fmt, f, 2)
nilmdb.utils.atomic.replace_file(
os.path.join(root, b"_format"), pickle.dumps(fmt, 2))

# Normal methods
def __init__(self, root, initial_nrows=0):


+ 5
- 0
tests/test_fsck.py View File

@@ -163,3 +163,8 @@ class TestFsck(object):
raise Exception("hi")
with assert_raises(Exception):
foo()

self.failmsg("test2v", "can't load _format, but data is also present")
self.failmsg("test2v1", "bad bulkdata table")
self.failmsg("test2v2", "empty, with corrupted format file", fix=False)
self.okmsg("test2v2", "empty, with corrupted format file")

Loading…
Cancel
Save