Compare commits

...

10 Commits

Author SHA1 Message Date
276fbc652a Bump Python version requirement 2020-08-07 15:56:53 -04:00
10b34f5937 Fix issue with test suite and empty dirs on git
Git doesn't save empty dirs, so put a placeholder there that is
ignored when the test data is copied to its final location during
testing.
2020-08-07 10:49:29 -04:00
83daeb148a Add fsck scan for any data timestamps outside interval range 2020-08-07 10:25:22 -04:00
d65f00e8b2 Add fsck to default tests 2020-08-07 02:56:49 -04:00
71dc01c9a7 Replace deprecated numpy.fromstring usage 2020-08-07 02:55:20 -04:00
bcd21b3498 Improve fsck test coverage to 100% 2020-08-07 02:54:45 -04:00
a1dee0e6f2 Improve fsck test coverage to 85% 2020-08-07 01:26:30 -04:00
99ac47cf0d Start implementing fsck test and porting fsck to Python 3 2020-08-07 00:11:45 -04:00
4cdaef51c1 Fix flake8-reported issues with fsck 2020-08-06 23:10:51 -04:00
88466dcafe Add yappi dependency for "nilmdb-server -y", but don't require ipython 2020-08-06 22:55:27 -04:00
102 changed files with 301 additions and 82 deletions

View File

@@ -1,7 +1,7 @@
# nilmdb: Non-Intrusive Load Monitor Database
by Jim Paris <jim@jtan.com>
NilmDB requires Python 3.7 or newer.
NilmDB requires Python 3.8 or newer.
## Prerequisites:

View File

@@ -59,7 +59,7 @@ class NumpyClient(nilmdb.client.client.Client):
dtype = self._get_dtype(path, layout)
def to_numpy(data):
a = numpy.fromstring(data, dtype)
a = numpy.frombuffer(data, dtype)
if structured:
return a
return numpy.c_[a['timestamp'], a['data']]

View File

@@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
raise Exception("todo: fix path bytes issues")
"""Check database consistency, with some ability to fix problems.
This should be able to fix cases where a database gets corrupted due
to unexpected system shutdown, and detect other cases that may cause
@@ -13,7 +11,6 @@ import nilmdb.client.numpyclient
from nilmdb.utils.interval import IntervalError
from nilmdb.server.interval import Interval, IntervalSet
from nilmdb.utils.printf import printf, fprintf, sprintf
from nilmdb.utils.time import timestamp_to_string
from collections import defaultdict
import sqlite3
@@ -21,30 +18,35 @@ import os
import sys
import progressbar
import re
import time
import shutil
import pickle
import numpy
class FsckError(Exception):
def __init__(self, msg="", *args):
if args:
msg = sprintf(msg, *args)
Exception.__init__(self, msg)
class FixableFsckError(FsckError):
def __init__(self, msg = "", *args):
if args:
msg = sprintf(msg, *args)
FsckError.__init__(self, "%s\nThis may be fixable with \"--fix\".", msg)
def __init__(self, msg=""):
FsckError.__init__(self, f'{msg}\nThis may be fixable with "--fix".')
class RetryFsck(FsckError):
pass
def log(format, *args):
printf(format, *args)
def err(format, *args):
fprintf(sys.stderr, format, *args)
# Decorator that retries a function if it returns a specific value
def retry_if_raised(exc, message=None, max_retries=100):
def f1(func):
@@ -52,13 +54,15 @@ def retry_if_raised(exc, message = None, max_retries = 100):
for n in range(max_retries):
try:
return func(*args, **kwargs)
except exc as e:
except exc:
if message:
log("%s\n\n", message)
raise Exception("Max number of retries (%d) exceeded; giving up")
raise Exception("Max number of retries (%d) exceeded; giving up" %
max_retries)
return f2
return f1
class Progress(object):
def __init__(self, maxval):
if maxval == 0:
@@ -68,22 +72,24 @@ class Progress(object):
widgets=[progressbar.Percentage(), ' ',
progressbar.Bar(), ' ',
progressbar.ETA()])
if self.bar.term_width == 0:
self.bar.term_width = 75
self.bar.term_width = self.bar.term_width or 75
def __enter__(self):
self.bar.start()
self.last_update = 0
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
self.bar.finish()
else:
printf("\n")
def update(self, val):
self.bar.update(val)
class Fsck(object):
class Fsck(object):
def __init__(self, path, fix=False):
self.basepath = path
self.sqlpath = os.path.join(path, "data.sql")
@@ -109,7 +115,9 @@ class Fsck(object):
finally:
if self.bulk:
self.bulk.close()
if self.sql:
if self.sql: # pragma: no cover
# (coverage doesn't handle finally clauses correctly;
# both branches here are tested)
self.sql.commit()
self.sql.close()
log("ok\n")
@@ -164,7 +172,7 @@ class Fsck(object):
"ORDER BY start_time")
for r in result:
if r[0] not in self.stream_path:
raise FsckError("interval ID %d not in streams", k)
raise FsckError("interval ID %d not in streams", r[0])
self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4]))
log(" loading metadata\n")
@@ -172,9 +180,10 @@ class Fsck(object):
result = cur.execute("SELECT stream_id, key, value FROM metadata")
for r in result:
if r[0] not in self.stream_path:
raise FsckError("metadata ID %d not in streams", k)
raise FsckError("metadata ID %d not in streams", r[0])
if r[1] in self.stream_meta[r[0]]:
raise FsckError("duplicate metadata key '%s' for stream %d",
raise FsckError(
"duplicate metadata key '%s' for stream %d",
r[1], r[0])
self.stream_meta[r[0]][r[1]] = r[2]
@@ -202,6 +211,7 @@ class Fsck(object):
# must exist in bulkdata
bulk = self.bulkpath + path
bulk = bulk.encode('utf-8')
if not os.path.isdir(bulk):
raise FsckError("%s: missing bulkdata dir", path)
if not nilmdb.server.bulkdata.Table.exists(bulk):
@@ -232,28 +242,31 @@ class Fsck(object):
self.check_bulkdata(sid, path, bulk)
# Check that we can open bulkdata
try:
tab = None
try:
tab = nilmdb.server.bulkdata.Table(bulk)
except Exception as e:
except Exception as e: # pragma: no cover --
# No coverage here because, in the current code,
# everything that would cause the bulkdata to fail
# has been already checked.
raise FsckError("%s: can't open bulkdata: %s",
path, str(e))
finally:
if tab:
tab.close()
### Check that bulkdata is good enough to be opened
@retry_if_raised(RetryFsck)
def check_bulkdata(self, sid, path, bulk):
with open(os.path.join(bulk, "_format"), "rb") as f:
with open(os.path.join(bulk, b"_format"), "rb") as f:
fmt = pickle.load(f)
if fmt["version"] != 3:
raise FsckError("%s: bad or unsupported bulkdata version %d",
path, fmt["version"])
row_per_file = int(fmt["rows_per_file"])
rows_per_file = int(fmt["rows_per_file"])
if rows_per_file < 1:
raise FsckError(f"{path}: bad rows_per_file {rows_per_file}")
files_per_dir = int(fmt["files_per_dir"])
if files_per_dir < 1:
raise FsckError(f"{path}: bad files_per_dir {files_per_dir}")
layout = fmt["layout"]
if layout != self.stream_layout[sid]:
raise FsckError("%s: layout mismatch %s != %s", path,
@@ -265,7 +278,7 @@ class Fsck(object):
rkt.close()
# Find all directories
regex = re.compile("^[0-9a-f]{4,}$")
regex = re.compile(b"^[0-9a-f]{4,}$")
subdirs = sorted(filter(regex.search, os.listdir(bulk)),
key=lambda x: int(x, 16), reverse=True)
for subdir in subdirs:
@@ -274,7 +287,7 @@ class Fsck(object):
files = list(filter(regex.search, os.listdir(subpath)))
if not files:
self.fix_empty_subdir(subpath)
raise RetryFsck
raise RetryFsck # pragma: no cover; raised by fix_empty_subdir
# Verify that their size is a multiple of the row size
for filename in files:
filepath = os.path.join(subpath, filename)
@@ -290,10 +303,11 @@ class Fsck(object):
# as long as it's only ".removed" files.
err("\n%s\n", msg)
for fn in os.listdir(subpath):
if not fn.endswith(".removed"):
if not fn.endswith(b".removed"):
raise FsckError("can't fix automatically: please manually "
"remove the file %s and try again",
os.path.join(subpath, fn))
"remove the file '%s' and try again",
os.path.join(subpath, fn).decode(
'utf-8', errors='backslashreplace'))
# Remove the whole thing
err("Removing empty subpath\n")
shutil.rmtree(subpath)
@@ -324,9 +338,12 @@ class Fsck(object):
for sid in self.stream_interval:
try:
bulk = self.bulkpath + self.stream_path[sid]
bulk = bulk.encode('utf-8')
tab = nilmdb.server.bulkdata.Table(bulk)
def update(x):
pbar.update(done + x)
ints = self.stream_interval[sid]
done += self.check_table_intervals(sid, ints, tab, update)
finally:
@@ -335,7 +352,7 @@ class Fsck(object):
def check_table_intervals(self, sid, ints, tab, update):
# look in the table to make sure we can pick out the interval's
# endpoints
path = self.stream_path[sid]
path = self.stream_path[sid] # noqa: F841 unused
tab.file_open.cache_remove_all()
for (i, intv) in enumerate(ints):
update(i)
@@ -343,11 +360,11 @@ class Fsck(object):
if spos == epos and spos >= 0 and spos <= tab.nrows:
continue
try:
srow = tab[spos]
erow = tab[epos-1]
srow = tab[spos] # noqa: F841 unused
erow = tab[epos-1] # noqa: F841 unused
except Exception as e:
self.fix_bad_interval(sid, intv, tab, str(e))
raise RetryFsck
raise RetryFsck # pragma: no cover; raised by fix_bad_interval
return len(ints)
def fix_bad_interval(self, sid, intv, tab, msg):
@@ -376,10 +393,10 @@ class Fsck(object):
"end_time=? AND start_pos=? AND end_pos=?",
(new_etime, new_epos, sid, stime, etime,
spos, epos))
if cur.rowcount != 1:
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
raise FsckError("failed to fix SQL database")
raise RetryFsck
err("actually it can't be truncated; times are bad too")
err("actually it can't be truncated; times are bad too\n")
# Otherwise, the only hope is to delete the interval entirely.
err("*** Deleting the entire interval from SQL.\n")
@@ -392,7 +409,7 @@ class Fsck(object):
"stream_id=? AND start_time=? AND "
"end_time=? AND start_pos=? AND end_pos=?",
(sid, stime, etime, spos, epos))
if cur.rowcount != 1:
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
raise FsckError("failed to remove interval")
raise RetryFsck
@@ -407,9 +424,12 @@ class Fsck(object):
for sid in self.stream_interval:
try:
bulk = self.bulkpath + self.stream_path[sid]
bulk = bulk.encode('utf-8')
tab = nilmdb.server.bulkdata.Table(bulk)
def update(x):
pbar.update(done + x)
ints = self.stream_interval[sid]
done += self.check_table_data(sid, ints, tab, update)
finally:
@@ -418,7 +438,7 @@ class Fsck(object):
def check_table_data(self, sid, ints, tab, update):
# Pull out all of the interval's data and verify that it's
# monotonic.
maxrows = 100000
maxrows = getattr(self, 'maxrows_override', 100000)
path = self.stream_path[sid]
layout = self.stream_layout[sid]
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
@@ -439,26 +459,44 @@ class Fsck(object):
# Get raw data, convert to NumPy arary
try:
raw = tab.get_data(start, stop, binary=True)
data = numpy.fromstring(raw, dtype)
except Exception as e:
raise FsckError("%s: failed to grab rows %d through %d: %s",
data = numpy.frombuffer(raw, dtype)
except Exception as e: # pragma: no cover
# No coverage because it's hard to trigger this -- earlier
# checks check the ranges, so this would probably be a real
# disk error, malloc failure, etc.
raise FsckError(
"%s: failed to grab rows %d through %d: %s",
path, start, stop, repr(e))
ts = data['timestamp']
# Verify that all timestamps are in range.
match = (ts < stime) | (ts >= etime)
if match.any():
row = start + numpy.argmax(match)
raise FsckError("%s: data timestamp %d at row %d "
"outside interval range [%d,%d)",
path, data['timestamp'][row], row,
stime, etime)
# Verify that timestamps are monotonic
if (numpy.diff(data['timestamp']) <= 0).any():
raise FsckError("%s: non-monotonic timestamp(s) in rows "
"%d through %d", path, start, stop)
first_ts = data['timestamp'][0]
match = numpy.diff(ts) <= 0
if match.any():
row = start + numpy.argmax(match)
raise FsckError("%s: non-monotonic timestamp (%d -> %d) "
"at row %d", path, ts[row], ts[row+1], row)
first_ts = ts[0]
if last_ts is not None and first_ts <= last_ts:
raise FsckError("%s: first interval timestamp %d is not "
"greater than the previous last interval "
"timestamp %d, at row %d",
path, first_ts, last_ts, start)
last_ts = data['timestamp'][-1]
last_ts = ts[-1]
# These are probably fixable, by removing the offending
# intervals. But I'm not going to bother implementing
# that yet.
# The previous errors are fixable, by removing the
# offending intervals, or changing the data
# timestamps. But these are probably unlikely errors,
# so it's not worth implementing that yet.
# Done
done += count

View File

@@ -2,8 +2,7 @@
import nilmdb.fsck
import argparse
import os
import sys
def main():
"""Main entry point for the 'nilmdb-fsck' command line script"""
@@ -23,5 +22,6 @@ def main():
nilmdb.fsck.Fsck(args.database, args.fix).check(skip_data=args.no_data)
if __name__ == "__main__":
main()

View File

@@ -78,9 +78,12 @@ def main():
stats = yappi.get_func_stats()
stats.sort("ttot")
stats.print_all()
try:
from IPython import embed
embed(header="Use the `yappi` or `stats` object to explore "
"further, quit to exit")
embed(header="Use the `yappi` or `stats` object to "
"explore further, `quit` to exit")
except ModuleNotFoundError:
print("\nInstall ipython to explore further")
else:
server.start(blocking=True)
except nilmdb.server.serverutil.CherryPyExit:

View File

@@ -13,7 +13,7 @@ python-datetime-tz==0.5.4
python-dateutil==2.8.1
requests==2.24.0
tz==0.2.2
WebTest==2.0.35
yappi==1.2.5
## The following requirements were added by pip freeze:
beautifulsoup4==4.9.1
@@ -37,4 +37,5 @@ tempora==4.0.0
urllib3==1.25.10
waitress==1.4.4
WebOb==1.8.6
WebTest==2.0.35
zc.lockfile==2.0

View File

@@ -47,10 +47,13 @@ tag_prefix=nilmdb-
parentdir_prefix=nilmdb-
[flake8]
exclude=_version.py,fsck.py,nilmdb_fsck.py
exclude=_version.py
extend-ignore=E731
per-file-ignores=__init__.py:F401,E402 serializer.py:E722 mustclose.py:E722
per-file-ignores=__init__.py:F401,E402 \
serializer.py:E722 \
mustclose.py:E722 \
fsck.py:E266
[pylint]
ignore=_version.py,fsck.py,nilmdb_fsck.py
ignore=_version.py
disable=C0103,C0111,R0913,R0914

Binary file not shown.

View File

@@ -0,0 +1 @@
hi

View File

@@ -0,0 +1 @@
hi

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1 @@
hi

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1 @@
hi

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1 @@
hi

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1 @@
world

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1 @@
world

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More