Compare commits

...

59 Commits

Author SHA1 Message Date
ba55ad82f0 Use a pure-python version of bisect_left, to fix 32-bit issues
The default bisect module includes a fast C implementation, which
requires that array indices fit within the system "long" type.  For
32-bit systems, that's not acceptable, as the table indices for raw
data can exceed 2^32 very quickly.  A pure python version works fine.
2015-01-20 18:31:58 -05:00
45c81d2019 Fix test that would fail if reordered, or in a different timezone 2015-01-18 17:50:54 -05:00
78cfda32e3 Handle another exception from some versions of dateutil.parser 2015-01-18 17:50:54 -05:00
3658d3876b Rename deprecated config option
The new version works in Cherrypy 3.2
2015-01-18 17:50:54 -05:00
022b50950f Support using a higher initial nrows in bulkdata, for tests
This gives an easy way to get a large values in the database start_pos
and end_pos fields, which is necessary for testing failure modes when
those get too large (e.g. on 32-bit systems).  Adjust tests to make
use of this knob.
2015-01-18 17:49:52 -05:00
e5efbadc8e fsck: row indices are too big for slice.indices, so calculate manually
Normally, indexes for an array are expected to fit in a platform's
native long (32 or 64-bit).  In nilmdb, tables aren't real arrays and
we need to handle unbounded indices.
2015-01-18 16:36:56 -05:00
74f633c9da Distribute was merged back into setuptools, so use setuptools 2015-01-18 16:33:58 -05:00
ab9a327130 Remove upper limit on requests library version 2014-02-18 16:36:34 -05:00
da72fc9777 Explicitly avoid HTTP/1.1 persistent connections (keep-alive)
We do this by creating a new requests.Session object for each request,
sending a "Connection: close" request header, and then explicitly
marking the connection for close after the response is read.

This is to avoid a longstanding race condition with HTTP keepalive
and server timeouts.  Due to data processing, capture, etc, requests
may be separated by an arbitrary delay.  If this delay is shorter
than the server's KeepAliveTimeout, the same connection is used.
If the delay is longer, a new connection is used.  If the delay is
the same, however, the request may be sent on the old connection at
the exact same time that the server closes it.  Typically, the
client sees the connection as closing between the request and the
response, which leads to "httplib.BadStatusLine" errors.

This patch avoids the race condition entirely by not using persistent
connections.

Another solution may be to detect those errors and retry the
connection, resending the request.  However, the race condition could
potentially show up in other places, like a closed connection during
the request body, not after.  Such an error could also be a legitimate
network condition or problem.  This solution should be more reliable,
and the overhead of each new connection will hopefully be minimal for
typical workloads.
2014-02-18 14:36:58 -05:00
a01cb4132d Add test for limited interval removal 2014-02-14 15:53:02 -05:00
7c3da2fe44 Limit the max number of intervals we remove in one stream_remove call 2014-02-14 15:52:53 -05:00
f0e06dc436 Allow newer versions of Requests library 2014-02-14 15:13:34 -05:00
ddc0eb4264 Coalesce calls to table.remove during stream_remove; significant speedup for degenerate cases 2014-02-14 15:13:17 -05:00
0a22db3965 Ignore exceptions during __del__ handlers, which may get called during shutdown 2014-02-14 15:07:30 -05:00
8bb8f068de Catch harmless error seen in apache logs during shutdown 2014-02-04 19:50:46 -05:00
416902097d Fix crash in nilmdb-fsck if there are zero intervals, etc. 2014-02-04 19:38:01 -05:00
f5276e9fc8 Test --no-decim 2013-08-16 15:34:35 -04:00
c47f28f93a Fix cache issue in stream_rename
We saw a bug where renamed streams had missing data at the end.  I
think what happened is:

- Write data to /old/path
- Rename to /new/path
- Write data to /new/path
- Cache entry for /old/path gets evicted, file gets truncated

Instead, make sure we evict /old/path right away when renaming.
2013-08-16 15:30:56 -04:00
63b5f99b90 Fix fsck 2013-08-16 15:06:12 -04:00
7d7b89b52f Add --no-decim option to nilmtool list 2013-08-12 13:04:25 -04:00
8d249273c6 Change -V option to -v everywhere 2013-08-06 21:38:00 -04:00
abe431c663 Add verify_ssl option to HTTPClient 2013-08-06 12:39:32 -04:00
ccf1f695af Prevent negative numbers in dbinfo output.
This might occur if things change while we're calculating the sizes.
2013-08-05 12:25:36 -04:00
06f7390c9e Fix disk usage block size 2013-08-05 12:25:10 -04:00
6de77a08f1 Report actual disk size, not apparent size 2013-08-05 12:16:56 -04:00
8db9771c20 Remove leftover fsck test 2013-08-05 12:16:47 -04:00
04f815a24b Reorder nilmtool commands 2013-08-04 19:51:13 -04:00
6868f5f126 fsck: limit max retries so we don't get stuck in a loop forever 2013-08-03 22:34:30 -04:00
ca0943ec19 fsck: add --no-data option to do a quicker fsck
This makes it fast enough to run at startup with -f, if it's expected
that a system will frequently need to be fixed.
2013-08-03 22:31:45 -04:00
68addb4e4a Clarify output when fsck database is locked 2013-08-03 21:58:24 -04:00
68c33b1f14 fsck: add comma separator on big numbers 2013-08-03 21:50:33 -04:00
8dd8741100 Tweak options, dependencies, documentation 2013-08-03 21:42:49 -04:00
8e6341ae5d Verify that data timestamps are monotonic 2013-08-03 21:32:05 -04:00
422b1e2df2 More fsck improvements. Fixed two problems on sharon so far. 2013-08-03 17:50:46 -04:00
0f745b3047 More fsck tools, including fixes 2013-08-03 16:43:20 -04:00
71cd7ed9b7 Add nilmdb-fsck tool to check database consistency 2013-08-03 14:23:14 -04:00
a79d6104d5 Documentation fixups 2013-08-01 16:24:51 -04:00
8e8ec59e30 Support "nilmtool cmd --version" 2013-08-01 15:14:34 -04:00
b89b945a0f Better responses to invalid HTTP times 2013-07-31 13:37:04 -04:00
bd7bdb2eb8 Add --optimize option to nilmtool intervals 2013-07-30 15:31:51 -04:00
840cd2fd13 Remove stray print 2013-07-30 15:21:09 -04:00
bbd59c8b50 Add nilmdb.utils.interval.intersection by generalizing set_difference 2013-07-30 14:48:19 -04:00
405c110fd7 Doc updates 2013-07-29 15:36:43 -04:00
274adcd856 Documentation updates 2013-07-27 19:51:09 -04:00
a1850c9c2c Misc documentation 2013-07-25 16:08:35 -04:00
6cd28b67b1 Support iterator protocol in Serializer 2013-07-24 14:52:26 -04:00
d6d215d53d Improve boolean HTTP parameter handling 2013-07-15 14:38:28 -04:00
e02143ddb2 Remove duplicated test 2013-07-14 15:30:53 -04:00
e275384d03 Fix WSGI docs again 2013-07-11 16:36:32 -04:00
a6a67ec15c Update WSGI docs 2013-07-10 14:16:25 -04:00
fc43107307 Fill out test coverage 2013-07-09 19:06:26 -04:00
90633413bb Add nilmdb.utils.interval.human_string function 2013-07-09 19:01:53 -04:00
c7c3aff0fb Add nilmdb.utils.interval.optimize function 2013-07-09 17:50:21 -04:00
e2347c954e Split more CherrpyPy stuff into serverutil 2013-07-02 11:44:08 -04:00
222a5c6c53 Move server decorators and other utilities to a separate file
This will help with implementing nilmrun.
2013-07-02 11:32:19 -04:00
1ca2c143e5 Fix typo 2013-06-29 12:39:00 -04:00
b5df575c79 Fix tests 2013-05-09 22:27:10 -04:00
2768a5ad15 Show FQDN rather than hostname. 2013-05-09 13:33:05 -04:00
a105543c38 Show a more helpful message at the root nilmdb path 2013-05-09 13:30:10 -04:00
30 changed files with 1193 additions and 341 deletions

View File

@@ -7,4 +7,4 @@
exclude_lines = exclude_lines =
pragma: no cover pragma: no cover
if 0: if 0:
omit = nilmdb/utils/datetime_tz*,nilmdb/scripts,nilmdb/_version.py omit = nilmdb/utils/datetime_tz*,nilmdb/scripts,nilmdb/_version.py,nilmdb/fsck

View File

@@ -8,7 +8,8 @@ Prerequisites:
# Base NilmDB dependencies # Base NilmDB dependencies
sudo apt-get install python-cherrypy3 python-decorator python-simplejson sudo apt-get install python-cherrypy3 python-decorator python-simplejson
sudo apt-get install python-requests python-dateutil python-tz python-psutil sudo apt-get install python-requests python-dateutil python-tz
sudo apt-get install python-progressbar python-psutil
# Other dependencies (required by some modules) # Other dependencies (required by some modules)
sudo apt-get install python-numpy sudo apt-get install python-numpy
@@ -26,6 +27,7 @@ Install:
Usage: Usage:
nilmdb-server --help nilmdb-server --help
nilmdb-fsck --help
nilmtool --help nilmtool --help
See docs/wsgi.md for info on setting up a WSGI application in Apache. See docs/wsgi.md for info on setting up a WSGI application in Apache.

View File

@@ -19,12 +19,12 @@ Then, set up Apache with a configuration like:
<VirtualHost> <VirtualHost>
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
WSGIApplicationGroup nilmdb-appgroup
WSGIProcessGroup nilmdb-procgroup
WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm
# Access control example:
<Location /nilmdb> <Location /nilmdb>
WSGIProcessGroup nilmdb-procgroup
WSGIApplicationGroup nilmdb-appgroup
# Access control example:
Order deny,allow Order deny,allow
Deny from all Deny from all
Allow from 1.2.3.4 Allow from 1.2.3.4

View File

@@ -58,6 +58,11 @@ class Client(object):
return self.http.get("dbinfo") return self.http.get("dbinfo")
def stream_list(self, path = None, layout = None, extended = False): def stream_list(self, path = None, layout = None, extended = False):
"""Return a sorted list of [path, layout] lists. If 'path' or
'layout' are specified, only return streams that match those
exact values. If 'extended' is True, the returned lists have
extended info, e.g.: [path, layout, extent_min, extent_max,
total_rows, total_seconds."""
params = {} params = {}
if path is not None: if path is not None:
params["path"] = path params["path"] = path
@@ -69,6 +74,7 @@ class Client(object):
return nilmdb.utils.sort.sort_human(streams, key = lambda s: s[0]) return nilmdb.utils.sort.sort_human(streams, key = lambda s: s[0])
def stream_get_metadata(self, path, keys = None): def stream_get_metadata(self, path, keys = None):
"""Get stream metadata"""
params = { "path": path } params = { "path": path }
if keys is not None: if keys is not None:
params["key"] = keys params["key"] = keys

View File

@@ -9,7 +9,7 @@ import requests
class HTTPClient(object): class HTTPClient(object):
"""Class to manage and perform HTTP requests from the client""" """Class to manage and perform HTTP requests from the client"""
def __init__(self, baseurl = "", post_json = False): def __init__(self, baseurl = "", post_json = False, verify_ssl = True):
"""If baseurl is supplied, all other functions that take """If baseurl is supplied, all other functions that take
a URL can be given a relative URL instead.""" a URL can be given a relative URL instead."""
# Verify / clean up URL # Verify / clean up URL
@@ -18,9 +18,8 @@ class HTTPClient(object):
reparsed = urlparse.urlparse("http://" + baseurl).geturl() reparsed = urlparse.urlparse("http://" + baseurl).geturl()
self.baseurl = reparsed.rstrip('/') + '/' self.baseurl = reparsed.rstrip('/') + '/'
# Build Requests session object, enable SSL verification # Note whether we want SSL verification
self.session = requests.Session() self.verify_ssl = verify_ssl
self.session.verify = True
# Saved response, so that tests can verify a few things. # Saved response, so that tests can verify a few things.
self._last_response = {} self._last_response = {}
@@ -58,16 +57,34 @@ class HTTPClient(object):
raise Error(**args) raise Error(**args)
def close(self): def close(self):
self.session.close() pass
def _do_req(self, method, url, query_data, body_data, stream, headers): def _do_req(self, method, url, query_data, body_data, stream, headers):
url = urlparse.urljoin(self.baseurl, url) url = urlparse.urljoin(self.baseurl, url)
try: try:
response = self.session.request(method, url, # Create a new session, ensure we send "Connection: close",
params = query_data, # and explicitly close connection after the transfer.
data = body_data, # This is to avoid HTTP/1.1 persistent connections
stream = stream, # (keepalive), because they have fundamental race
headers = headers) # conditions when there are delays between requests:
# a new request may be sent at the same instant that the
# server decides to timeout the connection.
session = requests.Session()
if headers is None:
headers = {}
headers["Connection"] = "close"
response = session.request(method, url,
params = query_data,
data = body_data,
stream = stream,
headers = headers,
verify = self.verify_ssl)
# Close the connection. If it's a generator (stream =
# True), the requests library shouldn't actually close the
# HTTP connection until all data has been read from the
# response.
session.close()
except requests.RequestException as e: except requests.RequestException as e:
raise ServerError(status = "502 Error", url = url, raise ServerError(status = "502 Error", url = url,
message = str(e.message)) message = str(e.message))

View File

@@ -19,9 +19,8 @@ except ImportError: # pragma: no cover
# Valid subcommands. Defined in separate files just to break # Valid subcommands. Defined in separate files just to break
# things up -- they're still called with Cmdline as self. # things up -- they're still called with Cmdline as self.
subcommands = [ "help", "info", "create", "list", "metadata", subcommands = [ "help", "info", "create", "rename", "list", "intervals",
"insert", "extract", "remove", "destroy", "metadata", "insert", "extract", "remove", "destroy" ]
"intervals", "rename" ]
# Import the subcommand modules # Import the subcommand modules
subcmd_mods = {} subcmd_mods = {}
@@ -29,6 +28,14 @@ for cmd in subcommands:
subcmd_mods[cmd] = __import__("nilmdb.cmdline." + cmd, fromlist = [ cmd ]) subcmd_mods[cmd] = __import__("nilmdb.cmdline." + cmd, fromlist = [ cmd ])
class JimArgumentParser(argparse.ArgumentParser): class JimArgumentParser(argparse.ArgumentParser):
def parse_args(self, args=None, namespace=None):
# Look for --version anywhere and change it to just "nilmtool
# --version". This makes "nilmtool cmd --version" work, which
# is needed by help2man.
if "--version" in (args or sys.argv[1:]):
args = [ "--version" ]
return argparse.ArgumentParser.parse_args(self, args, namespace)
def error(self, message): def error(self, message):
self.print_usage(sys.stderr) self.print_usage(sys.stderr)
self.exit(2, sprintf("error: %s\n", message)) self.exit(2, sprintf("error: %s\n", message))
@@ -114,7 +121,7 @@ class Cmdline(object):
group = self.parser.add_argument_group("General options") group = self.parser.add_argument_group("General options")
group.add_argument("-h", "--help", action='help', group.add_argument("-h", "--help", action='help',
help='show this help message and exit') help='show this help message and exit')
group.add_argument("-V", "--version", action="version", group.add_argument("-v", "--version", action="version",
version = nilmdb.__version__) version = nilmdb.__version__)
group = self.parser.add_argument_group("Server") group = self.parser.add_argument_group("Server")

View File

@@ -1,5 +1,6 @@
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
import nilmdb.utils.time import nilmdb.utils.time
from nilmdb.utils.interval import Interval
import fnmatch import fnmatch
import argparse import argparse
@@ -42,6 +43,8 @@ def setup(self, sub):
group = cmd.add_argument_group("Misc options") group = cmd.add_argument_group("Misc options")
group.add_argument("-T", "--timestamp-raw", action="store_true", group.add_argument("-T", "--timestamp-raw", action="store_true",
help="Show raw timestamps when printing times") help="Show raw timestamps when printing times")
group.add_argument("-o", "--optimize", action="store_true",
help="Optimize (merge adjacent) intervals")
return cmd return cmd
@@ -58,9 +61,16 @@ def cmd_intervals(self):
time_string = nilmdb.utils.time.timestamp_to_human time_string = nilmdb.utils.time.timestamp_to_human
try: try:
for (start, end) in self.client.stream_intervals( intervals = ( Interval(start, end) for (start, end) in
self.args.path, self.args.start, self.args.end, self.args.diff): self.client.stream_intervals(self.args.path,
printf("[ %s -> %s ]\n", time_string(start), time_string(end)) self.args.start,
self.args.end,
self.args.diff) )
if self.args.optimize:
intervals = nilmdb.utils.interval.optimize(intervals)
for i in intervals:
printf("[ %s -> %s ]\n", time_string(i.start), time_string(i.end))
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("error listing intervals: %s", str(e)) self.die("error listing intervals: %s", str(e))

View File

@@ -45,6 +45,8 @@ def setup(self, sub):
help="Show raw timestamps when printing times") help="Show raw timestamps when printing times")
group.add_argument("-l", "--layout", action="store_true", group.add_argument("-l", "--layout", action="store_true",
help="Show layout type next to path name") help="Show layout type next to path name")
group.add_argument("-n", "--no-decim", action="store_true",
help="Skip paths containing \"~decim-\"")
return cmd return cmd
@@ -71,6 +73,8 @@ def cmd_list(self):
(path, layout, int_min, int_max, rows, time) = stream[:6] (path, layout, int_min, int_max, rows, time) = stream[:6]
if not fnmatch.fnmatch(path, argpath): if not fnmatch.fnmatch(path, argpath):
continue continue
if self.args.no_decim and "~decim-" in path:
continue
if self.args.layout: if self.args.layout:
printf("%s %s\n", path, layout) printf("%s %s\n", path, layout)

5
nilmdb/fsck/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""nilmdb.fsck"""
from __future__ import absolute_import
from nilmdb.fsck.fsck import Fsck

464
nilmdb/fsck/fsck.py Normal file
View File

@@ -0,0 +1,464 @@
# -*- coding: utf-8 -*-
"""Check database consistency, with some ability to fix problems.
This should be able to fix cases where a database gets corrupted due
to unexpected system shutdown, and detect other cases that may cause
NilmDB to return errors when trying to manipulate the database."""
import nilmdb.utils
import nilmdb.server
import nilmdb.client.numpyclient
from nilmdb.utils.interval import IntervalError
from nilmdb.server.interval import Interval, IntervalSet
from nilmdb.utils.printf import *
from nilmdb.utils.time import timestamp_to_string
from collections import defaultdict
import sqlite3
import os
import sys
import progressbar
import re
import time
import shutil
import cPickle as pickle
import numpy
class FsckError(Exception):
def __init__(self, msg = "", *args):
if args:
msg = sprintf(msg, *args)
Exception.__init__(self, msg)
class FixableFsckError(FsckError):
def __init__(self, msg = "", *args):
if args:
msg = sprintf(msg, *args)
FsckError.__init__(self, "%s\nThis may be fixable with \"--fix\".", msg)
class RetryFsck(FsckError):
pass
def log(format, *args):
printf(format, *args)
def err(format, *args):
fprintf(sys.stderr, format, *args)
# Decorator that retries a function if it returns a specific value
def retry_if_raised(exc, message = None, max_retries = 100):
def f1(func):
def f2(*args, **kwargs):
for n in range(max_retries):
try:
return func(*args, **kwargs)
except exc as e:
if message:
log("%s\n\n", message)
raise Exception("Max number of retries (%d) exceeded; giving up")
return f2
return f1
class Progress(object):
def __init__(self, maxval):
if maxval == 0:
maxval = 1
self.bar = progressbar.ProgressBar(
maxval = maxval,
widgets = [ progressbar.Percentage(), ' ',
progressbar.Bar(), ' ',
progressbar.ETA() ])
if self.bar.term_width == 0:
self.bar.term_width = 75
def __enter__(self):
self.bar.start()
self.last_update = 0
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
self.bar.finish()
else:
printf("\n")
def update(self, val):
self.bar.update(val)
class Fsck(object):
def __init__(self, path, fix = False):
self.basepath = path
self.sqlpath = os.path.join(path, "data.sql")
self.bulkpath = os.path.join(path, "data")
self.bulklock = os.path.join(path, "data.lock")
self.fix = fix
### Main checks
@retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
def check(self, skip_data = False):
self.bulk = None
self.sql = None
try:
self.check_paths()
self.check_sql()
self.check_streams()
self.check_intervals()
if skip_data:
log("skipped data check\n")
else:
self.check_data()
finally:
if self.bulk:
self.bulk.close()
if self.sql:
self.sql.commit()
self.sql.close()
log("ok\n")
### Check basic path structure
def check_paths(self):
log("checking paths\n")
if self.bulk:
self.bulk.close()
if not os.path.isfile(self.sqlpath):
raise FsckError("SQL database missing (%s)", self.sqlpath)
if not os.path.isdir(self.bulkpath):
raise FsckError("Bulk data directory missing (%s)", self.bulkpath)
with open(self.bulklock, "w") as lockfile:
if not nilmdb.utils.lock.exclusive_lock(lockfile):
raise FsckError('Database already locked by another process\n'
'Make sure all other processes that might be '
'using the database are stopped.\n'
'Restarting apache will cause it to unlock '
'the db until a request is received.')
# unlocked immediately
self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)
### Check SQL database health
def check_sql(self):
log("checking sqlite database\n")
self.sql = sqlite3.connect(self.sqlpath)
with self.sql:
cur = self.sql.cursor()
ver = cur.execute("PRAGMA user_version").fetchone()[0]
good = max(nilmdb.server.nilmdb._sql_schema_updates.keys())
if ver != good:
raise FsckError("database version %d too old, should be %d",
ver, good)
self.stream_path = {}
self.stream_layout = {}
log(" loading paths\n")
result = cur.execute("SELECT id, path, layout FROM streams")
for r in result:
if r[0] in self.stream_path:
raise FsckError("duplicated ID %d in stream IDs", r[0])
self.stream_path[r[0]] = r[1]
self.stream_layout[r[0]] = r[2]
log(" loading intervals\n")
self.stream_interval = defaultdict(list)
result = cur.execute("SELECT stream_id, start_time, end_time, "
"start_pos, end_pos FROM ranges "
"ORDER BY start_time")
for r in result:
if r[0] not in self.stream_path:
raise FsckError("interval ID %d not in streams", k)
self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4]))
log(" loading metadata\n")
self.stream_meta = defaultdict(dict)
result = cur.execute("SELECT stream_id, key, value FROM metadata")
for r in result:
if r[0] not in self.stream_path:
raise FsckError("metadata ID %d not in streams", k)
if r[1] in self.stream_meta[r[0]]:
raise FsckError("duplicate metadata key '%s' for stream %d",
r[1], r[0])
self.stream_meta[r[0]][r[1]] = r[2]
### Check streams and basic interval overlap
def check_streams(self):
ids = self.stream_path.keys()
log("checking %s streams\n", "{:,d}".format(len(ids)))
with Progress(len(ids)) as pbar:
for i, sid in enumerate(ids):
pbar.update(i)
path = self.stream_path[sid]
# unique path, valid layout
if self.stream_path.values().count(path) != 1:
raise FsckError("duplicated path %s", path)
layout = self.stream_layout[sid].split('_')[0]
if layout not in ('int8', 'int16', 'int32', 'int64',
'uint8', 'uint16', 'uint32', 'uint64',
'float32', 'float64'):
raise FsckError("bad layout %s for %s", layout, path)
count = int(self.stream_layout[sid].split('_')[1])
if count < 1 or count > 1024:
raise FsckError("bad count %d for %s", count, path)
# must exist in bulkdata
bulk = self.bulkpath + path
if not os.path.isdir(bulk):
raise FsckError("%s: missing bulkdata dir", path)
if not nilmdb.server.bulkdata.Table.exists(bulk):
raise FsckError("%s: bad bulkdata table", path)
# intervals don't overlap. Abuse IntervalSet to check
# for intervals in file positions, too.
timeiset = IntervalSet()
posiset = IntervalSet()
for (stime, etime, spos, epos) in self.stream_interval[sid]:
new = Interval(stime, etime)
try:
timeiset += new
except IntervalError:
raise FsckError("%s: overlap in intervals:\n"
"set: %s\nnew: %s",
path, str(timeiset), str(new))
if spos != epos:
new = Interval(spos, epos)
try:
posiset += new
except IntervalError:
raise FsckError("%s: overlap in file offsets:\n"
"set: %s\nnew: %s",
path, str(posiset), str(new))
# check bulkdata
self.check_bulkdata(sid, path, bulk)
# Check that we can open bulkdata
try:
tab = None
try:
tab = nilmdb.server.bulkdata.Table(bulk)
except Exception as e:
raise FsckError("%s: can't open bulkdata: %s",
path, str(e))
finally:
if tab:
tab.close()
### Check that bulkdata is good enough to be opened
@retry_if_raised(RetryFsck)
def check_bulkdata(self, sid, path, bulk):
with open(os.path.join(bulk, "_format"), "rb") as f:
fmt = pickle.load(f)
if fmt["version"] != 3:
raise FsckError("%s: bad or unsupported bulkdata version %d",
path, fmt["version"])
row_per_file = int(fmt["rows_per_file"])
files_per_dir = int(fmt["files_per_dir"])
layout = fmt["layout"]
if layout != self.stream_layout[sid]:
raise FsckError("%s: layout mismatch %s != %s", path,
layout, self.stream_layout[sid])
# Every file should have a size that's the multiple of the row size
rkt = nilmdb.server.rocket.Rocket(layout, None)
row_size = rkt.binary_size
rkt.close()
# Find all directories
regex = re.compile("^[0-9a-f]{4,}$")
subdirs = sorted(filter(regex.search, os.listdir(bulk)),
key = lambda x: int(x, 16), reverse = True)
for subdir in subdirs:
# Find all files in that dir
subpath = os.path.join(bulk, subdir)
files = filter(regex.search, os.listdir(subpath))
if not files:
self.fix_empty_subdir(subpath)
raise RetryFsck
# Verify that their size is a multiple of the row size
for filename in files:
filepath = os.path.join(subpath, filename)
offset = os.path.getsize(filepath)
if offset % row_size:
self.fix_bad_filesize(path, filepath, offset, row_size)
def fix_empty_subdir(self, subpath):
msg = sprintf("bulkdata path %s is missing data files", subpath)
if not self.fix:
raise FixableFsckError(msg)
# Try to fix it by just deleting whatever is present,
# as long as it's only ".removed" files.
err("\n%s\n", msg)
for fn in os.listdir(subpath):
if not fn.endswith(".removed"):
raise FsckError("can't fix automatically: please manually "
"remove the file %s and try again",
os.path.join(subpath, fn))
# Remove the whole thing
err("Removing empty subpath\n")
shutil.rmtree(subpath)
raise RetryFsck
def fix_bad_filesize(self, path, filepath, offset, row_size):
extra = offset % row_size
msg = sprintf("%s: size of file %s (%d) is not a multiple" +
" of row size (%d): %d extra bytes present",
path, filepath, offset, row_size, extra)
if not self.fix:
raise FixableFsckError(msg)
# Try to fix it by just truncating the file
err("\n%s\n", msg)
newsize = offset - extra
err("Truncating file to %d bytes and retrying\n", newsize)
with open(filepath, "r+b") as f:
f.truncate(newsize)
raise RetryFsck
### Check interval endpoints
def check_intervals(self):
total_ints = sum(len(x) for x in self.stream_interval.values())
log("checking %s intervals\n", "{:,d}".format(total_ints))
done = 0
with Progress(total_ints) as pbar:
for sid in self.stream_interval:
try:
bulk = self.bulkpath + self.stream_path[sid]
tab = nilmdb.server.bulkdata.Table(bulk)
def update(x):
pbar.update(done + x)
ints = self.stream_interval[sid]
done += self.check_table_intervals(sid, ints, tab, update)
finally:
tab.close()
def check_table_intervals(self, sid, ints, tab, update):
# look in the table to make sure we can pick out the interval's
# endpoints
path = self.stream_path[sid]
tab.file_open.cache_remove_all()
for (i, intv) in enumerate(ints):
update(i)
(stime, etime, spos, epos) = intv
if spos == epos and spos >= 0 and spos <= tab.nrows:
continue
try:
srow = tab[spos]
erow = tab[epos-1]
except Exception as e:
self.fix_bad_interval(sid, intv, tab, str(e))
raise RetryFsck
return len(ints)
def fix_bad_interval(self, sid, intv, tab, msg):
path = self.stream_path[sid]
msg = sprintf("%s: interval %s error accessing rows: %s",
path, str(intv), str(msg))
if not self.fix:
raise FixableFsckError(msg)
err("\n%s\n", msg)
(stime, etime, spos, epos) = intv
# If it's just that the end pos is more than the number of rows
# in the table, lower end pos and truncate interval time too.
if spos < tab.nrows and epos >= tab.nrows:
err("end position is past endrows, but it can be truncated\n")
err("old end: time %d, pos %d\n", etime, epos)
new_epos = tab.nrows
new_etime = tab[new_epos-1] + 1
err("new end: time %d, pos %d\n", new_etime, new_epos)
if stime < new_etime:
# Change it in SQL
with self.sql:
cur = self.sql.cursor()
cur.execute("UPDATE ranges SET end_time=?, end_pos=? "
"WHERE stream_id=? AND start_time=? AND "
"end_time=? AND start_pos=? AND end_pos=?",
(new_etime, new_epos, sid, stime, etime,
spos, epos))
if cur.rowcount != 1:
raise FsckError("failed to fix SQL database")
raise RetryFsck
err("actually it can't be truncated; times are bad too")
# Otherwise, the only hope is to delete the interval entirely.
err("*** Deleting the entire interval from SQL.\n")
err("This may leave stale data on disk. To fix that, copy all\n")
err("data from this stream to a new stream, then remove all data\n")
err("from and destroy %s.\n", path)
with self.sql:
cur = self.sql.cursor()
cur.execute("DELETE FROM ranges WHERE "
"stream_id=? AND start_time=? AND "
"end_time=? AND start_pos=? AND end_pos=?",
(sid, stime, etime, spos, epos))
if cur.rowcount != 1:
raise FsckError("failed to remove interval")
raise RetryFsck
### Check data in each interval
def check_data(self):
total_rows = sum(sum((y[3] - y[2]) for y in x)
for x in self.stream_interval.values())
log("checking %s rows of data\n", "{:,d}".format(total_rows))
done = 0
with Progress(total_rows) as pbar:
for sid in self.stream_interval:
try:
bulk = self.bulkpath + self.stream_path[sid]
tab = nilmdb.server.bulkdata.Table(bulk)
def update(x):
pbar.update(done + x)
ints = self.stream_interval[sid]
done += self.check_table_data(sid, ints, tab, update)
finally:
tab.close()
def check_table_data(self, sid, ints, tab, update):
# Pull out all of the interval's data and verify that it's
# monotonic.
maxrows = 100000
path = self.stream_path[sid]
layout = self.stream_layout[sid]
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
tab.file_open.cache_remove_all()
done = 0
for intv in ints:
last_ts = None
(stime, etime, spos, epos) = intv
# Break interval into maxrows-sized chunks
next_start = spos
while next_start < epos:
start = next_start
stop = min(start + maxrows, epos)
count = stop - start
next_start = stop
# Get raw data, convert to NumPy arary
try:
raw = tab.get_data(start, stop, binary = True)
data = numpy.fromstring(raw, dtype)
except Exception as e:
raise FsckError("%s: failed to grab rows %d through %d: %s",
path, start, stop, repr(e))
# Verify that timestamps are monotonic
if (numpy.diff(data['timestamp']) <= 0).any():
raise FsckError("%s: non-monotonic timestamp(s) in rows "
"%d through %d", path, start, stop)
first_ts = data['timestamp'][0]
if last_ts is not None and first_ts <= last_ts:
raise FsckError("%s: first interval timestamp %d is not "
"greater than the previous last interval "
"timestamp %d, at row %d",
path, first_ts, last_ts, start)
last_ts = data['timestamp'][-1]
# These are probably fixable, by removing the offending
# intervals. But I'm not going to bother implementing
# that yet.
# Done
done += count
update(done)
return done

26
nilmdb/scripts/nilmdb_fsck.py Executable file
View File

@@ -0,0 +1,26 @@
#!/usr/bin/python
import nilmdb.fsck
import argparse
import os
import sys
def main():
"""Main entry point for the 'nilmdb-fsck' command line script"""
parser = argparse.ArgumentParser(
description = 'Check database consistency',
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
version = nilmdb.__version__)
parser.add_argument("-f", "--fix", action="store_true",
default=False, help = 'Fix errors when possible '
'(which may involve removing data)')
parser.add_argument("-n", "--no-data", action="store_true",
default=False, help = 'Skip the slow full-data check')
parser.add_argument('database', help = 'Database directory')
args = parser.parse_args()
nilmdb.fsck.Fsck(args.database, args.fix).check(skip_data = args.no_data)
if __name__ == "__main__":
main()

View File

@@ -10,10 +10,8 @@ def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description = 'Run the NilmDB server', description = 'Run the NilmDB server',
formatter_class = argparse.ArgumentDefaultsHelpFormatter) formatter_class = argparse.ArgumentDefaultsHelpFormatter,
version = nilmdb.__version__)
parser.add_argument("-V", "--version", action="version",
version = nilmdb.__version__)
group = parser.add_argument_group("Standard options") group = parser.add_argument_group("Standard options")
group.add_argument('-a', '--address', group.add_argument('-a', '--address',

View File

@@ -43,6 +43,12 @@ class BulkData(object):
# 32768 files per dir should work even on FAT32 # 32768 files per dir should work even on FAT32
self.files_per_dir = 32768 self.files_per_dir = 32768
if "initial_nrows" in kwargs:
self.initial_nrows = kwargs["initial_nrows"]
else:
# First row is 0
self.initial_nrows = 0
# Make root path # Make root path
if not os.path.isdir(self.root): if not os.path.isdir(self.root):
os.mkdir(self.root) os.mkdir(self.root)
@@ -194,6 +200,9 @@ class BulkData(object):
if oldospath == newospath: if oldospath == newospath:
raise ValueError("old and new paths are the same") raise ValueError("old and new paths are the same")
# Remove Table object at old path from cache
self.getnode.cache_remove(self, oldunicodepath)
# Move the table to a temporary location # Move the table to a temporary location
tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root) tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root)
tmppath = os.path.join(tmpdir, "table") tmppath = os.path.join(tmpdir, "table")
@@ -251,7 +260,7 @@ class BulkData(object):
path = self._encode_filename(unicodepath) path = self._encode_filename(unicodepath)
elements = path.lstrip('/').split('/') elements = path.lstrip('/').split('/')
ospath = os.path.join(self.root, *elements) ospath = os.path.join(self.root, *elements)
return Table(ospath) return Table(ospath, self.initial_nrows)
@nilmdb.utils.must_close(wrap_verify = False) @nilmdb.utils.must_close(wrap_verify = False)
class Table(object): class Table(object):
@@ -288,9 +297,10 @@ class Table(object):
pickle.dump(fmt, f, 2) pickle.dump(fmt, f, 2)
# Normal methods # Normal methods
def __init__(self, root): def __init__(self, root, initial_nrows):
"""'root' is the full OS path to the directory of this table""" """'root' is the full OS path to the directory of this table"""
self.root = root self.root = root
self.initial_nrows = initial_nrows
# Load the format # Load the format
with open(os.path.join(self.root, "_format"), "rb") as f: with open(os.path.join(self.root, "_format"), "rb") as f:
@@ -330,7 +340,8 @@ class Table(object):
# Find the last directory. We sort and loop through all of them, # Find the last directory. We sort and loop through all of them,
# starting with the numerically greatest, because the dirs could be # starting with the numerically greatest, because the dirs could be
# empty if something was deleted. # empty if something was deleted but the directory was unexpectedly
# not deleted.
subdirs = sorted(filter(regex.search, os.listdir(self.root)), subdirs = sorted(filter(regex.search, os.listdir(self.root)),
key = lambda x: int(x, 16), reverse = True) key = lambda x: int(x, 16), reverse = True)
@@ -349,8 +360,14 @@ class Table(object):
# Convert to row number # Convert to row number
return self._row_from_offset(subdir, filename, offset) return self._row_from_offset(subdir, filename, offset)
# No files, so no data # No files, so no data. We typically start at row 0 in this
return 0 # case, although initial_nrows is specified during some tests
# to exercise other parts of the code better. Since we have
# no files yet, round initial_nrows up so it points to a row
# that would begin a new file.
nrows = ((self.initial_nrows + (self.rows_per_file - 1)) //
self.rows_per_file) * self.rows_per_file
return nrows
def _offset_from_row(self, row): def _offset_from_row(self, row):
"""Return a (subdir, filename, offset, count) tuple: """Return a (subdir, filename, offset, count) tuple:

View File

@@ -23,7 +23,6 @@ from nilmdb.server.errors import NilmDBError, StreamError, OverlapError
import sqlite3 import sqlite3
import os import os
import errno import errno
import bisect
# Note about performance and transactions: # Note about performance and transactions:
# #
@@ -83,8 +82,11 @@ _sql_schema_updates = {
class NilmDB(object): class NilmDB(object):
verbose = 0 verbose = 0
def __init__(self, basepath, max_results=None, def __init__(self, basepath,
max_removals=None, bulkdata_args=None): max_results=None,
max_removals=None,
max_int_removals=None,
bulkdata_args=None):
"""Initialize NilmDB at the given basepath. """Initialize NilmDB at the given basepath.
Other arguments are for debugging / testing: Other arguments are for debugging / testing:
@@ -92,7 +94,10 @@ class NilmDB(object):
stream_intervals or stream_extract response. stream_intervals or stream_extract response.
'max_removals' is the max rows to delete at once 'max_removals' is the max rows to delete at once
in stream_move. in stream_remove.
'max_int_removals' is the max intervals to delete
at once in stream_remove.
'bulkdata_args' is kwargs for the bulkdata module. 'bulkdata_args' is kwargs for the bulkdata module.
""" """
@@ -134,6 +139,9 @@ class NilmDB(object):
# Remove up to this many rows per call to stream_remove. # Remove up to this many rows per call to stream_remove.
self.max_removals = max_removals or 1048576 self.max_removals = max_removals or 1048576
# Remove up to this many intervals per call to stream_remove.
self.max_int_removals = max_int_removals or 4096
def get_basepath(self): def get_basepath(self):
return self.basepath return self.basepath
@@ -507,6 +515,17 @@ class NilmDB(object):
# And that's all # And that's all
return return
def _bisect_left(self, a, x, lo, hi):
# Like bisect.bisect_left, but doesn't choke on large indices on
# 32-bit systems, like bisect's fast C implementation does.
while lo < hi:
mid = (lo + hi) / 2
if a[mid] < x:
lo = mid + 1
else:
hi = mid
return lo
def _find_start(self, table, dbinterval): def _find_start(self, table, dbinterval):
""" """
Given a DBInterval, find the row in the database that Given a DBInterval, find the row in the database that
@@ -517,10 +536,10 @@ class NilmDB(object):
# Optimization for the common case where an interval wasn't truncated # Optimization for the common case where an interval wasn't truncated
if dbinterval.start == dbinterval.db_start: if dbinterval.start == dbinterval.db_start:
return dbinterval.db_startpos return dbinterval.db_startpos
return bisect.bisect_left(table, return self._bisect_left(table,
dbinterval.start, dbinterval.start,
dbinterval.db_startpos, dbinterval.db_startpos,
dbinterval.db_endpos) dbinterval.db_endpos)
def _find_end(self, table, dbinterval): def _find_end(self, table, dbinterval):
""" """
@@ -536,10 +555,10 @@ class NilmDB(object):
# want to include the given timestamp in the results. This is # want to include the given timestamp in the results. This is
# so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return # so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return
# non-overlapping data. # non-overlapping data.
return bisect.bisect_left(table, return self._bisect_left(table,
dbinterval.end, dbinterval.end,
dbinterval.db_startpos, dbinterval.db_startpos,
dbinterval.db_endpos) dbinterval.db_endpos)
def stream_extract(self, path, start = None, end = None, def stream_extract(self, path, start = None, end = None,
count = False, markup = False, binary = False): count = False, markup = False, binary = False):
@@ -643,13 +662,22 @@ class NilmDB(object):
to_remove = Interval(start, end) to_remove = Interval(start, end)
removed = 0 removed = 0
remaining = self.max_removals remaining = self.max_removals
int_remaining = self.max_int_removals
restart = None restart = None
# Can't remove intervals from within the iterator, so we need to # Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now. # remember what's currently in the intersection now.
all_candidates = list(intervals.intersection(to_remove, orig = True)) all_candidates = list(intervals.intersection(to_remove, orig = True))
remove_start = None
remove_end = None
for (dbint, orig) in all_candidates: for (dbint, orig) in all_candidates:
# Stop if we've hit the max number of interval removals
if int_remaining <= 0:
restart = dbint.start
break
# Find row start and end # Find row start and end
row_start = self._find_start(table, dbint) row_start = self._find_start(table, dbint)
row_end = self._find_end(table, dbint) row_end = self._find_end(table, dbint)
@@ -670,14 +698,29 @@ class NilmDB(object):
# Remove interval from the database # Remove interval from the database
self._remove_interval(stream_id, orig, dbint) self._remove_interval(stream_id, orig, dbint)
# Remove data from the underlying table storage # Remove data from the underlying table storage,
table.remove(row_start, row_end) # coalescing adjacent removals to reduce the number of calls
# to table.remove.
if remove_end == row_start:
# Extend our coalesced region
remove_end = row_end
else:
# Perform previous removal, then save this one
if remove_end is not None:
table.remove(remove_start, remove_end)
remove_start = row_start
remove_end = row_end
# Count how many were removed # Count how many were removed
removed += row_end - row_start removed += row_end - row_start
remaining -= row_end - row_start remaining -= row_end - row_start
int_remaining -= 1
if restart is not None: if restart is not None:
break break
# Perform any final coalesced removal
if remove_end is not None:
table.remove(remove_start, remove_end)
return (removed, restart) return (removed, restart)

View File

@@ -17,126 +17,26 @@ import decorator
import psutil import psutil
import traceback import traceback
from nilmdb.server.serverutil import (
chunked_response,
response_type,
workaround_cp_bug_1200,
exception_to_httperror,
CORS_allow,
json_to_request_params,
json_error_page,
cherrypy_start,
cherrypy_stop,
bool_param,
)
# Add CORS_allow tool
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
class NilmApp(object): class NilmApp(object):
def __init__(self, db): def __init__(self, db):
self.db = db self.db = db
# Decorators
def chunked_response(func):
"""Decorator to enable chunked responses."""
# Set this to False to get better tracebacks from some requests
# (/stream/extract, /stream/intervals).
func._cp_config = { 'response.stream': True }
return func
def response_type(content_type):
"""Return a decorator-generating function that sets the
response type to the specified string."""
def wrapper(func, *args, **kwargs):
cherrypy.response.headers['Content-Type'] = content_type
return func(*args, **kwargs)
return decorator.decorator(wrapper)
@decorator.decorator
def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
"""Decorator to work around CherryPy bug #1200 in a response
generator.
Even if chunked responses are disabled, LookupError or
UnicodeError exceptions may still be swallowed by CherryPy due to
bug #1200. This throws them as generic Exceptions instead so that
they make it through.
"""
exc_info = None
try:
for val in func(*args, **kwargs):
yield val
except (LookupError, UnicodeError):
# Re-raise it, but maintain the original traceback
exc_info = sys.exc_info()
new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1]))
raise new_exc, None, exc_info[2]
finally:
del exc_info
def exception_to_httperror(*expected):
"""Return a decorator-generating function that catches expected
errors and throws a HTTPError describing it instead.
@exception_to_httperror(NilmDBError, ValueError)
def foo():
pass
"""
def wrapper(func, *args, **kwargs):
exc_info = None
try:
return func(*args, **kwargs)
except expected:
# Re-raise it, but maintain the original traceback
exc_info = sys.exc_info()
new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1]))
raise new_exc, None, exc_info[2]
finally:
del exc_info
# We need to preserve the function's argspecs for CherryPy to
# handle argument errors correctly. Decorator.decorator takes
# care of that.
return decorator.decorator(wrapper)
# Custom CherryPy tools
def CORS_allow(methods):
"""This does several things:
Handles CORS preflight requests.
Adds Allow: header to all requests.
Raise 405 if request.method not in method.
It is similar to cherrypy.tools.allow, with the CORS stuff added.
"""
request = cherrypy.request.headers
response = cherrypy.response.headers
if not isinstance(methods, (tuple, list)): # pragma: no cover
methods = [ methods ]
methods = [ m.upper() for m in methods if m ]
if not methods: # pragma: no cover
methods = [ 'GET', 'HEAD' ]
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
methods.append('HEAD')
response['Allow'] = ', '.join(methods)
# Allow all origins
if 'Origin' in request:
response['Access-Control-Allow-Origin'] = request['Origin']
# If it's a CORS request, send response.
request_method = request.get("Access-Control-Request-Method", None)
request_headers = request.get("Access-Control-Request-Headers", None)
if (cherrypy.request.method == "OPTIONS" and
request_method and request_headers):
response['Access-Control-Allow-Headers'] = request_headers
response['Access-Control-Allow-Methods'] = ', '.join(methods)
# Try to stop further processing and return a 200 OK
cherrypy.response.status = "200 OK"
cherrypy.response.body = ""
cherrypy.request.handler = lambda: ""
return
# Reject methods that were not explicitly allowed
if cherrypy.request.method not in methods:
raise cherrypy.HTTPError(405)
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
# Helper for json_in tool to process JSON data into normal request
# parameters.
def json_to_request_params(body):
cherrypy.lib.jsontools.json_processor(body)
if not isinstance(cherrypy.request.json, dict):
raise cherrypy.HTTPError(415)
cherrypy.request.params.update(cherrypy.request.json)
# CherryPy apps # CherryPy apps
class Root(NilmApp): class Root(NilmApp):
"""Root application for NILM database""" """Root application for NILM database"""
@@ -147,7 +47,10 @@ class Root(NilmApp):
# / # /
@cherrypy.expose @cherrypy.expose
def index(self): def index(self):
raise cherrypy.NotFound() cherrypy.response.headers['Content-Type'] = 'text/plain'
msg = sprintf("This is NilmDB version %s, running on host %s.\n",
nilmdb.__version__, socket.getfqdn())
return msg
# /favicon.ico # /favicon.ico
@cherrypy.expose @cherrypy.expose
@@ -171,8 +74,8 @@ class Root(NilmApp):
dbsize = nilmdb.utils.du(path) dbsize = nilmdb.utils.du(path)
return { "path": path, return { "path": path,
"size": dbsize, "size": dbsize,
"other": usage.used - dbsize, "other": max(usage.used - dbsize, 0),
"reserved": usage.total - usage.used - usage.free, "reserved": max(usage.total - usage.used - usage.free, 0),
"free": usage.free } "free": usage.free }
class Stream(NilmApp): class Stream(NilmApp):
@@ -181,10 +84,18 @@ class Stream(NilmApp):
# Helpers # Helpers
def _get_times(self, start_param, end_param): def _get_times(self, start_param, end_param):
(start, end) = (None, None) (start, end) = (None, None)
if start_param is not None: try:
start = string_to_timestamp(start_param) if start_param is not None:
if end_param is not None: start = string_to_timestamp(start_param)
end = string_to_timestamp(end_param) except Exception:
raise cherrypy.HTTPError("400 Bad Request", sprintf(
"invalid start (%s): must be a numeric timestamp", start_param))
try:
if end_param is not None:
end = string_to_timestamp(end_param)
except Exception:
raise cherrypy.HTTPError("400 Bad Request", sprintf(
"invalid end (%s): must be a numeric timestamp", end_param))
if start is not None and end is not None: if start is not None and end is not None:
if start >= end: if start >= end:
raise cherrypy.HTTPError( raise cherrypy.HTTPError(
@@ -203,10 +114,10 @@ class Stream(NilmApp):
layout parameter, just list streams that match the given path layout parameter, just list streams that match the given path
or layout. or layout.
If extent is not given, returns a list of lists containing If extended is missing or zero, returns a list of lists
the path and layout: [ path, layout ] containing the path and layout: [ path, layout ]
If extended is provided, returns a list of lists containing If extended is true, returns a list of lists containing
extended info: [ path, layout, extent_min, extent_max, extended info: [ path, layout, extent_min, extent_max,
total_rows, total_seconds ]. More data may be added. total_rows, total_seconds ]. More data may be added.
""" """
@@ -319,6 +230,8 @@ class Stream(NilmApp):
little-endian and matches the database types (including an little-endian and matches the database types (including an
int64 timestamp). int64 timestamp).
""" """
binary = bool_param(binary)
# Important that we always read the input before throwing any # Important that we always read the input before throwing any
# errors, to keep lengths happy for persistent connections. # errors, to keep lengths happy for persistent connections.
# Note that CherryPy 3.2.2 has a bug where this fails for GET # Note that CherryPy 3.2.2 has a bug where this fails for GET
@@ -443,6 +356,10 @@ class Stream(NilmApp):
little-endian and matches the database types (including an little-endian and matches the database types (including an
int64 timestamp). int64 timestamp).
""" """
binary = bool_param(binary)
markup = bool_param(markup)
count = bool_param(count)
(start, end) = self._get_times(start, end) (start, end) = self._get_times(start, end)
# Check path and get layout # Check path and get layout
@@ -512,7 +429,7 @@ class Server(object):
cherrypy.config.update({ cherrypy.config.update({
'server.socket_host': host, 'server.socket_host': host,
'server.socket_port': port, 'server.socket_port': port,
'engine.autoreload_on': False, 'engine.autoreload.on': False,
'server.max_request_body_size': 8*1024*1024, 'server.max_request_body_size': 8*1024*1024,
}) })
if self.embedded: if self.embedded:
@@ -570,70 +487,14 @@ class Server(object):
def json_error_page(self, status, message, traceback, version): def json_error_page(self, status, message, traceback, version):
"""Return a custom error page in JSON so the client can parse it""" """Return a custom error page in JSON so the client can parse it"""
errordata = { "status" : status, return json_error_page(status, message, traceback, version,
"message" : message, self.force_traceback)
"traceback" : traceback }
# Don't send a traceback if the error was 400-499 (client's fault)
try:
code = int(status.split()[0])
if not self.force_traceback:
if code >= 400 and code <= 499:
errordata["traceback"] = ""
except Exception: # pragma: no cover
pass
# Override the response type, which was previously set to text/html
cherrypy.serving.response.headers['Content-Type'] = (
"application/json;charset=utf-8" )
# Undo the HTML escaping that cherrypy's get_error_page function applies
# (cherrypy issue 1135)
for k, v in errordata.iteritems():
v = v.replace("&lt;","<")
v = v.replace("&gt;",">")
v = v.replace("&amp;","&")
errordata[k] = v
return json.dumps(errordata, separators=(',',':'))
def start(self, blocking = False, event = None): def start(self, blocking = False, event = None):
cherrypy_start(blocking, event, self.embedded)
if not self.embedded: # pragma: no cover
# Handle signals nicely
if hasattr(cherrypy.engine, "signal_handler"):
cherrypy.engine.signal_handler.subscribe()
if hasattr(cherrypy.engine, "console_control_handler"):
cherrypy.engine.console_control_handler.subscribe()
# Cherrypy stupidly calls os._exit(70) when it can't bind the
# port. At least try to print a reasonable error and continue
# in this case, rather than just dying silently (as we would
# otherwise do in embedded mode)
real_exit = os._exit
def fake_exit(code): # pragma: no cover
if code == os.EX_SOFTWARE:
fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
else:
real_exit(code)
os._exit = fake_exit
cherrypy.engine.start()
os._exit = real_exit
# Signal that the engine has started successfully
if event is not None:
event.set()
if blocking:
try:
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
interval = 0.1, channel = 'main')
except (KeyboardInterrupt, IOError): # pragma: no cover
cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
cherrypy.engine.exit()
except SystemExit: # pragma: no cover
cherrypy.engine.log('SystemExit raised: shutting down bus')
cherrypy.engine.exit()
raise
def stop(self): def stop(self):
cherrypy.engine.exit() cherrypy_stop()
# Use a single global nilmdb.server.NilmDB and nilmdb.server.Server # Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
# instance since the database can only be opened once. For this to # instance since the database can only be opened once. For this to

214
nilmdb/server/serverutil.py Normal file
View File

@@ -0,0 +1,214 @@
"""Miscellaneous decorators and other helpers for running a CherryPy
server"""
import cherrypy
import sys
import os
import decorator
import simplejson as json
# Helper to parse parameters into booleans
def bool_param(s):
"""Return a bool indicating whether parameter 's' was True or False,
supporting a few different types for 's'."""
try:
ss = s.lower()
if ss in [ "0", "false", "f", "no", "n" ]:
return False
if ss in [ "1", "true", "t", "yes", "y" ]:
return True
except Exception:
return bool(s)
raise cherrypy.HTTPError("400 Bad Request",
"can't parse parameter: " + ss)
# Decorators
def chunked_response(func):
"""Decorator to enable chunked responses."""
# Set this to False to get better tracebacks from some requests
# (/stream/extract, /stream/intervals).
func._cp_config = { 'response.stream': True }
return func
def response_type(content_type):
"""Return a decorator-generating function that sets the
response type to the specified string."""
def wrapper(func, *args, **kwargs):
cherrypy.response.headers['Content-Type'] = content_type
return func(*args, **kwargs)
return decorator.decorator(wrapper)
@decorator.decorator
def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
"""Decorator to work around CherryPy bug #1200 in a response
generator.
Even if chunked responses are disabled, LookupError or
UnicodeError exceptions may still be swallowed by CherryPy due to
bug #1200. This throws them as generic Exceptions instead so that
they make it through.
"""
exc_info = None
try:
for val in func(*args, **kwargs):
yield val
except (LookupError, UnicodeError):
# Re-raise it, but maintain the original traceback
exc_info = sys.exc_info()
new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1]))
raise new_exc, None, exc_info[2]
finally:
del exc_info
def exception_to_httperror(*expected):
"""Return a decorator-generating function that catches expected
errors and throws a HTTPError describing it instead.
@exception_to_httperror(NilmDBError, ValueError)
def foo():
pass
"""
def wrapper(func, *args, **kwargs):
exc_info = None
try:
return func(*args, **kwargs)
except expected:
# Re-raise it, but maintain the original traceback
exc_info = sys.exc_info()
new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1]))
raise new_exc, None, exc_info[2]
finally:
del exc_info
# We need to preserve the function's argspecs for CherryPy to
# handle argument errors correctly. Decorator.decorator takes
# care of that.
return decorator.decorator(wrapper)
# Custom CherryPy tools
def CORS_allow(methods):
"""This does several things:
Handles CORS preflight requests.
Adds Allow: header to all requests.
Raise 405 if request.method not in method.
It is similar to cherrypy.tools.allow, with the CORS stuff added.
Add this to CherryPy with:
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
"""
request = cherrypy.request.headers
response = cherrypy.response.headers
if not isinstance(methods, (tuple, list)): # pragma: no cover
methods = [ methods ]
methods = [ m.upper() for m in methods if m ]
if not methods: # pragma: no cover
methods = [ 'GET', 'HEAD' ]
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
methods.append('HEAD')
response['Allow'] = ', '.join(methods)
# Allow all origins
if 'Origin' in request:
response['Access-Control-Allow-Origin'] = request['Origin']
# If it's a CORS request, send response.
request_method = request.get("Access-Control-Request-Method", None)
request_headers = request.get("Access-Control-Request-Headers", None)
if (cherrypy.request.method == "OPTIONS" and
request_method and request_headers):
response['Access-Control-Allow-Headers'] = request_headers
response['Access-Control-Allow-Methods'] = ', '.join(methods)
# Try to stop further processing and return a 200 OK
cherrypy.response.status = "200 OK"
cherrypy.response.body = ""
cherrypy.request.handler = lambda: ""
return
# Reject methods that were not explicitly allowed
if cherrypy.request.method not in methods:
raise cherrypy.HTTPError(405)
# Helper for json_in tool to process JSON data into normal request
# parameters.
def json_to_request_params(body):
cherrypy.lib.jsontools.json_processor(body)
if not isinstance(cherrypy.request.json, dict):
raise cherrypy.HTTPError(415)
cherrypy.request.params.update(cherrypy.request.json)
# Used as an "error_page.default" handler
def json_error_page(status, message, traceback, version,
force_traceback = False):
"""Return a custom error page in JSON so the client can parse it"""
errordata = { "status" : status,
"message" : message,
"traceback" : traceback }
# Don't send a traceback if the error was 400-499 (client's fault)
try:
code = int(status.split()[0])
if not force_traceback:
if code >= 400 and code <= 499:
errordata["traceback"] = ""
except Exception: # pragma: no cover
pass
# Override the response type, which was previously set to text/html
cherrypy.serving.response.headers['Content-Type'] = (
"application/json;charset=utf-8" )
# Undo the HTML escaping that cherrypy's get_error_page function applies
# (cherrypy issue 1135)
for k, v in errordata.iteritems():
v = v.replace("&lt;","<")
v = v.replace("&gt;",">")
v = v.replace("&amp;","&")
errordata[k] = v
return json.dumps(errordata, separators=(',',':'))
# Start/stop CherryPy standalone server
def cherrypy_start(blocking = False, event = False, embedded = False):
"""Start the CherryPy server, handling errors and signals
somewhat gracefully."""
if not embedded: # pragma: no cover
# Handle signals nicely
if hasattr(cherrypy.engine, "signal_handler"):
cherrypy.engine.signal_handler.subscribe()
if hasattr(cherrypy.engine, "console_control_handler"):
cherrypy.engine.console_control_handler.subscribe()
# Cherrypy stupidly calls os._exit(70) when it can't bind the
# port. At least try to print a reasonable error and continue
# in this case, rather than just dying silently (as we would
# otherwise do in embedded mode)
real_exit = os._exit
def fake_exit(code): # pragma: no cover
if code == os.EX_SOFTWARE:
fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
else:
real_exit(code)
os._exit = fake_exit
cherrypy.engine.start()
os._exit = real_exit
# Signal that the engine has started successfully
if event is not None:
event.set()
if blocking:
try:
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
interval = 0.1, channel = 'main')
except (KeyboardInterrupt, IOError): # pragma: no cover
cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
cherrypy.engine.exit()
except SystemExit: # pragma: no cover
cherrypy.engine.log('SystemExit raised: shutting down bus')
cherrypy.engine.exit()
raise
# Stop CherryPy server
def cherrypy_stop():
cherrypy.engine.exit()

View File

@@ -21,7 +21,8 @@ def du(path):
errors that might occur if we encounter broken symlinks or errors that might occur if we encounter broken symlinks or
files in the process of being removed.""" files in the process of being removed."""
try: try:
size = os.path.getsize(path) st = os.stat(path)
size = st.st_blocks * 512
if os.path.isdir(path): if os.path.isdir(path):
for thisfile in os.listdir(path): for thisfile in os.listdir(path):
filepath = os.path.join(path, thisfile) filepath = os.path.join(path, thisfile)

View File

@@ -1,5 +1,6 @@
"""Interval. Like nilmdb.server.interval, but re-implemented here """Interval. Like nilmdb.server.interval, but re-implemented here
in plain Python so clients have easier access to it. in plain Python so clients have easier access to it, and with a few
helper functions.
Intervals are half-open, ie. they include data points with timestamps Intervals are half-open, ie. they include data points with timestamps
[start, end) [start, end)
@@ -34,6 +35,10 @@ class Interval:
return ("[" + nilmdb.utils.time.timestamp_to_string(self.start) + return ("[" + nilmdb.utils.time.timestamp_to_string(self.start) +
" -> " + nilmdb.utils.time.timestamp_to_string(self.end) + ")") " -> " + nilmdb.utils.time.timestamp_to_string(self.end) + ")")
def human_string(self):
return ("[ " + nilmdb.utils.time.timestamp_to_human(self.start) +
" -> " + nilmdb.utils.time.timestamp_to_human(self.end) + " ]")
def __cmp__(self, other): def __cmp__(self, other):
"""Compare two intervals. If non-equal, order by start then end""" """Compare two intervals. If non-equal, order by start then end"""
return cmp(self.start, other.start) or cmp(self.end, other.end) return cmp(self.start, other.start) or cmp(self.end, other.end)
@@ -53,18 +58,11 @@ class Interval:
raise IntervalError("not a subset") raise IntervalError("not a subset")
return Interval(start, end) return Interval(start, end)
def set_difference(a, b): def _interval_math_helper(a, b, op, subset = True):
""" """Helper for set_difference, intersection functions,
Compute the difference (a \\ b) between the intervals in 'a' and to compute interval subsets based on a math operator on ranges
the intervals in 'b'; i.e., the ranges that are present in 'self' present in A and B. Subsets are computed from A, or new intervals
but not 'other'. are generated if subset = False."""
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
# Iterate through all starts and ends in sorted order. Add a # Iterate through all starts and ends in sorted order. Add a
# tag to the iterator so that we can figure out which one they # tag to the iterator so that we can figure out which one they
# were, after sorting. # were, after sorting.
@@ -79,28 +77,71 @@ def set_difference(a, b):
# At each point, evaluate which type of end it is, to determine # At each point, evaluate which type of end it is, to determine
# how to build up the output intervals. # how to build up the output intervals.
a_interval = None a_interval = None
b_interval = None in_a = False
in_b = False
out_start = None out_start = None
for (ts, k, i) in nilmdb.utils.iterator.imerge(a_iter, b_iter): for (ts, k, i) in nilmdb.utils.iterator.imerge(a_iter, b_iter):
if k == 0: if k == 0:
# start a interval
a_interval = i a_interval = i
if b_interval is None: in_a = True
out_start = ts
elif k == 1: elif k == 1:
# start b interval in_b = True
b_interval = i
if out_start is not None and out_start != ts:
yield a_interval.subset(out_start, ts)
out_start = None
elif k == 2: elif k == 2:
# end a interval in_a = False
if out_start is not None and out_start != ts:
yield a_interval.subset(out_start, ts)
out_start = None
a_interval = None
elif k == 3: elif k == 3:
# end b interval in_b = False
b_interval = None include = op(in_a, in_b)
if a_interval: if include and out_start is None:
out_start = ts out_start = ts
elif not include:
if out_start is not None and out_start != ts:
if subset:
yield a_interval.subset(out_start, ts)
else:
yield Interval(out_start, ts)
out_start = None
def set_difference(a, b):
"""
Compute the difference (a \\ b) between the intervals in 'a' and
the intervals in 'b'; i.e., the ranges that are present in 'self'
but not 'other'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
return _interval_math_helper(a, b, (lambda a, b: a and not b))
def intersection(a, b):
"""
Compute the intersection between the intervals in 'a' and the
intervals in 'b'; i.e., the ranges that are present in both 'a'
and 'b'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
return _interval_math_helper(a, b, (lambda a, b: a and b))
def optimize(it):
"""
Given an iterable 'it' with intervals, optimize them by joining
together intervals that are adjacent in time, and return a generator
that yields the new intervals.
"""
saved_int = None
for interval in it:
if saved_int is not None:
if saved_int.end == interval.start:
interval.start = saved_int.start
else:
yield saved_int
saved_int = interval
if saved_int is not None:
yield saved_int

View File

@@ -28,10 +28,13 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
@wrap_class_method @wrap_class_method
def __del__(orig, self, *args, **kwargs): def __del__(orig, self, *args, **kwargs):
if "_must_close" in self.__dict__: try:
fprintf(errorfile, "error: %s.close() wasn't called!\n", if "_must_close" in self.__dict__:
self.__class__.__name__) fprintf(errorfile, "error: %s.close() wasn't called!\n",
return orig(self, *args, **kwargs) self.__class__.__name__)
return orig(self, *args, **kwargs)
except: # pragma: no cover
pass
@wrap_class_method @wrap_class_method
def close(orig, self, *args, **kwargs): def close(orig, self, *args, **kwargs):

View File

@@ -91,6 +91,20 @@ def serializer_proxy(obj_or_type):
r = SerializerCallProxy(self.__call_queue, attr, self) r = SerializerCallProxy(self.__call_queue, attr, self)
return r return r
# For an interable object, on __iter__(), save the object's
# iterator and return this proxy. On next(), call the object's
# iterator through this proxy.
def __iter__(self):
attr = getattr(self.__object, "__iter__")
self.__iter = SerializerCallProxy(self.__call_queue, attr, self)()
return self
def next(self):
return SerializerCallProxy(self.__call_queue,
self.__iter.next, self)()
def __getitem__(self, key):
return self.__getattr__("__getitem__")(key)
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
"""Call this to instantiate the type, if a type was passed """Call this to instantiate the type, if a type was passed
to serializer_proxy. Otherwise, pass the call through.""" to serializer_proxy. Otherwise, pass the call through."""
@@ -103,7 +117,10 @@ def serializer_proxy(obj_or_type):
return ret return ret
def __del__(self): def __del__(self):
self.__call_queue.put((None, None, None, None)) try:
self.__thread.join() self.__call_queue.put((None, None, None, None))
self.__thread.join()
except: # pragma: no cover
pass
return SerializerObjectProxy(obj_or_type) return SerializerObjectProxy(obj_or_type)

View File

@@ -60,7 +60,7 @@ def rate_to_period(hz, cycles = 1):
def parse_time(toparse): def parse_time(toparse):
""" """
Parse a free-form time string and return a nilmdb timestamp Parse a free-form time string and return a nilmdb timestamp
(integer seconds since epoch). If the string doesn't contain a (integer microseconds since epoch). If the string doesn't contain a
timestamp, the current local timezone is assumed (e.g. from the TZ timestamp, the current local timezone is assumed (e.g. from the TZ
env var). env var).
""" """
@@ -87,7 +87,7 @@ def parse_time(toparse):
try: try:
return unix_to_timestamp(datetime_tz.datetime_tz. return unix_to_timestamp(datetime_tz.datetime_tz.
smartparse(toparse).totimestamp()) smartparse(toparse).totimestamp())
except (ValueError, OverflowError): except (ValueError, OverflowError, TypeError):
pass pass
# If it's parseable as a float, treat it as a Unix or NILM # If it's parseable as a float, treat it as a Unix or NILM

View File

@@ -1,7 +1,14 @@
import sys
if sys.version_info[0] >= 3: # pragma: no cover (future Python3 compat)
text_type = str
else:
text_type = unicode
def encode(u): def encode(u):
"""Try to encode something from Unicode to a string using the """Try to encode something from Unicode to a string using the
default encoding. If it fails, try encoding as UTF-8.""" default encoding. If it fails, try encoding as UTF-8."""
if not isinstance(u, unicode): if not isinstance(u, text_type):
return u return u
try: try:
return u.encode() return u.encode()
@@ -11,7 +18,7 @@ def encode(u):
def decode(s): def decode(s):
"""Try to decode someting from string to Unicode using the """Try to decode someting from string to Unicode using the
default encoding. If it fails, try decoding as UTF-8.""" default encoding. If it fails, try decoding as UTF-8."""
if isinstance(s, unicode): if isinstance(s, text_type):
return s return s
try: try:
return s.decode() return s.decode()

View File

@@ -6,15 +6,6 @@
# Then just package it up: # Then just package it up:
# python setup.py sdist # python setup.py sdist
# This is supposed to be using Distribute:
#
# distutils provides a "setup" method.
# setuptools is a set of monkeypatches on top of that.
# distribute is a particular version/implementation of setuptools.
#
# So we don't really know if this is using the old setuptools or the
# Distribute-provided version of setuptools.
import traceback import traceback
import sys import sys
import os import os
@@ -109,7 +100,7 @@ setup(name='nilmdb',
'coverage', 'coverage',
'numpy', 'numpy',
], ],
setup_requires = [ 'distribute', setup_requires = [ 'setuptools',
], ],
install_requires = [ 'decorator', install_requires = [ 'decorator',
'cherrypy >= 3.2', 'cherrypy >= 3.2',
@@ -117,7 +108,8 @@ setup(name='nilmdb',
'python-dateutil', 'python-dateutil',
'pytz', 'pytz',
'psutil >= 0.3.0', 'psutil >= 0.3.0',
'requests >= 1.1.0, < 2.0.0', 'requests >= 1.1.0',
'progressbar >= 2.2',
], ],
packages = [ 'nilmdb', packages = [ 'nilmdb',
'nilmdb.utils', 'nilmdb.utils',
@@ -126,11 +118,13 @@ setup(name='nilmdb',
'nilmdb.client', 'nilmdb.client',
'nilmdb.cmdline', 'nilmdb.cmdline',
'nilmdb.scripts', 'nilmdb.scripts',
'nilmdb.fsck',
], ],
entry_points = { entry_points = {
'console_scripts': [ 'console_scripts': [
'nilmtool = nilmdb.scripts.nilmtool:main', 'nilmtool = nilmdb.scripts.nilmtool:main',
'nilmdb-server = nilmdb.scripts.nilmdb_server:main', 'nilmdb-server = nilmdb.scripts.nilmdb_server:main',
'nilmdb-fsck = nilmdb.scripts.nilmdb_fsck:main',
], ],
}, },
ext_modules = ext_modules, ext_modules = ext_modules,

8
tests/data/timestamped Normal file
View File

@@ -0,0 +1,8 @@
-10000000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
-100000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
-100000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
-1000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1000000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03

View File

@@ -242,6 +242,19 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception)) in_("400 Bad Request", str(e.exception))
in_("start must precede end", str(e.exception)) in_("start must precede end", str(e.exception))
# Invalid times in HTTP request
with assert_raises(ClientError) as e:
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": "asdf", "end": 0 })
in_("400 Bad Request", str(e.exception))
in_("invalid start", str(e.exception))
with assert_raises(ClientError) as e:
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": 0, "end": "asdf" })
in_("400 Bad Request", str(e.exception))
in_("invalid end", str(e.exception))
# Good content type # Good content type
with assert_raises(ClientError) as e: with assert_raises(ClientError) as e:
client.http.put("stream/insert", "", client.http.put("stream/insert", "",
@@ -354,10 +367,6 @@ class TestClient(object):
with assert_raises(ServerError) as e: with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl.example.com./").next() client.http.get_gen("http://nosuchurl.example.com./").next()
# Trigger a curl error in generator
with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl.example.com./").next()
# Check 404 for missing streams # Check 404 for missing streams
for function in [ client.stream_intervals, client.stream_extract ]: for function in [ client.stream_intervals, client.stream_extract ]:
with assert_raises(ClientError) as e: with assert_raises(ClientError) as e:
@@ -396,27 +405,38 @@ class TestClient(object):
headers()) headers())
# Extract # Extract
x = http.get("stream/extract", x = http.get("stream/extract", { "path": "/newton/prep",
{ "path": "/newton/prep", "start": "123", "end": "124" })
"start": "123",
"end": "124" })
if "transfer-encoding: chunked" not in headers(): if "transfer-encoding: chunked" not in headers():
warnings.warn("Non-chunked HTTP response for /stream/extract") warnings.warn("Non-chunked HTTP response for /stream/extract")
if "content-type: text/plain;charset=utf-8" not in headers(): if "content-type: text/plain;charset=utf-8" not in headers():
raise AssertionError("/stream/extract is not text/plain:\n" + raise AssertionError("/stream/extract is not text/plain:\n" +
headers()) headers())
x = http.get("stream/extract", x = http.get("stream/extract", { "path": "/newton/prep",
{ "path": "/newton/prep", "start": "123", "end": "124",
"start": "123", "binary": "1" })
"end": "124",
"binary": "1" })
if "transfer-encoding: chunked" not in headers(): if "transfer-encoding: chunked" not in headers():
warnings.warn("Non-chunked HTTP response for /stream/extract") warnings.warn("Non-chunked HTTP response for /stream/extract")
if "content-type: application/octet-stream" not in headers(): if "content-type: application/octet-stream" not in headers():
raise AssertionError("/stream/extract is not binary:\n" + raise AssertionError("/stream/extract is not binary:\n" +
headers()) headers())
# Make sure a binary of "0" is really off
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124",
"binary": "0" })
if "content-type: application/octet-stream" in headers():
raise AssertionError("/stream/extract is not text:\n" +
headers())
# Invalid parameters
with assert_raises(ClientError) as e:
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124",
"binary": "asdfasfd" })
in_("can't parse parameter", str(e.exception))
client.close() client.close()
def test_client_08_unicode(self): def test_client_08_unicode(self):
@@ -670,40 +690,15 @@ class TestClient(object):
client.close() client.close()
def test_client_12_persistent(self): def test_client_12_persistent(self):
# Check that connections are persistent when they should be. # Check that connections are NOT persistent. Rather than trying
# This is pretty hard to test; we have to poke deep into # to verify this at the TCP level, just make sure that the response
# the Requests library. # contained a "Connection: close" header.
with nilmdb.client.Client(url = testurl) as c: with nilmdb.client.Client(url = testurl) as c:
def connections():
try:
poolmanager = c.http._last_response.connection.poolmanager
pool = poolmanager.pools[('http','localhost',32180)]
return (pool.num_connections, pool.num_requests)
except Exception:
raise SkipTest("can't get connection info")
# First request makes a connection
c.stream_create("/persist/test", "uint16_1") c.stream_create("/persist/test", "uint16_1")
eq_(connections(), (1, 1)) eq_(c.http._last_response.headers["Connection"], "close")
# Non-generator
c.stream_list("/persist/test")
eq_(connections(), (1, 2))
c.stream_list("/persist/test")
eq_(connections(), (1, 3))
# Generators
for x in c.stream_intervals("/persist/test"):
pass
eq_(connections(), (1, 4))
for x in c.stream_intervals("/persist/test"):
pass
eq_(connections(), (1, 5))
# Clean up
c.stream_remove("/persist/test")
c.stream_destroy("/persist/test") c.stream_destroy("/persist/test")
eq_(connections(), (1, 7)) eq_(c.http._last_response.headers["Connection"], "close")
def test_client_13_timestamp_rounding(self): def test_client_13_timestamp_rounding(self):
# Test potentially bad timestamps (due to floating point # Test potentially bad timestamps (due to floating point

View File

@@ -21,13 +21,17 @@ from testutil.helpers import *
testdb = "tests/cmdline-testdb" testdb = "tests/cmdline-testdb"
def server_start(max_results = None, max_removals = None, bulkdata_args = {}): def server_start(max_results = None,
max_removals = None,
max_int_removals = None,
bulkdata_args = {}):
global test_server, test_db global test_server, test_db
# Start web app on a custom port # Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)( test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
testdb, testdb,
max_results = max_results, max_results = max_results,
max_removals = max_removals, max_removals = max_removals,
max_int_removals = max_int_removals,
bulkdata_args = bulkdata_args) bulkdata_args = bulkdata_args)
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
port = 32180, stoppable = False, port = 32180, stoppable = False,
@@ -59,8 +63,7 @@ class TestCmdline(object):
def run(self, arg_string, infile=None, outfile=None): def run(self, arg_string, infile=None, outfile=None):
"""Run a cmdline client with the specified argument string, """Run a cmdline client with the specified argument string,
passing the given input. Returns a tuple with the output and passing the given input. Save the output and exit code."""
exit code"""
# printf("TZ=UTC ./nilmtool.py %s\n", arg_string) # printf("TZ=UTC ./nilmtool.py %s\n", arg_string)
os.environ['NILMDB_URL'] = "http://localhost:32180/" os.environ['NILMDB_URL'] = "http://localhost:32180/"
class stdio_wrapper: class stdio_wrapper:
@@ -160,6 +163,12 @@ class TestCmdline(object):
self.ok("--help") self.ok("--help")
self.contain("usage:") self.contain("usage:")
# help
self.ok("--version")
ver = self.captured
self.ok("list --version")
eq_(self.captured, ver)
# fail for no args # fail for no args
self.fail("") self.fail("")
@@ -285,6 +294,7 @@ class TestCmdline(object):
self.ok("create /newton/zzz/rawnotch uint16_9") self.ok("create /newton/zzz/rawnotch uint16_9")
self.ok("create /newton/prep float32_8") self.ok("create /newton/prep float32_8")
self.ok("create /newton/raw uint16_6") self.ok("create /newton/raw uint16_6")
self.ok("create /newton/raw~decim-1234 uint16_6")
# Create a stream that already exists # Create a stream that already exists
self.fail("create /newton/raw uint16_6") self.fail("create /newton/raw uint16_6")
@@ -300,13 +310,23 @@ class TestCmdline(object):
self.fail("create /newton/zzz float32_8") self.fail("create /newton/zzz float32_8")
self.contain("subdirs of this path already exist") self.contain("subdirs of this path already exist")
# Verify we got those 3 streams and they're returned in # Verify we got those 4 streams and they're returned in
# alphabetical order. # alphabetical order.
self.ok("list -l") self.ok("list -l")
self.match("/newton/prep float32_8\n" self.match("/newton/prep float32_8\n"
"/newton/raw uint16_6\n" "/newton/raw uint16_6\n"
"/newton/raw~decim-1234 uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n") "/newton/zzz/rawnotch uint16_9\n")
# No decimated streams if -n specified
self.ok("list -n -l")
self.match("/newton/prep float32_8\n"
"/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n")
# Delete that decimated stream
self.ok("destroy /newton/raw~decim-1234")
# Match just one type or one path. Also check # Match just one type or one path. Also check
# that --path is optional # that --path is optional
self.ok("list --layout /newton/raw") self.ok("list --layout /newton/raw")
@@ -475,6 +495,13 @@ class TestCmdline(object):
# bad start time # bad start time
self.fail("insert -t -r 120 --start 'whatever' /newton/prep /dev/null") self.fail("insert -t -r 120 --start 'whatever' /newton/prep /dev/null")
# Test negative times
self.ok("insert --start @-10000000000 --end @1000000001 /newton/prep"
" tests/data/timestamped")
self.ok("extract -c /newton/prep --start min --end @1000000001")
self.match("8\n")
self.ok("remove /newton/prep --start min --end @1000000001")
def test_07_detail_extended(self): def test_07_detail_extended(self):
# Just count the number of lines, it's probably fine # Just count the number of lines, it's probably fine
self.ok("list --detail") self.ok("list --detail")
@@ -807,9 +834,12 @@ class TestCmdline(object):
def test_13_files(self): def test_13_files(self):
# Test BulkData's ability to split into multiple files, # Test BulkData's ability to split into multiple files,
# by forcing the file size to be really small. # by forcing the file size to be really small.
# Also increase the initial nrows, so that start/end positions
# in the database are very large (> 32 bit)
server_stop() server_stop()
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 }) "files_per_dir" : 3,
"initial_nrows" : 2**40 })
# Fill data # Fill data
self.ok("create /newton/prep float32_8") self.ok("create /newton/prep float32_8")
@@ -857,14 +887,28 @@ class TestCmdline(object):
self.ok("destroy -R /newton/prep") # destroy again self.ok("destroy -R /newton/prep") # destroy again
def test_14_remove_files(self): def test_14_remove_files(self):
# Test BulkData's ability to remove when data is split into # Limit max_removals, to cover more functionality.
# multiple files. Should be a fairly comprehensive test of
# remove functionality.
# Also limit max_removals, to cover more functionality.
server_stop() server_stop()
server_start(max_removals = 4321, server_start(max_removals = 4321,
bulkdata_args = { "file_size" : 920, # 23 rows per file bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 }) "files_per_dir" : 3,
"initial_nrows" : 2**40 })
self.do_remove_files()
self.ok("destroy -R /newton/prep") # destroy again
def test_14b_remove_files_maxint(self):
# Limit max_int_removals, to cover more functionality.
server_stop()
server_start(max_int_removals = 1,
bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3,
"initial_nrows" : 2**40 })
self.do_remove_files()
def do_remove_files(self):
# Test BulkData's ability to remove when data is split into
# multiple files. Should be a fairly comprehensive test of
# remove functionality.
# Insert data. Just for fun, insert out of order # Insert data. Just for fun, insert out of order
self.ok("create /newton/prep float32_8") self.ok("create /newton/prep float32_8")
@@ -1004,6 +1048,18 @@ class TestCmdline(object):
self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -" self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n") "> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
# optimize
self.ok("insert -s 01-01-2002 -e 01-01-2004 /diff/1 /dev/null")
self.ok("intervals /diff/1")
self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -"
"> Thu, 01 Jan 2004 00:00:00.000000 +0000 ]\n"
"[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
self.ok("intervals /diff/1 --optimize")
self.ok("intervals /diff/1 -o")
self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
self.ok("destroy -R /diff/1") self.ok("destroy -R /diff/1")
self.ok("destroy -R /diff/2") self.ok("destroy -R /diff/2")

View File

@@ -59,6 +59,14 @@ class TestInterval:
self.test_interval_intersect() self.test_interval_intersect()
Interval = NilmdbInterval Interval = NilmdbInterval
# Other helpers in nilmdb.utils.interval
i = [ UtilsInterval(1,2), UtilsInterval(2,3), UtilsInterval(4,5) ]
eq_(list(nilmdb.utils.interval.optimize(i)),
[ UtilsInterval(1,3), UtilsInterval(4,5) ])
eq_(UtilsInterval(1234567890123456, 1234567890654321).human_string(),
"[ Fri, 13 Feb 2009 18:31:30.123456 -0500 -> " +
"Fri, 13 Feb 2009 18:31:30.654321 -0500 ]")
def test_interval(self): def test_interval(self):
# Test Interval class # Test Interval class
os.environ['TZ'] = "America/New_York" os.environ['TZ'] = "America/New_York"
@@ -226,13 +234,16 @@ class TestInterval:
x = makeset("[--)") & 1234 x = makeset("[--)") & 1234
def do_test(a, b, c, d): def do_test(a, b, c, d):
# a & b == c # a & b == c (using nilmdb.server.interval)
ab = IntervalSet() ab = IntervalSet()
for x in b: for x in b:
for i in (a & x): for i in (a & x):
ab += i ab += i
eq_(ab,c) eq_(ab,c)
# a & b == c (using nilmdb.utils.interval)
eq_(IntervalSet(nilmdb.utils.interval.intersection(a,b)), c)
# a \ b == d # a \ b == d
eq_(IntervalSet(nilmdb.utils.interval.set_difference(a,b)), d) eq_(IntervalSet(nilmdb.utils.interval.set_difference(a,b)), d)
@@ -302,6 +313,17 @@ class TestInterval:
eq_(nilmdb.utils.interval.set_difference( eq_(nilmdb.utils.interval.set_difference(
a.intersection(list(c)[0]), b.intersection(list(c)[0])), d) a.intersection(list(c)[0]), b.intersection(list(c)[0])), d)
# Fill out test coverage for non-subsets
def diff2(a,b, subset):
return nilmdb.utils.interval._interval_math_helper(
a, b, (lambda a, b: b and not a), subset=subset)
with assert_raises(nilmdb.utils.interval.IntervalError):
list(diff2(a,b,True))
list(diff2(a,b,False))
# Empty second set
eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)
# Empty second set # Empty second set
eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a) eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)

View File

@@ -157,11 +157,14 @@ class TestServer(object):
def test_server(self): def test_server(self):
# Make sure we can't force an exit, and test other 404 errors # Make sure we can't force an exit, and test other 404 errors
for url in [ "/exit", "/", "/favicon.ico" ]: for url in [ "/exit", "/favicon.ico" ]:
with assert_raises(HTTPError) as e: with assert_raises(HTTPError) as e:
geturl(url) geturl(url)
eq_(e.exception.code, 404) eq_(e.exception.code, 404)
# Root page
in_("This is NilmDB", geturl("/"))
# Check version # Check version
eq_(distutils.version.LooseVersion(getjson("/version")), eq_(distutils.version.LooseVersion(getjson("/version")),
distutils.version.LooseVersion(nilmdb.__version__)) distutils.version.LooseVersion(nilmdb.__version__))

View File

@@ -62,6 +62,28 @@ class Base(object):
eq_(self.foo.val, 20) eq_(self.foo.val, 20)
eq_(self.foo.init_thread, self.foo.test_thread) eq_(self.foo.init_thread, self.foo.test_thread)
class ListLike(object):
def __init__(self):
self.thread = threading.current_thread().name
self.foo = 0
def __iter__(self):
eq_(threading.current_thread().name, self.thread)
self.foo = 0
return self
def __getitem__(self, key):
eq_(threading.current_thread().name, self.thread)
return key
def next(self):
eq_(threading.current_thread().name, self.thread)
if self.foo < 5:
self.foo += 1
return self.foo
else:
raise StopIteration
class TestUnserialized(Base): class TestUnserialized(Base):
def setUp(self): def setUp(self):
self.foo = Foo() self.foo = Foo()
@@ -84,3 +106,9 @@ class TestSerializer(Base):
sp(sp(Foo("x"))).t() sp(sp(Foo("x"))).t()
sp(sp(Foo)("x")).t() sp(sp(Foo)("x")).t()
sp(sp(Foo))("x").t() sp(sp(Foo))("x").t()
def test_iter(self):
sp = nilmdb.utils.serializer_proxy
i = sp(ListLike)()
eq_(list(i), [1,2,3,4,5])
eq_(i[3], 3)

View File

@@ -1,5 +1,6 @@
import nilmdb import nilmdb
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
from nilmdb.utils import datetime_tz
from nose.tools import * from nose.tools import *
from nose.tools import assert_raises from nose.tools import assert_raises
@@ -19,6 +20,8 @@ class TestTimestamper(object):
def join(list): def join(list):
return "\n".join(list) + "\n" return "\n".join(list) + "\n"
datetime_tz.localtz_set("America/New_York")
start = nilmdb.utils.time.parse_time("03/24/2012") start = nilmdb.utils.time.parse_time("03/24/2012")
lines_in = [ "hello", "world", "hello world", "# commented out" ] lines_in = [ "hello", "world", "hello world", "# commented out" ]
lines_out = [ "1332561600000000 hello", lines_out = [ "1332561600000000 hello",