Compare commits

...

51 Commits

Author SHA1 Message Date
bbd59c8b50 Add nilmdb.utils.interval.intersection by generalizing set_difference 2013-07-30 14:48:19 -04:00
405c110fd7 Doc updates 2013-07-29 15:36:43 -04:00
274adcd856 Documentation updates 2013-07-27 19:51:09 -04:00
a1850c9c2c Misc documentation 2013-07-25 16:08:35 -04:00
6cd28b67b1 Support iterator protocol in Serializer 2013-07-24 14:52:26 -04:00
d6d215d53d Improve boolean HTTP parameter handling 2013-07-15 14:38:28 -04:00
e02143ddb2 Remove duplicated test 2013-07-14 15:30:53 -04:00
e275384d03 Fix WSGI docs again 2013-07-11 16:36:32 -04:00
a6a67ec15c Update WSGI docs 2013-07-10 14:16:25 -04:00
fc43107307 Fill out test coverage 2013-07-09 19:06:26 -04:00
90633413bb Add nilmdb.utils.interval.human_string function 2013-07-09 19:01:53 -04:00
c7c3aff0fb Add nilmdb.utils.interval.optimize function 2013-07-09 17:50:21 -04:00
e2347c954e Split more CherrpyPy stuff into serverutil 2013-07-02 11:44:08 -04:00
222a5c6c53 Move server decorators and other utilities to a separate file
This will help with implementing nilmrun.
2013-07-02 11:32:19 -04:00
1ca2c143e5 Fix typo 2013-06-29 12:39:00 -04:00
b5df575c79 Fix tests 2013-05-09 22:27:10 -04:00
2768a5ad15 Show FQDN rather than hostname. 2013-05-09 13:33:05 -04:00
a105543c38 Show a more helpful message at the root nilmdb path 2013-05-09 13:30:10 -04:00
309f38d0ed Merge branch '32bit' 2013-05-08 17:20:31 -04:00
9a27b6ef6a Make rocket code suitable for 32-bit architectures 2013-05-08 16:35:32 -04:00
99532cf9e0 Fix coverage 2013-05-07 23:00:44 -04:00
dfdd0e5c74 Fix line parsing in http client 2013-05-07 22:56:00 -04:00
9a2699adfc Attempt at fixing up more Unicode issues with metadata. 2013-05-07 13:44:03 -04:00
9bbb95b18b Add unicode decode/encode helpers 2013-05-07 12:56:59 -04:00
6bbed322c5 Fix unicode in completion 2013-05-07 12:49:12 -04:00
2317894355 Tweak cache sizes to account for large numbers of decimated tables 2013-05-06 11:54:57 -04:00
539c92226c Add more disk space info 2013-05-06 11:36:28 -04:00
77c766d85d Bump MAX_LAYOUT_COUNT to 1024 2013-05-02 15:27:31 -04:00
49d04db1d6 Allow start==end in stream_insert_context, if no data was provided. 2013-04-11 13:25:37 -04:00
ea838d05ae Warn against reused context managers, and fix broken tests 2013-04-11 13:25:00 -04:00
f2a48bdb2a Test binary extract; fix bugs 2013-04-11 13:24:11 -04:00
6d14e0b8aa Allow binary extract 2013-04-11 11:30:41 -04:00
b31b9327b9 Add tool to fix oversize files (the bug fixed by b98ff13) 2013-04-11 11:02:53 -04:00
b98ff1331a Fix bug where too much data was getting written to each file.
We were still calculating the maximum number of rows correctly,
so the extra data was really extra and would get re-written to the
beginning of the subsequent file.

The only case in which this would lead to database issues is if the
very last file was lengthened incorrectly, and the "nrows" calculation
would therefore be wrong when the database was reopened.  Still, even
in that case, it should just leave a small gap in the data, not cause
any errors.
2013-04-10 23:22:03 -04:00
00e6ba1124 Avoid ENOENT in nilmdb.utils.diskusage.du
ENOENT might show up if we're actively deleting files in the nilmdb
thread while trying to read available space from e.g. the server
thread.
2013-04-10 22:25:22 -04:00
01029230c9 Tweaks to sorting 2013-04-10 19:59:38 -04:00
ecc4e5ef9d Improve test coverage 2013-04-10 19:08:05 -04:00
23f31c472b Split sort_streams_nicely into separate file 2013-04-10 19:07:58 -04:00
a1e2746360 Fix bug in nilmdb.stream_remove with max_removals 2013-04-10 18:37:21 -04:00
1c40d59a52 server: use a generator in /stream/remove
Instead of returning a single number at the end of N nilmdb calls, we
now use a generator that returns one line of text every time there's a
new count of rows removed.  This ensures that the connection will stay
alive for very long removals.
2013-04-10 18:11:58 -04:00
bfb09a189f Fix coverage 2013-04-10 16:33:08 -04:00
416a499866 Support wildcards for destroy 2013-04-10 16:23:07 -04:00
637d193807 Fix unicode processing of command line arguments 2013-04-10 16:22:51 -04:00
b7fa5745ce nilmtool list: allow multiple paths to be supplied 2013-04-10 15:34:33 -04:00
0104c8edd9 nilmtool remove: allow wildcards and multiple paths 2013-04-10 15:27:46 -04:00
cf3b8e787d Add test for wrong number of fields in numpy insert 2013-04-10 15:06:50 -04:00
83d022016c nilmtool list: add new --layout option to show layouts 2013-04-10 14:58:44 -04:00
43b740ecaa nilmtool list: remove old -p parameter 2013-04-10 14:48:23 -04:00
4ce059b920 Give a slightly more clear error on bad array sizes 2013-04-09 19:56:58 -04:00
99a4228285 Set up default SIGPIPE handler
This lets you do something like "nilmtool extract | head" without
triggering backtraces.
2013-04-09 18:25:09 -04:00
230ec72609 Fix timestamp display issues with --annotate 2013-04-09 18:19:32 -04:00
32 changed files with 957 additions and 457 deletions

View File

@@ -421,3 +421,20 @@ and has all of the same functions. It adds three new functions:
It is significantly faster! It is about 20 times faster to decimate a
stream with `nilm-decimate` when the filter code is using the new
binary/numpy interface.
WSGI interface & chunked requests
---------------------------------
mod_wsgi requires "WSGIChunkedRequest On" to handle
"Transfer-encoding: Chunked" requests. However, `/stream/insert`
doesn't handle this correctly right now, because:
- The `cherrpy.request.body.read()` call needs to be fixed for chunked requests
- We don't want to just buffer endlessly in the server, and it will
require some thought on how to handle data in chunks (what to do about
interval endpoints).
It is probably better to just keep the endpoint management on the client
side, so leave "WSGIChunkedRequest off" for now.

View File

@@ -19,12 +19,12 @@ Then, set up Apache with a configuration like:
<VirtualHost>
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
WSGIApplicationGroup nilmdb-appgroup
WSGIProcessGroup nilmdb-procgroup
WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm
# Access control example:
<Location /nilmdb>
WSGIProcessGroup nilmdb-procgroup
WSGIApplicationGroup nilmdb-appgroup
# Access control example:
Order deny,allow
Deny from all
Allow from 1.2.3.4

View File

@@ -0,0 +1,50 @@
#!/usr/bin/python
import os
import sys
import cPickle as pickle
import argparse
import fcntl
import re
from nilmdb.client.numpyclient import layout_to_dtype
parser = argparse.ArgumentParser(
description = """
Fix database corruption where binary writes caused too much data to be
written to the file. Truncates files to the correct length. This was
fixed by b98ff1331a515ad47fd3203615e835b529b039f9.
""")
parser.add_argument("path", action="store", help='Database root path')
parser.add_argument("-y", "--yes", action="store_true", help='Fix them')
args = parser.parse_args()
lock = os.path.join(args.path, "data.lock")
with open(lock, "w") as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
fix = {}
for (path, dirs, files) in os.walk(args.path):
if "_format" in files:
with open(os.path.join(path, "_format")) as format:
fmt = pickle.load(format)
rowsize = layout_to_dtype(fmt["layout"]).itemsize
maxsize = rowsize * fmt["rows_per_file"]
fix[path] = maxsize
if maxsize < 128000000: # sanity check
raise Exception("bad maxsize " + str(maxsize))
for fixpath in fix:
for (path, dirs, files) in os.walk(fixpath):
for fn in files:
if not re.match("^[0-9a-f]{4,}$", fn):
continue
fn = os.path.join(path, fn)
size = os.path.getsize(fn)
maxsize = fix[fixpath]
if size > maxsize:
diff = size - maxsize
print diff, "too big:", fn
if args.yes:
with open(fn, "a+") as dbfile:
dbfile.truncate(maxsize)

View File

@@ -6,7 +6,6 @@ import nilmdb.utils
import nilmdb.client.httpclient
from nilmdb.client.errors import ClientError
import re
import time
import simplejson as json
import contextlib
@@ -59,6 +58,11 @@ class Client(object):
return self.http.get("dbinfo")
def stream_list(self, path = None, layout = None, extended = False):
"""Return a sorted list of [path, layout] lists. If 'path' or
'layout' are specified, only return streams that match those
exact values. If 'extended' is True, the returned lists have
extended info, e.g.: [path, layout, extent_min, extent_max,
total_rows, total_seconds."""
params = {}
if path is not None:
params["path"] = path
@@ -66,14 +70,11 @@ class Client(object):
params["layout"] = layout
if extended:
params["extended"] = 1
def sort_streams_nicely(x):
"""Human-friendly sort (/stream/2 before /stream/10)"""
num = lambda t: int(t) if t.isdigit() else t
key = lambda k: [ num(c) for c in re.split('([0-9]+)', k[0]) ]
return sorted(x, key = key)
return sort_streams_nicely(self.http.get("stream/list", params))
streams = self.http.get("stream/list", params)
return nilmdb.utils.sort.sort_human(streams, key = lambda s: s[0])
def stream_get_metadata(self, path, keys = None):
"""Get stream metadata"""
params = { "path": path }
if keys is not None:
params["key"] = keys
@@ -122,7 +123,10 @@ class Client(object):
params["start"] = timestamp_to_string(start)
if end is not None:
params["end"] = timestamp_to_string(end)
return self.http.post("stream/remove", params)
total = 0
for count in self.http.post_gen("stream/remove", params):
total += int(count)
return total
@contextlib.contextmanager
def stream_insert_context(self, path, start = None, end = None):
@@ -146,6 +150,7 @@ class Client(object):
ctx = StreamInserter(self, path, start, end)
yield ctx
ctx.finalize()
ctx.destroy()
def stream_insert(self, path, data, start = None, end = None):
"""Insert rows of data into a stream. data should be a string
@@ -295,6 +300,15 @@ class StreamInserter(object):
self._block_data = []
self._block_len = 0
self.destroyed = False
def destroy(self):
"""Ensure this object can't be used again without raising
an error"""
def error(*args, **kwargs):
raise Exception("don't reuse this context object")
self._send_block = self.insert = self.finalize = self.send = error
def insert(self, data):
"""Insert a chunk of ASCII formatted data in string form. The
overall data must consist of lines terminated by '\\n'."""
@@ -441,7 +455,7 @@ class StreamInserter(object):
self._interval_start = end_ts
# Double check endpoints
if start_ts is None or end_ts is None:
if (start_ts is None or end_ts is None) or (start_ts == end_ts):
# If the block has no non-comment lines, it's OK
try:
self._get_first_noncomment(block)

View File

@@ -123,19 +123,50 @@ class HTTPClient(object):
"""
(response, isjson) = self._do_req(method, url, query, body,
stream = True, headers = headers)
# Like the iter_lines function in Requests, but only splits on
# the specified line ending.
def lines(source, ending):
pending = None
for chunk in source:
if pending is not None:
chunk = pending + chunk
tmp = chunk.split(ending)
lines = tmp[:-1]
if chunk.endswith(ending):
pending = None
else:
pending = tmp[-1]
for line in lines:
yield line
if pending is not None: # pragma: no cover (missing newline)
yield pending
# Yield the chunks or lines as requested
if binary:
for chunk in response.iter_content(chunk_size = 65536):
yield chunk
elif isjson:
for line in response.iter_lines():
for line in lines(response.iter_content(chunk_size = 1),
ending = '\r\n'):
yield json.loads(line)
else:
for line in response.iter_lines():
for line in lines(response.iter_content(chunk_size = 65536),
ending = '\n'):
yield line
def get_gen(self, url, params = None, binary = False):
"""Simple GET (parameters in URL) returning a generator"""
return self._req_gen("GET", url, params, binary = binary)
def post_gen(self, url, params = None):
"""Simple POST (parameters in body) returning a generator"""
if self.post_json:
return self._req_gen("POST", url, None,
json.dumps(params),
{ 'Content-type': 'application/json' })
else:
return self._req_gen("POST", url, None, params)
# Not much use for a POST or PUT generator, since they don't
# return much data.

View File

@@ -98,6 +98,7 @@ class NumpyClient(nilmdb.client.client.Client):
ctx = StreamInserterNumpy(self, path, start, end, dtype)
yield ctx
ctx.finalize()
ctx.destroy()
def stream_insert_numpy(self, path, data, start = None, end = None,
layout = None):
@@ -133,16 +134,8 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
contiguous interval and may be None. 'dtype' is the Numpy
dtype for this stream.
"""
self.last_response = None
super(StreamInserterNumpy, self).__init__(client, path, start, end)
self._dtype = dtype
self._client = client
self._path = path
# Start and end for the overall contiguous interval we're
# filling
self._interval_start = start
self._interval_end = end
# Max rows to send at once
self._max_rows = self._max_data // self._dtype.itemsize
@@ -162,9 +155,12 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
elif array.ndim == 2:
# Convert to structured array
sarray = numpy.zeros(array.shape[0], dtype=self._dtype)
sarray['timestamp'] = array[:,0]
# Need the squeeze in case sarray['data'] is 1 dimensional
sarray['data'] = numpy.squeeze(array[:,1:])
try:
sarray['timestamp'] = array[:,0]
# Need the squeeze in case sarray['data'] is 1 dimensional
sarray['data'] = numpy.squeeze(array[:,1:])
except (IndexError, ValueError):
raise ValueError("wrong number of fields for this data type")
array = sarray
else:
raise ValueError("wrong number of dimensions in array")
@@ -247,9 +243,12 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
# Next block continues where this one ended
self._interval_start = end_ts
# If we have no endpoints, it's because we had no data to send.
if start_ts is None or end_ts is None:
return
# If we have no endpoints, or equal endpoints, it's OK as long
# as there's no data to send
if (start_ts is None or end_ts is None) or (start_ts == end_ts):
if len(array) == 0:
return
raise ClientError("have data to send, but invalid start/end times")
# Send it
data = array.tostring()

View File

@@ -10,6 +10,7 @@ import sys
import os
import argparse
from argparse import ArgumentDefaultsHelpFormatter as def_form
import signal
try: # pragma: no cover
import argcomplete
@@ -71,15 +72,27 @@ class Complete(object): # pragma: no cover
path = parsed_args.path
if not path:
return []
return ( self.escape(k + '=' + v)
for (k,v) in client.stream_get_metadata(path).iteritems()
if k.startswith(prefix) )
results = []
# prefix comes in as UTF-8, but results need to be Unicode,
# weird. Still doesn't work in all cases, but that's bugs in
# argcomplete.
prefix = nilmdb.utils.unicode.decode(prefix)
for (k,v) in client.stream_get_metadata(path).iteritems():
kv = self.escape(k + '=' + v)
if kv.startswith(prefix):
results.append(kv)
return results
class Cmdline(object):
def __init__(self, argv = None):
self.argv = argv or sys.argv[1:]
try:
# Assume command line arguments are encoded with stdin's encoding,
# and reverse it. Won't be needed in Python 3, but for now..
self.argv = [ x.decode(sys.stdin.encoding) for x in self.argv ]
except Exception: # pragma: no cover
pass
self.client = None
self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
self.subcmd = {}
@@ -126,6 +139,13 @@ class Cmdline(object):
sys.exit(-1)
def run(self):
# Set SIGPIPE to its default handler -- we don't need Python
# to catch it for us.
try:
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
except ValueError: # pragma: no cover
pass
# Clear cached timezone, so that we can pick up timezone changes
# while running this from the test suite.
datetime_tz._localtz = None

View File

@@ -1,5 +1,6 @@
from nilmdb.utils.printf import *
import nilmdb.client
import fnmatch
from argparse import ArgumentDefaultsHelpFormatter as def_form
@@ -10,25 +11,39 @@ def setup(self, sub):
Destroy the stream at the specified path.
The stream must be empty. All metadata
related to the stream is permanently deleted.
Wildcards and multiple paths are supported.
""")
cmd.set_defaults(handler = cmd_destroy)
group = cmd.add_argument_group("Options")
group.add_argument("-R", "--remove", action="store_true",
help="Remove all data before destroying stream")
group.add_argument("-q", "--quiet", action="store_true",
help="Don't display names when destroying "
"multiple paths")
group = cmd.add_argument_group("Required arguments")
group.add_argument("path",
help="Path of the stream to delete, e.g. /foo/bar",
group.add_argument("path", nargs='+',
help="Path of the stream to delete, e.g. /foo/bar/*",
).completer = self.complete.path
return cmd
def cmd_destroy(self):
"""Destroy stream"""
if self.args.remove:
streams = [ s[0] for s in self.client.stream_list() ]
paths = []
for path in self.args.path:
new = fnmatch.filter(streams, path)
if not new:
self.die("error: no stream matched path: %s", path)
paths.extend(new)
for path in paths:
if not self.args.quiet and len(paths) > 1:
printf("Destroying %s\n", path)
try:
count = self.client.stream_remove(self.args.path)
if self.args.remove:
count = self.client.stream_remove(path)
self.client.stream_destroy(path)
except nilmdb.client.ClientError as e:
self.die("error removing data: %s", str(e))
try:
self.client.stream_destroy(self.args.path)
except nilmdb.client.ClientError as e:
self.die("error destroying stream: %s", str(e))
self.die("error destroying stream: %s", str(e))

View File

@@ -1,6 +1,7 @@
from __future__ import print_function
from nilmdb.utils.printf import *
import nilmdb.client
import sys
def setup(self, sub):
cmd = sub.add_parser("extract", help="Extract data",
@@ -24,6 +25,8 @@ def setup(self, sub):
).completer = self.complete.time
group = cmd.add_argument_group("Output format")
group.add_argument("-B", "--binary", action="store_true",
help="Raw binary output")
group.add_argument("-b", "--bare", action="store_true",
help="Exclude timestamps from output lines")
group.add_argument("-a", "--annotate", action="store_true",
@@ -42,6 +45,11 @@ def cmd_extract_verify(self):
if self.args.start > self.args.end:
self.parser.error("start is after end")
if self.args.binary:
if (self.args.bare or self.args.annotate or self.args.markup or
self.args.timestamp_raw or self.args.count):
self.parser.error("--binary cannot be combined with other options")
def cmd_extract(self):
streams = self.client.stream_list(self.args.path)
if len(streams) != 1:
@@ -60,16 +68,23 @@ def cmd_extract(self):
printf("# end: %s\n", time_string(self.args.end))
printed = False
if self.args.binary:
printer = sys.stdout.write
else:
printer = print
bare = self.args.bare
count = self.args.count
for dataline in self.client.stream_extract(self.args.path,
self.args.start,
self.args.end,
self.args.count,
self.args.markup):
if self.args.bare and not self.args.count:
self.args.markup,
self.args.binary):
if bare and not count:
# Strip timestamp (first element). Doesn't make sense
# if we are only returning a count.
dataline = ' '.join(dataline.split(' ')[1:])
print(dataline)
printer(dataline)
printed = True
if not printed:
if self.args.annotate:

View File

@@ -21,5 +21,8 @@ def cmd_info(self):
printf("Server URL: %s\n", self.client.geturl())
dbinfo = self.client.dbinfo()
printf("Server database path: %s\n", dbinfo["path"])
printf("Server database size: %s\n", human_size(dbinfo["size"]))
printf("Server database free space: %s\n", human_size(dbinfo["free"]))
for (desc, field) in [("used by NilmDB", "size"),
("used by other", "other"),
("reserved", "reserved"),
("free", "free")]:
printf("Server disk space %s: %s\n", desc, human_size(dbinfo[field]))

View File

@@ -10,22 +10,16 @@ def setup(self, sub):
formatter_class = def_form,
description="""
List streams available in the database,
optionally filtering by layout or path. Wildcards
are accepted.
optionally filtering by path. Wildcards
are accepted; non-matching paths or wildcards
are ignored.
""")
cmd.set_defaults(verify = cmd_list_verify,
handler = cmd_list)
group = cmd.add_argument_group("Stream filtering")
group.add_argument("-p", "--path", metavar="PATH", default="*",
help="Match only this path (-p can be omitted)",
group.add_argument("path", metavar="PATH", default=["*"], nargs='*',
).completer = self.complete.path
group.add_argument("path_positional", default="*",
nargs="?", help=argparse.SUPPRESS,
).completer = self.complete.path
group.add_argument("-l", "--layout", default="*",
help="Match only this stream layout",
).completer = self.complete.layout
group = cmd.add_argument_group("Interval info")
group.add_argument("-E", "--ext", action="store_true",
@@ -49,20 +43,12 @@ def setup(self, sub):
group = cmd.add_argument_group("Misc options")
group.add_argument("-T", "--timestamp-raw", action="store_true",
help="Show raw timestamps when printing times")
group.add_argument("-l", "--layout", action="store_true",
help="Show layout type next to path name")
return cmd
def cmd_list_verify(self):
# A hidden "path_positional" argument lets the user leave off the
# "-p" when specifying the path. Handle it here.
got_opt = self.args.path != "*"
got_pos = self.args.path_positional != "*"
if got_pos:
if got_opt:
self.parser.error("too many paths specified")
else:
self.args.path = self.args.path_positional
if self.args.start is not None and self.args.end is not None:
if self.args.start >= self.args.end:
self.parser.error("start must precede end")
@@ -80,29 +66,33 @@ def cmd_list(self):
else:
time_string = nilmdb.utils.time.timestamp_to_human
for stream in streams:
(path, layout, int_min, int_max, rows, time) = stream[:6]
if not (fnmatch.fnmatch(path, self.args.path) and
fnmatch.fnmatch(layout, self.args.layout)):
continue
for argpath in self.args.path:
for stream in streams:
(path, layout, int_min, int_max, rows, time) = stream[:6]
if not fnmatch.fnmatch(path, argpath):
continue
printf("%s %s\n", path, layout)
if self.args.ext:
if int_min is None or int_max is None:
printf(" interval extents: (no data)\n")
if self.args.layout:
printf("%s %s\n", path, layout)
else:
printf(" interval extents: %s -> %s\n",
time_string(int_min), time_string(int_max))
printf(" total data: %d rows, %.6f seconds\n",
rows or 0,
nilmdb.utils.time.timestamp_to_seconds(time or 0))
printf("%s\n", path)
if self.args.detail:
printed = False
for (start, end) in self.client.stream_intervals(
path, self.args.start, self.args.end):
printf(" [ %s -> %s ]\n", time_string(start), time_string(end))
printed = True
if not printed:
printf(" (no intervals)\n")
if self.args.ext:
if int_min is None or int_max is None:
printf(" interval extents: (no data)\n")
else:
printf(" interval extents: %s -> %s\n",
time_string(int_min), time_string(int_max))
printf(" total data: %d rows, %.6f seconds\n",
rows or 0,
nilmdb.utils.time.timestamp_to_seconds(time or 0))
if self.args.detail:
printed = False
for (start, end) in self.client.stream_intervals(
path, self.args.start, self.args.end):
printf(" [ %s -> %s ]\n",
time_string(start), time_string(end))
printed = True
if not printed:
printf(" (no intervals)\n")

View File

@@ -41,10 +41,10 @@ def cmd_metadata(self):
if self.args.set is not None or self.args.update is not None:
# Either set, or update
if self.args.set is not None:
keyvals = self.args.set
keyvals = map(nilmdb.utils.unicode.decode, self.args.set)
handler = self.client.stream_set_metadata
else:
keyvals = self.args.update
keyvals = map(nilmdb.utils.unicode.decode, self.args.update)
handler = self.client.stream_update_metadata
# Extract key=value pairs
@@ -62,7 +62,9 @@ def cmd_metadata(self):
self.die("error setting/updating metadata: %s", str(e))
elif self.args.delete is not None:
# Delete (by setting values to empty strings)
keys = self.args.delete or None
keys = None
if self.args.delete:
keys = map(nilmdb.utils.unicode.decode, self.args.delete)
try:
data = self.client.stream_get_metadata(self.args.path, keys)
for key in data:
@@ -72,7 +74,9 @@ def cmd_metadata(self):
self.die("error deleting metadata: %s", str(e))
else:
# Get (or unspecified)
keys = self.args.get or None
keys = None
if self.args.get:
keys = map(nilmdb.utils.unicode.decode, self.args.get)
try:
data = self.client.stream_get_metadata(self.args.path, keys)
except nilmdb.client.ClientError as e:
@@ -81,4 +85,6 @@ def cmd_metadata(self):
# Print nonexistant keys as having empty value
if value is None:
value = ""
printf("%s=%s\n", key, value)
printf("%s=%s\n",
nilmdb.utils.unicode.encode(key),
nilmdb.utils.unicode.encode(value))

View File

@@ -1,17 +1,19 @@
from nilmdb.utils.printf import *
import nilmdb.client
import fnmatch
def setup(self, sub):
cmd = sub.add_parser("remove", help="Remove data",
description="""
Remove all data from a specified time range within a
stream.
stream. If multiple streams or wildcards are provided,
the same time range is removed from all streams.
""")
cmd.set_defaults(handler = cmd_remove)
group = cmd.add_argument_group("Data selection")
group.add_argument("path",
help="Path of stream, e.g. /foo/bar",
group.add_argument("path", nargs='+',
help="Path of stream, e.g. /foo/bar/*",
).completer = self.complete.path
group.add_argument("-s", "--start", required=True,
metavar="TIME", type=self.arg_time,
@@ -23,18 +25,31 @@ def setup(self, sub):
).completer = self.complete.time
group = cmd.add_argument_group("Output format")
group.add_argument("-q", "--quiet", action="store_true",
help="Don't display names when removing "
"from multiple paths")
group.add_argument("-c", "--count", action="store_true",
help="Output number of data points removed")
return cmd
def cmd_remove(self):
streams = [ s[0] for s in self.client.stream_list() ]
paths = []
for path in self.args.path:
new = fnmatch.filter(streams, path)
if not new:
self.die("error: no stream matched path: %s", path)
paths.extend(new)
try:
count = self.client.stream_remove(self.args.path,
self.args.start, self.args.end)
for path in paths:
if not self.args.quiet and len(paths) > 1:
printf("Removing from %s\n", path)
count = self.client.stream_remove(path,
self.args.start, self.args.end)
if self.args.count:
printf("%d\n", count);
except nilmdb.client.ClientError as e:
self.die("error removing data: %s", str(e))
if self.args.count:
printf("%d\n", count)
return 0

View File

@@ -19,8 +19,8 @@ from . import rocket
# Up to 256 open file descriptors at any given time.
# These variables are global so they can be used in the decorator arguments.
table_cache_size = 16
fd_cache_size = 16
table_cache_size = 32
fd_cache_size = 8
@nilmdb.utils.must_close(wrap_verify = False)
class BulkData(object):

View File

@@ -176,7 +176,7 @@ class NilmDB(object):
raise NilmDBError("start must precede end")
return (start, end)
@nilmdb.utils.lru_cache(size = 16)
@nilmdb.utils.lru_cache(size = 64)
def _get_intervals(self, stream_id):
"""
Return a mutable IntervalSet corresponding to the given stream ID.
@@ -675,6 +675,7 @@ class NilmDB(object):
# Count how many were removed
removed += row_end - row_start
remaining -= row_end - row_start
if restart is not None:
break

View File

@@ -5,6 +5,9 @@
#include <ctype.h>
#include <stdint.h>
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
/* Values missing from stdint.h */
#define UINT8_MIN 0
#define UINT16_MIN 0
@@ -19,16 +22,9 @@
typedef int64_t timestamp_t;
/* This code probably needs to be double-checked for the case where
sizeof(long) != 8, so enforce that here with something that will
fail at build time. We assume that the python integer type can
hold an int64_t. */
const static char __long_ok[1 - 2*!(sizeof(int64_t) ==
sizeof(long int))] = { 0 };
/* Somewhat arbitrary, just so we can use fixed sizes for strings
etc. */
static const int MAX_LAYOUT_COUNT = 128;
static const int MAX_LAYOUT_COUNT = 1024;
/* Error object and constants */
static PyObject *ParseError;
@@ -58,7 +54,7 @@ static PyObject *raise_str(int line, int col, int code, const char *string)
static PyObject *raise_int(int line, int col, int code, int64_t num)
{
PyObject *o;
o = Py_BuildValue("(iiil)", line, col, code, num);
o = Py_BuildValue("(iiiL)", line, col, code, (long long)num);
if (o != NULL) {
PyErr_SetObject(ParseError, o);
Py_DECREF(o);
@@ -249,11 +245,11 @@ static PyObject *Rocket_get_file_size(Rocket *self)
/****
* Append from string
*/
static inline long int strtol10(const char *nptr, char **endptr) {
return strtol(nptr, endptr, 10);
static inline long int strtoll10(const char *nptr, char **endptr) {
return strtoll(nptr, endptr, 10);
}
static inline long int strtoul10(const char *nptr, char **endptr) {
return strtoul(nptr, endptr, 10);
static inline long int strtoull10(const char *nptr, char **endptr) {
return strtoull(nptr, endptr, 10);
}
/* .append_string(count, data, offset, linenum, start, end, last_timestamp) */
@@ -264,6 +260,7 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
int offset;
const char *linestart;
int linenum;
long long ll1, ll2, ll3;
timestamp_t start;
timestamp_t end;
timestamp_t last_timestamp;
@@ -280,10 +277,13 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
but we need the null termination for strto*. If we had
strnto* that took a length, we could use t# and not require
a copy. */
if (!PyArg_ParseTuple(args, "isiilll:append_string", &count,
if (!PyArg_ParseTuple(args, "isiiLLL:append_string", &count,
&data, &offset, &linenum,
&start, &end, &last_timestamp))
&ll1, &ll2, &ll3))
return NULL;
start = ll1;
end = ll2;
last_timestamp = ll3;
/* Skip spaces, but don't skip over a newline. */
#define SKIP_BLANK(buf) do { \
@@ -372,14 +372,14 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
goto extra_data_on_line; \
break
CS(INT8, strtol10, t64.i, t8.i, t8.u, , 1);
CS(UINT8, strtoul10, t64.u, t8.u, t8.u, , 1);
CS(INT16, strtol10, t64.i, t16.i, t16.u, le16toh, 2);
CS(UINT16, strtoul10, t64.u, t16.u, t16.u, le16toh, 2);
CS(INT32, strtol10, t64.i, t32.i, t32.u, le32toh, 4);
CS(UINT32, strtoul10, t64.u, t32.u, t32.u, le32toh, 4);
CS(INT64, strtol10, t64.i, t64.i, t64.u, le64toh, 8);
CS(UINT64, strtoul10, t64.u, t64.u, t64.u, le64toh, 8);
CS(INT8, strtoll10, t64.i, t8.i, t8.u, , 1);
CS(UINT8, strtoull10, t64.u, t8.u, t8.u, , 1);
CS(INT16, strtoll10, t64.i, t16.i, t16.u, le16toh, 2);
CS(UINT16, strtoull10, t64.u, t16.u, t16.u, le16toh, 2);
CS(INT32, strtoll10, t64.i, t32.i, t32.u, le32toh, 4);
CS(UINT32, strtoull10, t64.u, t32.u, t32.u, le32toh, 4);
CS(INT64, strtoll10, t64.i, t64.i, t64.u, le64toh, 8);
CS(UINT64, strtoull10, t64.u, t64.u, t64.u, le64toh, 8);
CS(FLOAT32, strtod, t64.d, t32.f, t32.u, le32toh, 4);
CS(FLOAT64, strtod, t64.d, t64.d, t64.u, le64toh, 8);
#undef CS
@@ -397,7 +397,8 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
/* Build return value and return */
offset = buf - data;
PyObject *o;
o = Py_BuildValue("(iili)", written, offset, last_timestamp, linenum);
o = Py_BuildValue("(iiLi)", written, offset,
(long long)last_timestamp, linenum);
return o;
err:
PyErr_SetFromErrno(PyExc_OSError);
@@ -431,14 +432,18 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
int data_len;
int linenum;
int offset;
long long ll1, ll2, ll3;
timestamp_t start;
timestamp_t end;
timestamp_t last_timestamp;
if (!PyArg_ParseTuple(args, "it#iilll:append_binary",
if (!PyArg_ParseTuple(args, "it#iiLLL:append_binary",
&count, &data, &data_len, &offset,
&linenum, &start, &end, &last_timestamp))
&linenum, &ll1, &ll2, &ll3))
return NULL;
start = ll1;
end = ll2;
last_timestamp = ll3;
/* Advance to offset */
if (offset > data_len)
@@ -468,7 +473,7 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
}
/* Write binary data */
if (fwrite(data, data_len, 1, self->file) != 1) {
if (fwrite(data, self->binary_size, rows, self->file) != rows) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
@@ -476,8 +481,8 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
/* Build return value and return */
PyObject *o;
o = Py_BuildValue("(iili)", rows, offset + rows * self->binary_size,
last_timestamp, linenum);
o = Py_BuildValue("(iiLi)", rows, offset + rows * self->binary_size,
(long long)last_timestamp, linenum);
return o;
}
@@ -534,7 +539,7 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
if (fread(&t64.u, 8, 1, self->file) != 1)
goto err;
t64.u = le64toh(t64.u);
ret = sprintf(&str[len], "%ld", t64.i);
ret = sprintf(&str[len], "%" PRId64, t64.i);
if (ret <= 0)
goto err;
len += ret;
@@ -556,14 +561,14 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
len += ret; \
} \
break
CASE(INT8, "%hhd", t8.i, t8.u, , 1);
CASE(UINT8, "%hhu", t8.u, t8.u, , 1);
CASE(INT16, "%hd", t16.i, t16.u, le16toh, 2);
CASE(UINT16, "%hu", t16.u, t16.u, le16toh, 2);
CASE(INT32, "%d", t32.i, t32.u, le32toh, 4);
CASE(UINT32, "%u", t32.u, t32.u, le32toh, 4);
CASE(INT64, "%ld", t64.i, t64.u, le64toh, 8);
CASE(UINT64, "%lu", t64.u, t64.u, le64toh, 8);
CASE(INT8, "%" PRId8, t8.i, t8.u, , 1);
CASE(UINT8, "%" PRIu8, t8.u, t8.u, , 1);
CASE(INT16, "%" PRId16, t16.i, t16.u, le16toh, 2);
CASE(UINT16, "%" PRIu16, t16.u, t16.u, le16toh, 2);
CASE(INT32, "%" PRId32, t32.i, t32.u, le32toh, 4);
CASE(UINT32, "%" PRIu32, t32.u, t32.u, le32toh, 4);
CASE(INT64, "%" PRId64, t64.i, t64.u, le64toh, 8);
CASE(UINT64, "%" PRIu64, t64.u, t64.u, le64toh, 8);
/* These next two are a bit debatable. floats
are 6-9 significant figures, so we print 7.
Doubles are 15-19, so we print 17. This is
@@ -653,7 +658,7 @@ static PyObject *Rocket_extract_timestamp(Rocket *self, PyObject *args)
/* Convert and return */
t64.u = le64toh(t64.u);
return Py_BuildValue("l", t64.i);
return Py_BuildValue("L", (long long)t64.i);
}
/****

View File

@@ -17,126 +17,26 @@ import decorator
import psutil
import traceback
from nilmdb.server.serverutil import (
chunked_response,
response_type,
workaround_cp_bug_1200,
exception_to_httperror,
CORS_allow,
json_to_request_params,
json_error_page,
cherrypy_start,
cherrypy_stop,
bool_param,
)
# Add CORS_allow tool
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
class NilmApp(object):
def __init__(self, db):
self.db = db
# Decorators
def chunked_response(func):
"""Decorator to enable chunked responses."""
# Set this to False to get better tracebacks from some requests
# (/stream/extract, /stream/intervals).
func._cp_config = { 'response.stream': True }
return func
def response_type(content_type):
"""Return a decorator-generating function that sets the
response type to the specified string."""
def wrapper(func, *args, **kwargs):
cherrypy.response.headers['Content-Type'] = content_type
return func(*args, **kwargs)
return decorator.decorator(wrapper)
@decorator.decorator
def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
"""Decorator to work around CherryPy bug #1200 in a response
generator.
Even if chunked responses are disabled, LookupError or
UnicodeError exceptions may still be swallowed by CherryPy due to
bug #1200. This throws them as generic Exceptions instead so that
they make it through.
"""
exc_info = None
try:
for val in func(*args, **kwargs):
yield val
except (LookupError, UnicodeError):
# Re-raise it, but maintain the original traceback
exc_info = sys.exc_info()
new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1]))
raise new_exc, None, exc_info[2]
finally:
del exc_info
def exception_to_httperror(*expected):
"""Return a decorator-generating function that catches expected
errors and throws a HTTPError describing it instead.
@exception_to_httperror(NilmDBError, ValueError)
def foo():
pass
"""
def wrapper(func, *args, **kwargs):
exc_info = None
try:
return func(*args, **kwargs)
except expected:
# Re-raise it, but maintain the original traceback
exc_info = sys.exc_info()
new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1]))
raise new_exc, None, exc_info[2]
finally:
del exc_info
# We need to preserve the function's argspecs for CherryPy to
# handle argument errors correctly. Decorator.decorator takes
# care of that.
return decorator.decorator(wrapper)
# Custom CherryPy tools
def CORS_allow(methods):
"""This does several things:
Handles CORS preflight requests.
Adds Allow: header to all requests.
Raise 405 if request.method not in method.
It is similar to cherrypy.tools.allow, with the CORS stuff added.
"""
request = cherrypy.request.headers
response = cherrypy.response.headers
if not isinstance(methods, (tuple, list)): # pragma: no cover
methods = [ methods ]
methods = [ m.upper() for m in methods if m ]
if not methods: # pragma: no cover
methods = [ 'GET', 'HEAD' ]
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
methods.append('HEAD')
response['Allow'] = ', '.join(methods)
# Allow all origins
if 'Origin' in request:
response['Access-Control-Allow-Origin'] = request['Origin']
# If it's a CORS request, send response.
request_method = request.get("Access-Control-Request-Method", None)
request_headers = request.get("Access-Control-Request-Headers", None)
if (cherrypy.request.method == "OPTIONS" and
request_method and request_headers):
response['Access-Control-Allow-Headers'] = request_headers
response['Access-Control-Allow-Methods'] = ', '.join(methods)
# Try to stop further processing and return a 200 OK
cherrypy.response.status = "200 OK"
cherrypy.response.body = ""
cherrypy.request.handler = lambda: ""
return
# Reject methods that were not explicitly allowed
if cherrypy.request.method not in methods:
raise cherrypy.HTTPError(405)
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
# Helper for json_in tool to process JSON data into normal request
# parameters.
def json_to_request_params(body):
cherrypy.lib.jsontools.json_processor(body)
if not isinstance(cherrypy.request.json, dict):
raise cherrypy.HTTPError(415)
cherrypy.request.params.update(cherrypy.request.json)
# CherryPy apps
class Root(NilmApp):
"""Root application for NILM database"""
@@ -147,7 +47,10 @@ class Root(NilmApp):
# /
@cherrypy.expose
def index(self):
raise cherrypy.NotFound()
cherrypy.response.headers['Content-Type'] = 'text/plain'
msg = sprintf("This is NilmDB version %s, running on host %s.\n",
nilmdb.__version__, socket.getfqdn())
return msg
# /favicon.ico
@cherrypy.expose
@@ -167,9 +70,13 @@ class Root(NilmApp):
"""Return a dictionary with the database path,
size of the database in bytes, and free disk space in bytes"""
path = self.db.get_basepath()
usage = psutil.disk_usage(path)
dbsize = nilmdb.utils.du(path)
return { "path": path,
"size": nilmdb.utils.du(path),
"free": psutil.disk_usage(path).free }
"size": dbsize,
"other": usage.used - dbsize,
"reserved": usage.total - usage.used - usage.free,
"free": usage.free }
class Stream(NilmApp):
"""Stream-specific operations"""
@@ -199,10 +106,10 @@ class Stream(NilmApp):
layout parameter, just list streams that match the given path
or layout.
If extent is not given, returns a list of lists containing
the path and layout: [ path, layout ]
If extended is missing or zero, returns a list of lists
containing the path and layout: [ path, layout ]
If extended is provided, returns a list of lists containing
If extended is true, returns a list of lists containing
extended info: [ path, layout, extent_min, extent_max,
total_rows, total_seconds ]. More data may be added.
"""
@@ -315,6 +222,8 @@ class Stream(NilmApp):
little-endian and matches the database types (including an
int64 timestamp).
"""
binary = bool_param(binary)
# Important that we always read the input before throwing any
# errors, to keep lengths happy for persistent connections.
# Note that CherryPy 3.2.2 has a bug where this fails for GET
@@ -347,24 +256,34 @@ class Stream(NilmApp):
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
@chunked_response
@response_type("application/x-json-stream")
def remove(self, path, start = None, end = None):
"""
Remove data from the backend database. Removes all data in
the interval [start, end). Returns the number of data points
removed.
the interval [start, end).
Returns the number of data points removed. Since this is a potentially
long-running operation, multiple numbers may be returned as the
data gets removed from the backend database. The total number of
points removed is the sum of all of these numbers.
"""
(start, end) = self._get_times(start, end)
total_removed = 0
while True:
(removed, restart) = self.db.stream_remove(path, start, end)
total_removed += removed
if restart is None:
break
start = restart
return total_removed
if len(self.db.stream_list(path = path)) != 1:
raise cherrypy.HTTPError("404", "No such stream: " + path)
@workaround_cp_bug_1200
def content(start, end):
# Note: disable chunked responses to see tracebacks from here.
while True:
(removed, restart) = self.db.stream_remove(path, start, end)
yield json.dumps(removed) + "\r\n"
if restart is None:
break
start = restart
return content(start, end)
# /stream/intervals?path=/newton/prep
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
@@ -429,6 +348,10 @@ class Stream(NilmApp):
little-endian and matches the database types (including an
int64 timestamp).
"""
binary = bool_param(binary)
markup = bool_param(markup)
count = bool_param(count)
(start, end) = self._get_times(start, end)
# Check path and get layout
@@ -556,70 +479,14 @@ class Server(object):
def json_error_page(self, status, message, traceback, version):
"""Return a custom error page in JSON so the client can parse it"""
errordata = { "status" : status,
"message" : message,
"traceback" : traceback }
# Don't send a traceback if the error was 400-499 (client's fault)
try:
code = int(status.split()[0])
if not self.force_traceback:
if code >= 400 and code <= 499:
errordata["traceback"] = ""
except Exception: # pragma: no cover
pass
# Override the response type, which was previously set to text/html
cherrypy.serving.response.headers['Content-Type'] = (
"application/json;charset=utf-8" )
# Undo the HTML escaping that cherrypy's get_error_page function applies
# (cherrypy issue 1135)
for k, v in errordata.iteritems():
v = v.replace("&lt;","<")
v = v.replace("&gt;",">")
v = v.replace("&amp;","&")
errordata[k] = v
return json.dumps(errordata, separators=(',',':'))
return json_error_page(status, message, traceback, version,
self.force_traceback)
def start(self, blocking = False, event = None):
if not self.embedded: # pragma: no cover
# Handle signals nicely
if hasattr(cherrypy.engine, "signal_handler"):
cherrypy.engine.signal_handler.subscribe()
if hasattr(cherrypy.engine, "console_control_handler"):
cherrypy.engine.console_control_handler.subscribe()
# Cherrypy stupidly calls os._exit(70) when it can't bind the
# port. At least try to print a reasonable error and continue
# in this case, rather than just dying silently (as we would
# otherwise do in embedded mode)
real_exit = os._exit
def fake_exit(code): # pragma: no cover
if code == os.EX_SOFTWARE:
fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
else:
real_exit(code)
os._exit = fake_exit
cherrypy.engine.start()
os._exit = real_exit
# Signal that the engine has started successfully
if event is not None:
event.set()
if blocking:
try:
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
interval = 0.1, channel = 'main')
except (KeyboardInterrupt, IOError): # pragma: no cover
cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
cherrypy.engine.exit()
except SystemExit: # pragma: no cover
cherrypy.engine.log('SystemExit raised: shutting down bus')
cherrypy.engine.exit()
raise
cherrypy_start(blocking, event, self.embedded)
def stop(self):
cherrypy.engine.exit()
cherrypy_stop()
# Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
# instance since the database can only be opened once. For this to

214
nilmdb/server/serverutil.py Normal file
View File

@@ -0,0 +1,214 @@
"""Miscellaneous decorators and other helpers for running a CherryPy
server"""
import cherrypy
import sys
import os
import decorator
import simplejson as json
# Helper to parse parameters into booleans
def bool_param(s):
"""Return a bool indicating whether parameter 's' was True or False,
supporting a few different types for 's'."""
try:
ss = s.lower()
if ss in [ "0", "false", "f", "no", "n" ]:
return False
if ss in [ "1", "true", "t", "yes", "y" ]:
return True
except Exception:
return bool(s)
raise cherrypy.HTTPError("400 Bad Request",
"can't parse parameter: " + ss)
# Decorators
def chunked_response(func):
"""Decorator to enable chunked responses."""
# Set this to False to get better tracebacks from some requests
# (/stream/extract, /stream/intervals).
func._cp_config = { 'response.stream': True }
return func
def response_type(content_type):
"""Return a decorator-generating function that sets the
response type to the specified string."""
def wrapper(func, *args, **kwargs):
cherrypy.response.headers['Content-Type'] = content_type
return func(*args, **kwargs)
return decorator.decorator(wrapper)
@decorator.decorator
def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
"""Decorator to work around CherryPy bug #1200 in a response
generator.
Even if chunked responses are disabled, LookupError or
UnicodeError exceptions may still be swallowed by CherryPy due to
bug #1200. This throws them as generic Exceptions instead so that
they make it through.
"""
exc_info = None
try:
for val in func(*args, **kwargs):
yield val
except (LookupError, UnicodeError):
# Re-raise it, but maintain the original traceback
exc_info = sys.exc_info()
new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1]))
raise new_exc, None, exc_info[2]
finally:
del exc_info
def exception_to_httperror(*expected):
"""Return a decorator-generating function that catches expected
errors and throws a HTTPError describing it instead.
@exception_to_httperror(NilmDBError, ValueError)
def foo():
pass
"""
def wrapper(func, *args, **kwargs):
exc_info = None
try:
return func(*args, **kwargs)
except expected:
# Re-raise it, but maintain the original traceback
exc_info = sys.exc_info()
new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1]))
raise new_exc, None, exc_info[2]
finally:
del exc_info
# We need to preserve the function's argspecs for CherryPy to
# handle argument errors correctly. Decorator.decorator takes
# care of that.
return decorator.decorator(wrapper)
# Custom CherryPy tools
def CORS_allow(methods):
"""This does several things:
Handles CORS preflight requests.
Adds Allow: header to all requests.
Raise 405 if request.method not in method.
It is similar to cherrypy.tools.allow, with the CORS stuff added.
Add this to CherryPy with:
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
"""
request = cherrypy.request.headers
response = cherrypy.response.headers
if not isinstance(methods, (tuple, list)): # pragma: no cover
methods = [ methods ]
methods = [ m.upper() for m in methods if m ]
if not methods: # pragma: no cover
methods = [ 'GET', 'HEAD' ]
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
methods.append('HEAD')
response['Allow'] = ', '.join(methods)
# Allow all origins
if 'Origin' in request:
response['Access-Control-Allow-Origin'] = request['Origin']
# If it's a CORS request, send response.
request_method = request.get("Access-Control-Request-Method", None)
request_headers = request.get("Access-Control-Request-Headers", None)
if (cherrypy.request.method == "OPTIONS" and
request_method and request_headers):
response['Access-Control-Allow-Headers'] = request_headers
response['Access-Control-Allow-Methods'] = ', '.join(methods)
# Try to stop further processing and return a 200 OK
cherrypy.response.status = "200 OK"
cherrypy.response.body = ""
cherrypy.request.handler = lambda: ""
return
# Reject methods that were not explicitly allowed
if cherrypy.request.method not in methods:
raise cherrypy.HTTPError(405)
# Helper for json_in tool to process JSON data into normal request
# parameters.
def json_to_request_params(body):
cherrypy.lib.jsontools.json_processor(body)
if not isinstance(cherrypy.request.json, dict):
raise cherrypy.HTTPError(415)
cherrypy.request.params.update(cherrypy.request.json)
# Used as an "error_page.default" handler
def json_error_page(status, message, traceback, version,
force_traceback = False):
"""Return a custom error page in JSON so the client can parse it"""
errordata = { "status" : status,
"message" : message,
"traceback" : traceback }
# Don't send a traceback if the error was 400-499 (client's fault)
try:
code = int(status.split()[0])
if not force_traceback:
if code >= 400 and code <= 499:
errordata["traceback"] = ""
except Exception: # pragma: no cover
pass
# Override the response type, which was previously set to text/html
cherrypy.serving.response.headers['Content-Type'] = (
"application/json;charset=utf-8" )
# Undo the HTML escaping that cherrypy's get_error_page function applies
# (cherrypy issue 1135)
for k, v in errordata.iteritems():
v = v.replace("&lt;","<")
v = v.replace("&gt;",">")
v = v.replace("&amp;","&")
errordata[k] = v
return json.dumps(errordata, separators=(',',':'))
# Start/stop CherryPy standalone server
def cherrypy_start(blocking = False, event = False, embedded = False):
"""Start the CherryPy server, handling errors and signals
somewhat gracefully."""
if not embedded: # pragma: no cover
# Handle signals nicely
if hasattr(cherrypy.engine, "signal_handler"):
cherrypy.engine.signal_handler.subscribe()
if hasattr(cherrypy.engine, "console_control_handler"):
cherrypy.engine.console_control_handler.subscribe()
# Cherrypy stupidly calls os._exit(70) when it can't bind the
# port. At least try to print a reasonable error and continue
# in this case, rather than just dying silently (as we would
# otherwise do in embedded mode)
real_exit = os._exit
def fake_exit(code): # pragma: no cover
if code == os.EX_SOFTWARE:
fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
else:
real_exit(code)
os._exit = fake_exit
cherrypy.engine.start()
os._exit = real_exit
# Signal that the engine has started successfully
if event is not None:
event.set()
if blocking:
try:
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
interval = 0.1, channel = 'main')
except (KeyboardInterrupt, IOError): # pragma: no cover
cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
cherrypy.engine.exit()
except SystemExit: # pragma: no cover
cherrypy.engine.log('SystemExit raised: shutting down bus')
cherrypy.engine.exit()
raise
# Stop CherryPy server
def cherrypy_stop():
cherrypy.engine.exit()

View File

@@ -13,3 +13,5 @@ import nilmdb.utils.time
import nilmdb.utils.iterator
import nilmdb.utils.interval
import nilmdb.utils.lock
import nilmdb.utils.sort
import nilmdb.utils.unicode

View File

@@ -1,4 +1,5 @@
import os
import errno
from math import log
def human_size(num):
@@ -16,10 +17,17 @@ def human_size(num):
return '1 byte'
def du(path):
"""Like du -sb, returns total size of path in bytes."""
size = os.path.getsize(path)
if os.path.isdir(path):
for thisfile in os.listdir(path):
filepath = os.path.join(path, thisfile)
size += du(filepath)
return size
"""Like du -sb, returns total size of path in bytes. Ignore
errors that might occur if we encounter broken symlinks or
files in the process of being removed."""
try:
size = os.path.getsize(path)
if os.path.isdir(path):
for thisfile in os.listdir(path):
filepath = os.path.join(path, thisfile)
size += du(filepath)
return size
except OSError as e: # pragma: no cover
if e.errno != errno.ENOENT:
raise
return 0

View File

@@ -1,5 +1,6 @@
"""Interval. Like nilmdb.server.interval, but re-implemented here
in plain Python so clients have easier access to it.
in plain Python so clients have easier access to it, and with a few
helper functions.
Intervals are half-open, ie. they include data points with timestamps
[start, end)
@@ -34,6 +35,10 @@ class Interval:
return ("[" + nilmdb.utils.time.timestamp_to_string(self.start) +
" -> " + nilmdb.utils.time.timestamp_to_string(self.end) + ")")
def human_string(self):
return ("[ " + nilmdb.utils.time.timestamp_to_human(self.start) +
" -> " + nilmdb.utils.time.timestamp_to_human(self.end) + " ]")
def __cmp__(self, other):
"""Compare two intervals. If non-equal, order by start then end"""
return cmp(self.start, other.start) or cmp(self.end, other.end)
@@ -53,18 +58,11 @@ class Interval:
raise IntervalError("not a subset")
return Interval(start, end)
def set_difference(a, b):
"""
Compute the difference (a \\ b) between the intervals in 'a' and
the intervals in 'b'; i.e., the ranges that are present in 'self'
but not 'other'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
def _interval_math_helper(a, b, op, subset = True):
"""Helper for set_difference, intersection functions,
to compute interval subsets based on a math operator on ranges
present in A and B. Subsets are computed from A, or new intervals
are generated if subset = False."""
# Iterate through all starts and ends in sorted order. Add a
# tag to the iterator so that we can figure out which one they
# were, after sorting.
@@ -79,28 +77,71 @@ def set_difference(a, b):
# At each point, evaluate which type of end it is, to determine
# how to build up the output intervals.
a_interval = None
b_interval = None
in_a = False
in_b = False
out_start = None
for (ts, k, i) in nilmdb.utils.iterator.imerge(a_iter, b_iter):
if k == 0:
# start a interval
a_interval = i
if b_interval is None:
out_start = ts
in_a = True
elif k == 1:
# start b interval
b_interval = i
if out_start is not None and out_start != ts:
yield a_interval.subset(out_start, ts)
out_start = None
in_b = True
elif k == 2:
# end a interval
if out_start is not None and out_start != ts:
yield a_interval.subset(out_start, ts)
out_start = None
a_interval = None
in_a = False
elif k == 3:
# end b interval
b_interval = None
if a_interval:
out_start = ts
in_b = False
include = op(in_a, in_b)
if include and out_start is None:
out_start = ts
elif not include:
if out_start is not None and out_start != ts:
if subset:
yield a_interval.subset(out_start, ts)
else:
yield Interval(out_start, ts)
out_start = None
def set_difference(a, b):
"""
Compute the difference (a \\ b) between the intervals in 'a' and
the intervals in 'b'; i.e., the ranges that are present in 'self'
but not 'other'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
return _interval_math_helper(a, b, (lambda a, b: a and not b))
def intersection(a, b):
"""
Compute the intersection between the intervals in 'a' and the
intervals in 'b'; i.e., the ranges that are present in both 'a'
and 'b'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
return _interval_math_helper(a, b, (lambda a, b: a and b))
def optimize(it):
"""
Given an iterable 'it' with intervals, optimize them by joining
together intervals that are adjacent in time, and return a generator
that yields the new intervals.
"""
saved_int = None
for interval in it:
if saved_int is not None:
if saved_int.end == interval.start:
interval.start = saved_int.start
else:
yield saved_int
saved_int = interval
if saved_int is not None:
yield saved_int

View File

@@ -91,6 +91,20 @@ def serializer_proxy(obj_or_type):
r = SerializerCallProxy(self.__call_queue, attr, self)
return r
# For an interable object, on __iter__(), save the object's
# iterator and return this proxy. On next(), call the object's
# iterator through this proxy.
def __iter__(self):
attr = getattr(self.__object, "__iter__")
self.__iter = SerializerCallProxy(self.__call_queue, attr, self)()
return self
def next(self):
return SerializerCallProxy(self.__call_queue,
self.__iter.next, self)()
def __getitem__(self, key):
return self.__getattr__("__getitem__")(key)
def __call__(self, *args, **kwargs):
"""Call this to instantiate the type, if a type was passed
to serializer_proxy. Otherwise, pass the call through."""

18
nilmdb/utils/sort.py Normal file
View File

@@ -0,0 +1,18 @@
import re
def sort_human(items, key = None):
"""Human-friendly sort (/stream/2 before /stream/10)"""
def to_num(val):
try:
return int(val)
except Exception:
return val
def human_key(text):
if key:
text = key(text)
# Break into character and numeric chunks.
chunks = re.split(r'([0-9]+)', text)
return [ to_num(c) for c in chunks ]
return sorted(items, key = human_key)

View File

@@ -6,7 +6,7 @@ import time
# Range
min_timestamp = (-2**63)
max_timestamp = (2**62 - 1)
max_timestamp = (2**63 - 1)
# Smallest representable step
epsilon = 1
@@ -32,6 +32,10 @@ def timestamp_to_human(timestamp):
"""Convert a timestamp (integer microseconds since epoch) to a
human-readable string, using the local timezone for display
(e.g. from the TZ env var)."""
if timestamp == min_timestamp:
return "(minimum)"
if timestamp == max_timestamp:
return "(maximum)"
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_unix(timestamp))
return dt.strftime("%a, %d %b %Y %H:%M:%S.%f %z")
@@ -56,7 +60,7 @@ def rate_to_period(hz, cycles = 1):
def parse_time(toparse):
"""
Parse a free-form time string and return a nilmdb timestamp
(integer seconds since epoch). If the string doesn't contain a
(integer microseconds since epoch). If the string doesn't contain a
timestamp, the current local timezone is assumed (e.g. from the TZ
env var).
"""

29
nilmdb/utils/unicode.py Normal file
View File

@@ -0,0 +1,29 @@
import sys
if sys.version_info[0] >= 3: # pragma: no cover (future Python3 compat)
text_type = str
else:
text_type = unicode
def encode(u):
"""Try to encode something from Unicode to a string using the
default encoding. If it fails, try encoding as UTF-8."""
if not isinstance(u, text_type):
return u
try:
return u.encode()
except UnicodeEncodeError:
return u.encode("utf-8")
def decode(s):
"""Try to decode someting from string to Unicode using the
default encoding. If it fails, try decoding as UTF-8."""
if isinstance(s, text_type):
return s
try:
return s.decode()
except UnicodeDecodeError:
try:
return s.decode("utf-8")
except UnicodeDecodeError:
return s # best we can do

8
tests/data/timestamped Normal file
View File

@@ -0,0 +1,8 @@
-10000000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
-100000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
-100000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
-1000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1000000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03

View File

@@ -105,16 +105,19 @@ class TestClient(object):
client.http.post("/stream/list")
client = nilmdb.client.Client(url = testurl)
# Create three streams
# Create four streams
client.stream_create("/newton/prep", "float32_8")
client.stream_create("/newton/raw", "uint16_6")
client.stream_create("/newton/zzz/rawnotch", "uint16_9")
client.stream_create("/newton/zzz/rawnotch2", "uint16_9")
client.stream_create("/newton/zzz/rawnotch11", "uint16_9")
# Verify we got 3 streams
# Verify we got 4 streams in the right order
eq_(client.stream_list(), [ ["/newton/prep", "float32_8"],
["/newton/raw", "uint16_6"],
["/newton/zzz/rawnotch", "uint16_9"]
["/newton/zzz/rawnotch2", "uint16_9"],
["/newton/zzz/rawnotch11", "uint16_9"]
])
# Match just one type or one path
eq_(client.stream_list(layout="uint16_6"),
[ ["/newton/raw", "uint16_6"] ])
@@ -327,6 +330,10 @@ class TestClient(object):
2525.169921875, 8350.83984375, 3724.699951171875,
1355.3399658203125, 2039.0))
# Just get some coverage
with assert_raises(ClientError) as e:
client.http.post("/stream/remove", { "path": "/none" })
client.close()
def test_client_06_generators(self):
@@ -347,10 +354,6 @@ class TestClient(object):
with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl.example.com./").next()
# Trigger a curl error in generator
with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl.example.com./").next()
# Check 404 for missing streams
for function in [ client.stream_intervals, client.stream_extract ]:
with assert_raises(ClientError) as e:
@@ -389,27 +392,38 @@ class TestClient(object):
headers())
# Extract
x = http.get("stream/extract",
{ "path": "/newton/prep",
"start": "123",
"end": "124" })
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124" })
if "transfer-encoding: chunked" not in headers():
warnings.warn("Non-chunked HTTP response for /stream/extract")
if "content-type: text/plain;charset=utf-8" not in headers():
raise AssertionError("/stream/extract is not text/plain:\n" +
headers())
x = http.get("stream/extract",
{ "path": "/newton/prep",
"start": "123",
"end": "124",
"binary": "1" })
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124",
"binary": "1" })
if "transfer-encoding: chunked" not in headers():
warnings.warn("Non-chunked HTTP response for /stream/extract")
if "content-type: application/octet-stream" not in headers():
raise AssertionError("/stream/extract is not binary:\n" +
headers())
# Make sure a binary of "0" is really off
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124",
"binary": "0" })
if "content-type: application/octet-stream" in headers():
raise AssertionError("/stream/extract is not text:\n" +
headers())
# Invalid parameters
with assert_raises(ClientError) as e:
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124",
"binary": "asdfasfd" })
in_("can't parse parameter", str(e.exception))
client.close()
def test_client_08_unicode(self):
@@ -613,8 +627,12 @@ class TestClient(object):
with client.stream_insert_context("/empty/test", end = 950):
pass
# Equal start and end is OK as long as there's no data
with client.stream_insert_context("/empty/test", start=9, end=9):
pass
# Try various things that might cause problems
with client.stream_insert_context("/empty/test", 1000, 1050):
with client.stream_insert_context("/empty/test", 1000, 1050) as ctx:
ctx.finalize() # inserts [1000, 1050]
ctx.finalize() # nothing
ctx.finalize() # nothing

View File

@@ -88,7 +88,7 @@ class TestCmdline(object):
sys.exit(0)
except SystemExit as e:
exitcode = e.code
captured = outfile.getvalue()
captured = nilmdb.utils.unicode.decode(outfile.getvalue())
self.captured = captured
self.exitcode = exitcode
@@ -245,8 +245,10 @@ class TestCmdline(object):
self.contain("Client version: " + nilmdb.__version__)
self.contain("Server version: " + test_server.version)
self.contain("Server database path")
self.contain("Server database size")
self.contain("Server database free space")
self.contain("Server disk space used by NilmDB")
self.contain("Server disk space used by other")
self.contain("Server disk space reserved")
self.contain("Server disk space free")
def test_04_createlist(self):
# Basic stream tests, like those in test_client.
@@ -300,38 +302,19 @@ class TestCmdline(object):
# Verify we got those 3 streams and they're returned in
# alphabetical order.
self.ok("list")
self.ok("list -l")
self.match("/newton/prep float32_8\n"
"/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n")
# Match just one type or one path. Also check
# that --path is optional
self.ok("list --path /newton/raw")
self.match("/newton/raw uint16_6\n")
self.ok("list /newton/raw")
self.match("/newton/raw uint16_6\n")
self.fail("list -p /newton/raw /newton/raw")
self.contain("too many paths")
self.ok("list --layout uint16_6")
self.ok("list --layout /newton/raw")
self.match("/newton/raw uint16_6\n")
# Wildcard matches
self.ok("list --layout uint16*")
self.match("/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n")
self.ok("list --path *zzz* --layout uint16*")
self.match("/newton/zzz/rawnotch uint16_9\n")
self.ok("list *zzz* --layout uint16*")
self.match("/newton/zzz/rawnotch uint16_9\n")
self.ok("list --path *zzz* --layout float32*")
self.match("")
self.ok("list *zzz*")
self.match("/newton/zzz/rawnotch\n")
# reversed range
self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01")
@@ -492,33 +475,40 @@ class TestCmdline(object):
# bad start time
self.fail("insert -t -r 120 --start 'whatever' /newton/prep /dev/null")
# Test negative times
self.ok("insert --start @-10000000000 --end @1000000001 /newton/prep"
" tests/data/timestamped")
self.ok("extract -c /newton/prep --start min --end @1000000001")
self.match("8\n")
self.ok("remove /newton/prep --start min --end @1000000001")
def test_07_detail_extended(self):
# Just count the number of lines, it's probably fine
self.ok("list --detail")
lines_(self.captured, 8)
self.ok("list --detail --path *prep")
self.ok("list --detail *prep")
lines_(self.captured, 4)
self.ok("list --detail --path *prep --start='23 Mar 2012 10:02'")
self.ok("list --detail *prep --start='23 Mar 2012 10:02'")
lines_(self.captured, 3)
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05'")
self.ok("list --detail *prep --start='23 Mar 2012 10:05'")
lines_(self.captured, 2)
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15'")
self.ok("list --detail *prep --start='23 Mar 2012 10:05:15'")
lines_(self.captured, 2)
self.contain("10:05:15.000")
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'")
self.ok("list --detail *prep --start='23 Mar 2012 10:05:15.50'")
lines_(self.captured, 2)
self.contain("10:05:15.500")
self.ok("list --detail --path *prep --start='23 Mar 2012 19:05:15.50'")
self.ok("list --detail *prep --start='23 Mar 2012 19:05:15.50'")
lines_(self.captured, 2)
self.contain("no intervals")
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'"
self.ok("list --detail *prep --start='23 Mar 2012 10:05:15.50'"
+ " --end='23 Mar 2012 10:05:15.51'")
lines_(self.captured, 2)
self.contain("10:05:15.500")
@@ -527,15 +517,15 @@ class TestCmdline(object):
lines_(self.captured, 8)
# Verify the "raw timestamp" output
self.ok("list --detail --path *prep --timestamp-raw "
self.ok("list --detail *prep --timestamp-raw "
"--start='23 Mar 2012 10:05:15.50'")
lines_(self.captured, 2)
self.contain("[ 1332497115500000 -> 1332497160000000 ]")
# bad time
self.fail("list --detail --path *prep -T --start='9332497115.612'")
self.fail("list --detail *prep -T --start='9332497115.612'")
# good time
self.ok("list --detail --path *prep -T --start='1332497115.612'")
self.ok("list --detail *prep -T --start='1332497115.612'")
lines_(self.captured, 2)
self.contain("[ 1332497115612000 -> 1332497160000000 ]")
@@ -615,11 +605,19 @@ class TestCmdline(object):
test(8, "10:01:59.9", "10:02:00.1", extra="-m")
# all data put in by tests
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
self.ok("extract -a /newton/prep --start min --end max")
lines_(self.captured, 43204)
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("43200\n")
# test binary mode
self.fail("extract -c -B /newton/prep -s min -e max")
self.contain("binary cannot be combined")
self.fail("extract -m -B /newton/prep -s min -e max")
self.contain("binary cannot be combined")
self.ok("extract -B /newton/prep -s min -e max")
eq_(len(self.captured), 43200 * (8 + 8*4))
# markup for 3 intervals, plus extra markup lines whenever we had
# a "restart" from the nilmdb.stream_extract function
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
@@ -639,7 +637,7 @@ class TestCmdline(object):
# Try nonexistent stream
self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01")
self.contain("No stream at path")
self.contain("no stream matched path")
# empty or backward ranges return errors
self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01")
@@ -667,9 +665,14 @@ class TestCmdline(object):
"--start '23 Mar 2022 20:00:30' " +
"--end '23 Mar 2022 20:00:31'")
self.match("0\n")
self.ok("remove -c /newton/prep /newton/pre* " +
"--start '23 Mar 2022 20:00:30' " +
"--end '23 Mar 2022 20:00:31'")
self.match("Removing from /newton/prep\n0\n" +
"Removing from /newton/prep\n0\n")
# Make sure we have the data we expect
self.ok("list --detail /newton/prep")
self.ok("list -l --detail /newton/prep")
self.match("/newton/prep float32_8\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
@@ -704,7 +707,7 @@ class TestCmdline(object):
self.match("24000\n")
# See the missing chunks in list output
self.ok("list --detail /newton/prep")
self.ok("list --layout --detail /newton/prep")
self.match("/newton/prep float32_8\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n"
@@ -718,7 +721,7 @@ class TestCmdline(object):
# Remove all data, verify it's missing
self.ok("remove /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("") # no count requested this time
self.ok("list --detail /newton/prep")
self.ok("list -l --detail /newton/prep")
self.match("/newton/prep float32_8\n" +
" (no intervals)\n")
@@ -736,16 +739,16 @@ class TestCmdline(object):
self.contain("too few arguments")
self.fail("destroy /no/such/stream")
self.contain("No stream at path")
self.contain("no stream matched path")
self.fail("destroy -R /no/such/stream")
self.contain("No stream at path")
self.contain("no stream matched path")
self.fail("destroy asdfasdf")
self.contain("No stream at path")
self.contain("no stream matched path")
# From previous tests, we have:
self.ok("list")
self.ok("list -l")
self.match("/newton/prep float32_8\n"
"/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n")
@@ -761,13 +764,13 @@ class TestCmdline(object):
lines_(self.captured, 7)
# Destroy for real
self.ok("destroy -R /newton/prep")
self.ok("list")
self.ok("destroy -R /n*/prep")
self.ok("list -l")
self.match("/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n")
self.ok("destroy /newton/zzz/rawnotch")
self.ok("list")
self.ok("list -l")
self.match("/newton/raw uint16_6\n")
self.ok("destroy /newton/raw")
@@ -786,18 +789,17 @@ class TestCmdline(object):
self.ok("list")
self.contain(path)
# Make sure it was created empty
self.ok("list --detail --path " + path)
self.ok("list --detail " + path)
self.contain("(no intervals)")
def test_12_unicode(self):
# Unicode paths.
self.ok("destroy /newton/asdf/qwer")
self.ok("destroy /newton/prep")
self.ok("destroy /newton/raw")
self.ok("destroy /newton/prep /newton/raw")
self.ok("destroy /newton/zzz")
self.ok(u"create /düsseldorf/raw uint16_6")
self.ok("list --detail")
self.ok("list -l --detail")
self.contain(u"/düsseldorf/raw uint16_6")
self.contain("(no intervals)")
@@ -883,7 +885,7 @@ class TestCmdline(object):
du_before = nilmdb.utils.diskusage.du(testdb)
# Make sure we have the data we expect
self.ok("list --detail")
self.ok("list -l --detail")
self.match("/newton/prep float32_8\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
@@ -919,7 +921,7 @@ class TestCmdline(object):
self.match("3600\n")
# See the missing chunks in list output
self.ok("list --detail")
self.ok("list -l --detail")
self.match("/newton/prep float32_8\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n"
@@ -1043,7 +1045,7 @@ class TestCmdline(object):
else:
raise AssertionError("data not found at " + seek)
# Verify "list" output
self.ok("list")
self.ok("list -l")
self.match("/" + "/".join(components) + " float32_8\n")
# Lots of renames

View File

@@ -59,6 +59,14 @@ class TestInterval:
self.test_interval_intersect()
Interval = NilmdbInterval
# Other helpers in nilmdb.utils.interval
i = [ UtilsInterval(1,2), UtilsInterval(2,3), UtilsInterval(4,5) ]
eq_(list(nilmdb.utils.interval.optimize(i)),
[ UtilsInterval(1,3), UtilsInterval(4,5) ])
eq_(UtilsInterval(1234567890123456, 1234567890654321).human_string(),
"[ Fri, 13 Feb 2009 18:31:30.123456 -0500 -> " +
"Fri, 13 Feb 2009 18:31:30.654321 -0500 ]")
def test_interval(self):
# Test Interval class
os.environ['TZ'] = "America/New_York"
@@ -226,13 +234,16 @@ class TestInterval:
x = makeset("[--)") & 1234
def do_test(a, b, c, d):
# a & b == c
# a & b == c (using nilmdb.server.interval)
ab = IntervalSet()
for x in b:
for i in (a & x):
ab += i
eq_(ab,c)
# a & b == c (using nilmdb.utils.interval)
eq_(IntervalSet(nilmdb.utils.interval.intersection(a,b)), c)
# a \ b == d
eq_(IntervalSet(nilmdb.utils.interval.set_difference(a,b)), d)
@@ -302,6 +313,17 @@ class TestInterval:
eq_(nilmdb.utils.interval.set_difference(
a.intersection(list(c)[0]), b.intersection(list(c)[0])), d)
# Fill out test coverage for non-subsets
def diff2(a,b, subset):
return nilmdb.utils.interval._interval_math_helper(
a, b, (lambda a, b: b and not a), subset=subset)
with assert_raises(nilmdb.utils.interval.IntervalError):
list(diff2(a,b,True))
list(diff2(a,b,False))
# Empty second set
eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)
# Empty second set
eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)

View File

@@ -157,11 +157,14 @@ class TestServer(object):
def test_server(self):
# Make sure we can't force an exit, and test other 404 errors
for url in [ "/exit", "/", "/favicon.ico" ]:
for url in [ "/exit", "/favicon.ico" ]:
with assert_raises(HTTPError) as e:
geturl(url)
eq_(e.exception.code, 404)
# Root page
in_("This is NilmDB", geturl("/"))
# Check version
eq_(distutils.version.LooseVersion(getjson("/version")),
distutils.version.LooseVersion(nilmdb.__version__))

View File

@@ -28,7 +28,10 @@ def setup_module():
recursive_unlink(testdb)
# Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
testdb, bulkdata_args = { "file_size" : 16384,
"files_per_dir" : 3 } )
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
port = 32180, stoppable = False,
fast_shutdown = True,
@@ -130,6 +133,15 @@ class TestNumpyClient(object):
[4, 5]]]))
in_("wrong number of dimensions", str(e.exception))
# Wrong number of fields
with assert_raises(ValueError) as e:
client.stream_insert_numpy("/test/1",
np.array([[0, 1, 2],
[1, 2, 3],
[3, 4, 5],
[4, 5, 6]]))
in_("wrong number of fields", str(e.exception))
# Unstructured
client.stream_create("/test/2", "float32_8")
client.stream_insert_numpy(
@@ -170,6 +182,17 @@ class TestNumpyClient(object):
assert(np.array_equal(a,b))
assert(np.array_equal(a,c))
# Make sure none of the files are greater than 16384 bytes as
# we configured with the bulkdata_args above.
datapath = os.path.join(testdb, "data")
for (dirpath, dirnames, filenames) in os.walk(datapath):
for f in filenames:
fn = os.path.join(dirpath, f)
size = os.path.getsize(fn)
if size > 16384:
raise AssertionError(sprintf("%s is too big: %d > %d\n",
fn, size, 16384))
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
client.close()
@@ -286,8 +309,25 @@ class TestNumpyClient(object):
with client.stream_insert_numpy_context("/empty/test", end = 950):
pass
# Equal start and end is OK as long as there's no data
with assert_raises(ClientError) as e:
with client.stream_insert_numpy_context("/empty/test",
start=9, end=9) as ctx:
ctx.insert([[9, 9]])
ctx.finalize()
in_("have data to send, but invalid start/end times", str(e.exception))
with client.stream_insert_numpy_context("/empty/test",
start=9, end=9) as ctx:
pass
# reusing a context object is bad
with assert_raises(Exception) as e:
ctx.insert([[9, 9]])
# Try various things that might cause problems
with client.stream_insert_numpy_context("/empty/test", 1000, 1050):
with client.stream_insert_numpy_context("/empty/test",
1000, 1050) as ctx:
ctx.finalize() # inserts [1000, 1050]
ctx.finalize() # nothing
ctx.finalize() # nothing

View File

@@ -62,6 +62,28 @@ class Base(object):
eq_(self.foo.val, 20)
eq_(self.foo.init_thread, self.foo.test_thread)
class ListLike(object):
def __init__(self):
self.thread = threading.current_thread().name
self.foo = 0
def __iter__(self):
eq_(threading.current_thread().name, self.thread)
self.foo = 0
return self
def __getitem__(self, key):
eq_(threading.current_thread().name, self.thread)
return key
def next(self):
eq_(threading.current_thread().name, self.thread)
if self.foo < 5:
self.foo += 1
return self.foo
else:
raise StopIteration
class TestUnserialized(Base):
def setUp(self):
self.foo = Foo()
@@ -84,3 +106,10 @@ class TestSerializer(Base):
sp(sp(Foo("x"))).t()
sp(sp(Foo)("x")).t()
sp(sp(Foo))("x").t()
def test_iter(self):
sp = nilmdb.utils.serializer_proxy
i = sp(ListLike)()
print iter(i)
eq_(list(i), [1,2,3,4,5])
eq_(i[3], 3)