Compare commits

..

30 Commits

Author SHA1 Message Date
517b237636 tests: fix test databases 2020-08-18 10:23:56 -04:00
07f138e0f4 flake8: fix style 2020-08-18 10:22:50 -04:00
7538c6201b tests: new fsck tests for interval overlap 2020-08-18 10:18:54 -04:00
4d9a106ca1 fsck: add fix for file position (row) overlap in database
The following sequence could lead to this corruption:
  (1) Append new data to bulkdata
  (2) Update interval file positions in SQL
  (3) Flush (2)
  (4) Crash before flushing (1)
  (5) Reload database without running fsck
  (6) Start writing new data to end of bulkdata and introduce new interval
2020-08-18 10:04:33 -04:00
e90a79ddad fsck: increase max restarts from 100 to 1000
This is effectively the number of problems with the database that can
be fixed, since we restart fsck after each one.
2020-08-18 10:03:57 -04:00
7056c5b4ec tests: new fsck tests 2020-08-18 10:03:34 -04:00
df4e7f0967 fsck: If data timestamps are unexpectedly zero, truncate data
This probably means that the file was written, and metadata was journaled,
but the system crashed before data was written.  If that happens, the
end of the file will be zeroed out.  We don't bother checking the entire
file here; if we see just one timestamp that is unexpectedly zero, let's
truncate the data there.
2020-08-18 00:32:51 -04:00
b6bba16505 fsck: fix error in reporting row number for timestamp errors
Since we process in chunks, we need to add "start" to the row number;
however, we don't need to use this when accessing data['timestamp']
(aka ts)
2020-08-18 00:31:40 -04:00
d4003d0d34 tests: fill out coverage for new fsck features 2020-08-17 23:27:32 -04:00
759492298a bulkdata: write _format file atomically, to reduce chance of corruption
Some databases were seeing _format truncated to 0 bytes, after crashing
soon after a new stream was created (e.g. during decimation).
2020-08-17 23:04:36 -04:00
b5f6fcc253 fsck: detect problems with _format file, and remove stream if empty
A bulkdata dir may get created for a new stream with an empty or
corrupted _format, before any data gets actually written.  In that
case, we can just delete the new stream; worst case, we lose some
metadata.

Note: The info in _format should really get moved into the database.
This was born when bulkdata switched from PyTables to a custom storage
system, and was probably stored this way to avoid tying the main DB
to specific implementation details while they were still in flux.
2020-08-17 22:55:44 -04:00
905e325ded fsck: add fixer that fully removes a stream 2020-08-17 22:55:32 -04:00
648b6f4b70 fsck: improve instructions about removing leftover data 2020-08-17 22:55:09 -04:00
7f8a2c7027 fsck: remove unnecessary raises
The called functions already raise RetryFsck.
2020-08-17 22:54:36 -04:00
276fbc652a Bump Python version requirement 2020-08-07 15:56:53 -04:00
10b34f5937 Fix issue with test suite and empty dirs on git
Git doesn't save empty dirs, so put a placeholder there that is
ignored when the test data is copied to its final location during
testing.
2020-08-07 10:49:29 -04:00
83daeb148a Add fsck scan for any data timestamps outside interval range 2020-08-07 10:25:22 -04:00
d65f00e8b2 Add fsck to default tests 2020-08-07 02:56:49 -04:00
71dc01c9a7 Replace deprecated numpy.fromstring usage 2020-08-07 02:55:20 -04:00
bcd21b3498 Improve fsck test coverage to 100% 2020-08-07 02:54:45 -04:00
a1dee0e6f2 Improve fsck test coverage to 85% 2020-08-07 01:26:30 -04:00
99ac47cf0d Start implementing fsck test and porting fsck to Python 3 2020-08-07 00:11:45 -04:00
4cdaef51c1 Fix flake8-reported issues with fsck 2020-08-06 23:10:51 -04:00
88466dcafe Add yappi dependency for "nilmdb-server -y", but don't require ipython 2020-08-06 22:55:27 -04:00
8dfb8da15c Freeze requirements to specific versions 2020-08-06 18:11:33 -04:00
6cc1f6b7b2 Fix #! at top of shell scripts for py3 and venvs 2020-08-05 17:02:30 -04:00
8dc36c2d37 Fix stream_insert_context docstring
This was never updated when timestamps were changed from floats to
ints.
2020-08-05 14:22:35 -04:00
3738430103 Fix flake8 warnings 2020-08-03 23:40:53 -04:00
a41111b045 Fix some Python 3.8 related issues 2020-08-03 17:48:51 -04:00
85f822e1c4 Decode non-JSON HTTP responses when possible
This doesn't affect anything in nilmdb, but is needed by nilmrun.
2020-08-03 17:31:11 -04:00
134 changed files with 510 additions and 126 deletions

View File

@ -1,7 +1,7 @@
# nilmdb: Non-Intrusive Load Monitor Database # nilmdb: Non-Intrusive Load Monitor Database
by Jim Paris <jim@jtan.com> by Jim Paris <jim@jtan.com>
NilmDB requires Python 3.7 or newer. NilmDB requires Python 3.8 or newer.
## Prerequisites: ## Prerequisites:

View File

@ -1,4 +1,4 @@
#!/usr/bin/python #!/usr/bin/env python3
import os import os
import sys import sys

View File

@ -145,8 +145,8 @@ class Client():
Example: Example:
with client.stream_insert_context('/path', start, end) as ctx: with client.stream_insert_context('/path', start, end) as ctx:
ctx.insert('1234567890.0 1 2 3 4\\n') ctx.insert('1234567890000000 1 2 3 4\\n')
ctx.insert('1234567891.0 1 2 3 4\\n') ctx.insert('1234567891000000 1 2 3 4\\n')
For more details, see help for nilmdb.client.client.StreamInserter For more details, see help for nilmdb.client.client.StreamInserter

View File

@ -109,7 +109,7 @@ class HTTPClient():
stream=False, headers=headers) stream=False, headers=headers)
if isjson: if isjson:
return json.loads(response.content) return json.loads(response.content)
return response.content return response.text
def get(self, url, params=None): def get(self, url, params=None):
"""Simple GET (parameters in URL)""" """Simple GET (parameters in URL)"""

View File

@ -59,7 +59,7 @@ class NumpyClient(nilmdb.client.client.Client):
dtype = self._get_dtype(path, layout) dtype = self._get_dtype(path, layout)
def to_numpy(data): def to_numpy(data):
a = numpy.fromstring(data, dtype) a = numpy.frombuffer(data, dtype)
if structured: if structured:
return a return a
return numpy.c_[a['timestamp'], a['data']] return numpy.c_[a['timestamp'], a['data']]

View File

@ -66,7 +66,7 @@ class Complete():
layouts = [] layouts = []
for i in range(1, 10): for i in range(1, 10):
layouts.extend([(t + "_" + str(i)) for t in types]) layouts.extend([(t + "_" + str(i)) for t in types])
return (l for l in layouts if l.startswith(prefix)) return (lay for lay in layouts if lay.startswith(prefix))
def meta_key(self, prefix, parsed_args, **kwargs): def meta_key(self, prefix, parsed_args, **kwargs):
return (kv.split('=')[0] for kv return (kv.split('=')[0] for kv

View File

@ -1,7 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
raise Exception("todo: fix path bytes issues")
"""Check database consistency, with some ability to fix problems. """Check database consistency, with some ability to fix problems.
This should be able to fix cases where a database gets corrupted due This should be able to fix cases where a database gets corrupted due
to unexpected system shutdown, and detect other cases that may cause to unexpected system shutdown, and detect other cases that may cause
@ -13,7 +11,6 @@ import nilmdb.client.numpyclient
from nilmdb.utils.interval import IntervalError from nilmdb.utils.interval import IntervalError
from nilmdb.server.interval import Interval, IntervalSet from nilmdb.server.interval import Interval, IntervalSet
from nilmdb.utils.printf import printf, fprintf, sprintf from nilmdb.utils.printf import printf, fprintf, sprintf
from nilmdb.utils.time import timestamp_to_string
from collections import defaultdict from collections import defaultdict
import sqlite3 import sqlite3
@ -21,70 +18,83 @@ import os
import sys import sys
import progressbar import progressbar
import re import re
import time
import shutil import shutil
import pickle import pickle
import numpy import numpy
class FsckError(Exception): class FsckError(Exception):
def __init__(self, msg = "", *args): def __init__(self, msg="", *args):
if args: if args:
msg = sprintf(msg, *args) msg = sprintf(msg, *args)
Exception.__init__(self, msg) Exception.__init__(self, msg)
class FixableFsckError(FsckError): class FixableFsckError(FsckError):
def __init__(self, msg = "", *args): def __init__(self, msg=""):
if args: FsckError.__init__(self, f'{msg}\nThis may be fixable with "--fix".')
msg = sprintf(msg, *args)
FsckError.__init__(self, "%s\nThis may be fixable with \"--fix\".", msg)
class RetryFsck(FsckError): class RetryFsck(FsckError):
pass pass
class FsckFormatError(FsckError):
pass
def log(format, *args): def log(format, *args):
printf(format, *args) printf(format, *args)
def err(format, *args): def err(format, *args):
fprintf(sys.stderr, format, *args) fprintf(sys.stderr, format, *args)
# Decorator that retries a function if it returns a specific value # Decorator that retries a function if it returns a specific value
def retry_if_raised(exc, message = None, max_retries = 100): def retry_if_raised(exc, message=None, max_retries=1000):
def f1(func): def f1(func):
def f2(*args, **kwargs): def f2(*args, **kwargs):
for n in range(max_retries): for n in range(max_retries):
try: try:
return func(*args, **kwargs) return func(*args, **kwargs)
except exc as e: except exc:
if message: if message:
log("%s\n\n", message) log(f"{message} ({n+1})\n\n")
raise Exception("Max number of retries (%d) exceeded; giving up") raise Exception("Max number of retries (%d) exceeded; giving up" %
max_retries)
return f2 return f2
return f1 return f1
class Progress(object): class Progress(object):
def __init__(self, maxval): def __init__(self, maxval):
if maxval == 0: if maxval == 0:
maxval = 1 maxval = 1
self.bar = progressbar.ProgressBar( self.bar = progressbar.ProgressBar(
maxval = maxval, maxval=maxval,
widgets = [ progressbar.Percentage(), ' ', widgets=[progressbar.Percentage(), ' ',
progressbar.Bar(), ' ', progressbar.Bar(), ' ',
progressbar.ETA() ]) progressbar.ETA()])
if self.bar.term_width == 0: self.bar.term_width = self.bar.term_width or 75
self.bar.term_width = 75
def __enter__(self): def __enter__(self):
self.bar.start() self.bar.start()
self.last_update = 0 self.last_update = 0
return self return self
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None: if exc_type is None:
self.bar.finish() self.bar.finish()
else: else:
printf("\n") printf("\n")
def update(self, val): def update(self, val):
self.bar.update(val) self.bar.update(val)
class Fsck(object):
def __init__(self, path, fix = False): class Fsck(object):
def __init__(self, path, fix=False):
self.basepath = path self.basepath = path
self.sqlpath = os.path.join(path, "data.sql") self.sqlpath = os.path.join(path, "data.sql")
self.bulkpath = os.path.join(path, "data") self.bulkpath = os.path.join(path, "data")
@ -94,7 +104,7 @@ class Fsck(object):
### Main checks ### Main checks
@retry_if_raised(RetryFsck, "Something was fixed: restarting fsck") @retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
def check(self, skip_data = False): def check(self, skip_data=False):
self.bulk = None self.bulk = None
self.sql = None self.sql = None
try: try:
@ -109,7 +119,9 @@ class Fsck(object):
finally: finally:
if self.bulk: if self.bulk:
self.bulk.close() self.bulk.close()
if self.sql: if self.sql: # pragma: no cover
# (coverage doesn't handle finally clauses correctly;
# both branches here are tested)
self.sql.commit() self.sql.commit()
self.sql.close() self.sql.close()
log("ok\n") log("ok\n")
@ -164,7 +176,7 @@ class Fsck(object):
"ORDER BY start_time") "ORDER BY start_time")
for r in result: for r in result:
if r[0] not in self.stream_path: if r[0] not in self.stream_path:
raise FsckError("interval ID %d not in streams", k) raise FsckError("interval ID %d not in streams", r[0])
self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4])) self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4]))
log(" loading metadata\n") log(" loading metadata\n")
@ -172,10 +184,11 @@ class Fsck(object):
result = cur.execute("SELECT stream_id, key, value FROM metadata") result = cur.execute("SELECT stream_id, key, value FROM metadata")
for r in result: for r in result:
if r[0] not in self.stream_path: if r[0] not in self.stream_path:
raise FsckError("metadata ID %d not in streams", k) raise FsckError("metadata ID %d not in streams", r[0])
if r[1] in self.stream_meta[r[0]]: if r[1] in self.stream_meta[r[0]]:
raise FsckError("duplicate metadata key '%s' for stream %d", raise FsckError(
r[1], r[0]) "duplicate metadata key '%s' for stream %d",
r[1], r[0])
self.stream_meta[r[0]][r[1]] = r[2] self.stream_meta[r[0]][r[1]] = r[2]
### Check streams and basic interval overlap ### Check streams and basic interval overlap
@ -202,6 +215,7 @@ class Fsck(object):
# must exist in bulkdata # must exist in bulkdata
bulk = self.bulkpath + path bulk = self.bulkpath + path
bulk = bulk.encode('utf-8')
if not os.path.isdir(bulk): if not os.path.isdir(bulk):
raise FsckError("%s: missing bulkdata dir", path) raise FsckError("%s: missing bulkdata dir", path)
if not nilmdb.server.bulkdata.Table.exists(bulk): if not nilmdb.server.bulkdata.Table.exists(bulk):
@ -224,40 +238,98 @@ class Fsck(object):
try: try:
posiset += new posiset += new
except IntervalError: except IntervalError:
raise FsckError("%s: overlap in file offsets:\n" self.fix_row_overlap(sid, path, posiset, new)
"set: %s\nnew: %s",
path, str(posiset), str(new))
# check bulkdata
self.check_bulkdata(sid, path, bulk)
# Check that we can open bulkdata
try: try:
tab = None # Check bulkdata
try: self.check_bulkdata(sid, path, bulk)
tab = nilmdb.server.bulkdata.Table(bulk)
except Exception as e: # Check that we can open bulkdata
raise FsckError("%s: can't open bulkdata: %s", tab = nilmdb.server.bulkdata.Table(bulk)
path, str(e)) except FsckFormatError:
finally: # If there are no files except _format, try deleting
if tab: # the entire stream; this may remove metadata, but
tab.close() # it's probably unimportant.
files = list(os.listdir(bulk))
if len(files) > 1:
raise FsckFormatError(f"{path}: can't load _format, "
f"but data is also present")
# Since the stream was empty, just remove it
self.fix_remove_stream(sid, path, bulk,
"empty, with corrupted format file")
except FsckError as e:
raise e
except Exception as e: # pragma: no cover
# No coverage because this is an unknown/unexpected error
raise FsckError("%s: can't open bulkdata: %s",
path, str(e))
tab.close()
def fix_row_overlap(self, sid, path, existing, new):
# If the file rows (spos, epos) overlap in the interval table,
# and the overlapping ranges look like this:
# A --------- C
# B -------- D
# Then we can try changing the first interval to go from
# A to B instead.
msg = (f"{path}: overlap in file offsets:\n"
f"existing ranges: {existing}\n"
f"overlapping interval: {new}")
if not self.fix:
raise FixableFsckError(msg)
err(f"\n{msg}\nSeeing if we can truncate one of them...\n")
# See if there'e exactly one interval that overlaps the
# conflicting one in the right way
match = None
for intv in self.stream_interval[sid]:
(stime, etime, spos, epos) = intv
if spos < new.start and epos > new.start:
if match:
err(f"no, more than one interval matched:\n"
f"{intv}\n{match}\n")
raise FsckError(f"{path}: unfixable overlap")
match = intv
if match is None:
err("no intervals overlapped in the right way\n")
raise FsckError(f"{path}: unfixable overlap")
# Truncate the file position
err(f"truncating {match}\n")
with self.sql:
cur = self.sql.cursor()
cur.execute("UPDATE ranges SET end_pos=? "
"WHERE stream_id=? AND start_time=? AND "
"end_time=? AND start_pos=? AND end_pos=?",
(new.start, sid, *match))
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
raise FsckError("failed to fix SQL database")
raise RetryFsck
### Check that bulkdata is good enough to be opened ### Check that bulkdata is good enough to be opened
@retry_if_raised(RetryFsck) @retry_if_raised(RetryFsck)
def check_bulkdata(self, sid, path, bulk): def check_bulkdata(self, sid, path, bulk):
with open(os.path.join(bulk, "_format"), "rb") as f: try:
fmt = pickle.load(f) with open(os.path.join(bulk, b"_format"), "rb") as f:
fmt = pickle.load(f)
except Exception as e:
raise FsckFormatError(f"{path}: can't load _format file ({e})")
if fmt["version"] != 3: if fmt["version"] != 3:
raise FsckError("%s: bad or unsupported bulkdata version %d", raise FsckFormatError("%s: bad or unsupported bulkdata version %d",
path, fmt["version"]) path, fmt["version"])
row_per_file = int(fmt["rows_per_file"]) rows_per_file = int(fmt["rows_per_file"])
if rows_per_file < 1:
raise FsckFormatError(f"{path}: bad rows_per_file {rows_per_file}")
files_per_dir = int(fmt["files_per_dir"]) files_per_dir = int(fmt["files_per_dir"])
if files_per_dir < 1:
raise FsckFormatError(f"{path}: bad files_per_dir {files_per_dir}")
layout = fmt["layout"] layout = fmt["layout"]
if layout != self.stream_layout[sid]: if layout != self.stream_layout[sid]:
raise FsckError("%s: layout mismatch %s != %s", path, raise FsckFormatError("%s: layout mismatch %s != %s", path,
layout, self.stream_layout[sid]) layout, self.stream_layout[sid])
# Every file should have a size that's the multiple of the row size # Every file should have a size that's the multiple of the row size
rkt = nilmdb.server.rocket.Rocket(layout, None) rkt = nilmdb.server.rocket.Rocket(layout, None)
@ -265,16 +337,16 @@ class Fsck(object):
rkt.close() rkt.close()
# Find all directories # Find all directories
regex = re.compile("^[0-9a-f]{4,}$") regex = re.compile(b"^[0-9a-f]{4,}$")
subdirs = sorted(filter(regex.search, os.listdir(bulk)), subdirs = sorted(filter(regex.search, os.listdir(bulk)),
key = lambda x: int(x, 16), reverse = True) key=lambda x: int(x, 16), reverse=True)
for subdir in subdirs: for subdir in subdirs:
# Find all files in that dir # Find all files in that dir
subpath = os.path.join(bulk, subdir) subpath = os.path.join(bulk, subdir)
files = list(filter(regex.search, os.listdir(subpath))) files = list(filter(regex.search, os.listdir(subpath)))
if not files: if not files:
self.fix_empty_subdir(subpath) self.fix_empty_subdir(subpath)
raise RetryFsck
# Verify that their size is a multiple of the row size # Verify that their size is a multiple of the row size
for filename in files: for filename in files:
filepath = os.path.join(subpath, filename) filepath = os.path.join(subpath, filename)
@ -290,10 +362,11 @@ class Fsck(object):
# as long as it's only ".removed" files. # as long as it's only ".removed" files.
err("\n%s\n", msg) err("\n%s\n", msg)
for fn in os.listdir(subpath): for fn in os.listdir(subpath):
if not fn.endswith(".removed"): if not fn.endswith(b".removed"):
raise FsckError("can't fix automatically: please manually " raise FsckError("can't fix automatically: please manually "
"remove the file %s and try again", "remove the file '%s' and try again",
os.path.join(subpath, fn)) os.path.join(subpath, fn).decode(
'utf-8', errors='backslashreplace'))
# Remove the whole thing # Remove the whole thing
err("Removing empty subpath\n") err("Removing empty subpath\n")
shutil.rmtree(subpath) shutil.rmtree(subpath)
@ -314,6 +387,24 @@ class Fsck(object):
f.truncate(newsize) f.truncate(newsize)
raise RetryFsck raise RetryFsck
def fix_remove_stream(self, sid, path, bulk, reason):
msg = f"stream {path} is corrupted: {reason}"
if not self.fix:
raise FixableFsckError(msg)
# Remove the stream from disk and the database
err(f"\n{msg}\n")
err(f"Removing stream {path} from disk and database\n")
shutil.rmtree(bulk)
with self.sql:
cur = self.sql.cursor()
cur.execute("DELETE FROM streams WHERE id=?",
(sid,))
if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
raise FsckError("failed to remove stream")
cur.execute("DELETE FROM ranges WHERE stream_id=?", (sid,))
cur.execute("DELETE FROM metadata WHERE stream_id=?", (sid,))
raise RetryFsck
### Check interval endpoints ### Check interval endpoints
def check_intervals(self): def check_intervals(self):
@ -324,9 +415,12 @@ class Fsck(object):
for sid in self.stream_interval: for sid in self.stream_interval:
try: try:
bulk = self.bulkpath + self.stream_path[sid] bulk = self.bulkpath + self.stream_path[sid]
bulk = bulk.encode('utf-8')
tab = nilmdb.server.bulkdata.Table(bulk) tab = nilmdb.server.bulkdata.Table(bulk)
def update(x): def update(x):
pbar.update(done + x) pbar.update(done + x)
ints = self.stream_interval[sid] ints = self.stream_interval[sid]
done += self.check_table_intervals(sid, ints, tab, update) done += self.check_table_intervals(sid, ints, tab, update)
finally: finally:
@ -335,7 +429,7 @@ class Fsck(object):
def check_table_intervals(self, sid, ints, tab, update): def check_table_intervals(self, sid, ints, tab, update):
# look in the table to make sure we can pick out the interval's # look in the table to make sure we can pick out the interval's
# endpoints # endpoints
path = self.stream_path[sid] path = self.stream_path[sid] # noqa: F841 unused
tab.file_open.cache_remove_all() tab.file_open.cache_remove_all()
for (i, intv) in enumerate(ints): for (i, intv) in enumerate(ints):
update(i) update(i)
@ -343,11 +437,11 @@ class Fsck(object):
if spos == epos and spos >= 0 and spos <= tab.nrows: if spos == epos and spos >= 0 and spos <= tab.nrows:
continue continue
try: try:
srow = tab[spos] srow = tab[spos] # noqa: F841 unused
erow = tab[epos-1] erow = tab[epos-1] # noqa: F841 unused
except Exception as e: except Exception as e:
self.fix_bad_interval(sid, intv, tab, str(e)) self.fix_bad_interval(sid, intv, tab, str(e))
raise RetryFsck
return len(ints) return len(ints)
def fix_bad_interval(self, sid, intv, tab, msg): def fix_bad_interval(self, sid, intv, tab, msg):
@ -376,23 +470,23 @@ class Fsck(object):
"end_time=? AND start_pos=? AND end_pos=?", "end_time=? AND start_pos=? AND end_pos=?",
(new_etime, new_epos, sid, stime, etime, (new_etime, new_epos, sid, stime, etime,
spos, epos)) spos, epos))
if cur.rowcount != 1: if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
raise FsckError("failed to fix SQL database") raise FsckError("failed to fix SQL database")
raise RetryFsck raise RetryFsck
err("actually it can't be truncated; times are bad too") err("actually it can't be truncated; times are bad too\n")
# Otherwise, the only hope is to delete the interval entirely. # Otherwise, the only hope is to delete the interval entirely.
err("*** Deleting the entire interval from SQL.\n") err("*** Deleting the entire interval from SQL.\n")
err("This may leave stale data on disk. To fix that, copy all\n") err("This may leave stale data on disk. To fix that, copy all "
err("data from this stream to a new stream, then remove all data\n") "data from this stream to a new stream using nilm-copy, then\n")
err("from and destroy %s.\n", path) err("remove all data from and destroy %s.\n", path)
with self.sql: with self.sql:
cur = self.sql.cursor() cur = self.sql.cursor()
cur.execute("DELETE FROM ranges WHERE " cur.execute("DELETE FROM ranges WHERE "
"stream_id=? AND start_time=? AND " "stream_id=? AND start_time=? AND "
"end_time=? AND start_pos=? AND end_pos=?", "end_time=? AND start_pos=? AND end_pos=?",
(sid, stime, etime, spos, epos)) (sid, stime, etime, spos, epos))
if cur.rowcount != 1: if cur.rowcount != 1: # pragma: no cover (shouldn't fail)
raise FsckError("failed to remove interval") raise FsckError("failed to remove interval")
raise RetryFsck raise RetryFsck
@ -407,9 +501,12 @@ class Fsck(object):
for sid in self.stream_interval: for sid in self.stream_interval:
try: try:
bulk = self.bulkpath + self.stream_path[sid] bulk = self.bulkpath + self.stream_path[sid]
bulk = bulk.encode('utf-8')
tab = nilmdb.server.bulkdata.Table(bulk) tab = nilmdb.server.bulkdata.Table(bulk)
def update(x): def update(x):
pbar.update(done + x) pbar.update(done + x)
ints = self.stream_interval[sid] ints = self.stream_interval[sid]
done += self.check_table_data(sid, ints, tab, update) done += self.check_table_data(sid, ints, tab, update)
finally: finally:
@ -418,7 +515,7 @@ class Fsck(object):
def check_table_data(self, sid, ints, tab, update): def check_table_data(self, sid, ints, tab, update):
# Pull out all of the interval's data and verify that it's # Pull out all of the interval's data and verify that it's
# monotonic. # monotonic.
maxrows = 100000 maxrows = getattr(self, 'maxrows_override', 100000)
path = self.stream_path[sid] path = self.stream_path[sid]
layout = self.stream_layout[sid] layout = self.stream_layout[sid]
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout) dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
@ -438,29 +535,76 @@ class Fsck(object):
# Get raw data, convert to NumPy arary # Get raw data, convert to NumPy arary
try: try:
raw = tab.get_data(start, stop, binary = True) raw = tab.get_data(start, stop, binary=True)
data = numpy.fromstring(raw, dtype) data = numpy.frombuffer(raw, dtype)
except Exception as e: except Exception as e: # pragma: no cover
raise FsckError("%s: failed to grab rows %d through %d: %s", # No coverage because it's hard to trigger this -- earlier
path, start, stop, repr(e)) # checks check the ranges, so this would probably be a real
# disk error, malloc failure, etc.
raise FsckError(
"%s: failed to grab rows %d through %d: %s",
path, start, stop, repr(e))
ts = data['timestamp']
# Verify that all timestamps are in range.
match = (ts < stime) | (ts >= etime)
if match.any():
row = numpy.argmax(match)
if ts[row] != 0:
raise FsckError("%s: data timestamp %d at row %d "
"outside interval range [%d,%d)",
path, ts[row], row + start,
stime, etime)
# Timestamp is zero and out of the expected range;
# assume file ends with zeroed data and just truncate it.
self.fix_table_by_truncating(
path, tab, row + start,
"data timestamp is out of range, and zero")
# Verify that timestamps are monotonic # Verify that timestamps are monotonic
if (numpy.diff(data['timestamp']) <= 0).any(): match = numpy.diff(ts) <= 0
raise FsckError("%s: non-monotonic timestamp(s) in rows " if match.any():
"%d through %d", path, start, stop) row = numpy.argmax(match)
first_ts = data['timestamp'][0] if ts[row+1] != 0:
raise FsckError(
"%s: non-monotonic timestamp (%d -> %d) "
"at row %d", path, ts[row], ts[row+1],
row + start)
# Timestamp is zero and non-monotonic;
# assume file ends with zeroed data and just truncate it.
self.fix_table_by_truncating(
path, tab, row + start + 1,
"data timestamp is non-monotonic, and zero")
first_ts = ts[0]
if last_ts is not None and first_ts <= last_ts: if last_ts is not None and first_ts <= last_ts:
raise FsckError("%s: first interval timestamp %d is not " raise FsckError("%s: first interval timestamp %d is not "
"greater than the previous last interval " "greater than the previous last interval "
"timestamp %d, at row %d", "timestamp %d, at row %d",
path, first_ts, last_ts, start) path, first_ts, last_ts, start)
last_ts = data['timestamp'][-1] last_ts = ts[-1]
# These are probably fixable, by removing the offending # The previous errors are fixable, by removing the
# intervals. But I'm not going to bother implementing # offending intervals, or changing the data
# that yet. # timestamps. But these are probably unlikely errors,
# so it's not worth implementing that yet.
# Done # Done
done += count done += count
update(done) update(done)
return done return done
def fix_table_by_truncating(self, path, tab, row, reason):
# Simple fix for bad data: truncate the table at the given row.
# On retry, fix_bad_interval will correct the database and timestamps
# to account for this truncation.
msg = f"{path}: bad data in table, starting at row {row}: {reason}"
if not self.fix:
raise FixableFsckError(msg)
err(f"\n{msg}\nWill try truncating table\n")
(subdir, fname, offs, count) = tab._offset_from_row(row)
tab._remove_or_truncate_file(subdir, fname, offs)
raise RetryFsck

View File

@ -1,27 +1,27 @@
#!/usr/bin/python #!/usr/bin/env python3
import nilmdb.fsck import nilmdb.fsck
import argparse import argparse
import os
import sys
def main(): def main():
"""Main entry point for the 'nilmdb-fsck' command line script""" """Main entry point for the 'nilmdb-fsck' command line script"""
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description = 'Check database consistency', description='Check database consistency',
formatter_class = argparse.ArgumentDefaultsHelpFormatter) formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-v", "--version", action="version", parser.add_argument("-v", "--version", action="version",
version = nilmdb.__version__) version=nilmdb.__version__)
parser.add_argument("-f", "--fix", action="store_true", parser.add_argument("-f", "--fix", action="store_true",
default=False, help = 'Fix errors when possible ' default=False, help='Fix errors when possible '
'(which may involve removing data)') '(which may involve removing data)')
parser.add_argument("-n", "--no-data", action="store_true", parser.add_argument("-n", "--no-data", action="store_true",
default=False, help = 'Skip the slow full-data check') default=False, help='Skip the slow full-data check')
parser.add_argument('database', help = 'Database directory') parser.add_argument('database', help='Database directory')
args = parser.parse_args() args = parser.parse_args()
nilmdb.fsck.Fsck(args.database, args.fix).check(skip_data = args.no_data) nilmdb.fsck.Fsck(args.database, args.fix).check(skip_data=args.no_data)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,4 +1,4 @@
#!/usr/bin/python #!/usr/bin/env python3
import os import os
import sys import sys
@ -78,9 +78,12 @@ def main():
stats = yappi.get_func_stats() stats = yappi.get_func_stats()
stats.sort("ttot") stats.sort("ttot")
stats.print_all() stats.print_all()
from IPython import embed try:
embed(header="Use the `yappi` or `stats` object to explore " from IPython import embed
"further, quit to exit") embed(header="Use the `yappi` or `stats` object to "
"explore further, `quit` to exit")
except ModuleNotFoundError:
print("\nInstall ipython to explore further")
else: else:
server.start(blocking=True) server.start(blocking=True)
except nilmdb.server.serverutil.CherryPyExit: except nilmdb.server.serverutil.CherryPyExit:

View File

@ -1,4 +1,4 @@
#!/usr/bin/python #!/usr/bin/env python3
import nilmdb.cmdline import nilmdb.cmdline

View File

@ -293,8 +293,8 @@ class Table():
"layout": layout, "layout": layout,
"version": 3 "version": 3
} }
with open(os.path.join(root, b"_format"), "wb") as f: nilmdb.utils.atomic.replace_file(
pickle.dump(fmt, f, 2) os.path.join(root, b"_format"), pickle.dumps(fmt, 2))
# Normal methods # Normal methods
def __init__(self, root, initial_nrows=0): def __init__(self, root, initial_nrows=0):

View File

@ -6,6 +6,7 @@ import sys
import json import json
import decorator import decorator
import functools import functools
import threading
import cherrypy import cherrypy
@ -178,6 +179,19 @@ def cherrypy_patch_exit():
os._exit = real_exit os._exit = real_exit
bus.exit = functools.partial(patched_exit, bus.exit) bus.exit = functools.partial(patched_exit, bus.exit)
# A behavior change in Python 3.8 means that some thread exceptions,
# derived from SystemExit, now print tracebacks where they didn't
# used to: https://bugs.python.org/issue1230540
# Install a thread exception hook that ignores CherryPyExit;
# to make this match the behavior where we didn't set
# threading.excepthook, we also need to ignore SystemExit.
def hook(args):
if args.exc_type == CherryPyExit or args.exc_type == SystemExit:
return
sys.excepthook(args.exc_type, args.exc_value,
args.exc_traceback) # pragma: no cover
threading.excepthook = hook
# Start/stop CherryPy standalone server # Start/stop CherryPy standalone server
def cherrypy_start(blocking=False, event=False): def cherrypy_start(blocking=False, event=False):

View File

@ -26,7 +26,9 @@ class Timestamper():
return b"" return b""
if line[0:1] == b'#': if line[0:1] == b'#':
continue continue
break # For some reason, coverage on python 3.8 reports that
# we never hit this break, even though we definitely do.
break # pragma: no cover
try: try:
return next(self.ts_iter) + line return next(self.ts_iter) + line
except StopIteration: except StopIteration:

View File

@ -1,16 +1,41 @@
argcomplete>=1.10.0 argcomplete==1.12.0
CherryPy>=18.1.2 CherryPy==18.6.0
coverage>=4.5.4 coverage==5.2.1
cython>=0.29.13 Cython==0.29.21
decorator>=4.4.0 decorator==4.4.2
fallocate>=1.6.4 fallocate==1.6.4
flake8>=3.7.8 flake8==3.8.3
nose>=1.3.7 nose==1.3.7
numpy>=1.17.0 numpy==1.19.1
progressbar>=2.5 progressbar==2.5
psutil>=5.6.3 psutil==5.7.2
python-datetime-tz>=0.5.4 python-datetime-tz==0.5.4
python-dateutil>=2.8.0 python-dateutil==2.8.1
requests>=2.22.0 requests==2.24.0
tz>=0.2.2 tz==0.2.2
WebTest>=2.0.33 yappi==1.2.5
## The following requirements were added by pip freeze:
beautifulsoup4==4.9.1
certifi==2020.6.20
chardet==3.0.4
cheroot==8.4.2
idna==2.10
jaraco.classes==3.1.0
jaraco.collections==3.0.0
jaraco.functools==3.0.1
jaraco.text==3.2.0
mccabe==0.6.1
more-itertools==8.4.0
portend==2.6
pycodestyle==2.6.0
pyflakes==2.2.0
pytz==2020.1
six==1.15.0
soupsieve==2.0.1
tempora==4.0.0
urllib3==1.25.10
waitress==1.4.4
WebOb==1.8.6
WebTest==2.0.35
zc.lockfile==2.0

View File

@ -47,10 +47,13 @@ tag_prefix=nilmdb-
parentdir_prefix=nilmdb- parentdir_prefix=nilmdb-
[flake8] [flake8]
exclude=_version.py,fsck.py,nilmdb_fsck.py exclude=_version.py
extend-ignore=E731 extend-ignore=E731
per-file-ignores=__init__.py:F401,E402 serializer.py:E722 mustclose.py:E722 per-file-ignores=__init__.py:F401,E402 \
serializer.py:E722 \
mustclose.py:E722 \
fsck.py:E266
[pylint] [pylint]
ignore=_version.py,fsck.py,nilmdb_fsck.py ignore=_version.py
disable=C0103,C0111,R0913,R0914 disable=C0103,C0111,R0913,R0914

View File

@ -1,4 +1,4 @@
#!/usr/bin/python #!/usr/bin/env python3
# To release a new version, tag it: # To release a new version, tag it:
# git tag -a nilmdb-1.1 -m "Version 1.1" # git tag -a nilmdb-1.1 -m "Version 1.1"

Binary file not shown.

View File

@ -0,0 +1 @@
hi

View File

@ -0,0 +1 @@
hi

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
hi

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
hi

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
hi

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
world

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
world

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More