Browse Source

Verify that data timestamps are monotonic

tags/nilmdb-1.9.1
Jim Paris 10 years ago
parent
commit
8e6341ae5d
2 changed files with 126 additions and 76 deletions
  1. +2
    -2
      Makefile
  2. +124
    -74
      nilmdb/fsck/fsck.py

+ 2
- 2
Makefile View File

@@ -24,8 +24,8 @@ lint:
pylint --rcfile=.pylintrc nilmdb

fscktest:
# python -c "import nilmdb.fsck; nilmdb.fsck.Fsck('/home/jim/wsgi/db').check()"
python -c "import nilmdb.fsck; nilmdb.fsck.Fsck('/home/jim/mnt/bucket/mnt/sharon/data/db', True).check()"
python -c "import nilmdb.fsck; nilmdb.fsck.Fsck('/home/jim/wsgi/db').check()"
# python -c "import nilmdb.fsck; nilmdb.fsck.Fsck('/home/jim/mnt/bucket/mnt/sharon/data/db', True).check()"

test:
ifeq ($(INSIDE_EMACS), t)


+ 124
- 74
nilmdb/fsck/fsck.py View File

@@ -1,9 +1,13 @@
# -*- coding: utf-8 -*-

"""Check database consistency"""
"""Check database consistency, with some ability to fix problems.
This should be able to fix cases where a database gets corrupted due
to unexpected system shutdown, and detect other cases that may cause
NilmDB to return errors when trying to manipulate the database."""

import nilmdb.utils
import nilmdb.server
import nilmdb.client.numpyclient
from nilmdb.utils.interval import IntervalError
from nilmdb.server.interval import Interval, IntervalSet
from nilmdb.utils.printf import *
@@ -18,6 +22,7 @@ import re
import time
import shutil
import cPickle as pickle
import numpy

class FsckError(Exception):
def __init__(self, msg = "", *args):
@@ -81,6 +86,8 @@ class Fsck(object):
self.bulklock = os.path.join(path, "data.lock")
self.fix = fix

### Main checks

@retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
def check(self):
self.bulk = None
@@ -99,6 +106,8 @@ class Fsck(object):
self.sql.close()
log("ok\n")

### Check basic path structure

def check_paths(self):
log("checking paths\n")
if self.bulk:
@@ -113,6 +122,8 @@ class Fsck(object):
# unlocked immediately
self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)

### Check SQL database health

def check_sql(self):
log("checking sqlite database\n")

@@ -137,7 +148,8 @@ class Fsck(object):
log(" loading intervals\n")
self.stream_interval = defaultdict(list)
result = cur.execute("SELECT stream_id, start_time, end_time, "
"start_pos, end_pos FROM ranges")
"start_pos, end_pos FROM ranges "
"ORDER BY start_time")
for r in result:
if r[0] not in self.stream_path:
raise FsckError("interval ID %d not in streams", k)
@@ -154,6 +166,8 @@ class Fsck(object):
r[1], r[0])
self.stream_meta[r[0]][r[1]] = r[2]

### Check streams and basic interval overlap

def check_streams(self):
ids = self.stream_path.keys()
log("checking %d streams\n", len(ids))
@@ -217,37 +231,7 @@ class Fsck(object):
if tab:
tab.close()

def fix_empty_subdir(self, subpath):
msg = sprintf("bulkdata path %s is missing data files", subpath)
if not self.fix:
raise FixableFsckError(msg)
# Try to fix it by just deleting whatever is present,
# as long as it's only ".removed" files.
err("\n%s\n", msg)
for fn in os.listdir(subpath):
if not fn.endswith(".removed"):
raise FsckError("can't fix automatically: please manually "
"remove the file %s and try again",
os.path.join(subpath, fn))
# Remove the whole thing
err("Removing empty subpath\n")
shutil.rmtree(subpath)
raise RetryFsck

def fix_bad_filesize(self, path, filepath, offset, row_size):
extra = offset % row_size
msg = sprintf("%s: size of file %s (%d) is not a multiple" +
" of row size (%d): %d extra bytes present",
path, filepath, offset, row_size, extra)
if not self.fix:
raise FixableFsckError(msg)
# Try to fix it by just truncating the file
err("\n%s\n", msg)
newsize = offset - extra
err("Truncating file to %d bytes and retrying\n", newsize)
with open(filepath, "r+b") as f:
f.truncate(newsize)
raise RetryFsck
### Check that bulkdata is good enough to be opened

@retry_if_raised(RetryFsck)
def check_bulkdata(self, sid, path, bulk):
@@ -286,26 +270,73 @@ class Fsck(object):
if offset % row_size:
self.fix_bad_filesize(path, filepath, offset, row_size)

def _check_for_each_interval(self, checkfunc):
def fix_empty_subdir(self, subpath):
msg = sprintf("bulkdata path %s is missing data files", subpath)
if not self.fix:
raise FixableFsckError(msg)
# Try to fix it by just deleting whatever is present,
# as long as it's only ".removed" files.
err("\n%s\n", msg)
for fn in os.listdir(subpath):
if not fn.endswith(".removed"):
raise FsckError("can't fix automatically: please manually "
"remove the file %s and try again",
os.path.join(subpath, fn))
# Remove the whole thing
err("Removing empty subpath\n")
shutil.rmtree(subpath)
raise RetryFsck

def fix_bad_filesize(self, path, filepath, offset, row_size):
extra = offset % row_size
msg = sprintf("%s: size of file %s (%d) is not a multiple" +
" of row size (%d): %d extra bytes present",
path, filepath, offset, row_size, extra)
if not self.fix:
raise FixableFsckError(msg)
# Try to fix it by just truncating the file
err("\n%s\n", msg)
newsize = offset - extra
err("Truncating file to %d bytes and retrying\n", newsize)
with open(filepath, "r+b") as f:
f.truncate(newsize)
raise RetryFsck

### Check interval endpoints

def check_intervals(self):
total_ints = sum(len(x) for x in self.stream_interval.values())
checked = 0
log("checking %d intervals\n", total_ints)
done = 0
with Progress(total_ints) as pbar:
for sid in self.stream_interval:
try:
bulk = self.bulkpath + self.stream_path[sid]
tab = nilmdb.server.bulkdata.Table(bulk)
def update(x):
pbar.update(checked + x)
pbar.update(done + x)
ints = self.stream_interval[sid]
checkfunc(sid, ints, tab, update)
checked += len(ints)
done += self.check_table_intervals(sid, ints, tab, update)
finally:
tab.close()

def check_intervals(self):
total_ints = sum(len(x) for x in self.stream_interval.values())
log("checking %d intervals\n", total_ints)
self._check_for_each_interval(self.check_table_intervals)
def check_table_intervals(self, sid, ints, tab, update):
# look in the table to make sure we can pick out the interval's
# endpoints
path = self.stream_path[sid]
tab.file_open.cache_remove_all()
for (i, intv) in enumerate(ints):
update(i)
(stime, etime, spos, epos) = intv
if spos == epos and spos >= 0 and spos <= tab.nrows:
continue
try:
srow = tab[spos]
erow = tab[epos-1]
except Exception as e:
self.fix_bad_interval(sid, intv, tab, str(e))
raise RetryFsck
return len(ints)

def fix_bad_interval(self, sid, intv, tab, msg):
path = self.stream_path[sid]
@@ -342,7 +373,7 @@ class Fsck(object):
err("*** Deleting the entire interval from SQL.\n")
err("This may leave stale data on disk. To fix that, copy all\n")
err("data from this stream to a new stream, then remove all data\n")
err("and destroy %s.\n")
err("from and destroy %s.\n")
with self.sql:
cur = self.sql.cursor()
cur.execute("DELETE FROM ranges WHERE "
@@ -353,44 +384,63 @@ class Fsck(object):
raise FsckError("failed to remove interval")
raise RetryFsck

def check_table_intervals(self, sid, ints, tab, update):
# look in the table to make sure we can pick out the interval's
# endpoints
path = self.stream_path[sid]
tab.file_open.cache_remove_all()
for (i, intv) in enumerate(ints):
(stime, etime, spos, epos) = intv
update(i)
if spos == epos and spos >= 0 and spos <= tab.nrows:
continue
try:
srow = tab[spos]
erow = tab[epos-1]
except Exception as e:
self.fix_bad_interval(sid, intv, tab, str(e))
raise RetryFsck
### Check data in each interval

def check_data(self):
total_rows = sum(sum((y[3] - y[2]) for y in x)
for x in self.stream_interval.values())
log("checking %d rows of data\n", total_rows)
self._check_for_each_interval(self.check_table_data)
done = 0
with Progress(total_rows) as pbar:
for sid in self.stream_interval:
try:
bulk = self.bulkpath + self.stream_path[sid]
tab = nilmdb.server.bulkdata.Table(bulk)
def update(x):
pbar.update(done + x)
ints = self.stream_interval[sid]
done += self.check_table_data(sid, ints, tab, update)
finally:
tab.close()

def check_table_data(self, sid, ints, tab, update):
# look in the table to make sure we can pick out all of
# the interval's data, and that the data is monotonic
# Pull out all of the interval's data and verify that it's
# monotonic.
maxrows = 100000
path = self.stream_path[sid]
layout = self.stream_layout[sid]
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
tab.file_open.cache_remove_all()
for (i, intv) in enumerate(ints):
(stime, etime, spos, epos) = intv
update(i)
done = 0
for intv in ints:
last_ts = None
for row in xrange(spos, epos):
ts = tab[row]
if ts <= last_ts:
raise FsckError("%s: interval %s has non-monotonic "
"timestamps: %d and then %d\n",
path, intv, last_ts, ts)
if ts < stime or ts >= etime:
raise FsckError("%s: interval %s has out-of-bound "
"timestamp %d\n", ps, intv, ts)
(stime, etime, spos, epos) = intv
if spos == epos:
continue
for start in xrange(*slice(spos, epos, maxrows).indices(epos)):
stop = min(start + maxrows, epos)
count = stop - start
# Get raw data, convert to NumPy arary
try:
raw = tab.get_data(start, stop, binary = True)
data = numpy.fromstring(raw, dtype)
except Exception as e:
raise FsckError("%s: failed to grab rows %d through %d: %s",
path, start, stop, repr(e))

# Verify that timestamps are monotonic
if (numpy.diff(data['timestamp']) <= 0).any():
raise FsckError("%s: non-monotonic timestamp(s) in rows "
"%d through %d", path, start, stop)
first_ts = data['timestamp'][0]
if last_ts is not None and first_ts <= last_ts:
raise FsckError("%s: first interval timestamp %d is not "
"greater than the previous last interval "
"timestamp %d, at row %d",
path, first_ts, last_ts, start)
last_ts = data['timestamp'][-1]

# Done
done += count
update(done)
return done

Loading…
Cancel
Save