Compare commits

...

19 Commits

Author SHA1 Message Date
6868f5f126 fsck: limit max retries so we don't get stuck in a loop forever 2013-08-03 22:34:30 -04:00
ca0943ec19 fsck: add --no-data option to do a quicker fsck
This makes it fast enough to run at startup with -f, if it's expected
that a system will frequently need to be fixed.
2013-08-03 22:31:45 -04:00
68addb4e4a Clarify output when fsck database is locked 2013-08-03 21:58:24 -04:00
68c33b1f14 fsck: add comma separator on big numbers 2013-08-03 21:50:33 -04:00
8dd8741100 Tweak options, dependencies, documentation 2013-08-03 21:42:49 -04:00
8e6341ae5d Verify that data timestamps are monotonic 2013-08-03 21:32:05 -04:00
422b1e2df2 More fsck improvements. Fixed two problems on sharon so far. 2013-08-03 17:50:46 -04:00
0f745b3047 More fsck tools, including fixes 2013-08-03 16:43:20 -04:00
71cd7ed9b7 Add nilmdb-fsck tool to check database consistency 2013-08-03 14:23:14 -04:00
a79d6104d5 Documentation fixups 2013-08-01 16:24:51 -04:00
8e8ec59e30 Support "nilmtool cmd --version" 2013-08-01 15:14:34 -04:00
b89b945a0f Better responses to invalid HTTP times 2013-07-31 13:37:04 -04:00
bd7bdb2eb8 Add --optimize option to nilmtool intervals 2013-07-30 15:31:51 -04:00
840cd2fd13 Remove stray print 2013-07-30 15:21:09 -04:00
bbd59c8b50 Add nilmdb.utils.interval.intersection by generalizing set_difference 2013-07-30 14:48:19 -04:00
405c110fd7 Doc updates 2013-07-29 15:36:43 -04:00
274adcd856 Documentation updates 2013-07-27 19:51:09 -04:00
a1850c9c2c Misc documentation 2013-07-25 16:08:35 -04:00
6cd28b67b1 Support iterator protocol in Serializer 2013-07-24 14:52:26 -04:00
19 changed files with 682 additions and 45 deletions

View File

@@ -7,4 +7,4 @@
exclude_lines =
pragma: no cover
if 0:
omit = nilmdb/utils/datetime_tz*,nilmdb/scripts,nilmdb/_version.py
omit = nilmdb/utils/datetime_tz*,nilmdb/scripts,nilmdb/_version.py,nilmdb/fsck

View File

@@ -1,5 +1,5 @@
# By default, run the tests.
all: test
all: fscktest
version:
python setup.py version
@@ -23,6 +23,10 @@ docs:
lint:
pylint --rcfile=.pylintrc nilmdb
fscktest:
python -c "import nilmdb.fsck; nilmdb.fsck.Fsck('/home/jim/wsgi/db').check()"
# python -c "import nilmdb.fsck; nilmdb.fsck.Fsck('/home/jim/mnt/bucket/mnt/sharon/data/db', True).check()"
test:
ifeq ($(INSIDE_EMACS), t)
# Use the slightly more flexible script

View File

@@ -8,7 +8,8 @@ Prerequisites:
# Base NilmDB dependencies
sudo apt-get install python-cherrypy3 python-decorator python-simplejson
sudo apt-get install python-requests python-dateutil python-tz python-psutil
sudo apt-get install python-requests python-dateutil python-tz
sudo apt-get install python-progressbar python-psutil
# Other dependencies (required by some modules)
sudo apt-get install python-numpy
@@ -26,6 +27,7 @@ Install:
Usage:
nilmdb-server --help
nilmdb-fsck --help
nilmtool --help
See docs/wsgi.md for info on setting up a WSGI application in Apache.

View File

@@ -58,6 +58,11 @@ class Client(object):
return self.http.get("dbinfo")
def stream_list(self, path = None, layout = None, extended = False):
"""Return a sorted list of [path, layout] lists. If 'path' or
'layout' are specified, only return streams that match those
exact values. If 'extended' is True, the returned lists have
extended info, e.g.: [path, layout, extent_min, extent_max,
total_rows, total_seconds."""
params = {}
if path is not None:
params["path"] = path
@@ -69,6 +74,7 @@ class Client(object):
return nilmdb.utils.sort.sort_human(streams, key = lambda s: s[0])
def stream_get_metadata(self, path, keys = None):
"""Get stream metadata"""
params = { "path": path }
if keys is not None:
params["key"] = keys

View File

@@ -29,6 +29,14 @@ for cmd in subcommands:
subcmd_mods[cmd] = __import__("nilmdb.cmdline." + cmd, fromlist = [ cmd ])
class JimArgumentParser(argparse.ArgumentParser):
def parse_args(self, args=None, namespace=None):
# Look for --version anywhere and change it to just "nilmtool
# --version". This makes "nilmtool cmd --version" work, which
# is needed by help2man.
if "--version" in (args or sys.argv[1:]):
args = [ "--version" ]
return argparse.ArgumentParser.parse_args(self, args, namespace)
def error(self, message):
self.print_usage(sys.stderr)
self.exit(2, sprintf("error: %s\n", message))

View File

@@ -1,5 +1,6 @@
from nilmdb.utils.printf import *
import nilmdb.utils.time
from nilmdb.utils.interval import Interval
import fnmatch
import argparse
@@ -42,6 +43,8 @@ def setup(self, sub):
group = cmd.add_argument_group("Misc options")
group.add_argument("-T", "--timestamp-raw", action="store_true",
help="Show raw timestamps when printing times")
group.add_argument("-o", "--optimize", action="store_true",
help="Optimize (merge adjacent) intervals")
return cmd
@@ -58,9 +61,16 @@ def cmd_intervals(self):
time_string = nilmdb.utils.time.timestamp_to_human
try:
for (start, end) in self.client.stream_intervals(
self.args.path, self.args.start, self.args.end, self.args.diff):
printf("[ %s -> %s ]\n", time_string(start), time_string(end))
intervals = ( Interval(start, end) for (start, end) in
self.client.stream_intervals(self.args.path,
self.args.start,
self.args.end,
self.args.diff) )
if self.args.optimize:
intervals = nilmdb.utils.interval.optimize(intervals)
for i in intervals:
printf("[ %s -> %s ]\n", time_string(i.start), time_string(i.end))
except nilmdb.client.ClientError as e:
self.die("error listing intervals: %s", str(e))

5
nilmdb/fsck/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""nilmdb.fsck"""
from __future__ import absolute_import
from nilmdb.fsck.fsck import Fsck

458
nilmdb/fsck/fsck.py Normal file
View File

@@ -0,0 +1,458 @@
# -*- coding: utf-8 -*-
"""Check database consistency, with some ability to fix problems.
This should be able to fix cases where a database gets corrupted due
to unexpected system shutdown, and detect other cases that may cause
NilmDB to return errors when trying to manipulate the database."""
import nilmdb.utils
import nilmdb.server
import nilmdb.client.numpyclient
from nilmdb.utils.interval import IntervalError
from nilmdb.server.interval import Interval, IntervalSet
from nilmdb.utils.printf import *
from nilmdb.utils.time import timestamp_to_string
from collections import defaultdict
import sqlite3
import os
import sys
import progressbar
import re
import time
import shutil
import cPickle as pickle
import numpy
class FsckError(Exception):
def __init__(self, msg = "", *args):
if args:
msg = sprintf(msg, *args)
Exception.__init__(self, msg)
class FixableFsckError(FsckError):
def __init__(self, msg = "", *args):
if args:
msg = sprintf(msg, *args)
FsckError.__init__(self, "%s\nThis may be fixable with \"--fix\".", msg)
class RetryFsck(FsckError):
pass
def log(format, *args):
printf(format, *args)
def err(format, *args):
fprintf(sys.stderr, format, *args)
# Decorator that retries a function if it returns a specific value
def retry_if_raised(exc, message = None, max_retries = 100):
def f1(func):
def f2(*args, **kwargs):
for n in range(max_retries):
try:
return func(*args, **kwargs)
except exc as e:
if message:
log("%s\n\n", message)
raise Exception("Max number of retries (%d) exceeded; giving up")
return f2
return f1
class Progress(object):
def __init__(self, maxval):
self.bar = progressbar.ProgressBar(
maxval = maxval,
widgets = [ progressbar.Percentage(), ' ',
progressbar.Bar(), ' ',
progressbar.ETA() ])
if self.bar.term_width == 0:
self.bar.term_width = 75
def __enter__(self):
self.bar.start()
self.last_update = 0
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
self.bar.finish()
else:
printf("\n")
def update(self, val):
self.bar.update(val)
class Fsck(object):
def __init__(self, path, fix = False):
self.basepath = path
self.sqlpath = os.path.join(path, "data.sql")
self.bulkpath = os.path.join(path, "data")
self.bulklock = os.path.join(path, "data.lock")
self.fix = fix
### Main checks
@retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
def check(self, skip_data = False):
self.bulk = None
self.sql = None
try:
self.check_paths()
self.check_sql()
self.check_streams()
self.check_intervals()
if skip_data:
log("skipped data check\n")
else:
self.check_data()
finally:
if self.bulk:
self.bulk.close()
if self.sql:
self.sql.commit()
self.sql.close()
log("ok\n")
### Check basic path structure
def check_paths(self):
log("checking paths\n")
if self.bulk:
self.bulk.close()
if not os.path.isfile(self.sqlpath):
raise FsckError("SQL database missing (%s)", self.sqlpath)
if not os.path.isdir(self.bulkpath):
raise FsckError("Bulk data directory missing (%s)", self.bulkpath)
with open(self.bulklock, "w") as lockfile:
if not nilmdb.utils.lock.exclusive_lock(lockfile):
raise FsckError('Database already locked by another process\n'
'Make sure all other processes that might be '
'using the database are stopped.\n'
'Restarting apache will cause it to unlock '
'the db until a request is received.')
# unlocked immediately
self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)
### Check SQL database health
def check_sql(self):
log("checking sqlite database\n")
self.sql = sqlite3.connect(self.sqlpath)
with self.sql:
cur = self.sql.cursor()
ver = cur.execute("PRAGMA user_version").fetchone()[0]
good = max(nilmdb.server.nilmdb._sql_schema_updates.keys())
if ver != good:
raise FsckError("database version %d too old, should be %d",
ver, good)
self.stream_path = {}
self.stream_layout = {}
log(" loading paths\n")
result = cur.execute("SELECT id, path, layout FROM streams")
for r in result:
if r[0] in self.stream_path:
raise FsckError("duplicated ID %d in stream IDs", r[0])
self.stream_path[r[0]] = r[1]
self.stream_layout[r[0]] = r[2]
log(" loading intervals\n")
self.stream_interval = defaultdict(list)
result = cur.execute("SELECT stream_id, start_time, end_time, "
"start_pos, end_pos FROM ranges "
"ORDER BY start_time")
for r in result:
if r[0] not in self.stream_path:
raise FsckError("interval ID %d not in streams", k)
self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4]))
log(" loading metadata\n")
self.stream_meta = defaultdict(dict)
result = cur.execute("SELECT stream_id, key, value FROM metadata")
for r in result:
if r[0] not in self.stream_path:
raise FsckError("metadata ID %d not in streams", k)
if r[1] in self.stream_meta[r[0]]:
raise FsckError("duplicate metadata key '%s' for stream %d",
r[1], r[0])
self.stream_meta[r[0]][r[1]] = r[2]
### Check streams and basic interval overlap
def check_streams(self):
ids = self.stream_path.keys()
log("checking %s streams\n", "{:,d}".format(len(ids)))
with Progress(len(ids)) as pbar:
for i, sid in enumerate(ids):
pbar.update(i)
path = self.stream_path[sid]
# unique path, valid layout
if self.stream_path.values().count(path) != 1:
raise FsckError("duplicated path %s", path)
layout = self.stream_layout[sid].split('_')[0]
if layout not in ('int8', 'int16', 'int32', 'int64',
'uint8', 'uint16', 'uint32', 'uint64',
'float32', 'float64'):
raise FsckError("bad layout %s for %s", layout, path)
count = int(self.stream_layout[sid].split('_')[1])
if count < 1 or count > 1024:
raise FsckError("bad count %d for %s", count, path)
# must exist in bulkdata
bulk = self.bulkpath + path
if not os.path.isdir(bulk):
raise FsckError("%s: missing bulkdata dir", path)
if not nilmdb.server.bulkdata.Table.exists(bulk):
raise FsckError("%s: bad bulkdata table", path)
# intervals don't overlap. Abuse IntervalSet to check
# for intervals in file positions, too.
timeiset = IntervalSet()
posiset = IntervalSet()
for (stime, etime, spos, epos) in self.stream_interval[sid]:
new = Interval(stime, etime)
try:
timeiset += new
except IntervalError:
raise FsckError("%s: overlap in intervals:\n"
"set: %s\nnew: %s",
path, str(timeiset), str(new))
if spos != epos:
new = Interval(spos, epos)
try:
posiset += new
except IntervalError:
raise FsckError("%s: overlap in file offsets:\n"
"set: %s\nnew: %s",
path, str(posiset), str(new))
# check bulkdata
self.check_bulkdata(sid, path, bulk)
# Check that we can open bulkdata
try:
tab = None
try:
tab = nilmdb.server.bulkdata.Table(bulk)
except Exception as e:
raise FsckError("%s: can't open bulkdata: %s",
path, str(e))
finally:
if tab:
tab.close()
### Check that bulkdata is good enough to be opened
@retry_if_raised(RetryFsck)
def check_bulkdata(self, sid, path, bulk):
with open(os.path.join(bulk, "_format"), "rb") as f:
fmt = pickle.load(f)
if fmt["version"] != 3:
raise FsckError("%s: bad or unsupported bulkdata version %d",
path, fmt["version"])
row_per_file = int(fmt["rows_per_file"])
files_per_dir = int(fmt["files_per_dir"])
layout = fmt["layout"]
if layout != self.stream_layout[sid]:
raise FsckError("%s: layout mismatch %s != %s", path,
layout, self.stream_layout[sid])
# Every file should have a size that's the multiple of the row size
rkt = nilmdb.server.rocket.Rocket(layout, None)
row_size = rkt.binary_size
rkt.close()
# Find all directories
regex = re.compile("^[0-9a-f]{4,}$")
subdirs = sorted(filter(regex.search, os.listdir(bulk)),
key = lambda x: int(x, 16), reverse = True)
for subdir in subdirs:
# Find all files in that dir
subpath = os.path.join(bulk, subdir)
files = filter(regex.search, os.listdir(subpath))
if not files:
self.fix_empty_subdir(subpath)
raise RetryFsck
# Verify that their size is a multiple of the row size
for filename in files:
filepath = os.path.join(subpath, filename)
offset = os.path.getsize(filepath)
if offset % row_size:
self.fix_bad_filesize(path, filepath, offset, row_size)
def fix_empty_subdir(self, subpath):
msg = sprintf("bulkdata path %s is missing data files", subpath)
if not self.fix:
raise FixableFsckError(msg)
# Try to fix it by just deleting whatever is present,
# as long as it's only ".removed" files.
err("\n%s\n", msg)
for fn in os.listdir(subpath):
if not fn.endswith(".removed"):
raise FsckError("can't fix automatically: please manually "
"remove the file %s and try again",
os.path.join(subpath, fn))
# Remove the whole thing
err("Removing empty subpath\n")
shutil.rmtree(subpath)
raise RetryFsck
def fix_bad_filesize(self, path, filepath, offset, row_size):
extra = offset % row_size
msg = sprintf("%s: size of file %s (%d) is not a multiple" +
" of row size (%d): %d extra bytes present",
path, filepath, offset, row_size, extra)
if not self.fix:
raise FixableFsckError(msg)
# Try to fix it by just truncating the file
err("\n%s\n", msg)
newsize = offset - extra
err("Truncating file to %d bytes and retrying\n", newsize)
with open(filepath, "r+b") as f:
f.truncate(newsize)
raise RetryFsck
### Check interval endpoints
def check_intervals(self):
total_ints = sum(len(x) for x in self.stream_interval.values())
log("checking %s intervals\n", "{:,d}".format(total_ints))
done = 0
with Progress(total_ints) as pbar:
for sid in self.stream_interval:
try:
bulk = self.bulkpath + self.stream_path[sid]
tab = nilmdb.server.bulkdata.Table(bulk)
def update(x):
pbar.update(done + x)
ints = self.stream_interval[sid]
done += self.check_table_intervals(sid, ints, tab, update)
finally:
tab.close()
def check_table_intervals(self, sid, ints, tab, update):
# look in the table to make sure we can pick out the interval's
# endpoints
path = self.stream_path[sid]
tab.file_open.cache_remove_all()
for (i, intv) in enumerate(ints):
update(i)
(stime, etime, spos, epos) = intv
if spos == epos and spos >= 0 and spos <= tab.nrows:
continue
try:
srow = tab[spos]
erow = tab[epos-1]
except Exception as e:
self.fix_bad_interval(sid, intv, tab, str(e))
raise RetryFsck
return len(ints)
def fix_bad_interval(self, sid, intv, tab, msg):
path = self.stream_path[sid]
msg = sprintf("%s: interval %s error accessing rows: %s",
path, str(intv), str(msg))
if not self.fix:
raise FixableFsckError(msg)
err("\n%s\n", msg)
(stime, etime, spos, epos) = intv
# If it's just that the end pos is more than the number of rows
# in the table, lower end pos and truncate interval time too.
if spos < tab.nrows and epos >= tab.nrows:
err("end position is past endrows, but it can be truncated\n")
err("old end: time %d, pos %d\n", etime, epos)
new_epos = tab.nrows
new_etime = tab[new_epos-1] + 1
err("new end: time %d, pos %d\n", new_etime, new_epos)
if stime < new_etime:
# Change it in SQL
with self.sql:
cur = self.sql.cursor()
cur.execute("UPDATE ranges SET end_time=?, end_pos=? "
"WHERE stream_id=? AND start_time=? AND "
"end_time=? AND start_pos=? AND end_pos=?",
(new_etime, new_epos, sid, stime, etime,
spos, epos))
if cur.rowcount != 1:
raise FsckError("failed to fix SQL database")
raise RetryFsck
err("actually it can't be truncated; times are bad too")
# Otherwise, the only hope is to delete the interval entirely.
err("*** Deleting the entire interval from SQL.\n")
err("This may leave stale data on disk. To fix that, copy all\n")
err("data from this stream to a new stream, then remove all data\n")
err("from and destroy %s.\n")
with self.sql:
cur = self.sql.cursor()
cur.execute("DELETE FROM ranges WHERE "
"stream_id=? AND start_time=? AND "
"end_time=? AND start_pos=? AND end_pos=?",
(sid, stime, etime, spos, epos))
if cur.rowcount != 1:
raise FsckError("failed to remove interval")
raise RetryFsck
### Check data in each interval
def check_data(self):
total_rows = sum(sum((y[3] - y[2]) for y in x)
for x in self.stream_interval.values())
log("checking %s rows of data\n", "{:,d}".format(total_rows))
done = 0
with Progress(total_rows) as pbar:
for sid in self.stream_interval:
try:
bulk = self.bulkpath + self.stream_path[sid]
tab = nilmdb.server.bulkdata.Table(bulk)
def update(x):
pbar.update(done + x)
ints = self.stream_interval[sid]
done += self.check_table_data(sid, ints, tab, update)
finally:
tab.close()
def check_table_data(self, sid, ints, tab, update):
# Pull out all of the interval's data and verify that it's
# monotonic.
maxrows = 100000
path = self.stream_path[sid]
layout = self.stream_layout[sid]
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
tab.file_open.cache_remove_all()
done = 0
for intv in ints:
last_ts = None
(stime, etime, spos, epos) = intv
if spos == epos:
continue
for start in xrange(*slice(spos, epos, maxrows).indices(epos)):
stop = min(start + maxrows, epos)
count = stop - start
# Get raw data, convert to NumPy arary
try:
raw = tab.get_data(start, stop, binary = True)
data = numpy.fromstring(raw, dtype)
except Exception as e:
raise FsckError("%s: failed to grab rows %d through %d: %s",
path, start, stop, repr(e))
# Verify that timestamps are monotonic
if (numpy.diff(data['timestamp']) <= 0).any():
raise FsckError("%s: non-monotonic timestamp(s) in rows "
"%d through %d", path, start, stop)
first_ts = data['timestamp'][0]
if last_ts is not None and first_ts <= last_ts:
raise FsckError("%s: first interval timestamp %d is not "
"greater than the previous last interval "
"timestamp %d, at row %d",
path, first_ts, last_ts, start)
last_ts = data['timestamp'][-1]
# These are probably fixable, by removing the offending
# intervals. But I'm not going to bother implementing
# that yet.
# Done
done += count
update(done)
return done

27
nilmdb/scripts/nilmdb_fsck.py Executable file
View File

@@ -0,0 +1,27 @@
#!/usr/bin/python
import nilmdb.fsck
import argparse
import os
import sys
def main():
"""Main entry point for the 'nilmdb-fsck' command line script"""
parser = argparse.ArgumentParser(
description = 'Check database consistency',
formatter_class = argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-V", "--version", action="version",
version = nilmdb.__version__)
parser.add_argument("-f", "--fix", action="store_true",
default=False, help = 'Fix errors when possible '
'(which may involve removing data)')
parser.add_argument("-n", "--no-data", action="store_true",
default=False, help = 'Skip the slow full-data check')
parser.add_argument('database', help = 'Database directory')
args = parser.parse_args()
nilmdb.fsck.Fsck(args.database, args.fix).check(skip_data = args.no_data)
if __name__ == "__main__":
main()

View File

@@ -330,7 +330,8 @@ class Table(object):
# Find the last directory. We sort and loop through all of them,
# starting with the numerically greatest, because the dirs could be
# empty if something was deleted.
# empty if something was deleted but the directory was unexpectedly
# not deleted.
subdirs = sorted(filter(regex.search, os.listdir(self.root)),
key = lambda x: int(x, 16), reverse = True)

View File

@@ -84,10 +84,18 @@ class Stream(NilmApp):
# Helpers
def _get_times(self, start_param, end_param):
(start, end) = (None, None)
try:
if start_param is not None:
start = string_to_timestamp(start_param)
except Exception:
raise cherrypy.HTTPError("400 Bad Request", sprintf(
"invalid start (%s): must be a numeric timestamp", start_param))
try:
if end_param is not None:
end = string_to_timestamp(end_param)
except Exception:
raise cherrypy.HTTPError("400 Bad Request", sprintf(
"invalid end (%s): must be a numeric timestamp", end_param))
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError(

View File

@@ -58,18 +58,11 @@ class Interval:
raise IntervalError("not a subset")
return Interval(start, end)
def set_difference(a, b):
"""
Compute the difference (a \\ b) between the intervals in 'a' and
the intervals in 'b'; i.e., the ranges that are present in 'self'
but not 'other'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
def _interval_math_helper(a, b, op, subset = True):
"""Helper for set_difference, intersection functions,
to compute interval subsets based on a math operator on ranges
present in A and B. Subsets are computed from A, or new intervals
are generated if subset = False."""
# Iterate through all starts and ends in sorted order. Add a
# tag to the iterator so that we can figure out which one they
# were, after sorting.
@@ -84,31 +77,57 @@ def set_difference(a, b):
# At each point, evaluate which type of end it is, to determine
# how to build up the output intervals.
a_interval = None
b_interval = None
in_a = False
in_b = False
out_start = None
for (ts, k, i) in nilmdb.utils.iterator.imerge(a_iter, b_iter):
if k == 0:
# start a interval
a_interval = i
if b_interval is None:
out_start = ts
in_a = True
elif k == 1:
# start b interval
b_interval = i
if out_start is not None and out_start != ts:
yield a_interval.subset(out_start, ts)
out_start = None
in_b = True
elif k == 2:
# end a interval
if out_start is not None and out_start != ts:
yield a_interval.subset(out_start, ts)
out_start = None
a_interval = None
in_a = False
elif k == 3:
# end b interval
b_interval = None
if a_interval:
in_b = False
include = op(in_a, in_b)
if include and out_start is None:
out_start = ts
elif not include:
if out_start is not None and out_start != ts:
if subset:
yield a_interval.subset(out_start, ts)
else:
yield Interval(out_start, ts)
out_start = None
def set_difference(a, b):
"""
Compute the difference (a \\ b) between the intervals in 'a' and
the intervals in 'b'; i.e., the ranges that are present in 'self'
but not 'other'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
return _interval_math_helper(a, b, (lambda a, b: a and not b))
def intersection(a, b):
"""
Compute the intersection between the intervals in 'a' and the
intervals in 'b'; i.e., the ranges that are present in both 'a'
and 'b'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
return _interval_math_helper(a, b, (lambda a, b: a and b))
def optimize(it):
"""

View File

@@ -91,6 +91,20 @@ def serializer_proxy(obj_or_type):
r = SerializerCallProxy(self.__call_queue, attr, self)
return r
# For an interable object, on __iter__(), save the object's
# iterator and return this proxy. On next(), call the object's
# iterator through this proxy.
def __iter__(self):
attr = getattr(self.__object, "__iter__")
self.__iter = SerializerCallProxy(self.__call_queue, attr, self)()
return self
def next(self):
return SerializerCallProxy(self.__call_queue,
self.__iter.next, self)()
def __getitem__(self, key):
return self.__getattr__("__getitem__")(key)
def __call__(self, *args, **kwargs):
"""Call this to instantiate the type, if a type was passed
to serializer_proxy. Otherwise, pass the call through."""

View File

@@ -60,7 +60,7 @@ def rate_to_period(hz, cycles = 1):
def parse_time(toparse):
"""
Parse a free-form time string and return a nilmdb timestamp
(integer seconds since epoch). If the string doesn't contain a
(integer microseconds since epoch). If the string doesn't contain a
timestamp, the current local timezone is assumed (e.g. from the TZ
env var).
"""

View File

@@ -118,6 +118,7 @@ setup(name='nilmdb',
'pytz',
'psutil >= 0.3.0',
'requests >= 1.1.0, < 2.0.0',
'progressbar >= 2.2',
],
packages = [ 'nilmdb',
'nilmdb.utils',
@@ -126,11 +127,13 @@ setup(name='nilmdb',
'nilmdb.client',
'nilmdb.cmdline',
'nilmdb.scripts',
'nilmdb.fsck',
],
entry_points = {
'console_scripts': [
'nilmtool = nilmdb.scripts.nilmtool:main',
'nilmdb-server = nilmdb.scripts.nilmdb_server:main',
'nilmdb-fsck = nilmdb.scripts.nilmdb_fsck:main',
],
},
ext_modules = ext_modules,

View File

@@ -242,6 +242,19 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception))
in_("start must precede end", str(e.exception))
# Invalid times in HTTP request
with assert_raises(ClientError) as e:
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": "asdf", "end": 0 })
in_("400 Bad Request", str(e.exception))
in_("invalid start", str(e.exception))
with assert_raises(ClientError) as e:
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": 0, "end": "asdf" })
in_("400 Bad Request", str(e.exception))
in_("invalid end", str(e.exception))
# Good content type
with assert_raises(ClientError) as e:
client.http.put("stream/insert", "",

View File

@@ -59,8 +59,7 @@ class TestCmdline(object):
def run(self, arg_string, infile=None, outfile=None):
"""Run a cmdline client with the specified argument string,
passing the given input. Returns a tuple with the output and
exit code"""
passing the given input. Save the output and exit code."""
# printf("TZ=UTC ./nilmtool.py %s\n", arg_string)
os.environ['NILMDB_URL'] = "http://localhost:32180/"
class stdio_wrapper:
@@ -160,6 +159,12 @@ class TestCmdline(object):
self.ok("--help")
self.contain("usage:")
# help
self.ok("--version")
ver = self.captured
self.ok("list --version")
eq_(self.captured, ver)
# fail for no args
self.fail("")
@@ -1011,6 +1016,18 @@ class TestCmdline(object):
self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
# optimize
self.ok("insert -s 01-01-2002 -e 01-01-2004 /diff/1 /dev/null")
self.ok("intervals /diff/1")
self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -"
"> Thu, 01 Jan 2004 00:00:00.000000 +0000 ]\n"
"[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
self.ok("intervals /diff/1 --optimize")
self.ok("intervals /diff/1 -o")
self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
self.ok("destroy -R /diff/1")
self.ok("destroy -R /diff/2")

View File

@@ -234,13 +234,16 @@ class TestInterval:
x = makeset("[--)") & 1234
def do_test(a, b, c, d):
# a & b == c
# a & b == c (using nilmdb.server.interval)
ab = IntervalSet()
for x in b:
for i in (a & x):
ab += i
eq_(ab,c)
# a & b == c (using nilmdb.utils.interval)
eq_(IntervalSet(nilmdb.utils.interval.intersection(a,b)), c)
# a \ b == d
eq_(IntervalSet(nilmdb.utils.interval.set_difference(a,b)), d)
@@ -310,6 +313,17 @@ class TestInterval:
eq_(nilmdb.utils.interval.set_difference(
a.intersection(list(c)[0]), b.intersection(list(c)[0])), d)
# Fill out test coverage for non-subsets
def diff2(a,b, subset):
return nilmdb.utils.interval._interval_math_helper(
a, b, (lambda a, b: b and not a), subset=subset)
with assert_raises(nilmdb.utils.interval.IntervalError):
list(diff2(a,b,True))
list(diff2(a,b,False))
# Empty second set
eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)
# Empty second set
eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)

View File

@@ -62,6 +62,28 @@ class Base(object):
eq_(self.foo.val, 20)
eq_(self.foo.init_thread, self.foo.test_thread)
class ListLike(object):
def __init__(self):
self.thread = threading.current_thread().name
self.foo = 0
def __iter__(self):
eq_(threading.current_thread().name, self.thread)
self.foo = 0
return self
def __getitem__(self, key):
eq_(threading.current_thread().name, self.thread)
return key
def next(self):
eq_(threading.current_thread().name, self.thread)
if self.foo < 5:
self.foo += 1
return self.foo
else:
raise StopIteration
class TestUnserialized(Base):
def setUp(self):
self.foo = Foo()
@@ -84,3 +106,9 @@ class TestSerializer(Base):
sp(sp(Foo("x"))).t()
sp(sp(Foo)("x")).t()
sp(sp(Foo))("x").t()
def test_iter(self):
sp = nilmdb.utils.serializer_proxy
i = sp(ListLike)()
eq_(list(i), [1,2,3,4,5])
eq_(i[3], 3)