Compare commits
76 Commits
nilmdb-1.5
...
python2
Author | SHA1 | Date | |
---|---|---|---|
8125d9c840 | |||
ba55ad82f0 | |||
45c81d2019 | |||
78cfda32e3 | |||
3658d3876b | |||
022b50950f | |||
e5efbadc8e | |||
74f633c9da | |||
ab9a327130 | |||
da72fc9777 | |||
a01cb4132d | |||
7c3da2fe44 | |||
f0e06dc436 | |||
ddc0eb4264 | |||
0a22db3965 | |||
8bb8f068de | |||
416902097d | |||
f5276e9fc8 | |||
c47f28f93a | |||
63b5f99b90 | |||
7d7b89b52f | |||
8d249273c6 | |||
abe431c663 | |||
ccf1f695af | |||
06f7390c9e | |||
6de77a08f1 | |||
8db9771c20 | |||
04f815a24b | |||
6868f5f126 | |||
ca0943ec19 | |||
68addb4e4a | |||
68c33b1f14 | |||
8dd8741100 | |||
8e6341ae5d | |||
422b1e2df2 | |||
0f745b3047 | |||
71cd7ed9b7 | |||
a79d6104d5 | |||
8e8ec59e30 | |||
b89b945a0f | |||
bd7bdb2eb8 | |||
840cd2fd13 | |||
bbd59c8b50 | |||
405c110fd7 | |||
274adcd856 | |||
a1850c9c2c | |||
6cd28b67b1 | |||
d6d215d53d | |||
e02143ddb2 | |||
e275384d03 | |||
a6a67ec15c | |||
fc43107307 | |||
90633413bb | |||
c7c3aff0fb | |||
e2347c954e | |||
222a5c6c53 | |||
1ca2c143e5 | |||
b5df575c79 | |||
2768a5ad15 | |||
a105543c38 | |||
309f38d0ed | |||
9a27b6ef6a | |||
99532cf9e0 | |||
dfdd0e5c74 | |||
9a2699adfc | |||
9bbb95b18b | |||
6bbed322c5 | |||
2317894355 | |||
539c92226c | |||
77c766d85d | |||
49d04db1d6 | |||
ea838d05ae | |||
f2a48bdb2a | |||
6d14e0b8aa | |||
b31b9327b9 | |||
b98ff1331a |
@@ -7,4 +7,4 @@
|
||||
exclude_lines =
|
||||
pragma: no cover
|
||||
if 0:
|
||||
omit = nilmdb/utils/datetime_tz*,nilmdb/scripts,nilmdb/_version.py
|
||||
omit = nilmdb/utils/datetime_tz*,nilmdb/scripts,nilmdb/_version.py,nilmdb/fsck
|
||||
|
@@ -8,7 +8,8 @@ Prerequisites:
|
||||
|
||||
# Base NilmDB dependencies
|
||||
sudo apt-get install python-cherrypy3 python-decorator python-simplejson
|
||||
sudo apt-get install python-requests python-dateutil python-tz python-psutil
|
||||
sudo apt-get install python-requests python-dateutil python-tz
|
||||
sudo apt-get install python-progressbar python-psutil
|
||||
|
||||
# Other dependencies (required by some modules)
|
||||
sudo apt-get install python-numpy
|
||||
@@ -26,6 +27,7 @@ Install:
|
||||
Usage:
|
||||
|
||||
nilmdb-server --help
|
||||
nilmdb-fsck --help
|
||||
nilmtool --help
|
||||
|
||||
See docs/wsgi.md for info on setting up a WSGI application in Apache.
|
||||
|
@@ -421,3 +421,20 @@ and has all of the same functions. It adds three new functions:
|
||||
It is significantly faster! It is about 20 times faster to decimate a
|
||||
stream with `nilm-decimate` when the filter code is using the new
|
||||
binary/numpy interface.
|
||||
|
||||
|
||||
WSGI interface & chunked requests
|
||||
---------------------------------
|
||||
|
||||
mod_wsgi requires "WSGIChunkedRequest On" to handle
|
||||
"Transfer-encoding: Chunked" requests. However, `/stream/insert`
|
||||
doesn't handle this correctly right now, because:
|
||||
|
||||
- The `cherrpy.request.body.read()` call needs to be fixed for chunked requests
|
||||
|
||||
- We don't want to just buffer endlessly in the server, and it will
|
||||
require some thought on how to handle data in chunks (what to do about
|
||||
interval endpoints).
|
||||
|
||||
It is probably better to just keep the endpoint management on the client
|
||||
side, so leave "WSGIChunkedRequest off" for now.
|
||||
|
@@ -19,12 +19,12 @@ Then, set up Apache with a configuration like:
|
||||
|
||||
<VirtualHost>
|
||||
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
|
||||
WSGIApplicationGroup nilmdb-appgroup
|
||||
WSGIProcessGroup nilmdb-procgroup
|
||||
WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm
|
||||
<Location /nilmdb>
|
||||
WSGIProcessGroup nilmdb-procgroup
|
||||
WSGIApplicationGroup nilmdb-appgroup
|
||||
|
||||
# Access control example:
|
||||
<Location /nilmdb>
|
||||
Order deny,allow
|
||||
Deny from all
|
||||
Allow from 1.2.3.4
|
||||
|
50
extras/fix-oversize-files.py
Normal file
50
extras/fix-oversize-files.py
Normal file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import os
|
||||
import sys
|
||||
import cPickle as pickle
|
||||
import argparse
|
||||
import fcntl
|
||||
import re
|
||||
from nilmdb.client.numpyclient import layout_to_dtype
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description = """
|
||||
Fix database corruption where binary writes caused too much data to be
|
||||
written to the file. Truncates files to the correct length. This was
|
||||
fixed by b98ff1331a515ad47fd3203615e835b529b039f9.
|
||||
""")
|
||||
parser.add_argument("path", action="store", help='Database root path')
|
||||
parser.add_argument("-y", "--yes", action="store_true", help='Fix them')
|
||||
args = parser.parse_args()
|
||||
|
||||
lock = os.path.join(args.path, "data.lock")
|
||||
with open(lock, "w") as f:
|
||||
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
||||
fix = {}
|
||||
|
||||
for (path, dirs, files) in os.walk(args.path):
|
||||
if "_format" in files:
|
||||
with open(os.path.join(path, "_format")) as format:
|
||||
fmt = pickle.load(format)
|
||||
rowsize = layout_to_dtype(fmt["layout"]).itemsize
|
||||
maxsize = rowsize * fmt["rows_per_file"]
|
||||
fix[path] = maxsize
|
||||
if maxsize < 128000000: # sanity check
|
||||
raise Exception("bad maxsize " + str(maxsize))
|
||||
|
||||
for fixpath in fix:
|
||||
for (path, dirs, files) in os.walk(fixpath):
|
||||
for fn in files:
|
||||
if not re.match("^[0-9a-f]{4,}$", fn):
|
||||
continue
|
||||
fn = os.path.join(path, fn)
|
||||
size = os.path.getsize(fn)
|
||||
maxsize = fix[fixpath]
|
||||
if size > maxsize:
|
||||
diff = size - maxsize
|
||||
print diff, "too big:", fn
|
||||
if args.yes:
|
||||
with open(fn, "a+") as dbfile:
|
||||
dbfile.truncate(maxsize)
|
@@ -58,6 +58,11 @@ class Client(object):
|
||||
return self.http.get("dbinfo")
|
||||
|
||||
def stream_list(self, path = None, layout = None, extended = False):
|
||||
"""Return a sorted list of [path, layout] lists. If 'path' or
|
||||
'layout' are specified, only return streams that match those
|
||||
exact values. If 'extended' is True, the returned lists have
|
||||
extended info, e.g.: [path, layout, extent_min, extent_max,
|
||||
total_rows, total_seconds."""
|
||||
params = {}
|
||||
if path is not None:
|
||||
params["path"] = path
|
||||
@@ -69,6 +74,7 @@ class Client(object):
|
||||
return nilmdb.utils.sort.sort_human(streams, key = lambda s: s[0])
|
||||
|
||||
def stream_get_metadata(self, path, keys = None):
|
||||
"""Get stream metadata"""
|
||||
params = { "path": path }
|
||||
if keys is not None:
|
||||
params["key"] = keys
|
||||
@@ -144,6 +150,7 @@ class Client(object):
|
||||
ctx = StreamInserter(self, path, start, end)
|
||||
yield ctx
|
||||
ctx.finalize()
|
||||
ctx.destroy()
|
||||
|
||||
def stream_insert(self, path, data, start = None, end = None):
|
||||
"""Insert rows of data into a stream. data should be a string
|
||||
@@ -293,6 +300,15 @@ class StreamInserter(object):
|
||||
self._block_data = []
|
||||
self._block_len = 0
|
||||
|
||||
self.destroyed = False
|
||||
|
||||
def destroy(self):
|
||||
"""Ensure this object can't be used again without raising
|
||||
an error"""
|
||||
def error(*args, **kwargs):
|
||||
raise Exception("don't reuse this context object")
|
||||
self._send_block = self.insert = self.finalize = self.send = error
|
||||
|
||||
def insert(self, data):
|
||||
"""Insert a chunk of ASCII formatted data in string form. The
|
||||
overall data must consist of lines terminated by '\\n'."""
|
||||
@@ -439,7 +455,7 @@ class StreamInserter(object):
|
||||
self._interval_start = end_ts
|
||||
|
||||
# Double check endpoints
|
||||
if start_ts is None or end_ts is None:
|
||||
if (start_ts is None or end_ts is None) or (start_ts == end_ts):
|
||||
# If the block has no non-comment lines, it's OK
|
||||
try:
|
||||
self._get_first_noncomment(block)
|
||||
|
@@ -9,7 +9,7 @@ import requests
|
||||
|
||||
class HTTPClient(object):
|
||||
"""Class to manage and perform HTTP requests from the client"""
|
||||
def __init__(self, baseurl = "", post_json = False):
|
||||
def __init__(self, baseurl = "", post_json = False, verify_ssl = True):
|
||||
"""If baseurl is supplied, all other functions that take
|
||||
a URL can be given a relative URL instead."""
|
||||
# Verify / clean up URL
|
||||
@@ -18,9 +18,8 @@ class HTTPClient(object):
|
||||
reparsed = urlparse.urlparse("http://" + baseurl).geturl()
|
||||
self.baseurl = reparsed.rstrip('/') + '/'
|
||||
|
||||
# Build Requests session object, enable SSL verification
|
||||
self.session = requests.Session()
|
||||
self.session.verify = True
|
||||
# Note whether we want SSL verification
|
||||
self.verify_ssl = verify_ssl
|
||||
|
||||
# Saved response, so that tests can verify a few things.
|
||||
self._last_response = {}
|
||||
@@ -58,16 +57,34 @@ class HTTPClient(object):
|
||||
raise Error(**args)
|
||||
|
||||
def close(self):
|
||||
self.session.close()
|
||||
pass
|
||||
|
||||
def _do_req(self, method, url, query_data, body_data, stream, headers):
|
||||
url = urlparse.urljoin(self.baseurl, url)
|
||||
try:
|
||||
response = self.session.request(method, url,
|
||||
# Create a new session, ensure we send "Connection: close",
|
||||
# and explicitly close connection after the transfer.
|
||||
# This is to avoid HTTP/1.1 persistent connections
|
||||
# (keepalive), because they have fundamental race
|
||||
# conditions when there are delays between requests:
|
||||
# a new request may be sent at the same instant that the
|
||||
# server decides to timeout the connection.
|
||||
session = requests.Session()
|
||||
if headers is None:
|
||||
headers = {}
|
||||
headers["Connection"] = "close"
|
||||
response = session.request(method, url,
|
||||
params = query_data,
|
||||
data = body_data,
|
||||
stream = stream,
|
||||
headers = headers)
|
||||
headers = headers,
|
||||
verify = self.verify_ssl)
|
||||
|
||||
# Close the connection. If it's a generator (stream =
|
||||
# True), the requests library shouldn't actually close the
|
||||
# HTTP connection until all data has been read from the
|
||||
# response.
|
||||
session.close()
|
||||
except requests.RequestException as e:
|
||||
raise ServerError(status = "502 Error", url = url,
|
||||
message = str(e.message))
|
||||
@@ -123,14 +140,36 @@ class HTTPClient(object):
|
||||
"""
|
||||
(response, isjson) = self._do_req(method, url, query, body,
|
||||
stream = True, headers = headers)
|
||||
|
||||
# Like the iter_lines function in Requests, but only splits on
|
||||
# the specified line ending.
|
||||
def lines(source, ending):
|
||||
pending = None
|
||||
for chunk in source:
|
||||
if pending is not None:
|
||||
chunk = pending + chunk
|
||||
tmp = chunk.split(ending)
|
||||
lines = tmp[:-1]
|
||||
if chunk.endswith(ending):
|
||||
pending = None
|
||||
else:
|
||||
pending = tmp[-1]
|
||||
for line in lines:
|
||||
yield line
|
||||
if pending is not None: # pragma: no cover (missing newline)
|
||||
yield pending
|
||||
|
||||
# Yield the chunks or lines as requested
|
||||
if binary:
|
||||
for chunk in response.iter_content(chunk_size = 65536):
|
||||
yield chunk
|
||||
elif isjson:
|
||||
for line in response.iter_lines():
|
||||
for line in lines(response.iter_content(chunk_size = 1),
|
||||
ending = '\r\n'):
|
||||
yield json.loads(line)
|
||||
else:
|
||||
for line in response.iter_lines():
|
||||
for line in lines(response.iter_content(chunk_size = 65536),
|
||||
ending = '\n'):
|
||||
yield line
|
||||
|
||||
def get_gen(self, url, params = None, binary = False):
|
||||
|
@@ -98,6 +98,7 @@ class NumpyClient(nilmdb.client.client.Client):
|
||||
ctx = StreamInserterNumpy(self, path, start, end, dtype)
|
||||
yield ctx
|
||||
ctx.finalize()
|
||||
ctx.destroy()
|
||||
|
||||
def stream_insert_numpy(self, path, data, start = None, end = None,
|
||||
layout = None):
|
||||
@@ -133,16 +134,8 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
|
||||
contiguous interval and may be None. 'dtype' is the Numpy
|
||||
dtype for this stream.
|
||||
"""
|
||||
self.last_response = None
|
||||
|
||||
super(StreamInserterNumpy, self).__init__(client, path, start, end)
|
||||
self._dtype = dtype
|
||||
self._client = client
|
||||
self._path = path
|
||||
|
||||
# Start and end for the overall contiguous interval we're
|
||||
# filling
|
||||
self._interval_start = start
|
||||
self._interval_end = end
|
||||
|
||||
# Max rows to send at once
|
||||
self._max_rows = self._max_data // self._dtype.itemsize
|
||||
@@ -250,9 +243,12 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
|
||||
# Next block continues where this one ended
|
||||
self._interval_start = end_ts
|
||||
|
||||
# If we have no endpoints, it's because we had no data to send.
|
||||
if start_ts is None or end_ts is None:
|
||||
# If we have no endpoints, or equal endpoints, it's OK as long
|
||||
# as there's no data to send
|
||||
if (start_ts is None or end_ts is None) or (start_ts == end_ts):
|
||||
if len(array) == 0:
|
||||
return
|
||||
raise ClientError("have data to send, but invalid start/end times")
|
||||
|
||||
# Send it
|
||||
data = array.tostring()
|
||||
|
@@ -19,9 +19,8 @@ except ImportError: # pragma: no cover
|
||||
|
||||
# Valid subcommands. Defined in separate files just to break
|
||||
# things up -- they're still called with Cmdline as self.
|
||||
subcommands = [ "help", "info", "create", "list", "metadata",
|
||||
"insert", "extract", "remove", "destroy",
|
||||
"intervals", "rename" ]
|
||||
subcommands = [ "help", "info", "create", "rename", "list", "intervals",
|
||||
"metadata", "insert", "extract", "remove", "destroy" ]
|
||||
|
||||
# Import the subcommand modules
|
||||
subcmd_mods = {}
|
||||
@@ -29,6 +28,14 @@ for cmd in subcommands:
|
||||
subcmd_mods[cmd] = __import__("nilmdb.cmdline." + cmd, fromlist = [ cmd ])
|
||||
|
||||
class JimArgumentParser(argparse.ArgumentParser):
|
||||
def parse_args(self, args=None, namespace=None):
|
||||
# Look for --version anywhere and change it to just "nilmtool
|
||||
# --version". This makes "nilmtool cmd --version" work, which
|
||||
# is needed by help2man.
|
||||
if "--version" in (args or sys.argv[1:]):
|
||||
args = [ "--version" ]
|
||||
return argparse.ArgumentParser.parse_args(self, args, namespace)
|
||||
|
||||
def error(self, message):
|
||||
self.print_usage(sys.stderr)
|
||||
self.exit(2, sprintf("error: %s\n", message))
|
||||
@@ -72,10 +79,16 @@ class Complete(object): # pragma: no cover
|
||||
path = parsed_args.path
|
||||
if not path:
|
||||
return []
|
||||
return ( self.escape(k + '=' + v)
|
||||
for (k,v) in client.stream_get_metadata(path).iteritems()
|
||||
if k.startswith(prefix) )
|
||||
|
||||
results = []
|
||||
# prefix comes in as UTF-8, but results need to be Unicode,
|
||||
# weird. Still doesn't work in all cases, but that's bugs in
|
||||
# argcomplete.
|
||||
prefix = nilmdb.utils.unicode.decode(prefix)
|
||||
for (k,v) in client.stream_get_metadata(path).iteritems():
|
||||
kv = self.escape(k + '=' + v)
|
||||
if kv.startswith(prefix):
|
||||
results.append(kv)
|
||||
return results
|
||||
|
||||
class Cmdline(object):
|
||||
|
||||
@@ -108,7 +121,7 @@ class Cmdline(object):
|
||||
group = self.parser.add_argument_group("General options")
|
||||
group.add_argument("-h", "--help", action='help',
|
||||
help='show this help message and exit')
|
||||
group.add_argument("-V", "--version", action="version",
|
||||
group.add_argument("-v", "--version", action="version",
|
||||
version = nilmdb.__version__)
|
||||
|
||||
group = self.parser.add_argument_group("Server")
|
||||
|
@@ -1,6 +1,7 @@
|
||||
from __future__ import print_function
|
||||
from nilmdb.utils.printf import *
|
||||
import nilmdb.client
|
||||
import sys
|
||||
|
||||
def setup(self, sub):
|
||||
cmd = sub.add_parser("extract", help="Extract data",
|
||||
@@ -24,6 +25,8 @@ def setup(self, sub):
|
||||
).completer = self.complete.time
|
||||
|
||||
group = cmd.add_argument_group("Output format")
|
||||
group.add_argument("-B", "--binary", action="store_true",
|
||||
help="Raw binary output")
|
||||
group.add_argument("-b", "--bare", action="store_true",
|
||||
help="Exclude timestamps from output lines")
|
||||
group.add_argument("-a", "--annotate", action="store_true",
|
||||
@@ -42,6 +45,11 @@ def cmd_extract_verify(self):
|
||||
if self.args.start > self.args.end:
|
||||
self.parser.error("start is after end")
|
||||
|
||||
if self.args.binary:
|
||||
if (self.args.bare or self.args.annotate or self.args.markup or
|
||||
self.args.timestamp_raw or self.args.count):
|
||||
self.parser.error("--binary cannot be combined with other options")
|
||||
|
||||
def cmd_extract(self):
|
||||
streams = self.client.stream_list(self.args.path)
|
||||
if len(streams) != 1:
|
||||
@@ -60,16 +68,23 @@ def cmd_extract(self):
|
||||
printf("# end: %s\n", time_string(self.args.end))
|
||||
|
||||
printed = False
|
||||
if self.args.binary:
|
||||
printer = sys.stdout.write
|
||||
else:
|
||||
printer = print
|
||||
bare = self.args.bare
|
||||
count = self.args.count
|
||||
for dataline in self.client.stream_extract(self.args.path,
|
||||
self.args.start,
|
||||
self.args.end,
|
||||
self.args.count,
|
||||
self.args.markup):
|
||||
if self.args.bare and not self.args.count:
|
||||
self.args.markup,
|
||||
self.args.binary):
|
||||
if bare and not count:
|
||||
# Strip timestamp (first element). Doesn't make sense
|
||||
# if we are only returning a count.
|
||||
dataline = ' '.join(dataline.split(' ')[1:])
|
||||
print(dataline)
|
||||
printer(dataline)
|
||||
printed = True
|
||||
if not printed:
|
||||
if self.args.annotate:
|
||||
|
@@ -21,5 +21,8 @@ def cmd_info(self):
|
||||
printf("Server URL: %s\n", self.client.geturl())
|
||||
dbinfo = self.client.dbinfo()
|
||||
printf("Server database path: %s\n", dbinfo["path"])
|
||||
printf("Server database size: %s\n", human_size(dbinfo["size"]))
|
||||
printf("Server database free space: %s\n", human_size(dbinfo["free"]))
|
||||
for (desc, field) in [("used by NilmDB", "size"),
|
||||
("used by other", "other"),
|
||||
("reserved", "reserved"),
|
||||
("free", "free")]:
|
||||
printf("Server disk space %s: %s\n", desc, human_size(dbinfo[field]))
|
||||
|
@@ -1,5 +1,6 @@
|
||||
from nilmdb.utils.printf import *
|
||||
import nilmdb.utils.time
|
||||
from nilmdb.utils.interval import Interval
|
||||
|
||||
import fnmatch
|
||||
import argparse
|
||||
@@ -42,6 +43,8 @@ def setup(self, sub):
|
||||
group = cmd.add_argument_group("Misc options")
|
||||
group.add_argument("-T", "--timestamp-raw", action="store_true",
|
||||
help="Show raw timestamps when printing times")
|
||||
group.add_argument("-o", "--optimize", action="store_true",
|
||||
help="Optimize (merge adjacent) intervals")
|
||||
|
||||
return cmd
|
||||
|
||||
@@ -58,9 +61,16 @@ def cmd_intervals(self):
|
||||
time_string = nilmdb.utils.time.timestamp_to_human
|
||||
|
||||
try:
|
||||
for (start, end) in self.client.stream_intervals(
|
||||
self.args.path, self.args.start, self.args.end, self.args.diff):
|
||||
printf("[ %s -> %s ]\n", time_string(start), time_string(end))
|
||||
intervals = ( Interval(start, end) for (start, end) in
|
||||
self.client.stream_intervals(self.args.path,
|
||||
self.args.start,
|
||||
self.args.end,
|
||||
self.args.diff) )
|
||||
if self.args.optimize:
|
||||
intervals = nilmdb.utils.interval.optimize(intervals)
|
||||
for i in intervals:
|
||||
printf("[ %s -> %s ]\n", time_string(i.start), time_string(i.end))
|
||||
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("error listing intervals: %s", str(e))
|
||||
|
||||
|
@@ -45,6 +45,8 @@ def setup(self, sub):
|
||||
help="Show raw timestamps when printing times")
|
||||
group.add_argument("-l", "--layout", action="store_true",
|
||||
help="Show layout type next to path name")
|
||||
group.add_argument("-n", "--no-decim", action="store_true",
|
||||
help="Skip paths containing \"~decim-\"")
|
||||
|
||||
return cmd
|
||||
|
||||
@@ -71,6 +73,8 @@ def cmd_list(self):
|
||||
(path, layout, int_min, int_max, rows, time) = stream[:6]
|
||||
if not fnmatch.fnmatch(path, argpath):
|
||||
continue
|
||||
if self.args.no_decim and "~decim-" in path:
|
||||
continue
|
||||
|
||||
if self.args.layout:
|
||||
printf("%s %s\n", path, layout)
|
||||
|
@@ -41,10 +41,10 @@ def cmd_metadata(self):
|
||||
if self.args.set is not None or self.args.update is not None:
|
||||
# Either set, or update
|
||||
if self.args.set is not None:
|
||||
keyvals = self.args.set
|
||||
keyvals = map(nilmdb.utils.unicode.decode, self.args.set)
|
||||
handler = self.client.stream_set_metadata
|
||||
else:
|
||||
keyvals = self.args.update
|
||||
keyvals = map(nilmdb.utils.unicode.decode, self.args.update)
|
||||
handler = self.client.stream_update_metadata
|
||||
|
||||
# Extract key=value pairs
|
||||
@@ -62,7 +62,9 @@ def cmd_metadata(self):
|
||||
self.die("error setting/updating metadata: %s", str(e))
|
||||
elif self.args.delete is not None:
|
||||
# Delete (by setting values to empty strings)
|
||||
keys = self.args.delete or None
|
||||
keys = None
|
||||
if self.args.delete:
|
||||
keys = map(nilmdb.utils.unicode.decode, self.args.delete)
|
||||
try:
|
||||
data = self.client.stream_get_metadata(self.args.path, keys)
|
||||
for key in data:
|
||||
@@ -72,7 +74,9 @@ def cmd_metadata(self):
|
||||
self.die("error deleting metadata: %s", str(e))
|
||||
else:
|
||||
# Get (or unspecified)
|
||||
keys = self.args.get or None
|
||||
keys = None
|
||||
if self.args.get:
|
||||
keys = map(nilmdb.utils.unicode.decode, self.args.get)
|
||||
try:
|
||||
data = self.client.stream_get_metadata(self.args.path, keys)
|
||||
except nilmdb.client.ClientError as e:
|
||||
@@ -81,4 +85,6 @@ def cmd_metadata(self):
|
||||
# Print nonexistant keys as having empty value
|
||||
if value is None:
|
||||
value = ""
|
||||
printf("%s=%s\n", key, value)
|
||||
printf("%s=%s\n",
|
||||
nilmdb.utils.unicode.encode(key),
|
||||
nilmdb.utils.unicode.encode(value))
|
||||
|
5
nilmdb/fsck/__init__.py
Normal file
5
nilmdb/fsck/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""nilmdb.fsck"""
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from nilmdb.fsck.fsck import Fsck
|
464
nilmdb/fsck/fsck.py
Normal file
464
nilmdb/fsck/fsck.py
Normal file
@@ -0,0 +1,464 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Check database consistency, with some ability to fix problems.
|
||||
This should be able to fix cases where a database gets corrupted due
|
||||
to unexpected system shutdown, and detect other cases that may cause
|
||||
NilmDB to return errors when trying to manipulate the database."""
|
||||
|
||||
import nilmdb.utils
|
||||
import nilmdb.server
|
||||
import nilmdb.client.numpyclient
|
||||
from nilmdb.utils.interval import IntervalError
|
||||
from nilmdb.server.interval import Interval, IntervalSet
|
||||
from nilmdb.utils.printf import *
|
||||
from nilmdb.utils.time import timestamp_to_string
|
||||
|
||||
from collections import defaultdict
|
||||
import sqlite3
|
||||
import os
|
||||
import sys
|
||||
import progressbar
|
||||
import re
|
||||
import time
|
||||
import shutil
|
||||
import cPickle as pickle
|
||||
import numpy
|
||||
|
||||
class FsckError(Exception):
|
||||
def __init__(self, msg = "", *args):
|
||||
if args:
|
||||
msg = sprintf(msg, *args)
|
||||
Exception.__init__(self, msg)
|
||||
class FixableFsckError(FsckError):
|
||||
def __init__(self, msg = "", *args):
|
||||
if args:
|
||||
msg = sprintf(msg, *args)
|
||||
FsckError.__init__(self, "%s\nThis may be fixable with \"--fix\".", msg)
|
||||
class RetryFsck(FsckError):
|
||||
pass
|
||||
|
||||
def log(format, *args):
|
||||
printf(format, *args)
|
||||
|
||||
def err(format, *args):
|
||||
fprintf(sys.stderr, format, *args)
|
||||
|
||||
# Decorator that retries a function if it returns a specific value
|
||||
def retry_if_raised(exc, message = None, max_retries = 100):
|
||||
def f1(func):
|
||||
def f2(*args, **kwargs):
|
||||
for n in range(max_retries):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except exc as e:
|
||||
if message:
|
||||
log("%s\n\n", message)
|
||||
raise Exception("Max number of retries (%d) exceeded; giving up")
|
||||
return f2
|
||||
return f1
|
||||
|
||||
class Progress(object):
|
||||
def __init__(self, maxval):
|
||||
if maxval == 0:
|
||||
maxval = 1
|
||||
self.bar = progressbar.ProgressBar(
|
||||
maxval = maxval,
|
||||
widgets = [ progressbar.Percentage(), ' ',
|
||||
progressbar.Bar(), ' ',
|
||||
progressbar.ETA() ])
|
||||
if self.bar.term_width == 0:
|
||||
self.bar.term_width = 75
|
||||
def __enter__(self):
|
||||
self.bar.start()
|
||||
self.last_update = 0
|
||||
return self
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
if exc_type is None:
|
||||
self.bar.finish()
|
||||
else:
|
||||
printf("\n")
|
||||
def update(self, val):
|
||||
self.bar.update(val)
|
||||
|
||||
class Fsck(object):
|
||||
|
||||
def __init__(self, path, fix = False):
|
||||
self.basepath = path
|
||||
self.sqlpath = os.path.join(path, "data.sql")
|
||||
self.bulkpath = os.path.join(path, "data")
|
||||
self.bulklock = os.path.join(path, "data.lock")
|
||||
self.fix = fix
|
||||
|
||||
### Main checks
|
||||
|
||||
@retry_if_raised(RetryFsck, "Something was fixed: restarting fsck")
|
||||
def check(self, skip_data = False):
|
||||
self.bulk = None
|
||||
self.sql = None
|
||||
try:
|
||||
self.check_paths()
|
||||
self.check_sql()
|
||||
self.check_streams()
|
||||
self.check_intervals()
|
||||
if skip_data:
|
||||
log("skipped data check\n")
|
||||
else:
|
||||
self.check_data()
|
||||
finally:
|
||||
if self.bulk:
|
||||
self.bulk.close()
|
||||
if self.sql:
|
||||
self.sql.commit()
|
||||
self.sql.close()
|
||||
log("ok\n")
|
||||
|
||||
### Check basic path structure
|
||||
|
||||
def check_paths(self):
|
||||
log("checking paths\n")
|
||||
if self.bulk:
|
||||
self.bulk.close()
|
||||
if not os.path.isfile(self.sqlpath):
|
||||
raise FsckError("SQL database missing (%s)", self.sqlpath)
|
||||
if not os.path.isdir(self.bulkpath):
|
||||
raise FsckError("Bulk data directory missing (%s)", self.bulkpath)
|
||||
with open(self.bulklock, "w") as lockfile:
|
||||
if not nilmdb.utils.lock.exclusive_lock(lockfile):
|
||||
raise FsckError('Database already locked by another process\n'
|
||||
'Make sure all other processes that might be '
|
||||
'using the database are stopped.\n'
|
||||
'Restarting apache will cause it to unlock '
|
||||
'the db until a request is received.')
|
||||
# unlocked immediately
|
||||
self.bulk = nilmdb.server.bulkdata.BulkData(self.basepath)
|
||||
|
||||
### Check SQL database health
|
||||
|
||||
def check_sql(self):
|
||||
log("checking sqlite database\n")
|
||||
|
||||
self.sql = sqlite3.connect(self.sqlpath)
|
||||
with self.sql:
|
||||
cur = self.sql.cursor()
|
||||
ver = cur.execute("PRAGMA user_version").fetchone()[0]
|
||||
good = max(nilmdb.server.nilmdb._sql_schema_updates.keys())
|
||||
if ver != good:
|
||||
raise FsckError("database version %d too old, should be %d",
|
||||
ver, good)
|
||||
self.stream_path = {}
|
||||
self.stream_layout = {}
|
||||
log(" loading paths\n")
|
||||
result = cur.execute("SELECT id, path, layout FROM streams")
|
||||
for r in result:
|
||||
if r[0] in self.stream_path:
|
||||
raise FsckError("duplicated ID %d in stream IDs", r[0])
|
||||
self.stream_path[r[0]] = r[1]
|
||||
self.stream_layout[r[0]] = r[2]
|
||||
|
||||
log(" loading intervals\n")
|
||||
self.stream_interval = defaultdict(list)
|
||||
result = cur.execute("SELECT stream_id, start_time, end_time, "
|
||||
"start_pos, end_pos FROM ranges "
|
||||
"ORDER BY start_time")
|
||||
for r in result:
|
||||
if r[0] not in self.stream_path:
|
||||
raise FsckError("interval ID %d not in streams", k)
|
||||
self.stream_interval[r[0]].append((r[1], r[2], r[3], r[4]))
|
||||
|
||||
log(" loading metadata\n")
|
||||
self.stream_meta = defaultdict(dict)
|
||||
result = cur.execute("SELECT stream_id, key, value FROM metadata")
|
||||
for r in result:
|
||||
if r[0] not in self.stream_path:
|
||||
raise FsckError("metadata ID %d not in streams", k)
|
||||
if r[1] in self.stream_meta[r[0]]:
|
||||
raise FsckError("duplicate metadata key '%s' for stream %d",
|
||||
r[1], r[0])
|
||||
self.stream_meta[r[0]][r[1]] = r[2]
|
||||
|
||||
### Check streams and basic interval overlap
|
||||
|
||||
def check_streams(self):
|
||||
ids = self.stream_path.keys()
|
||||
log("checking %s streams\n", "{:,d}".format(len(ids)))
|
||||
with Progress(len(ids)) as pbar:
|
||||
for i, sid in enumerate(ids):
|
||||
pbar.update(i)
|
||||
path = self.stream_path[sid]
|
||||
|
||||
# unique path, valid layout
|
||||
if self.stream_path.values().count(path) != 1:
|
||||
raise FsckError("duplicated path %s", path)
|
||||
layout = self.stream_layout[sid].split('_')[0]
|
||||
if layout not in ('int8', 'int16', 'int32', 'int64',
|
||||
'uint8', 'uint16', 'uint32', 'uint64',
|
||||
'float32', 'float64'):
|
||||
raise FsckError("bad layout %s for %s", layout, path)
|
||||
count = int(self.stream_layout[sid].split('_')[1])
|
||||
if count < 1 or count > 1024:
|
||||
raise FsckError("bad count %d for %s", count, path)
|
||||
|
||||
# must exist in bulkdata
|
||||
bulk = self.bulkpath + path
|
||||
if not os.path.isdir(bulk):
|
||||
raise FsckError("%s: missing bulkdata dir", path)
|
||||
if not nilmdb.server.bulkdata.Table.exists(bulk):
|
||||
raise FsckError("%s: bad bulkdata table", path)
|
||||
|
||||
# intervals don't overlap. Abuse IntervalSet to check
|
||||
# for intervals in file positions, too.
|
||||
timeiset = IntervalSet()
|
||||
posiset = IntervalSet()
|
||||
for (stime, etime, spos, epos) in self.stream_interval[sid]:
|
||||
new = Interval(stime, etime)
|
||||
try:
|
||||
timeiset += new
|
||||
except IntervalError:
|
||||
raise FsckError("%s: overlap in intervals:\n"
|
||||
"set: %s\nnew: %s",
|
||||
path, str(timeiset), str(new))
|
||||
if spos != epos:
|
||||
new = Interval(spos, epos)
|
||||
try:
|
||||
posiset += new
|
||||
except IntervalError:
|
||||
raise FsckError("%s: overlap in file offsets:\n"
|
||||
"set: %s\nnew: %s",
|
||||
path, str(posiset), str(new))
|
||||
|
||||
# check bulkdata
|
||||
self.check_bulkdata(sid, path, bulk)
|
||||
|
||||
# Check that we can open bulkdata
|
||||
try:
|
||||
tab = None
|
||||
try:
|
||||
tab = nilmdb.server.bulkdata.Table(bulk)
|
||||
except Exception as e:
|
||||
raise FsckError("%s: can't open bulkdata: %s",
|
||||
path, str(e))
|
||||
finally:
|
||||
if tab:
|
||||
tab.close()
|
||||
|
||||
### Check that bulkdata is good enough to be opened
|
||||
|
||||
@retry_if_raised(RetryFsck)
|
||||
def check_bulkdata(self, sid, path, bulk):
|
||||
with open(os.path.join(bulk, "_format"), "rb") as f:
|
||||
fmt = pickle.load(f)
|
||||
if fmt["version"] != 3:
|
||||
raise FsckError("%s: bad or unsupported bulkdata version %d",
|
||||
path, fmt["version"])
|
||||
row_per_file = int(fmt["rows_per_file"])
|
||||
files_per_dir = int(fmt["files_per_dir"])
|
||||
layout = fmt["layout"]
|
||||
if layout != self.stream_layout[sid]:
|
||||
raise FsckError("%s: layout mismatch %s != %s", path,
|
||||
layout, self.stream_layout[sid])
|
||||
|
||||
# Every file should have a size that's the multiple of the row size
|
||||
rkt = nilmdb.server.rocket.Rocket(layout, None)
|
||||
row_size = rkt.binary_size
|
||||
rkt.close()
|
||||
|
||||
# Find all directories
|
||||
regex = re.compile("^[0-9a-f]{4,}$")
|
||||
subdirs = sorted(filter(regex.search, os.listdir(bulk)),
|
||||
key = lambda x: int(x, 16), reverse = True)
|
||||
for subdir in subdirs:
|
||||
# Find all files in that dir
|
||||
subpath = os.path.join(bulk, subdir)
|
||||
files = filter(regex.search, os.listdir(subpath))
|
||||
if not files:
|
||||
self.fix_empty_subdir(subpath)
|
||||
raise RetryFsck
|
||||
# Verify that their size is a multiple of the row size
|
||||
for filename in files:
|
||||
filepath = os.path.join(subpath, filename)
|
||||
offset = os.path.getsize(filepath)
|
||||
if offset % row_size:
|
||||
self.fix_bad_filesize(path, filepath, offset, row_size)
|
||||
|
||||
def fix_empty_subdir(self, subpath):
|
||||
msg = sprintf("bulkdata path %s is missing data files", subpath)
|
||||
if not self.fix:
|
||||
raise FixableFsckError(msg)
|
||||
# Try to fix it by just deleting whatever is present,
|
||||
# as long as it's only ".removed" files.
|
||||
err("\n%s\n", msg)
|
||||
for fn in os.listdir(subpath):
|
||||
if not fn.endswith(".removed"):
|
||||
raise FsckError("can't fix automatically: please manually "
|
||||
"remove the file %s and try again",
|
||||
os.path.join(subpath, fn))
|
||||
# Remove the whole thing
|
||||
err("Removing empty subpath\n")
|
||||
shutil.rmtree(subpath)
|
||||
raise RetryFsck
|
||||
|
||||
def fix_bad_filesize(self, path, filepath, offset, row_size):
|
||||
extra = offset % row_size
|
||||
msg = sprintf("%s: size of file %s (%d) is not a multiple" +
|
||||
" of row size (%d): %d extra bytes present",
|
||||
path, filepath, offset, row_size, extra)
|
||||
if not self.fix:
|
||||
raise FixableFsckError(msg)
|
||||
# Try to fix it by just truncating the file
|
||||
err("\n%s\n", msg)
|
||||
newsize = offset - extra
|
||||
err("Truncating file to %d bytes and retrying\n", newsize)
|
||||
with open(filepath, "r+b") as f:
|
||||
f.truncate(newsize)
|
||||
raise RetryFsck
|
||||
|
||||
### Check interval endpoints
|
||||
|
||||
def check_intervals(self):
|
||||
total_ints = sum(len(x) for x in self.stream_interval.values())
|
||||
log("checking %s intervals\n", "{:,d}".format(total_ints))
|
||||
done = 0
|
||||
with Progress(total_ints) as pbar:
|
||||
for sid in self.stream_interval:
|
||||
try:
|
||||
bulk = self.bulkpath + self.stream_path[sid]
|
||||
tab = nilmdb.server.bulkdata.Table(bulk)
|
||||
def update(x):
|
||||
pbar.update(done + x)
|
||||
ints = self.stream_interval[sid]
|
||||
done += self.check_table_intervals(sid, ints, tab, update)
|
||||
finally:
|
||||
tab.close()
|
||||
|
||||
def check_table_intervals(self, sid, ints, tab, update):
|
||||
# look in the table to make sure we can pick out the interval's
|
||||
# endpoints
|
||||
path = self.stream_path[sid]
|
||||
tab.file_open.cache_remove_all()
|
||||
for (i, intv) in enumerate(ints):
|
||||
update(i)
|
||||
(stime, etime, spos, epos) = intv
|
||||
if spos == epos and spos >= 0 and spos <= tab.nrows:
|
||||
continue
|
||||
try:
|
||||
srow = tab[spos]
|
||||
erow = tab[epos-1]
|
||||
except Exception as e:
|
||||
self.fix_bad_interval(sid, intv, tab, str(e))
|
||||
raise RetryFsck
|
||||
return len(ints)
|
||||
|
||||
def fix_bad_interval(self, sid, intv, tab, msg):
|
||||
path = self.stream_path[sid]
|
||||
msg = sprintf("%s: interval %s error accessing rows: %s",
|
||||
path, str(intv), str(msg))
|
||||
if not self.fix:
|
||||
raise FixableFsckError(msg)
|
||||
err("\n%s\n", msg)
|
||||
|
||||
(stime, etime, spos, epos) = intv
|
||||
# If it's just that the end pos is more than the number of rows
|
||||
# in the table, lower end pos and truncate interval time too.
|
||||
if spos < tab.nrows and epos >= tab.nrows:
|
||||
err("end position is past endrows, but it can be truncated\n")
|
||||
err("old end: time %d, pos %d\n", etime, epos)
|
||||
new_epos = tab.nrows
|
||||
new_etime = tab[new_epos-1] + 1
|
||||
err("new end: time %d, pos %d\n", new_etime, new_epos)
|
||||
if stime < new_etime:
|
||||
# Change it in SQL
|
||||
with self.sql:
|
||||
cur = self.sql.cursor()
|
||||
cur.execute("UPDATE ranges SET end_time=?, end_pos=? "
|
||||
"WHERE stream_id=? AND start_time=? AND "
|
||||
"end_time=? AND start_pos=? AND end_pos=?",
|
||||
(new_etime, new_epos, sid, stime, etime,
|
||||
spos, epos))
|
||||
if cur.rowcount != 1:
|
||||
raise FsckError("failed to fix SQL database")
|
||||
raise RetryFsck
|
||||
err("actually it can't be truncated; times are bad too")
|
||||
|
||||
# Otherwise, the only hope is to delete the interval entirely.
|
||||
err("*** Deleting the entire interval from SQL.\n")
|
||||
err("This may leave stale data on disk. To fix that, copy all\n")
|
||||
err("data from this stream to a new stream, then remove all data\n")
|
||||
err("from and destroy %s.\n", path)
|
||||
with self.sql:
|
||||
cur = self.sql.cursor()
|
||||
cur.execute("DELETE FROM ranges WHERE "
|
||||
"stream_id=? AND start_time=? AND "
|
||||
"end_time=? AND start_pos=? AND end_pos=?",
|
||||
(sid, stime, etime, spos, epos))
|
||||
if cur.rowcount != 1:
|
||||
raise FsckError("failed to remove interval")
|
||||
raise RetryFsck
|
||||
|
||||
### Check data in each interval
|
||||
|
||||
def check_data(self):
|
||||
total_rows = sum(sum((y[3] - y[2]) for y in x)
|
||||
for x in self.stream_interval.values())
|
||||
log("checking %s rows of data\n", "{:,d}".format(total_rows))
|
||||
done = 0
|
||||
with Progress(total_rows) as pbar:
|
||||
for sid in self.stream_interval:
|
||||
try:
|
||||
bulk = self.bulkpath + self.stream_path[sid]
|
||||
tab = nilmdb.server.bulkdata.Table(bulk)
|
||||
def update(x):
|
||||
pbar.update(done + x)
|
||||
ints = self.stream_interval[sid]
|
||||
done += self.check_table_data(sid, ints, tab, update)
|
||||
finally:
|
||||
tab.close()
|
||||
|
||||
def check_table_data(self, sid, ints, tab, update):
|
||||
# Pull out all of the interval's data and verify that it's
|
||||
# monotonic.
|
||||
maxrows = 100000
|
||||
path = self.stream_path[sid]
|
||||
layout = self.stream_layout[sid]
|
||||
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
|
||||
tab.file_open.cache_remove_all()
|
||||
done = 0
|
||||
for intv in ints:
|
||||
last_ts = None
|
||||
(stime, etime, spos, epos) = intv
|
||||
|
||||
# Break interval into maxrows-sized chunks
|
||||
next_start = spos
|
||||
while next_start < epos:
|
||||
start = next_start
|
||||
stop = min(start + maxrows, epos)
|
||||
count = stop - start
|
||||
next_start = stop
|
||||
|
||||
# Get raw data, convert to NumPy arary
|
||||
try:
|
||||
raw = tab.get_data(start, stop, binary = True)
|
||||
data = numpy.fromstring(raw, dtype)
|
||||
except Exception as e:
|
||||
raise FsckError("%s: failed to grab rows %d through %d: %s",
|
||||
path, start, stop, repr(e))
|
||||
|
||||
# Verify that timestamps are monotonic
|
||||
if (numpy.diff(data['timestamp']) <= 0).any():
|
||||
raise FsckError("%s: non-monotonic timestamp(s) in rows "
|
||||
"%d through %d", path, start, stop)
|
||||
first_ts = data['timestamp'][0]
|
||||
if last_ts is not None and first_ts <= last_ts:
|
||||
raise FsckError("%s: first interval timestamp %d is not "
|
||||
"greater than the previous last interval "
|
||||
"timestamp %d, at row %d",
|
||||
path, first_ts, last_ts, start)
|
||||
last_ts = data['timestamp'][-1]
|
||||
|
||||
# These are probably fixable, by removing the offending
|
||||
# intervals. But I'm not going to bother implementing
|
||||
# that yet.
|
||||
|
||||
# Done
|
||||
done += count
|
||||
update(done)
|
||||
return done
|
26
nilmdb/scripts/nilmdb_fsck.py
Executable file
26
nilmdb/scripts/nilmdb_fsck.py
Executable file
@@ -0,0 +1,26 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import nilmdb.fsck
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
|
||||
def main():
|
||||
"""Main entry point for the 'nilmdb-fsck' command line script"""
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description = 'Check database consistency',
|
||||
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
|
||||
version = nilmdb.__version__)
|
||||
parser.add_argument("-f", "--fix", action="store_true",
|
||||
default=False, help = 'Fix errors when possible '
|
||||
'(which may involve removing data)')
|
||||
parser.add_argument("-n", "--no-data", action="store_true",
|
||||
default=False, help = 'Skip the slow full-data check')
|
||||
parser.add_argument('database', help = 'Database directory')
|
||||
args = parser.parse_args()
|
||||
|
||||
nilmdb.fsck.Fsck(args.database, args.fix).check(skip_data = args.no_data)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@@ -10,9 +10,7 @@ def main():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description = 'Run the NilmDB server',
|
||||
formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
|
||||
parser.add_argument("-V", "--version", action="version",
|
||||
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
|
||||
version = nilmdb.__version__)
|
||||
|
||||
group = parser.add_argument_group("Standard options")
|
||||
|
@@ -19,8 +19,8 @@ from . import rocket
|
||||
|
||||
# Up to 256 open file descriptors at any given time.
|
||||
# These variables are global so they can be used in the decorator arguments.
|
||||
table_cache_size = 16
|
||||
fd_cache_size = 16
|
||||
table_cache_size = 32
|
||||
fd_cache_size = 8
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = False)
|
||||
class BulkData(object):
|
||||
@@ -43,6 +43,12 @@ class BulkData(object):
|
||||
# 32768 files per dir should work even on FAT32
|
||||
self.files_per_dir = 32768
|
||||
|
||||
if "initial_nrows" in kwargs:
|
||||
self.initial_nrows = kwargs["initial_nrows"]
|
||||
else:
|
||||
# First row is 0
|
||||
self.initial_nrows = 0
|
||||
|
||||
# Make root path
|
||||
if not os.path.isdir(self.root):
|
||||
os.mkdir(self.root)
|
||||
@@ -194,6 +200,9 @@ class BulkData(object):
|
||||
if oldospath == newospath:
|
||||
raise ValueError("old and new paths are the same")
|
||||
|
||||
# Remove Table object at old path from cache
|
||||
self.getnode.cache_remove(self, oldunicodepath)
|
||||
|
||||
# Move the table to a temporary location
|
||||
tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root)
|
||||
tmppath = os.path.join(tmpdir, "table")
|
||||
@@ -251,7 +260,7 @@ class BulkData(object):
|
||||
path = self._encode_filename(unicodepath)
|
||||
elements = path.lstrip('/').split('/')
|
||||
ospath = os.path.join(self.root, *elements)
|
||||
return Table(ospath)
|
||||
return Table(ospath, self.initial_nrows)
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = False)
|
||||
class Table(object):
|
||||
@@ -288,9 +297,10 @@ class Table(object):
|
||||
pickle.dump(fmt, f, 2)
|
||||
|
||||
# Normal methods
|
||||
def __init__(self, root):
|
||||
def __init__(self, root, initial_nrows = 0):
|
||||
"""'root' is the full OS path to the directory of this table"""
|
||||
self.root = root
|
||||
self.initial_nrows = initial_nrows
|
||||
|
||||
# Load the format
|
||||
with open(os.path.join(self.root, "_format"), "rb") as f:
|
||||
@@ -330,7 +340,8 @@ class Table(object):
|
||||
|
||||
# Find the last directory. We sort and loop through all of them,
|
||||
# starting with the numerically greatest, because the dirs could be
|
||||
# empty if something was deleted.
|
||||
# empty if something was deleted but the directory was unexpectedly
|
||||
# not deleted.
|
||||
subdirs = sorted(filter(regex.search, os.listdir(self.root)),
|
||||
key = lambda x: int(x, 16), reverse = True)
|
||||
|
||||
@@ -349,8 +360,14 @@ class Table(object):
|
||||
# Convert to row number
|
||||
return self._row_from_offset(subdir, filename, offset)
|
||||
|
||||
# No files, so no data
|
||||
return 0
|
||||
# No files, so no data. We typically start at row 0 in this
|
||||
# case, although initial_nrows is specified during some tests
|
||||
# to exercise other parts of the code better. Since we have
|
||||
# no files yet, round initial_nrows up so it points to a row
|
||||
# that would begin a new file.
|
||||
nrows = ((self.initial_nrows + (self.rows_per_file - 1)) //
|
||||
self.rows_per_file) * self.rows_per_file
|
||||
return nrows
|
||||
|
||||
def _offset_from_row(self, row):
|
||||
"""Return a (subdir, filename, offset, count) tuple:
|
||||
|
@@ -23,7 +23,6 @@ from nilmdb.server.errors import NilmDBError, StreamError, OverlapError
|
||||
import sqlite3
|
||||
import os
|
||||
import errno
|
||||
import bisect
|
||||
|
||||
# Note about performance and transactions:
|
||||
#
|
||||
@@ -83,8 +82,11 @@ _sql_schema_updates = {
|
||||
class NilmDB(object):
|
||||
verbose = 0
|
||||
|
||||
def __init__(self, basepath, max_results=None,
|
||||
max_removals=None, bulkdata_args=None):
|
||||
def __init__(self, basepath,
|
||||
max_results=None,
|
||||
max_removals=None,
|
||||
max_int_removals=None,
|
||||
bulkdata_args=None):
|
||||
"""Initialize NilmDB at the given basepath.
|
||||
Other arguments are for debugging / testing:
|
||||
|
||||
@@ -92,7 +94,10 @@ class NilmDB(object):
|
||||
stream_intervals or stream_extract response.
|
||||
|
||||
'max_removals' is the max rows to delete at once
|
||||
in stream_move.
|
||||
in stream_remove.
|
||||
|
||||
'max_int_removals' is the max intervals to delete
|
||||
at once in stream_remove.
|
||||
|
||||
'bulkdata_args' is kwargs for the bulkdata module.
|
||||
"""
|
||||
@@ -134,6 +139,9 @@ class NilmDB(object):
|
||||
# Remove up to this many rows per call to stream_remove.
|
||||
self.max_removals = max_removals or 1048576
|
||||
|
||||
# Remove up to this many intervals per call to stream_remove.
|
||||
self.max_int_removals = max_int_removals or 4096
|
||||
|
||||
def get_basepath(self):
|
||||
return self.basepath
|
||||
|
||||
@@ -176,7 +184,7 @@ class NilmDB(object):
|
||||
raise NilmDBError("start must precede end")
|
||||
return (start, end)
|
||||
|
||||
@nilmdb.utils.lru_cache(size = 16)
|
||||
@nilmdb.utils.lru_cache(size = 64)
|
||||
def _get_intervals(self, stream_id):
|
||||
"""
|
||||
Return a mutable IntervalSet corresponding to the given stream ID.
|
||||
@@ -507,6 +515,17 @@ class NilmDB(object):
|
||||
# And that's all
|
||||
return
|
||||
|
||||
def _bisect_left(self, a, x, lo, hi):
|
||||
# Like bisect.bisect_left, but doesn't choke on large indices on
|
||||
# 32-bit systems, like bisect's fast C implementation does.
|
||||
while lo < hi:
|
||||
mid = (lo + hi) / 2
|
||||
if a[mid] < x:
|
||||
lo = mid + 1
|
||||
else:
|
||||
hi = mid
|
||||
return lo
|
||||
|
||||
def _find_start(self, table, dbinterval):
|
||||
"""
|
||||
Given a DBInterval, find the row in the database that
|
||||
@@ -517,7 +536,7 @@ class NilmDB(object):
|
||||
# Optimization for the common case where an interval wasn't truncated
|
||||
if dbinterval.start == dbinterval.db_start:
|
||||
return dbinterval.db_startpos
|
||||
return bisect.bisect_left(table,
|
||||
return self._bisect_left(table,
|
||||
dbinterval.start,
|
||||
dbinterval.db_startpos,
|
||||
dbinterval.db_endpos)
|
||||
@@ -536,7 +555,7 @@ class NilmDB(object):
|
||||
# want to include the given timestamp in the results. This is
|
||||
# so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return
|
||||
# non-overlapping data.
|
||||
return bisect.bisect_left(table,
|
||||
return self._bisect_left(table,
|
||||
dbinterval.end,
|
||||
dbinterval.db_startpos,
|
||||
dbinterval.db_endpos)
|
||||
@@ -643,13 +662,22 @@ class NilmDB(object):
|
||||
to_remove = Interval(start, end)
|
||||
removed = 0
|
||||
remaining = self.max_removals
|
||||
int_remaining = self.max_int_removals
|
||||
restart = None
|
||||
|
||||
# Can't remove intervals from within the iterator, so we need to
|
||||
# remember what's currently in the intersection now.
|
||||
all_candidates = list(intervals.intersection(to_remove, orig = True))
|
||||
|
||||
remove_start = None
|
||||
remove_end = None
|
||||
|
||||
for (dbint, orig) in all_candidates:
|
||||
# Stop if we've hit the max number of interval removals
|
||||
if int_remaining <= 0:
|
||||
restart = dbint.start
|
||||
break
|
||||
|
||||
# Find row start and end
|
||||
row_start = self._find_start(table, dbint)
|
||||
row_end = self._find_end(table, dbint)
|
||||
@@ -670,14 +698,29 @@ class NilmDB(object):
|
||||
# Remove interval from the database
|
||||
self._remove_interval(stream_id, orig, dbint)
|
||||
|
||||
# Remove data from the underlying table storage
|
||||
table.remove(row_start, row_end)
|
||||
# Remove data from the underlying table storage,
|
||||
# coalescing adjacent removals to reduce the number of calls
|
||||
# to table.remove.
|
||||
if remove_end == row_start:
|
||||
# Extend our coalesced region
|
||||
remove_end = row_end
|
||||
else:
|
||||
# Perform previous removal, then save this one
|
||||
if remove_end is not None:
|
||||
table.remove(remove_start, remove_end)
|
||||
remove_start = row_start
|
||||
remove_end = row_end
|
||||
|
||||
# Count how many were removed
|
||||
removed += row_end - row_start
|
||||
remaining -= row_end - row_start
|
||||
int_remaining -= 1
|
||||
|
||||
if restart is not None:
|
||||
break
|
||||
|
||||
# Perform any final coalesced removal
|
||||
if remove_end is not None:
|
||||
table.remove(remove_start, remove_end)
|
||||
|
||||
return (removed, restart)
|
||||
|
@@ -5,6 +5,9 @@
|
||||
#include <ctype.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#define __STDC_FORMAT_MACROS
|
||||
#include <inttypes.h>
|
||||
|
||||
/* Values missing from stdint.h */
|
||||
#define UINT8_MIN 0
|
||||
#define UINT16_MIN 0
|
||||
@@ -19,16 +22,9 @@
|
||||
|
||||
typedef int64_t timestamp_t;
|
||||
|
||||
/* This code probably needs to be double-checked for the case where
|
||||
sizeof(long) != 8, so enforce that here with something that will
|
||||
fail at build time. We assume that the python integer type can
|
||||
hold an int64_t. */
|
||||
const static char __long_ok[1 - 2*!(sizeof(int64_t) ==
|
||||
sizeof(long int))] = { 0 };
|
||||
|
||||
/* Somewhat arbitrary, just so we can use fixed sizes for strings
|
||||
etc. */
|
||||
static const int MAX_LAYOUT_COUNT = 128;
|
||||
static const int MAX_LAYOUT_COUNT = 1024;
|
||||
|
||||
/* Error object and constants */
|
||||
static PyObject *ParseError;
|
||||
@@ -58,7 +54,7 @@ static PyObject *raise_str(int line, int col, int code, const char *string)
|
||||
static PyObject *raise_int(int line, int col, int code, int64_t num)
|
||||
{
|
||||
PyObject *o;
|
||||
o = Py_BuildValue("(iiil)", line, col, code, num);
|
||||
o = Py_BuildValue("(iiiL)", line, col, code, (long long)num);
|
||||
if (o != NULL) {
|
||||
PyErr_SetObject(ParseError, o);
|
||||
Py_DECREF(o);
|
||||
@@ -249,11 +245,11 @@ static PyObject *Rocket_get_file_size(Rocket *self)
|
||||
/****
|
||||
* Append from string
|
||||
*/
|
||||
static inline long int strtol10(const char *nptr, char **endptr) {
|
||||
return strtol(nptr, endptr, 10);
|
||||
static inline long int strtoll10(const char *nptr, char **endptr) {
|
||||
return strtoll(nptr, endptr, 10);
|
||||
}
|
||||
static inline long int strtoul10(const char *nptr, char **endptr) {
|
||||
return strtoul(nptr, endptr, 10);
|
||||
static inline long int strtoull10(const char *nptr, char **endptr) {
|
||||
return strtoull(nptr, endptr, 10);
|
||||
}
|
||||
|
||||
/* .append_string(count, data, offset, linenum, start, end, last_timestamp) */
|
||||
@@ -264,6 +260,7 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
int offset;
|
||||
const char *linestart;
|
||||
int linenum;
|
||||
long long ll1, ll2, ll3;
|
||||
timestamp_t start;
|
||||
timestamp_t end;
|
||||
timestamp_t last_timestamp;
|
||||
@@ -280,10 +277,13 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
but we need the null termination for strto*. If we had
|
||||
strnto* that took a length, we could use t# and not require
|
||||
a copy. */
|
||||
if (!PyArg_ParseTuple(args, "isiilll:append_string", &count,
|
||||
if (!PyArg_ParseTuple(args, "isiiLLL:append_string", &count,
|
||||
&data, &offset, &linenum,
|
||||
&start, &end, &last_timestamp))
|
||||
&ll1, &ll2, &ll3))
|
||||
return NULL;
|
||||
start = ll1;
|
||||
end = ll2;
|
||||
last_timestamp = ll3;
|
||||
|
||||
/* Skip spaces, but don't skip over a newline. */
|
||||
#define SKIP_BLANK(buf) do { \
|
||||
@@ -372,14 +372,14 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
goto extra_data_on_line; \
|
||||
break
|
||||
|
||||
CS(INT8, strtol10, t64.i, t8.i, t8.u, , 1);
|
||||
CS(UINT8, strtoul10, t64.u, t8.u, t8.u, , 1);
|
||||
CS(INT16, strtol10, t64.i, t16.i, t16.u, le16toh, 2);
|
||||
CS(UINT16, strtoul10, t64.u, t16.u, t16.u, le16toh, 2);
|
||||
CS(INT32, strtol10, t64.i, t32.i, t32.u, le32toh, 4);
|
||||
CS(UINT32, strtoul10, t64.u, t32.u, t32.u, le32toh, 4);
|
||||
CS(INT64, strtol10, t64.i, t64.i, t64.u, le64toh, 8);
|
||||
CS(UINT64, strtoul10, t64.u, t64.u, t64.u, le64toh, 8);
|
||||
CS(INT8, strtoll10, t64.i, t8.i, t8.u, , 1);
|
||||
CS(UINT8, strtoull10, t64.u, t8.u, t8.u, , 1);
|
||||
CS(INT16, strtoll10, t64.i, t16.i, t16.u, le16toh, 2);
|
||||
CS(UINT16, strtoull10, t64.u, t16.u, t16.u, le16toh, 2);
|
||||
CS(INT32, strtoll10, t64.i, t32.i, t32.u, le32toh, 4);
|
||||
CS(UINT32, strtoull10, t64.u, t32.u, t32.u, le32toh, 4);
|
||||
CS(INT64, strtoll10, t64.i, t64.i, t64.u, le64toh, 8);
|
||||
CS(UINT64, strtoull10, t64.u, t64.u, t64.u, le64toh, 8);
|
||||
CS(FLOAT32, strtod, t64.d, t32.f, t32.u, le32toh, 4);
|
||||
CS(FLOAT64, strtod, t64.d, t64.d, t64.u, le64toh, 8);
|
||||
#undef CS
|
||||
@@ -397,7 +397,8 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
/* Build return value and return */
|
||||
offset = buf - data;
|
||||
PyObject *o;
|
||||
o = Py_BuildValue("(iili)", written, offset, last_timestamp, linenum);
|
||||
o = Py_BuildValue("(iiLi)", written, offset,
|
||||
(long long)last_timestamp, linenum);
|
||||
return o;
|
||||
err:
|
||||
PyErr_SetFromErrno(PyExc_OSError);
|
||||
@@ -431,14 +432,18 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
|
||||
int data_len;
|
||||
int linenum;
|
||||
int offset;
|
||||
long long ll1, ll2, ll3;
|
||||
timestamp_t start;
|
||||
timestamp_t end;
|
||||
timestamp_t last_timestamp;
|
||||
|
||||
if (!PyArg_ParseTuple(args, "it#iilll:append_binary",
|
||||
if (!PyArg_ParseTuple(args, "it#iiLLL:append_binary",
|
||||
&count, &data, &data_len, &offset,
|
||||
&linenum, &start, &end, &last_timestamp))
|
||||
&linenum, &ll1, &ll2, &ll3))
|
||||
return NULL;
|
||||
start = ll1;
|
||||
end = ll2;
|
||||
last_timestamp = ll3;
|
||||
|
||||
/* Advance to offset */
|
||||
if (offset > data_len)
|
||||
@@ -468,7 +473,7 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
|
||||
}
|
||||
|
||||
/* Write binary data */
|
||||
if (fwrite(data, data_len, 1, self->file) != 1) {
|
||||
if (fwrite(data, self->binary_size, rows, self->file) != rows) {
|
||||
PyErr_SetFromErrno(PyExc_OSError);
|
||||
return NULL;
|
||||
}
|
||||
@@ -476,8 +481,8 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
|
||||
|
||||
/* Build return value and return */
|
||||
PyObject *o;
|
||||
o = Py_BuildValue("(iili)", rows, offset + rows * self->binary_size,
|
||||
last_timestamp, linenum);
|
||||
o = Py_BuildValue("(iiLi)", rows, offset + rows * self->binary_size,
|
||||
(long long)last_timestamp, linenum);
|
||||
return o;
|
||||
}
|
||||
|
||||
@@ -534,7 +539,7 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
|
||||
if (fread(&t64.u, 8, 1, self->file) != 1)
|
||||
goto err;
|
||||
t64.u = le64toh(t64.u);
|
||||
ret = sprintf(&str[len], "%ld", t64.i);
|
||||
ret = sprintf(&str[len], "%" PRId64, t64.i);
|
||||
if (ret <= 0)
|
||||
goto err;
|
||||
len += ret;
|
||||
@@ -556,14 +561,14 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
|
||||
len += ret; \
|
||||
} \
|
||||
break
|
||||
CASE(INT8, "%hhd", t8.i, t8.u, , 1);
|
||||
CASE(UINT8, "%hhu", t8.u, t8.u, , 1);
|
||||
CASE(INT16, "%hd", t16.i, t16.u, le16toh, 2);
|
||||
CASE(UINT16, "%hu", t16.u, t16.u, le16toh, 2);
|
||||
CASE(INT32, "%d", t32.i, t32.u, le32toh, 4);
|
||||
CASE(UINT32, "%u", t32.u, t32.u, le32toh, 4);
|
||||
CASE(INT64, "%ld", t64.i, t64.u, le64toh, 8);
|
||||
CASE(UINT64, "%lu", t64.u, t64.u, le64toh, 8);
|
||||
CASE(INT8, "%" PRId8, t8.i, t8.u, , 1);
|
||||
CASE(UINT8, "%" PRIu8, t8.u, t8.u, , 1);
|
||||
CASE(INT16, "%" PRId16, t16.i, t16.u, le16toh, 2);
|
||||
CASE(UINT16, "%" PRIu16, t16.u, t16.u, le16toh, 2);
|
||||
CASE(INT32, "%" PRId32, t32.i, t32.u, le32toh, 4);
|
||||
CASE(UINT32, "%" PRIu32, t32.u, t32.u, le32toh, 4);
|
||||
CASE(INT64, "%" PRId64, t64.i, t64.u, le64toh, 8);
|
||||
CASE(UINT64, "%" PRIu64, t64.u, t64.u, le64toh, 8);
|
||||
/* These next two are a bit debatable. floats
|
||||
are 6-9 significant figures, so we print 7.
|
||||
Doubles are 15-19, so we print 17. This is
|
||||
@@ -653,7 +658,7 @@ static PyObject *Rocket_extract_timestamp(Rocket *self, PyObject *args)
|
||||
|
||||
/* Convert and return */
|
||||
t64.u = le64toh(t64.u);
|
||||
return Py_BuildValue("l", t64.i);
|
||||
return Py_BuildValue("L", (long long)t64.i);
|
||||
}
|
||||
|
||||
/****
|
||||
|
@@ -17,126 +17,26 @@ import decorator
|
||||
import psutil
|
||||
import traceback
|
||||
|
||||
from nilmdb.server.serverutil import (
|
||||
chunked_response,
|
||||
response_type,
|
||||
workaround_cp_bug_1200,
|
||||
exception_to_httperror,
|
||||
CORS_allow,
|
||||
json_to_request_params,
|
||||
json_error_page,
|
||||
cherrypy_start,
|
||||
cherrypy_stop,
|
||||
bool_param,
|
||||
)
|
||||
|
||||
# Add CORS_allow tool
|
||||
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
|
||||
|
||||
class NilmApp(object):
|
||||
def __init__(self, db):
|
||||
self.db = db
|
||||
|
||||
# Decorators
|
||||
def chunked_response(func):
|
||||
"""Decorator to enable chunked responses."""
|
||||
# Set this to False to get better tracebacks from some requests
|
||||
# (/stream/extract, /stream/intervals).
|
||||
func._cp_config = { 'response.stream': True }
|
||||
return func
|
||||
|
||||
def response_type(content_type):
|
||||
"""Return a decorator-generating function that sets the
|
||||
response type to the specified string."""
|
||||
def wrapper(func, *args, **kwargs):
|
||||
cherrypy.response.headers['Content-Type'] = content_type
|
||||
return func(*args, **kwargs)
|
||||
return decorator.decorator(wrapper)
|
||||
|
||||
@decorator.decorator
|
||||
def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
|
||||
"""Decorator to work around CherryPy bug #1200 in a response
|
||||
generator.
|
||||
|
||||
Even if chunked responses are disabled, LookupError or
|
||||
UnicodeError exceptions may still be swallowed by CherryPy due to
|
||||
bug #1200. This throws them as generic Exceptions instead so that
|
||||
they make it through.
|
||||
"""
|
||||
exc_info = None
|
||||
try:
|
||||
for val in func(*args, **kwargs):
|
||||
yield val
|
||||
except (LookupError, UnicodeError):
|
||||
# Re-raise it, but maintain the original traceback
|
||||
exc_info = sys.exc_info()
|
||||
new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1]))
|
||||
raise new_exc, None, exc_info[2]
|
||||
finally:
|
||||
del exc_info
|
||||
|
||||
def exception_to_httperror(*expected):
|
||||
"""Return a decorator-generating function that catches expected
|
||||
errors and throws a HTTPError describing it instead.
|
||||
|
||||
@exception_to_httperror(NilmDBError, ValueError)
|
||||
def foo():
|
||||
pass
|
||||
"""
|
||||
def wrapper(func, *args, **kwargs):
|
||||
exc_info = None
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except expected:
|
||||
# Re-raise it, but maintain the original traceback
|
||||
exc_info = sys.exc_info()
|
||||
new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1]))
|
||||
raise new_exc, None, exc_info[2]
|
||||
finally:
|
||||
del exc_info
|
||||
# We need to preserve the function's argspecs for CherryPy to
|
||||
# handle argument errors correctly. Decorator.decorator takes
|
||||
# care of that.
|
||||
return decorator.decorator(wrapper)
|
||||
|
||||
# Custom CherryPy tools
|
||||
|
||||
def CORS_allow(methods):
|
||||
"""This does several things:
|
||||
|
||||
Handles CORS preflight requests.
|
||||
Adds Allow: header to all requests.
|
||||
Raise 405 if request.method not in method.
|
||||
|
||||
It is similar to cherrypy.tools.allow, with the CORS stuff added.
|
||||
"""
|
||||
request = cherrypy.request.headers
|
||||
response = cherrypy.response.headers
|
||||
|
||||
if not isinstance(methods, (tuple, list)): # pragma: no cover
|
||||
methods = [ methods ]
|
||||
methods = [ m.upper() for m in methods if m ]
|
||||
if not methods: # pragma: no cover
|
||||
methods = [ 'GET', 'HEAD' ]
|
||||
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
|
||||
methods.append('HEAD')
|
||||
response['Allow'] = ', '.join(methods)
|
||||
|
||||
# Allow all origins
|
||||
if 'Origin' in request:
|
||||
response['Access-Control-Allow-Origin'] = request['Origin']
|
||||
|
||||
# If it's a CORS request, send response.
|
||||
request_method = request.get("Access-Control-Request-Method", None)
|
||||
request_headers = request.get("Access-Control-Request-Headers", None)
|
||||
if (cherrypy.request.method == "OPTIONS" and
|
||||
request_method and request_headers):
|
||||
response['Access-Control-Allow-Headers'] = request_headers
|
||||
response['Access-Control-Allow-Methods'] = ', '.join(methods)
|
||||
# Try to stop further processing and return a 200 OK
|
||||
cherrypy.response.status = "200 OK"
|
||||
cherrypy.response.body = ""
|
||||
cherrypy.request.handler = lambda: ""
|
||||
return
|
||||
|
||||
# Reject methods that were not explicitly allowed
|
||||
if cherrypy.request.method not in methods:
|
||||
raise cherrypy.HTTPError(405)
|
||||
|
||||
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
|
||||
|
||||
# Helper for json_in tool to process JSON data into normal request
|
||||
# parameters.
|
||||
def json_to_request_params(body):
|
||||
cherrypy.lib.jsontools.json_processor(body)
|
||||
if not isinstance(cherrypy.request.json, dict):
|
||||
raise cherrypy.HTTPError(415)
|
||||
cherrypy.request.params.update(cherrypy.request.json)
|
||||
|
||||
# CherryPy apps
|
||||
class Root(NilmApp):
|
||||
"""Root application for NILM database"""
|
||||
@@ -147,7 +47,10 @@ class Root(NilmApp):
|
||||
# /
|
||||
@cherrypy.expose
|
||||
def index(self):
|
||||
raise cherrypy.NotFound()
|
||||
cherrypy.response.headers['Content-Type'] = 'text/plain'
|
||||
msg = sprintf("This is NilmDB version %s, running on host %s.\n",
|
||||
nilmdb.__version__, socket.getfqdn())
|
||||
return msg
|
||||
|
||||
# /favicon.ico
|
||||
@cherrypy.expose
|
||||
@@ -167,9 +70,13 @@ class Root(NilmApp):
|
||||
"""Return a dictionary with the database path,
|
||||
size of the database in bytes, and free disk space in bytes"""
|
||||
path = self.db.get_basepath()
|
||||
usage = psutil.disk_usage(path)
|
||||
dbsize = nilmdb.utils.du(path)
|
||||
return { "path": path,
|
||||
"size": nilmdb.utils.du(path),
|
||||
"free": psutil.disk_usage(path).free }
|
||||
"size": dbsize,
|
||||
"other": max(usage.used - dbsize, 0),
|
||||
"reserved": max(usage.total - usage.used - usage.free, 0),
|
||||
"free": usage.free }
|
||||
|
||||
class Stream(NilmApp):
|
||||
"""Stream-specific operations"""
|
||||
@@ -177,10 +84,18 @@ class Stream(NilmApp):
|
||||
# Helpers
|
||||
def _get_times(self, start_param, end_param):
|
||||
(start, end) = (None, None)
|
||||
try:
|
||||
if start_param is not None:
|
||||
start = string_to_timestamp(start_param)
|
||||
except Exception:
|
||||
raise cherrypy.HTTPError("400 Bad Request", sprintf(
|
||||
"invalid start (%s): must be a numeric timestamp", start_param))
|
||||
try:
|
||||
if end_param is not None:
|
||||
end = string_to_timestamp(end_param)
|
||||
except Exception:
|
||||
raise cherrypy.HTTPError("400 Bad Request", sprintf(
|
||||
"invalid end (%s): must be a numeric timestamp", end_param))
|
||||
if start is not None and end is not None:
|
||||
if start >= end:
|
||||
raise cherrypy.HTTPError(
|
||||
@@ -199,10 +114,10 @@ class Stream(NilmApp):
|
||||
layout parameter, just list streams that match the given path
|
||||
or layout.
|
||||
|
||||
If extent is not given, returns a list of lists containing
|
||||
the path and layout: [ path, layout ]
|
||||
If extended is missing or zero, returns a list of lists
|
||||
containing the path and layout: [ path, layout ]
|
||||
|
||||
If extended is provided, returns a list of lists containing
|
||||
If extended is true, returns a list of lists containing
|
||||
extended info: [ path, layout, extent_min, extent_max,
|
||||
total_rows, total_seconds ]. More data may be added.
|
||||
"""
|
||||
@@ -315,6 +230,8 @@ class Stream(NilmApp):
|
||||
little-endian and matches the database types (including an
|
||||
int64 timestamp).
|
||||
"""
|
||||
binary = bool_param(binary)
|
||||
|
||||
# Important that we always read the input before throwing any
|
||||
# errors, to keep lengths happy for persistent connections.
|
||||
# Note that CherryPy 3.2.2 has a bug where this fails for GET
|
||||
@@ -439,6 +356,10 @@ class Stream(NilmApp):
|
||||
little-endian and matches the database types (including an
|
||||
int64 timestamp).
|
||||
"""
|
||||
binary = bool_param(binary)
|
||||
markup = bool_param(markup)
|
||||
count = bool_param(count)
|
||||
|
||||
(start, end) = self._get_times(start, end)
|
||||
|
||||
# Check path and get layout
|
||||
@@ -508,7 +429,7 @@ class Server(object):
|
||||
cherrypy.config.update({
|
||||
'server.socket_host': host,
|
||||
'server.socket_port': port,
|
||||
'engine.autoreload_on': False,
|
||||
'engine.autoreload.on': False,
|
||||
'server.max_request_body_size': 8*1024*1024,
|
||||
})
|
||||
if self.embedded:
|
||||
@@ -566,70 +487,14 @@ class Server(object):
|
||||
|
||||
def json_error_page(self, status, message, traceback, version):
|
||||
"""Return a custom error page in JSON so the client can parse it"""
|
||||
errordata = { "status" : status,
|
||||
"message" : message,
|
||||
"traceback" : traceback }
|
||||
# Don't send a traceback if the error was 400-499 (client's fault)
|
||||
try:
|
||||
code = int(status.split()[0])
|
||||
if not self.force_traceback:
|
||||
if code >= 400 and code <= 499:
|
||||
errordata["traceback"] = ""
|
||||
except Exception: # pragma: no cover
|
||||
pass
|
||||
# Override the response type, which was previously set to text/html
|
||||
cherrypy.serving.response.headers['Content-Type'] = (
|
||||
"application/json;charset=utf-8" )
|
||||
# Undo the HTML escaping that cherrypy's get_error_page function applies
|
||||
# (cherrypy issue 1135)
|
||||
for k, v in errordata.iteritems():
|
||||
v = v.replace("<","<")
|
||||
v = v.replace(">",">")
|
||||
v = v.replace("&","&")
|
||||
errordata[k] = v
|
||||
return json.dumps(errordata, separators=(',',':'))
|
||||
return json_error_page(status, message, traceback, version,
|
||||
self.force_traceback)
|
||||
|
||||
def start(self, blocking = False, event = None):
|
||||
|
||||
if not self.embedded: # pragma: no cover
|
||||
# Handle signals nicely
|
||||
if hasattr(cherrypy.engine, "signal_handler"):
|
||||
cherrypy.engine.signal_handler.subscribe()
|
||||
if hasattr(cherrypy.engine, "console_control_handler"):
|
||||
cherrypy.engine.console_control_handler.subscribe()
|
||||
|
||||
# Cherrypy stupidly calls os._exit(70) when it can't bind the
|
||||
# port. At least try to print a reasonable error and continue
|
||||
# in this case, rather than just dying silently (as we would
|
||||
# otherwise do in embedded mode)
|
||||
real_exit = os._exit
|
||||
def fake_exit(code): # pragma: no cover
|
||||
if code == os.EX_SOFTWARE:
|
||||
fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
|
||||
else:
|
||||
real_exit(code)
|
||||
os._exit = fake_exit
|
||||
cherrypy.engine.start()
|
||||
os._exit = real_exit
|
||||
|
||||
# Signal that the engine has started successfully
|
||||
if event is not None:
|
||||
event.set()
|
||||
|
||||
if blocking:
|
||||
try:
|
||||
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
|
||||
interval = 0.1, channel = 'main')
|
||||
except (KeyboardInterrupt, IOError): # pragma: no cover
|
||||
cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
|
||||
cherrypy.engine.exit()
|
||||
except SystemExit: # pragma: no cover
|
||||
cherrypy.engine.log('SystemExit raised: shutting down bus')
|
||||
cherrypy.engine.exit()
|
||||
raise
|
||||
cherrypy_start(blocking, event, self.embedded)
|
||||
|
||||
def stop(self):
|
||||
cherrypy.engine.exit()
|
||||
cherrypy_stop()
|
||||
|
||||
# Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
|
||||
# instance since the database can only be opened once. For this to
|
||||
|
214
nilmdb/server/serverutil.py
Normal file
214
nilmdb/server/serverutil.py
Normal file
@@ -0,0 +1,214 @@
|
||||
"""Miscellaneous decorators and other helpers for running a CherryPy
|
||||
server"""
|
||||
|
||||
import cherrypy
|
||||
import sys
|
||||
import os
|
||||
import decorator
|
||||
import simplejson as json
|
||||
|
||||
# Helper to parse parameters into booleans
|
||||
def bool_param(s):
|
||||
"""Return a bool indicating whether parameter 's' was True or False,
|
||||
supporting a few different types for 's'."""
|
||||
try:
|
||||
ss = s.lower()
|
||||
if ss in [ "0", "false", "f", "no", "n" ]:
|
||||
return False
|
||||
if ss in [ "1", "true", "t", "yes", "y" ]:
|
||||
return True
|
||||
except Exception:
|
||||
return bool(s)
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"can't parse parameter: " + ss)
|
||||
|
||||
# Decorators
|
||||
def chunked_response(func):
|
||||
"""Decorator to enable chunked responses."""
|
||||
# Set this to False to get better tracebacks from some requests
|
||||
# (/stream/extract, /stream/intervals).
|
||||
func._cp_config = { 'response.stream': True }
|
||||
return func
|
||||
|
||||
def response_type(content_type):
|
||||
"""Return a decorator-generating function that sets the
|
||||
response type to the specified string."""
|
||||
def wrapper(func, *args, **kwargs):
|
||||
cherrypy.response.headers['Content-Type'] = content_type
|
||||
return func(*args, **kwargs)
|
||||
return decorator.decorator(wrapper)
|
||||
|
||||
@decorator.decorator
|
||||
def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
|
||||
"""Decorator to work around CherryPy bug #1200 in a response
|
||||
generator.
|
||||
|
||||
Even if chunked responses are disabled, LookupError or
|
||||
UnicodeError exceptions may still be swallowed by CherryPy due to
|
||||
bug #1200. This throws them as generic Exceptions instead so that
|
||||
they make it through.
|
||||
"""
|
||||
exc_info = None
|
||||
try:
|
||||
for val in func(*args, **kwargs):
|
||||
yield val
|
||||
except (LookupError, UnicodeError):
|
||||
# Re-raise it, but maintain the original traceback
|
||||
exc_info = sys.exc_info()
|
||||
new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1]))
|
||||
raise new_exc, None, exc_info[2]
|
||||
finally:
|
||||
del exc_info
|
||||
|
||||
def exception_to_httperror(*expected):
|
||||
"""Return a decorator-generating function that catches expected
|
||||
errors and throws a HTTPError describing it instead.
|
||||
|
||||
@exception_to_httperror(NilmDBError, ValueError)
|
||||
def foo():
|
||||
pass
|
||||
"""
|
||||
def wrapper(func, *args, **kwargs):
|
||||
exc_info = None
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except expected:
|
||||
# Re-raise it, but maintain the original traceback
|
||||
exc_info = sys.exc_info()
|
||||
new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1]))
|
||||
raise new_exc, None, exc_info[2]
|
||||
finally:
|
||||
del exc_info
|
||||
# We need to preserve the function's argspecs for CherryPy to
|
||||
# handle argument errors correctly. Decorator.decorator takes
|
||||
# care of that.
|
||||
return decorator.decorator(wrapper)
|
||||
|
||||
# Custom CherryPy tools
|
||||
|
||||
def CORS_allow(methods):
|
||||
"""This does several things:
|
||||
|
||||
Handles CORS preflight requests.
|
||||
Adds Allow: header to all requests.
|
||||
Raise 405 if request.method not in method.
|
||||
|
||||
It is similar to cherrypy.tools.allow, with the CORS stuff added.
|
||||
|
||||
Add this to CherryPy with:
|
||||
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
|
||||
"""
|
||||
request = cherrypy.request.headers
|
||||
response = cherrypy.response.headers
|
||||
|
||||
if not isinstance(methods, (tuple, list)): # pragma: no cover
|
||||
methods = [ methods ]
|
||||
methods = [ m.upper() for m in methods if m ]
|
||||
if not methods: # pragma: no cover
|
||||
methods = [ 'GET', 'HEAD' ]
|
||||
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
|
||||
methods.append('HEAD')
|
||||
response['Allow'] = ', '.join(methods)
|
||||
|
||||
# Allow all origins
|
||||
if 'Origin' in request:
|
||||
response['Access-Control-Allow-Origin'] = request['Origin']
|
||||
|
||||
# If it's a CORS request, send response.
|
||||
request_method = request.get("Access-Control-Request-Method", None)
|
||||
request_headers = request.get("Access-Control-Request-Headers", None)
|
||||
if (cherrypy.request.method == "OPTIONS" and
|
||||
request_method and request_headers):
|
||||
response['Access-Control-Allow-Headers'] = request_headers
|
||||
response['Access-Control-Allow-Methods'] = ', '.join(methods)
|
||||
# Try to stop further processing and return a 200 OK
|
||||
cherrypy.response.status = "200 OK"
|
||||
cherrypy.response.body = ""
|
||||
cherrypy.request.handler = lambda: ""
|
||||
return
|
||||
|
||||
# Reject methods that were not explicitly allowed
|
||||
if cherrypy.request.method not in methods:
|
||||
raise cherrypy.HTTPError(405)
|
||||
|
||||
|
||||
# Helper for json_in tool to process JSON data into normal request
|
||||
# parameters.
|
||||
def json_to_request_params(body):
|
||||
cherrypy.lib.jsontools.json_processor(body)
|
||||
if not isinstance(cherrypy.request.json, dict):
|
||||
raise cherrypy.HTTPError(415)
|
||||
cherrypy.request.params.update(cherrypy.request.json)
|
||||
|
||||
# Used as an "error_page.default" handler
|
||||
def json_error_page(status, message, traceback, version,
|
||||
force_traceback = False):
|
||||
"""Return a custom error page in JSON so the client can parse it"""
|
||||
errordata = { "status" : status,
|
||||
"message" : message,
|
||||
"traceback" : traceback }
|
||||
# Don't send a traceback if the error was 400-499 (client's fault)
|
||||
try:
|
||||
code = int(status.split()[0])
|
||||
if not force_traceback:
|
||||
if code >= 400 and code <= 499:
|
||||
errordata["traceback"] = ""
|
||||
except Exception: # pragma: no cover
|
||||
pass
|
||||
# Override the response type, which was previously set to text/html
|
||||
cherrypy.serving.response.headers['Content-Type'] = (
|
||||
"application/json;charset=utf-8" )
|
||||
# Undo the HTML escaping that cherrypy's get_error_page function applies
|
||||
# (cherrypy issue 1135)
|
||||
for k, v in errordata.iteritems():
|
||||
v = v.replace("<","<")
|
||||
v = v.replace(">",">")
|
||||
v = v.replace("&","&")
|
||||
errordata[k] = v
|
||||
return json.dumps(errordata, separators=(',',':'))
|
||||
|
||||
# Start/stop CherryPy standalone server
|
||||
def cherrypy_start(blocking = False, event = False, embedded = False):
|
||||
"""Start the CherryPy server, handling errors and signals
|
||||
somewhat gracefully."""
|
||||
|
||||
if not embedded: # pragma: no cover
|
||||
# Handle signals nicely
|
||||
if hasattr(cherrypy.engine, "signal_handler"):
|
||||
cherrypy.engine.signal_handler.subscribe()
|
||||
if hasattr(cherrypy.engine, "console_control_handler"):
|
||||
cherrypy.engine.console_control_handler.subscribe()
|
||||
|
||||
# Cherrypy stupidly calls os._exit(70) when it can't bind the
|
||||
# port. At least try to print a reasonable error and continue
|
||||
# in this case, rather than just dying silently (as we would
|
||||
# otherwise do in embedded mode)
|
||||
real_exit = os._exit
|
||||
def fake_exit(code): # pragma: no cover
|
||||
if code == os.EX_SOFTWARE:
|
||||
fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
|
||||
else:
|
||||
real_exit(code)
|
||||
os._exit = fake_exit
|
||||
cherrypy.engine.start()
|
||||
os._exit = real_exit
|
||||
|
||||
# Signal that the engine has started successfully
|
||||
if event is not None:
|
||||
event.set()
|
||||
|
||||
if blocking:
|
||||
try:
|
||||
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
|
||||
interval = 0.1, channel = 'main')
|
||||
except (KeyboardInterrupt, IOError): # pragma: no cover
|
||||
cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
|
||||
cherrypy.engine.exit()
|
||||
except SystemExit: # pragma: no cover
|
||||
cherrypy.engine.log('SystemExit raised: shutting down bus')
|
||||
cherrypy.engine.exit()
|
||||
raise
|
||||
|
||||
# Stop CherryPy server
|
||||
def cherrypy_stop():
|
||||
cherrypy.engine.exit()
|
@@ -14,3 +14,4 @@ import nilmdb.utils.iterator
|
||||
import nilmdb.utils.interval
|
||||
import nilmdb.utils.lock
|
||||
import nilmdb.utils.sort
|
||||
import nilmdb.utils.unicode
|
||||
|
@@ -21,7 +21,8 @@ def du(path):
|
||||
errors that might occur if we encounter broken symlinks or
|
||||
files in the process of being removed."""
|
||||
try:
|
||||
size = os.path.getsize(path)
|
||||
st = os.stat(path)
|
||||
size = st.st_blocks * 512
|
||||
if os.path.isdir(path):
|
||||
for thisfile in os.listdir(path):
|
||||
filepath = os.path.join(path, thisfile)
|
||||
|
@@ -1,5 +1,6 @@
|
||||
"""Interval. Like nilmdb.server.interval, but re-implemented here
|
||||
in plain Python so clients have easier access to it.
|
||||
in plain Python so clients have easier access to it, and with a few
|
||||
helper functions.
|
||||
|
||||
Intervals are half-open, ie. they include data points with timestamps
|
||||
[start, end)
|
||||
@@ -34,6 +35,10 @@ class Interval:
|
||||
return ("[" + nilmdb.utils.time.timestamp_to_string(self.start) +
|
||||
" -> " + nilmdb.utils.time.timestamp_to_string(self.end) + ")")
|
||||
|
||||
def human_string(self):
|
||||
return ("[ " + nilmdb.utils.time.timestamp_to_human(self.start) +
|
||||
" -> " + nilmdb.utils.time.timestamp_to_human(self.end) + " ]")
|
||||
|
||||
def __cmp__(self, other):
|
||||
"""Compare two intervals. If non-equal, order by start then end"""
|
||||
return cmp(self.start, other.start) or cmp(self.end, other.end)
|
||||
@@ -53,18 +58,11 @@ class Interval:
|
||||
raise IntervalError("not a subset")
|
||||
return Interval(start, end)
|
||||
|
||||
def set_difference(a, b):
|
||||
"""
|
||||
Compute the difference (a \\ b) between the intervals in 'a' and
|
||||
the intervals in 'b'; i.e., the ranges that are present in 'self'
|
||||
but not 'other'.
|
||||
|
||||
'a' and 'b' must both be iterables.
|
||||
|
||||
Returns a generator that yields each interval in turn.
|
||||
Output intervals are built as subsets of the intervals in the
|
||||
first argument (a).
|
||||
"""
|
||||
def _interval_math_helper(a, b, op, subset = True):
|
||||
"""Helper for set_difference, intersection functions,
|
||||
to compute interval subsets based on a math operator on ranges
|
||||
present in A and B. Subsets are computed from A, or new intervals
|
||||
are generated if subset = False."""
|
||||
# Iterate through all starts and ends in sorted order. Add a
|
||||
# tag to the iterator so that we can figure out which one they
|
||||
# were, after sorting.
|
||||
@@ -79,28 +77,71 @@ def set_difference(a, b):
|
||||
# At each point, evaluate which type of end it is, to determine
|
||||
# how to build up the output intervals.
|
||||
a_interval = None
|
||||
b_interval = None
|
||||
in_a = False
|
||||
in_b = False
|
||||
out_start = None
|
||||
for (ts, k, i) in nilmdb.utils.iterator.imerge(a_iter, b_iter):
|
||||
if k == 0:
|
||||
# start a interval
|
||||
a_interval = i
|
||||
if b_interval is None:
|
||||
out_start = ts
|
||||
in_a = True
|
||||
elif k == 1:
|
||||
# start b interval
|
||||
b_interval = i
|
||||
if out_start is not None and out_start != ts:
|
||||
yield a_interval.subset(out_start, ts)
|
||||
out_start = None
|
||||
in_b = True
|
||||
elif k == 2:
|
||||
# end a interval
|
||||
if out_start is not None and out_start != ts:
|
||||
yield a_interval.subset(out_start, ts)
|
||||
out_start = None
|
||||
a_interval = None
|
||||
in_a = False
|
||||
elif k == 3:
|
||||
# end b interval
|
||||
b_interval = None
|
||||
if a_interval:
|
||||
in_b = False
|
||||
include = op(in_a, in_b)
|
||||
if include and out_start is None:
|
||||
out_start = ts
|
||||
elif not include:
|
||||
if out_start is not None and out_start != ts:
|
||||
if subset:
|
||||
yield a_interval.subset(out_start, ts)
|
||||
else:
|
||||
yield Interval(out_start, ts)
|
||||
out_start = None
|
||||
|
||||
def set_difference(a, b):
|
||||
"""
|
||||
Compute the difference (a \\ b) between the intervals in 'a' and
|
||||
the intervals in 'b'; i.e., the ranges that are present in 'self'
|
||||
but not 'other'.
|
||||
|
||||
'a' and 'b' must both be iterables.
|
||||
|
||||
Returns a generator that yields each interval in turn.
|
||||
Output intervals are built as subsets of the intervals in the
|
||||
first argument (a).
|
||||
"""
|
||||
return _interval_math_helper(a, b, (lambda a, b: a and not b))
|
||||
|
||||
def intersection(a, b):
|
||||
"""
|
||||
Compute the intersection between the intervals in 'a' and the
|
||||
intervals in 'b'; i.e., the ranges that are present in both 'a'
|
||||
and 'b'.
|
||||
|
||||
'a' and 'b' must both be iterables.
|
||||
|
||||
Returns a generator that yields each interval in turn.
|
||||
Output intervals are built as subsets of the intervals in the
|
||||
first argument (a).
|
||||
"""
|
||||
return _interval_math_helper(a, b, (lambda a, b: a and b))
|
||||
|
||||
def optimize(it):
|
||||
"""
|
||||
Given an iterable 'it' with intervals, optimize them by joining
|
||||
together intervals that are adjacent in time, and return a generator
|
||||
that yields the new intervals.
|
||||
"""
|
||||
saved_int = None
|
||||
for interval in it:
|
||||
if saved_int is not None:
|
||||
if saved_int.end == interval.start:
|
||||
interval.start = saved_int.start
|
||||
else:
|
||||
yield saved_int
|
||||
saved_int = interval
|
||||
if saved_int is not None:
|
||||
yield saved_int
|
||||
|
@@ -28,10 +28,13 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
|
||||
|
||||
@wrap_class_method
|
||||
def __del__(orig, self, *args, **kwargs):
|
||||
try:
|
||||
if "_must_close" in self.__dict__:
|
||||
fprintf(errorfile, "error: %s.close() wasn't called!\n",
|
||||
self.__class__.__name__)
|
||||
return orig(self, *args, **kwargs)
|
||||
except: # pragma: no cover
|
||||
pass
|
||||
|
||||
@wrap_class_method
|
||||
def close(orig, self, *args, **kwargs):
|
||||
|
@@ -91,6 +91,20 @@ def serializer_proxy(obj_or_type):
|
||||
r = SerializerCallProxy(self.__call_queue, attr, self)
|
||||
return r
|
||||
|
||||
# For an interable object, on __iter__(), save the object's
|
||||
# iterator and return this proxy. On next(), call the object's
|
||||
# iterator through this proxy.
|
||||
def __iter__(self):
|
||||
attr = getattr(self.__object, "__iter__")
|
||||
self.__iter = SerializerCallProxy(self.__call_queue, attr, self)()
|
||||
return self
|
||||
def next(self):
|
||||
return SerializerCallProxy(self.__call_queue,
|
||||
self.__iter.next, self)()
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.__getattr__("__getitem__")(key)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
"""Call this to instantiate the type, if a type was passed
|
||||
to serializer_proxy. Otherwise, pass the call through."""
|
||||
@@ -103,7 +117,10 @@ def serializer_proxy(obj_or_type):
|
||||
return ret
|
||||
|
||||
def __del__(self):
|
||||
try:
|
||||
self.__call_queue.put((None, None, None, None))
|
||||
self.__thread.join()
|
||||
except: # pragma: no cover
|
||||
pass
|
||||
|
||||
return SerializerObjectProxy(obj_or_type)
|
||||
|
@@ -60,7 +60,7 @@ def rate_to_period(hz, cycles = 1):
|
||||
def parse_time(toparse):
|
||||
"""
|
||||
Parse a free-form time string and return a nilmdb timestamp
|
||||
(integer seconds since epoch). If the string doesn't contain a
|
||||
(integer microseconds since epoch). If the string doesn't contain a
|
||||
timestamp, the current local timezone is assumed (e.g. from the TZ
|
||||
env var).
|
||||
"""
|
||||
@@ -87,7 +87,7 @@ def parse_time(toparse):
|
||||
try:
|
||||
return unix_to_timestamp(datetime_tz.datetime_tz.
|
||||
smartparse(toparse).totimestamp())
|
||||
except (ValueError, OverflowError):
|
||||
except (ValueError, OverflowError, TypeError):
|
||||
pass
|
||||
|
||||
# If it's parseable as a float, treat it as a Unix or NILM
|
||||
|
29
nilmdb/utils/unicode.py
Normal file
29
nilmdb/utils/unicode.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import sys
|
||||
|
||||
if sys.version_info[0] >= 3: # pragma: no cover (future Python3 compat)
|
||||
text_type = str
|
||||
else:
|
||||
text_type = unicode
|
||||
|
||||
def encode(u):
|
||||
"""Try to encode something from Unicode to a string using the
|
||||
default encoding. If it fails, try encoding as UTF-8."""
|
||||
if not isinstance(u, text_type):
|
||||
return u
|
||||
try:
|
||||
return u.encode()
|
||||
except UnicodeEncodeError:
|
||||
return u.encode("utf-8")
|
||||
|
||||
def decode(s):
|
||||
"""Try to decode someting from string to Unicode using the
|
||||
default encoding. If it fails, try decoding as UTF-8."""
|
||||
if isinstance(s, text_type):
|
||||
return s
|
||||
try:
|
||||
return s.decode()
|
||||
except UnicodeDecodeError:
|
||||
try:
|
||||
return s.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
return s # best we can do
|
16
setup.py
16
setup.py
@@ -6,15 +6,6 @@
|
||||
# Then just package it up:
|
||||
# python setup.py sdist
|
||||
|
||||
# This is supposed to be using Distribute:
|
||||
#
|
||||
# distutils provides a "setup" method.
|
||||
# setuptools is a set of monkeypatches on top of that.
|
||||
# distribute is a particular version/implementation of setuptools.
|
||||
#
|
||||
# So we don't really know if this is using the old setuptools or the
|
||||
# Distribute-provided version of setuptools.
|
||||
|
||||
import traceback
|
||||
import sys
|
||||
import os
|
||||
@@ -109,7 +100,7 @@ setup(name='nilmdb',
|
||||
'coverage',
|
||||
'numpy',
|
||||
],
|
||||
setup_requires = [ 'distribute',
|
||||
setup_requires = [ 'setuptools',
|
||||
],
|
||||
install_requires = [ 'decorator',
|
||||
'cherrypy >= 3.2',
|
||||
@@ -117,7 +108,8 @@ setup(name='nilmdb',
|
||||
'python-dateutil',
|
||||
'pytz',
|
||||
'psutil >= 0.3.0',
|
||||
'requests >= 1.1.0, < 2.0.0',
|
||||
'requests >= 1.1.0',
|
||||
'progressbar >= 2.2',
|
||||
],
|
||||
packages = [ 'nilmdb',
|
||||
'nilmdb.utils',
|
||||
@@ -126,11 +118,13 @@ setup(name='nilmdb',
|
||||
'nilmdb.client',
|
||||
'nilmdb.cmdline',
|
||||
'nilmdb.scripts',
|
||||
'nilmdb.fsck',
|
||||
],
|
||||
entry_points = {
|
||||
'console_scripts': [
|
||||
'nilmtool = nilmdb.scripts.nilmtool:main',
|
||||
'nilmdb-server = nilmdb.scripts.nilmdb_server:main',
|
||||
'nilmdb-fsck = nilmdb.scripts.nilmdb_fsck:main',
|
||||
],
|
||||
},
|
||||
ext_modules = ext_modules,
|
||||
|
8
tests/data/timestamped
Normal file
8
tests/data/timestamped
Normal file
@@ -0,0 +1,8 @@
|
||||
-10000000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
-100000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
-100000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
-1000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
1 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
1000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
1000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
1000000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
@@ -242,6 +242,19 @@ class TestClient(object):
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("start must precede end", str(e.exception))
|
||||
|
||||
# Invalid times in HTTP request
|
||||
with assert_raises(ClientError) as e:
|
||||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||||
"start": "asdf", "end": 0 })
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("invalid start", str(e.exception))
|
||||
|
||||
with assert_raises(ClientError) as e:
|
||||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||||
"start": 0, "end": "asdf" })
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("invalid end", str(e.exception))
|
||||
|
||||
# Good content type
|
||||
with assert_raises(ClientError) as e:
|
||||
client.http.put("stream/insert", "",
|
||||
@@ -354,10 +367,6 @@ class TestClient(object):
|
||||
with assert_raises(ServerError) as e:
|
||||
client.http.get_gen("http://nosuchurl.example.com./").next()
|
||||
|
||||
# Trigger a curl error in generator
|
||||
with assert_raises(ServerError) as e:
|
||||
client.http.get_gen("http://nosuchurl.example.com./").next()
|
||||
|
||||
# Check 404 for missing streams
|
||||
for function in [ client.stream_intervals, client.stream_extract ]:
|
||||
with assert_raises(ClientError) as e:
|
||||
@@ -396,20 +405,16 @@ class TestClient(object):
|
||||
headers())
|
||||
|
||||
# Extract
|
||||
x = http.get("stream/extract",
|
||||
{ "path": "/newton/prep",
|
||||
"start": "123",
|
||||
"end": "124" })
|
||||
x = http.get("stream/extract", { "path": "/newton/prep",
|
||||
"start": "123", "end": "124" })
|
||||
if "transfer-encoding: chunked" not in headers():
|
||||
warnings.warn("Non-chunked HTTP response for /stream/extract")
|
||||
if "content-type: text/plain;charset=utf-8" not in headers():
|
||||
raise AssertionError("/stream/extract is not text/plain:\n" +
|
||||
headers())
|
||||
|
||||
x = http.get("stream/extract",
|
||||
{ "path": "/newton/prep",
|
||||
"start": "123",
|
||||
"end": "124",
|
||||
x = http.get("stream/extract", { "path": "/newton/prep",
|
||||
"start": "123", "end": "124",
|
||||
"binary": "1" })
|
||||
if "transfer-encoding: chunked" not in headers():
|
||||
warnings.warn("Non-chunked HTTP response for /stream/extract")
|
||||
@@ -417,6 +422,21 @@ class TestClient(object):
|
||||
raise AssertionError("/stream/extract is not binary:\n" +
|
||||
headers())
|
||||
|
||||
# Make sure a binary of "0" is really off
|
||||
x = http.get("stream/extract", { "path": "/newton/prep",
|
||||
"start": "123", "end": "124",
|
||||
"binary": "0" })
|
||||
if "content-type: application/octet-stream" in headers():
|
||||
raise AssertionError("/stream/extract is not text:\n" +
|
||||
headers())
|
||||
|
||||
# Invalid parameters
|
||||
with assert_raises(ClientError) as e:
|
||||
x = http.get("stream/extract", { "path": "/newton/prep",
|
||||
"start": "123", "end": "124",
|
||||
"binary": "asdfasfd" })
|
||||
in_("can't parse parameter", str(e.exception))
|
||||
|
||||
client.close()
|
||||
|
||||
def test_client_08_unicode(self):
|
||||
@@ -620,8 +640,12 @@ class TestClient(object):
|
||||
with client.stream_insert_context("/empty/test", end = 950):
|
||||
pass
|
||||
|
||||
# Equal start and end is OK as long as there's no data
|
||||
with client.stream_insert_context("/empty/test", start=9, end=9):
|
||||
pass
|
||||
|
||||
# Try various things that might cause problems
|
||||
with client.stream_insert_context("/empty/test", 1000, 1050):
|
||||
with client.stream_insert_context("/empty/test", 1000, 1050) as ctx:
|
||||
ctx.finalize() # inserts [1000, 1050]
|
||||
ctx.finalize() # nothing
|
||||
ctx.finalize() # nothing
|
||||
@@ -666,40 +690,15 @@ class TestClient(object):
|
||||
client.close()
|
||||
|
||||
def test_client_12_persistent(self):
|
||||
# Check that connections are persistent when they should be.
|
||||
# This is pretty hard to test; we have to poke deep into
|
||||
# the Requests library.
|
||||
# Check that connections are NOT persistent. Rather than trying
|
||||
# to verify this at the TCP level, just make sure that the response
|
||||
# contained a "Connection: close" header.
|
||||
with nilmdb.client.Client(url = testurl) as c:
|
||||
def connections():
|
||||
try:
|
||||
poolmanager = c.http._last_response.connection.poolmanager
|
||||
pool = poolmanager.pools[('http','localhost',32180)]
|
||||
return (pool.num_connections, pool.num_requests)
|
||||
except Exception:
|
||||
raise SkipTest("can't get connection info")
|
||||
|
||||
# First request makes a connection
|
||||
c.stream_create("/persist/test", "uint16_1")
|
||||
eq_(connections(), (1, 1))
|
||||
eq_(c.http._last_response.headers["Connection"], "close")
|
||||
|
||||
# Non-generator
|
||||
c.stream_list("/persist/test")
|
||||
eq_(connections(), (1, 2))
|
||||
c.stream_list("/persist/test")
|
||||
eq_(connections(), (1, 3))
|
||||
|
||||
# Generators
|
||||
for x in c.stream_intervals("/persist/test"):
|
||||
pass
|
||||
eq_(connections(), (1, 4))
|
||||
for x in c.stream_intervals("/persist/test"):
|
||||
pass
|
||||
eq_(connections(), (1, 5))
|
||||
|
||||
# Clean up
|
||||
c.stream_remove("/persist/test")
|
||||
c.stream_destroy("/persist/test")
|
||||
eq_(connections(), (1, 7))
|
||||
eq_(c.http._last_response.headers["Connection"], "close")
|
||||
|
||||
def test_client_13_timestamp_rounding(self):
|
||||
# Test potentially bad timestamps (due to floating point
|
||||
|
@@ -21,13 +21,17 @@ from testutil.helpers import *
|
||||
|
||||
testdb = "tests/cmdline-testdb"
|
||||
|
||||
def server_start(max_results = None, max_removals = None, bulkdata_args = {}):
|
||||
def server_start(max_results = None,
|
||||
max_removals = None,
|
||||
max_int_removals = None,
|
||||
bulkdata_args = {}):
|
||||
global test_server, test_db
|
||||
# Start web app on a custom port
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
|
||||
testdb,
|
||||
max_results = max_results,
|
||||
max_removals = max_removals,
|
||||
max_int_removals = max_int_removals,
|
||||
bulkdata_args = bulkdata_args)
|
||||
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
||||
port = 32180, stoppable = False,
|
||||
@@ -59,8 +63,7 @@ class TestCmdline(object):
|
||||
|
||||
def run(self, arg_string, infile=None, outfile=None):
|
||||
"""Run a cmdline client with the specified argument string,
|
||||
passing the given input. Returns a tuple with the output and
|
||||
exit code"""
|
||||
passing the given input. Save the output and exit code."""
|
||||
# printf("TZ=UTC ./nilmtool.py %s\n", arg_string)
|
||||
os.environ['NILMDB_URL'] = "http://localhost:32180/"
|
||||
class stdio_wrapper:
|
||||
@@ -88,7 +91,7 @@ class TestCmdline(object):
|
||||
sys.exit(0)
|
||||
except SystemExit as e:
|
||||
exitcode = e.code
|
||||
captured = outfile.getvalue()
|
||||
captured = nilmdb.utils.unicode.decode(outfile.getvalue())
|
||||
self.captured = captured
|
||||
self.exitcode = exitcode
|
||||
|
||||
@@ -160,6 +163,12 @@ class TestCmdline(object):
|
||||
self.ok("--help")
|
||||
self.contain("usage:")
|
||||
|
||||
# help
|
||||
self.ok("--version")
|
||||
ver = self.captured
|
||||
self.ok("list --version")
|
||||
eq_(self.captured, ver)
|
||||
|
||||
# fail for no args
|
||||
self.fail("")
|
||||
|
||||
@@ -245,8 +254,10 @@ class TestCmdline(object):
|
||||
self.contain("Client version: " + nilmdb.__version__)
|
||||
self.contain("Server version: " + test_server.version)
|
||||
self.contain("Server database path")
|
||||
self.contain("Server database size")
|
||||
self.contain("Server database free space")
|
||||
self.contain("Server disk space used by NilmDB")
|
||||
self.contain("Server disk space used by other")
|
||||
self.contain("Server disk space reserved")
|
||||
self.contain("Server disk space free")
|
||||
|
||||
def test_04_createlist(self):
|
||||
# Basic stream tests, like those in test_client.
|
||||
@@ -283,6 +294,7 @@ class TestCmdline(object):
|
||||
self.ok("create /newton/zzz/rawnotch uint16_9")
|
||||
self.ok("create /newton/prep float32_8")
|
||||
self.ok("create /newton/raw uint16_6")
|
||||
self.ok("create /newton/raw~decim-1234 uint16_6")
|
||||
|
||||
# Create a stream that already exists
|
||||
self.fail("create /newton/raw uint16_6")
|
||||
@@ -298,13 +310,23 @@ class TestCmdline(object):
|
||||
self.fail("create /newton/zzz float32_8")
|
||||
self.contain("subdirs of this path already exist")
|
||||
|
||||
# Verify we got those 3 streams and they're returned in
|
||||
# Verify we got those 4 streams and they're returned in
|
||||
# alphabetical order.
|
||||
self.ok("list -l")
|
||||
self.match("/newton/prep float32_8\n"
|
||||
"/newton/raw uint16_6\n"
|
||||
"/newton/raw~decim-1234 uint16_6\n"
|
||||
"/newton/zzz/rawnotch uint16_9\n")
|
||||
|
||||
# No decimated streams if -n specified
|
||||
self.ok("list -n -l")
|
||||
self.match("/newton/prep float32_8\n"
|
||||
"/newton/raw uint16_6\n"
|
||||
"/newton/zzz/rawnotch uint16_9\n")
|
||||
|
||||
# Delete that decimated stream
|
||||
self.ok("destroy /newton/raw~decim-1234")
|
||||
|
||||
# Match just one type or one path. Also check
|
||||
# that --path is optional
|
||||
self.ok("list --layout /newton/raw")
|
||||
@@ -473,6 +495,13 @@ class TestCmdline(object):
|
||||
# bad start time
|
||||
self.fail("insert -t -r 120 --start 'whatever' /newton/prep /dev/null")
|
||||
|
||||
# Test negative times
|
||||
self.ok("insert --start @-10000000000 --end @1000000001 /newton/prep"
|
||||
" tests/data/timestamped")
|
||||
self.ok("extract -c /newton/prep --start min --end @1000000001")
|
||||
self.match("8\n")
|
||||
self.ok("remove /newton/prep --start min --end @1000000001")
|
||||
|
||||
def test_07_detail_extended(self):
|
||||
# Just count the number of lines, it's probably fine
|
||||
self.ok("list --detail")
|
||||
@@ -601,6 +630,14 @@ class TestCmdline(object):
|
||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
self.match("43200\n")
|
||||
|
||||
# test binary mode
|
||||
self.fail("extract -c -B /newton/prep -s min -e max")
|
||||
self.contain("binary cannot be combined")
|
||||
self.fail("extract -m -B /newton/prep -s min -e max")
|
||||
self.contain("binary cannot be combined")
|
||||
self.ok("extract -B /newton/prep -s min -e max")
|
||||
eq_(len(self.captured), 43200 * (8 + 8*4))
|
||||
|
||||
# markup for 3 intervals, plus extra markup lines whenever we had
|
||||
# a "restart" from the nilmdb.stream_extract function
|
||||
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
@@ -797,9 +834,12 @@ class TestCmdline(object):
|
||||
def test_13_files(self):
|
||||
# Test BulkData's ability to split into multiple files,
|
||||
# by forcing the file size to be really small.
|
||||
# Also increase the initial nrows, so that start/end positions
|
||||
# in the database are very large (> 32 bit)
|
||||
server_stop()
|
||||
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
|
||||
"files_per_dir" : 3 })
|
||||
"files_per_dir" : 3,
|
||||
"initial_nrows" : 2**40 })
|
||||
|
||||
# Fill data
|
||||
self.ok("create /newton/prep float32_8")
|
||||
@@ -847,14 +887,28 @@ class TestCmdline(object):
|
||||
self.ok("destroy -R /newton/prep") # destroy again
|
||||
|
||||
def test_14_remove_files(self):
|
||||
# Test BulkData's ability to remove when data is split into
|
||||
# multiple files. Should be a fairly comprehensive test of
|
||||
# remove functionality.
|
||||
# Also limit max_removals, to cover more functionality.
|
||||
# Limit max_removals, to cover more functionality.
|
||||
server_stop()
|
||||
server_start(max_removals = 4321,
|
||||
bulkdata_args = { "file_size" : 920, # 23 rows per file
|
||||
"files_per_dir" : 3 })
|
||||
"files_per_dir" : 3,
|
||||
"initial_nrows" : 2**40 })
|
||||
self.do_remove_files()
|
||||
self.ok("destroy -R /newton/prep") # destroy again
|
||||
|
||||
def test_14b_remove_files_maxint(self):
|
||||
# Limit max_int_removals, to cover more functionality.
|
||||
server_stop()
|
||||
server_start(max_int_removals = 1,
|
||||
bulkdata_args = { "file_size" : 920, # 23 rows per file
|
||||
"files_per_dir" : 3,
|
||||
"initial_nrows" : 2**40 })
|
||||
self.do_remove_files()
|
||||
|
||||
def do_remove_files(self):
|
||||
# Test BulkData's ability to remove when data is split into
|
||||
# multiple files. Should be a fairly comprehensive test of
|
||||
# remove functionality.
|
||||
|
||||
# Insert data. Just for fun, insert out of order
|
||||
self.ok("create /newton/prep float32_8")
|
||||
@@ -994,6 +1048,18 @@ class TestCmdline(object):
|
||||
self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
|
||||
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
|
||||
|
||||
# optimize
|
||||
self.ok("insert -s 01-01-2002 -e 01-01-2004 /diff/1 /dev/null")
|
||||
self.ok("intervals /diff/1")
|
||||
self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -"
|
||||
"> Thu, 01 Jan 2004 00:00:00.000000 +0000 ]\n"
|
||||
"[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
|
||||
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
|
||||
self.ok("intervals /diff/1 --optimize")
|
||||
self.ok("intervals /diff/1 -o")
|
||||
self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -"
|
||||
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
|
||||
|
||||
self.ok("destroy -R /diff/1")
|
||||
self.ok("destroy -R /diff/2")
|
||||
|
||||
|
@@ -59,6 +59,14 @@ class TestInterval:
|
||||
self.test_interval_intersect()
|
||||
Interval = NilmdbInterval
|
||||
|
||||
# Other helpers in nilmdb.utils.interval
|
||||
i = [ UtilsInterval(1,2), UtilsInterval(2,3), UtilsInterval(4,5) ]
|
||||
eq_(list(nilmdb.utils.interval.optimize(i)),
|
||||
[ UtilsInterval(1,3), UtilsInterval(4,5) ])
|
||||
eq_(UtilsInterval(1234567890123456, 1234567890654321).human_string(),
|
||||
"[ Fri, 13 Feb 2009 18:31:30.123456 -0500 -> " +
|
||||
"Fri, 13 Feb 2009 18:31:30.654321 -0500 ]")
|
||||
|
||||
def test_interval(self):
|
||||
# Test Interval class
|
||||
os.environ['TZ'] = "America/New_York"
|
||||
@@ -226,13 +234,16 @@ class TestInterval:
|
||||
x = makeset("[--)") & 1234
|
||||
|
||||
def do_test(a, b, c, d):
|
||||
# a & b == c
|
||||
# a & b == c (using nilmdb.server.interval)
|
||||
ab = IntervalSet()
|
||||
for x in b:
|
||||
for i in (a & x):
|
||||
ab += i
|
||||
eq_(ab,c)
|
||||
|
||||
# a & b == c (using nilmdb.utils.interval)
|
||||
eq_(IntervalSet(nilmdb.utils.interval.intersection(a,b)), c)
|
||||
|
||||
# a \ b == d
|
||||
eq_(IntervalSet(nilmdb.utils.interval.set_difference(a,b)), d)
|
||||
|
||||
@@ -302,6 +313,17 @@ class TestInterval:
|
||||
eq_(nilmdb.utils.interval.set_difference(
|
||||
a.intersection(list(c)[0]), b.intersection(list(c)[0])), d)
|
||||
|
||||
# Fill out test coverage for non-subsets
|
||||
def diff2(a,b, subset):
|
||||
return nilmdb.utils.interval._interval_math_helper(
|
||||
a, b, (lambda a, b: b and not a), subset=subset)
|
||||
with assert_raises(nilmdb.utils.interval.IntervalError):
|
||||
list(diff2(a,b,True))
|
||||
list(diff2(a,b,False))
|
||||
|
||||
# Empty second set
|
||||
eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)
|
||||
|
||||
# Empty second set
|
||||
eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)
|
||||
|
||||
|
@@ -157,11 +157,14 @@ class TestServer(object):
|
||||
|
||||
def test_server(self):
|
||||
# Make sure we can't force an exit, and test other 404 errors
|
||||
for url in [ "/exit", "/", "/favicon.ico" ]:
|
||||
for url in [ "/exit", "/favicon.ico" ]:
|
||||
with assert_raises(HTTPError) as e:
|
||||
geturl(url)
|
||||
eq_(e.exception.code, 404)
|
||||
|
||||
# Root page
|
||||
in_("This is NilmDB", geturl("/"))
|
||||
|
||||
# Check version
|
||||
eq_(distutils.version.LooseVersion(getjson("/version")),
|
||||
distutils.version.LooseVersion(nilmdb.__version__))
|
||||
|
@@ -28,7 +28,10 @@ def setup_module():
|
||||
recursive_unlink(testdb)
|
||||
|
||||
# Start web app on a custom port
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
|
||||
testdb, bulkdata_args = { "file_size" : 16384,
|
||||
"files_per_dir" : 3 } )
|
||||
|
||||
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
||||
port = 32180, stoppable = False,
|
||||
fast_shutdown = True,
|
||||
@@ -179,6 +182,17 @@ class TestNumpyClient(object):
|
||||
assert(np.array_equal(a,b))
|
||||
assert(np.array_equal(a,c))
|
||||
|
||||
# Make sure none of the files are greater than 16384 bytes as
|
||||
# we configured with the bulkdata_args above.
|
||||
datapath = os.path.join(testdb, "data")
|
||||
for (dirpath, dirnames, filenames) in os.walk(datapath):
|
||||
for f in filenames:
|
||||
fn = os.path.join(dirpath, f)
|
||||
size = os.path.getsize(fn)
|
||||
if size > 16384:
|
||||
raise AssertionError(sprintf("%s is too big: %d > %d\n",
|
||||
fn, size, 16384))
|
||||
|
||||
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
|
||||
client.close()
|
||||
|
||||
@@ -295,8 +309,25 @@ class TestNumpyClient(object):
|
||||
with client.stream_insert_numpy_context("/empty/test", end = 950):
|
||||
pass
|
||||
|
||||
# Equal start and end is OK as long as there's no data
|
||||
with assert_raises(ClientError) as e:
|
||||
with client.stream_insert_numpy_context("/empty/test",
|
||||
start=9, end=9) as ctx:
|
||||
ctx.insert([[9, 9]])
|
||||
ctx.finalize()
|
||||
in_("have data to send, but invalid start/end times", str(e.exception))
|
||||
|
||||
with client.stream_insert_numpy_context("/empty/test",
|
||||
start=9, end=9) as ctx:
|
||||
pass
|
||||
|
||||
# reusing a context object is bad
|
||||
with assert_raises(Exception) as e:
|
||||
ctx.insert([[9, 9]])
|
||||
|
||||
# Try various things that might cause problems
|
||||
with client.stream_insert_numpy_context("/empty/test", 1000, 1050):
|
||||
with client.stream_insert_numpy_context("/empty/test",
|
||||
1000, 1050) as ctx:
|
||||
ctx.finalize() # inserts [1000, 1050]
|
||||
ctx.finalize() # nothing
|
||||
ctx.finalize() # nothing
|
||||
|
@@ -62,6 +62,28 @@ class Base(object):
|
||||
eq_(self.foo.val, 20)
|
||||
eq_(self.foo.init_thread, self.foo.test_thread)
|
||||
|
||||
class ListLike(object):
|
||||
def __init__(self):
|
||||
self.thread = threading.current_thread().name
|
||||
self.foo = 0
|
||||
|
||||
def __iter__(self):
|
||||
eq_(threading.current_thread().name, self.thread)
|
||||
self.foo = 0
|
||||
return self
|
||||
|
||||
def __getitem__(self, key):
|
||||
eq_(threading.current_thread().name, self.thread)
|
||||
return key
|
||||
|
||||
def next(self):
|
||||
eq_(threading.current_thread().name, self.thread)
|
||||
if self.foo < 5:
|
||||
self.foo += 1
|
||||
return self.foo
|
||||
else:
|
||||
raise StopIteration
|
||||
|
||||
class TestUnserialized(Base):
|
||||
def setUp(self):
|
||||
self.foo = Foo()
|
||||
@@ -84,3 +106,9 @@ class TestSerializer(Base):
|
||||
sp(sp(Foo("x"))).t()
|
||||
sp(sp(Foo)("x")).t()
|
||||
sp(sp(Foo))("x").t()
|
||||
|
||||
def test_iter(self):
|
||||
sp = nilmdb.utils.serializer_proxy
|
||||
i = sp(ListLike)()
|
||||
eq_(list(i), [1,2,3,4,5])
|
||||
eq_(i[3], 3)
|
||||
|
@@ -1,5 +1,6 @@
|
||||
import nilmdb
|
||||
from nilmdb.utils.printf import *
|
||||
from nilmdb.utils import datetime_tz
|
||||
|
||||
from nose.tools import *
|
||||
from nose.tools import assert_raises
|
||||
@@ -19,6 +20,8 @@ class TestTimestamper(object):
|
||||
def join(list):
|
||||
return "\n".join(list) + "\n"
|
||||
|
||||
datetime_tz.localtz_set("America/New_York")
|
||||
|
||||
start = nilmdb.utils.time.parse_time("03/24/2012")
|
||||
lines_in = [ "hello", "world", "hello world", "# commented out" ]
|
||||
lines_out = [ "1332561600000000 hello",
|
||||
|
Reference in New Issue
Block a user