Compare commits
33 Commits
nilmdb-1.5
...
nilmdb-1.8
Author | SHA1 | Date | |
---|---|---|---|
e275384d03 | |||
a6a67ec15c | |||
fc43107307 | |||
90633413bb | |||
c7c3aff0fb | |||
e2347c954e | |||
222a5c6c53 | |||
1ca2c143e5 | |||
b5df575c79 | |||
2768a5ad15 | |||
a105543c38 | |||
309f38d0ed | |||
9a27b6ef6a | |||
99532cf9e0 | |||
dfdd0e5c74 | |||
9a2699adfc | |||
9bbb95b18b | |||
6bbed322c5 | |||
2317894355 | |||
539c92226c | |||
77c766d85d | |||
49d04db1d6 | |||
ea838d05ae | |||
f2a48bdb2a | |||
6d14e0b8aa | |||
b31b9327b9 | |||
b98ff1331a | |||
00e6ba1124 | |||
01029230c9 | |||
ecc4e5ef9d | |||
23f31c472b | |||
a1e2746360 | |||
1c40d59a52 |
@@ -421,3 +421,20 @@ and has all of the same functions. It adds three new functions:
|
||||
It is significantly faster! It is about 20 times faster to decimate a
|
||||
stream with `nilm-decimate` when the filter code is using the new
|
||||
binary/numpy interface.
|
||||
|
||||
|
||||
WSGI interface & chunked requests
|
||||
---------------------------------
|
||||
|
||||
mod_wsgi requires "WSGIChunkedRequest On" to handle
|
||||
"Transfer-encoding: Chunked" requests. However, `/stream/insert`
|
||||
doesn't handle this correctly right now, because:
|
||||
|
||||
- The `cherrpy.request.body.read()` call needs to be fixed for chunked requests
|
||||
|
||||
- We don't want to just buffer endlessly in the server, and it will
|
||||
require some thought on how to handle data in chunks (what to do about
|
||||
interval endpoints).
|
||||
|
||||
It is probably better to just keep the endpoint management on the client
|
||||
side, so leave "WSGIChunkedRequest off" for now.
|
||||
|
@@ -19,12 +19,12 @@ Then, set up Apache with a configuration like:
|
||||
|
||||
<VirtualHost>
|
||||
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
|
||||
WSGIApplicationGroup nilmdb-appgroup
|
||||
WSGIProcessGroup nilmdb-procgroup
|
||||
WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm
|
||||
|
||||
# Access control example:
|
||||
<Location /nilmdb>
|
||||
WSGIProcessGroup nilmdb-procgroup
|
||||
WSGIApplicationGroup nilmdb-appgroup
|
||||
|
||||
# Access control example:
|
||||
Order deny,allow
|
||||
Deny from all
|
||||
Allow from 1.2.3.4
|
||||
|
50
extras/fix-oversize-files.py
Normal file
50
extras/fix-oversize-files.py
Normal file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import os
|
||||
import sys
|
||||
import cPickle as pickle
|
||||
import argparse
|
||||
import fcntl
|
||||
import re
|
||||
from nilmdb.client.numpyclient import layout_to_dtype
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description = """
|
||||
Fix database corruption where binary writes caused too much data to be
|
||||
written to the file. Truncates files to the correct length. This was
|
||||
fixed by b98ff1331a515ad47fd3203615e835b529b039f9.
|
||||
""")
|
||||
parser.add_argument("path", action="store", help='Database root path')
|
||||
parser.add_argument("-y", "--yes", action="store_true", help='Fix them')
|
||||
args = parser.parse_args()
|
||||
|
||||
lock = os.path.join(args.path, "data.lock")
|
||||
with open(lock, "w") as f:
|
||||
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
||||
fix = {}
|
||||
|
||||
for (path, dirs, files) in os.walk(args.path):
|
||||
if "_format" in files:
|
||||
with open(os.path.join(path, "_format")) as format:
|
||||
fmt = pickle.load(format)
|
||||
rowsize = layout_to_dtype(fmt["layout"]).itemsize
|
||||
maxsize = rowsize * fmt["rows_per_file"]
|
||||
fix[path] = maxsize
|
||||
if maxsize < 128000000: # sanity check
|
||||
raise Exception("bad maxsize " + str(maxsize))
|
||||
|
||||
for fixpath in fix:
|
||||
for (path, dirs, files) in os.walk(fixpath):
|
||||
for fn in files:
|
||||
if not re.match("^[0-9a-f]{4,}$", fn):
|
||||
continue
|
||||
fn = os.path.join(path, fn)
|
||||
size = os.path.getsize(fn)
|
||||
maxsize = fix[fixpath]
|
||||
if size > maxsize:
|
||||
diff = size - maxsize
|
||||
print diff, "too big:", fn
|
||||
if args.yes:
|
||||
with open(fn, "a+") as dbfile:
|
||||
dbfile.truncate(maxsize)
|
@@ -6,7 +6,6 @@ import nilmdb.utils
|
||||
import nilmdb.client.httpclient
|
||||
from nilmdb.client.errors import ClientError
|
||||
|
||||
import re
|
||||
import time
|
||||
import simplejson as json
|
||||
import contextlib
|
||||
@@ -66,12 +65,8 @@ class Client(object):
|
||||
params["layout"] = layout
|
||||
if extended:
|
||||
params["extended"] = 1
|
||||
def sort_streams_nicely(x):
|
||||
"""Human-friendly sort (/stream/2 before /stream/10)"""
|
||||
num = lambda t: int(t) if t.isdigit() else t
|
||||
key = lambda k: [ num(c) for c in re.split('([0-9]+)', k[0]) ]
|
||||
return sorted(x, key = key)
|
||||
return sort_streams_nicely(self.http.get("stream/list", params))
|
||||
streams = self.http.get("stream/list", params)
|
||||
return nilmdb.utils.sort.sort_human(streams, key = lambda s: s[0])
|
||||
|
||||
def stream_get_metadata(self, path, keys = None):
|
||||
params = { "path": path }
|
||||
@@ -122,7 +117,10 @@ class Client(object):
|
||||
params["start"] = timestamp_to_string(start)
|
||||
if end is not None:
|
||||
params["end"] = timestamp_to_string(end)
|
||||
return self.http.post("stream/remove", params)
|
||||
total = 0
|
||||
for count in self.http.post_gen("stream/remove", params):
|
||||
total += int(count)
|
||||
return total
|
||||
|
||||
@contextlib.contextmanager
|
||||
def stream_insert_context(self, path, start = None, end = None):
|
||||
@@ -146,6 +144,7 @@ class Client(object):
|
||||
ctx = StreamInserter(self, path, start, end)
|
||||
yield ctx
|
||||
ctx.finalize()
|
||||
ctx.destroy()
|
||||
|
||||
def stream_insert(self, path, data, start = None, end = None):
|
||||
"""Insert rows of data into a stream. data should be a string
|
||||
@@ -295,6 +294,15 @@ class StreamInserter(object):
|
||||
self._block_data = []
|
||||
self._block_len = 0
|
||||
|
||||
self.destroyed = False
|
||||
|
||||
def destroy(self):
|
||||
"""Ensure this object can't be used again without raising
|
||||
an error"""
|
||||
def error(*args, **kwargs):
|
||||
raise Exception("don't reuse this context object")
|
||||
self._send_block = self.insert = self.finalize = self.send = error
|
||||
|
||||
def insert(self, data):
|
||||
"""Insert a chunk of ASCII formatted data in string form. The
|
||||
overall data must consist of lines terminated by '\\n'."""
|
||||
@@ -441,7 +449,7 @@ class StreamInserter(object):
|
||||
self._interval_start = end_ts
|
||||
|
||||
# Double check endpoints
|
||||
if start_ts is None or end_ts is None:
|
||||
if (start_ts is None or end_ts is None) or (start_ts == end_ts):
|
||||
# If the block has no non-comment lines, it's OK
|
||||
try:
|
||||
self._get_first_noncomment(block)
|
||||
|
@@ -123,19 +123,50 @@ class HTTPClient(object):
|
||||
"""
|
||||
(response, isjson) = self._do_req(method, url, query, body,
|
||||
stream = True, headers = headers)
|
||||
|
||||
# Like the iter_lines function in Requests, but only splits on
|
||||
# the specified line ending.
|
||||
def lines(source, ending):
|
||||
pending = None
|
||||
for chunk in source:
|
||||
if pending is not None:
|
||||
chunk = pending + chunk
|
||||
tmp = chunk.split(ending)
|
||||
lines = tmp[:-1]
|
||||
if chunk.endswith(ending):
|
||||
pending = None
|
||||
else:
|
||||
pending = tmp[-1]
|
||||
for line in lines:
|
||||
yield line
|
||||
if pending is not None: # pragma: no cover (missing newline)
|
||||
yield pending
|
||||
|
||||
# Yield the chunks or lines as requested
|
||||
if binary:
|
||||
for chunk in response.iter_content(chunk_size = 65536):
|
||||
yield chunk
|
||||
elif isjson:
|
||||
for line in response.iter_lines():
|
||||
for line in lines(response.iter_content(chunk_size = 1),
|
||||
ending = '\r\n'):
|
||||
yield json.loads(line)
|
||||
else:
|
||||
for line in response.iter_lines():
|
||||
for line in lines(response.iter_content(chunk_size = 65536),
|
||||
ending = '\n'):
|
||||
yield line
|
||||
|
||||
def get_gen(self, url, params = None, binary = False):
|
||||
"""Simple GET (parameters in URL) returning a generator"""
|
||||
return self._req_gen("GET", url, params, binary = binary)
|
||||
|
||||
def post_gen(self, url, params = None):
|
||||
"""Simple POST (parameters in body) returning a generator"""
|
||||
if self.post_json:
|
||||
return self._req_gen("POST", url, None,
|
||||
json.dumps(params),
|
||||
{ 'Content-type': 'application/json' })
|
||||
else:
|
||||
return self._req_gen("POST", url, None, params)
|
||||
|
||||
# Not much use for a POST or PUT generator, since they don't
|
||||
# return much data.
|
||||
|
@@ -98,6 +98,7 @@ class NumpyClient(nilmdb.client.client.Client):
|
||||
ctx = StreamInserterNumpy(self, path, start, end, dtype)
|
||||
yield ctx
|
||||
ctx.finalize()
|
||||
ctx.destroy()
|
||||
|
||||
def stream_insert_numpy(self, path, data, start = None, end = None,
|
||||
layout = None):
|
||||
@@ -133,16 +134,8 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
|
||||
contiguous interval and may be None. 'dtype' is the Numpy
|
||||
dtype for this stream.
|
||||
"""
|
||||
self.last_response = None
|
||||
|
||||
super(StreamInserterNumpy, self).__init__(client, path, start, end)
|
||||
self._dtype = dtype
|
||||
self._client = client
|
||||
self._path = path
|
||||
|
||||
# Start and end for the overall contiguous interval we're
|
||||
# filling
|
||||
self._interval_start = start
|
||||
self._interval_end = end
|
||||
|
||||
# Max rows to send at once
|
||||
self._max_rows = self._max_data // self._dtype.itemsize
|
||||
@@ -250,9 +243,12 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
|
||||
# Next block continues where this one ended
|
||||
self._interval_start = end_ts
|
||||
|
||||
# If we have no endpoints, it's because we had no data to send.
|
||||
if start_ts is None or end_ts is None:
|
||||
return
|
||||
# If we have no endpoints, or equal endpoints, it's OK as long
|
||||
# as there's no data to send
|
||||
if (start_ts is None or end_ts is None) or (start_ts == end_ts):
|
||||
if len(array) == 0:
|
||||
return
|
||||
raise ClientError("have data to send, but invalid start/end times")
|
||||
|
||||
# Send it
|
||||
data = array.tostring()
|
||||
|
@@ -72,10 +72,16 @@ class Complete(object): # pragma: no cover
|
||||
path = parsed_args.path
|
||||
if not path:
|
||||
return []
|
||||
return ( self.escape(k + '=' + v)
|
||||
for (k,v) in client.stream_get_metadata(path).iteritems()
|
||||
if k.startswith(prefix) )
|
||||
|
||||
results = []
|
||||
# prefix comes in as UTF-8, but results need to be Unicode,
|
||||
# weird. Still doesn't work in all cases, but that's bugs in
|
||||
# argcomplete.
|
||||
prefix = nilmdb.utils.unicode.decode(prefix)
|
||||
for (k,v) in client.stream_get_metadata(path).iteritems():
|
||||
kv = self.escape(k + '=' + v)
|
||||
if kv.startswith(prefix):
|
||||
results.append(kv)
|
||||
return results
|
||||
|
||||
class Cmdline(object):
|
||||
|
||||
|
@@ -1,6 +1,7 @@
|
||||
from __future__ import print_function
|
||||
from nilmdb.utils.printf import *
|
||||
import nilmdb.client
|
||||
import sys
|
||||
|
||||
def setup(self, sub):
|
||||
cmd = sub.add_parser("extract", help="Extract data",
|
||||
@@ -24,6 +25,8 @@ def setup(self, sub):
|
||||
).completer = self.complete.time
|
||||
|
||||
group = cmd.add_argument_group("Output format")
|
||||
group.add_argument("-B", "--binary", action="store_true",
|
||||
help="Raw binary output")
|
||||
group.add_argument("-b", "--bare", action="store_true",
|
||||
help="Exclude timestamps from output lines")
|
||||
group.add_argument("-a", "--annotate", action="store_true",
|
||||
@@ -42,6 +45,11 @@ def cmd_extract_verify(self):
|
||||
if self.args.start > self.args.end:
|
||||
self.parser.error("start is after end")
|
||||
|
||||
if self.args.binary:
|
||||
if (self.args.bare or self.args.annotate or self.args.markup or
|
||||
self.args.timestamp_raw or self.args.count):
|
||||
self.parser.error("--binary cannot be combined with other options")
|
||||
|
||||
def cmd_extract(self):
|
||||
streams = self.client.stream_list(self.args.path)
|
||||
if len(streams) != 1:
|
||||
@@ -60,16 +68,23 @@ def cmd_extract(self):
|
||||
printf("# end: %s\n", time_string(self.args.end))
|
||||
|
||||
printed = False
|
||||
if self.args.binary:
|
||||
printer = sys.stdout.write
|
||||
else:
|
||||
printer = print
|
||||
bare = self.args.bare
|
||||
count = self.args.count
|
||||
for dataline in self.client.stream_extract(self.args.path,
|
||||
self.args.start,
|
||||
self.args.end,
|
||||
self.args.count,
|
||||
self.args.markup):
|
||||
if self.args.bare and not self.args.count:
|
||||
self.args.markup,
|
||||
self.args.binary):
|
||||
if bare and not count:
|
||||
# Strip timestamp (first element). Doesn't make sense
|
||||
# if we are only returning a count.
|
||||
dataline = ' '.join(dataline.split(' ')[1:])
|
||||
print(dataline)
|
||||
printer(dataline)
|
||||
printed = True
|
||||
if not printed:
|
||||
if self.args.annotate:
|
||||
|
@@ -21,5 +21,8 @@ def cmd_info(self):
|
||||
printf("Server URL: %s\n", self.client.geturl())
|
||||
dbinfo = self.client.dbinfo()
|
||||
printf("Server database path: %s\n", dbinfo["path"])
|
||||
printf("Server database size: %s\n", human_size(dbinfo["size"]))
|
||||
printf("Server database free space: %s\n", human_size(dbinfo["free"]))
|
||||
for (desc, field) in [("used by NilmDB", "size"),
|
||||
("used by other", "other"),
|
||||
("reserved", "reserved"),
|
||||
("free", "free")]:
|
||||
printf("Server disk space %s: %s\n", desc, human_size(dbinfo[field]))
|
||||
|
@@ -41,10 +41,10 @@ def cmd_metadata(self):
|
||||
if self.args.set is not None or self.args.update is not None:
|
||||
# Either set, or update
|
||||
if self.args.set is not None:
|
||||
keyvals = self.args.set
|
||||
keyvals = map(nilmdb.utils.unicode.decode, self.args.set)
|
||||
handler = self.client.stream_set_metadata
|
||||
else:
|
||||
keyvals = self.args.update
|
||||
keyvals = map(nilmdb.utils.unicode.decode, self.args.update)
|
||||
handler = self.client.stream_update_metadata
|
||||
|
||||
# Extract key=value pairs
|
||||
@@ -62,7 +62,9 @@ def cmd_metadata(self):
|
||||
self.die("error setting/updating metadata: %s", str(e))
|
||||
elif self.args.delete is not None:
|
||||
# Delete (by setting values to empty strings)
|
||||
keys = self.args.delete or None
|
||||
keys = None
|
||||
if self.args.delete:
|
||||
keys = map(nilmdb.utils.unicode.decode, self.args.delete)
|
||||
try:
|
||||
data = self.client.stream_get_metadata(self.args.path, keys)
|
||||
for key in data:
|
||||
@@ -72,7 +74,9 @@ def cmd_metadata(self):
|
||||
self.die("error deleting metadata: %s", str(e))
|
||||
else:
|
||||
# Get (or unspecified)
|
||||
keys = self.args.get or None
|
||||
keys = None
|
||||
if self.args.get:
|
||||
keys = map(nilmdb.utils.unicode.decode, self.args.get)
|
||||
try:
|
||||
data = self.client.stream_get_metadata(self.args.path, keys)
|
||||
except nilmdb.client.ClientError as e:
|
||||
@@ -81,4 +85,6 @@ def cmd_metadata(self):
|
||||
# Print nonexistant keys as having empty value
|
||||
if value is None:
|
||||
value = ""
|
||||
printf("%s=%s\n", key, value)
|
||||
printf("%s=%s\n",
|
||||
nilmdb.utils.unicode.encode(key),
|
||||
nilmdb.utils.unicode.encode(value))
|
||||
|
@@ -19,8 +19,8 @@ from . import rocket
|
||||
|
||||
# Up to 256 open file descriptors at any given time.
|
||||
# These variables are global so they can be used in the decorator arguments.
|
||||
table_cache_size = 16
|
||||
fd_cache_size = 16
|
||||
table_cache_size = 32
|
||||
fd_cache_size = 8
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = False)
|
||||
class BulkData(object):
|
||||
|
@@ -176,7 +176,7 @@ class NilmDB(object):
|
||||
raise NilmDBError("start must precede end")
|
||||
return (start, end)
|
||||
|
||||
@nilmdb.utils.lru_cache(size = 16)
|
||||
@nilmdb.utils.lru_cache(size = 64)
|
||||
def _get_intervals(self, stream_id):
|
||||
"""
|
||||
Return a mutable IntervalSet corresponding to the given stream ID.
|
||||
@@ -675,6 +675,7 @@ class NilmDB(object):
|
||||
|
||||
# Count how many were removed
|
||||
removed += row_end - row_start
|
||||
remaining -= row_end - row_start
|
||||
|
||||
if restart is not None:
|
||||
break
|
||||
|
@@ -5,6 +5,9 @@
|
||||
#include <ctype.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#define __STDC_FORMAT_MACROS
|
||||
#include <inttypes.h>
|
||||
|
||||
/* Values missing from stdint.h */
|
||||
#define UINT8_MIN 0
|
||||
#define UINT16_MIN 0
|
||||
@@ -19,16 +22,9 @@
|
||||
|
||||
typedef int64_t timestamp_t;
|
||||
|
||||
/* This code probably needs to be double-checked for the case where
|
||||
sizeof(long) != 8, so enforce that here with something that will
|
||||
fail at build time. We assume that the python integer type can
|
||||
hold an int64_t. */
|
||||
const static char __long_ok[1 - 2*!(sizeof(int64_t) ==
|
||||
sizeof(long int))] = { 0 };
|
||||
|
||||
/* Somewhat arbitrary, just so we can use fixed sizes for strings
|
||||
etc. */
|
||||
static const int MAX_LAYOUT_COUNT = 128;
|
||||
static const int MAX_LAYOUT_COUNT = 1024;
|
||||
|
||||
/* Error object and constants */
|
||||
static PyObject *ParseError;
|
||||
@@ -58,7 +54,7 @@ static PyObject *raise_str(int line, int col, int code, const char *string)
|
||||
static PyObject *raise_int(int line, int col, int code, int64_t num)
|
||||
{
|
||||
PyObject *o;
|
||||
o = Py_BuildValue("(iiil)", line, col, code, num);
|
||||
o = Py_BuildValue("(iiiL)", line, col, code, (long long)num);
|
||||
if (o != NULL) {
|
||||
PyErr_SetObject(ParseError, o);
|
||||
Py_DECREF(o);
|
||||
@@ -249,11 +245,11 @@ static PyObject *Rocket_get_file_size(Rocket *self)
|
||||
/****
|
||||
* Append from string
|
||||
*/
|
||||
static inline long int strtol10(const char *nptr, char **endptr) {
|
||||
return strtol(nptr, endptr, 10);
|
||||
static inline long int strtoll10(const char *nptr, char **endptr) {
|
||||
return strtoll(nptr, endptr, 10);
|
||||
}
|
||||
static inline long int strtoul10(const char *nptr, char **endptr) {
|
||||
return strtoul(nptr, endptr, 10);
|
||||
static inline long int strtoull10(const char *nptr, char **endptr) {
|
||||
return strtoull(nptr, endptr, 10);
|
||||
}
|
||||
|
||||
/* .append_string(count, data, offset, linenum, start, end, last_timestamp) */
|
||||
@@ -264,6 +260,7 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
int offset;
|
||||
const char *linestart;
|
||||
int linenum;
|
||||
long long ll1, ll2, ll3;
|
||||
timestamp_t start;
|
||||
timestamp_t end;
|
||||
timestamp_t last_timestamp;
|
||||
@@ -280,10 +277,13 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
but we need the null termination for strto*. If we had
|
||||
strnto* that took a length, we could use t# and not require
|
||||
a copy. */
|
||||
if (!PyArg_ParseTuple(args, "isiilll:append_string", &count,
|
||||
if (!PyArg_ParseTuple(args, "isiiLLL:append_string", &count,
|
||||
&data, &offset, &linenum,
|
||||
&start, &end, &last_timestamp))
|
||||
&ll1, &ll2, &ll3))
|
||||
return NULL;
|
||||
start = ll1;
|
||||
end = ll2;
|
||||
last_timestamp = ll3;
|
||||
|
||||
/* Skip spaces, but don't skip over a newline. */
|
||||
#define SKIP_BLANK(buf) do { \
|
||||
@@ -372,14 +372,14 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
goto extra_data_on_line; \
|
||||
break
|
||||
|
||||
CS(INT8, strtol10, t64.i, t8.i, t8.u, , 1);
|
||||
CS(UINT8, strtoul10, t64.u, t8.u, t8.u, , 1);
|
||||
CS(INT16, strtol10, t64.i, t16.i, t16.u, le16toh, 2);
|
||||
CS(UINT16, strtoul10, t64.u, t16.u, t16.u, le16toh, 2);
|
||||
CS(INT32, strtol10, t64.i, t32.i, t32.u, le32toh, 4);
|
||||
CS(UINT32, strtoul10, t64.u, t32.u, t32.u, le32toh, 4);
|
||||
CS(INT64, strtol10, t64.i, t64.i, t64.u, le64toh, 8);
|
||||
CS(UINT64, strtoul10, t64.u, t64.u, t64.u, le64toh, 8);
|
||||
CS(INT8, strtoll10, t64.i, t8.i, t8.u, , 1);
|
||||
CS(UINT8, strtoull10, t64.u, t8.u, t8.u, , 1);
|
||||
CS(INT16, strtoll10, t64.i, t16.i, t16.u, le16toh, 2);
|
||||
CS(UINT16, strtoull10, t64.u, t16.u, t16.u, le16toh, 2);
|
||||
CS(INT32, strtoll10, t64.i, t32.i, t32.u, le32toh, 4);
|
||||
CS(UINT32, strtoull10, t64.u, t32.u, t32.u, le32toh, 4);
|
||||
CS(INT64, strtoll10, t64.i, t64.i, t64.u, le64toh, 8);
|
||||
CS(UINT64, strtoull10, t64.u, t64.u, t64.u, le64toh, 8);
|
||||
CS(FLOAT32, strtod, t64.d, t32.f, t32.u, le32toh, 4);
|
||||
CS(FLOAT64, strtod, t64.d, t64.d, t64.u, le64toh, 8);
|
||||
#undef CS
|
||||
@@ -397,7 +397,8 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
/* Build return value and return */
|
||||
offset = buf - data;
|
||||
PyObject *o;
|
||||
o = Py_BuildValue("(iili)", written, offset, last_timestamp, linenum);
|
||||
o = Py_BuildValue("(iiLi)", written, offset,
|
||||
(long long)last_timestamp, linenum);
|
||||
return o;
|
||||
err:
|
||||
PyErr_SetFromErrno(PyExc_OSError);
|
||||
@@ -431,14 +432,18 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
|
||||
int data_len;
|
||||
int linenum;
|
||||
int offset;
|
||||
long long ll1, ll2, ll3;
|
||||
timestamp_t start;
|
||||
timestamp_t end;
|
||||
timestamp_t last_timestamp;
|
||||
|
||||
if (!PyArg_ParseTuple(args, "it#iilll:append_binary",
|
||||
if (!PyArg_ParseTuple(args, "it#iiLLL:append_binary",
|
||||
&count, &data, &data_len, &offset,
|
||||
&linenum, &start, &end, &last_timestamp))
|
||||
&linenum, &ll1, &ll2, &ll3))
|
||||
return NULL;
|
||||
start = ll1;
|
||||
end = ll2;
|
||||
last_timestamp = ll3;
|
||||
|
||||
/* Advance to offset */
|
||||
if (offset > data_len)
|
||||
@@ -468,7 +473,7 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
|
||||
}
|
||||
|
||||
/* Write binary data */
|
||||
if (fwrite(data, data_len, 1, self->file) != 1) {
|
||||
if (fwrite(data, self->binary_size, rows, self->file) != rows) {
|
||||
PyErr_SetFromErrno(PyExc_OSError);
|
||||
return NULL;
|
||||
}
|
||||
@@ -476,8 +481,8 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
|
||||
|
||||
/* Build return value and return */
|
||||
PyObject *o;
|
||||
o = Py_BuildValue("(iili)", rows, offset + rows * self->binary_size,
|
||||
last_timestamp, linenum);
|
||||
o = Py_BuildValue("(iiLi)", rows, offset + rows * self->binary_size,
|
||||
(long long)last_timestamp, linenum);
|
||||
return o;
|
||||
}
|
||||
|
||||
@@ -534,7 +539,7 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
|
||||
if (fread(&t64.u, 8, 1, self->file) != 1)
|
||||
goto err;
|
||||
t64.u = le64toh(t64.u);
|
||||
ret = sprintf(&str[len], "%ld", t64.i);
|
||||
ret = sprintf(&str[len], "%" PRId64, t64.i);
|
||||
if (ret <= 0)
|
||||
goto err;
|
||||
len += ret;
|
||||
@@ -556,14 +561,14 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
|
||||
len += ret; \
|
||||
} \
|
||||
break
|
||||
CASE(INT8, "%hhd", t8.i, t8.u, , 1);
|
||||
CASE(UINT8, "%hhu", t8.u, t8.u, , 1);
|
||||
CASE(INT16, "%hd", t16.i, t16.u, le16toh, 2);
|
||||
CASE(UINT16, "%hu", t16.u, t16.u, le16toh, 2);
|
||||
CASE(INT32, "%d", t32.i, t32.u, le32toh, 4);
|
||||
CASE(UINT32, "%u", t32.u, t32.u, le32toh, 4);
|
||||
CASE(INT64, "%ld", t64.i, t64.u, le64toh, 8);
|
||||
CASE(UINT64, "%lu", t64.u, t64.u, le64toh, 8);
|
||||
CASE(INT8, "%" PRId8, t8.i, t8.u, , 1);
|
||||
CASE(UINT8, "%" PRIu8, t8.u, t8.u, , 1);
|
||||
CASE(INT16, "%" PRId16, t16.i, t16.u, le16toh, 2);
|
||||
CASE(UINT16, "%" PRIu16, t16.u, t16.u, le16toh, 2);
|
||||
CASE(INT32, "%" PRId32, t32.i, t32.u, le32toh, 4);
|
||||
CASE(UINT32, "%" PRIu32, t32.u, t32.u, le32toh, 4);
|
||||
CASE(INT64, "%" PRId64, t64.i, t64.u, le64toh, 8);
|
||||
CASE(UINT64, "%" PRIu64, t64.u, t64.u, le64toh, 8);
|
||||
/* These next two are a bit debatable. floats
|
||||
are 6-9 significant figures, so we print 7.
|
||||
Doubles are 15-19, so we print 17. This is
|
||||
@@ -653,7 +658,7 @@ static PyObject *Rocket_extract_timestamp(Rocket *self, PyObject *args)
|
||||
|
||||
/* Convert and return */
|
||||
t64.u = le64toh(t64.u);
|
||||
return Py_BuildValue("l", t64.i);
|
||||
return Py_BuildValue("L", (long long)t64.i);
|
||||
}
|
||||
|
||||
/****
|
||||
|
@@ -17,126 +17,25 @@ import decorator
|
||||
import psutil
|
||||
import traceback
|
||||
|
||||
from nilmdb.server.serverutil import (
|
||||
chunked_response,
|
||||
response_type,
|
||||
workaround_cp_bug_1200,
|
||||
exception_to_httperror,
|
||||
CORS_allow,
|
||||
json_to_request_params,
|
||||
json_error_page,
|
||||
cherrypy_start,
|
||||
cherrypy_stop,
|
||||
)
|
||||
|
||||
# Add CORS_allow tool
|
||||
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
|
||||
|
||||
class NilmApp(object):
|
||||
def __init__(self, db):
|
||||
self.db = db
|
||||
|
||||
# Decorators
|
||||
def chunked_response(func):
|
||||
"""Decorator to enable chunked responses."""
|
||||
# Set this to False to get better tracebacks from some requests
|
||||
# (/stream/extract, /stream/intervals).
|
||||
func._cp_config = { 'response.stream': True }
|
||||
return func
|
||||
|
||||
def response_type(content_type):
|
||||
"""Return a decorator-generating function that sets the
|
||||
response type to the specified string."""
|
||||
def wrapper(func, *args, **kwargs):
|
||||
cherrypy.response.headers['Content-Type'] = content_type
|
||||
return func(*args, **kwargs)
|
||||
return decorator.decorator(wrapper)
|
||||
|
||||
@decorator.decorator
|
||||
def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
|
||||
"""Decorator to work around CherryPy bug #1200 in a response
|
||||
generator.
|
||||
|
||||
Even if chunked responses are disabled, LookupError or
|
||||
UnicodeError exceptions may still be swallowed by CherryPy due to
|
||||
bug #1200. This throws them as generic Exceptions instead so that
|
||||
they make it through.
|
||||
"""
|
||||
exc_info = None
|
||||
try:
|
||||
for val in func(*args, **kwargs):
|
||||
yield val
|
||||
except (LookupError, UnicodeError):
|
||||
# Re-raise it, but maintain the original traceback
|
||||
exc_info = sys.exc_info()
|
||||
new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1]))
|
||||
raise new_exc, None, exc_info[2]
|
||||
finally:
|
||||
del exc_info
|
||||
|
||||
def exception_to_httperror(*expected):
|
||||
"""Return a decorator-generating function that catches expected
|
||||
errors and throws a HTTPError describing it instead.
|
||||
|
||||
@exception_to_httperror(NilmDBError, ValueError)
|
||||
def foo():
|
||||
pass
|
||||
"""
|
||||
def wrapper(func, *args, **kwargs):
|
||||
exc_info = None
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except expected:
|
||||
# Re-raise it, but maintain the original traceback
|
||||
exc_info = sys.exc_info()
|
||||
new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1]))
|
||||
raise new_exc, None, exc_info[2]
|
||||
finally:
|
||||
del exc_info
|
||||
# We need to preserve the function's argspecs for CherryPy to
|
||||
# handle argument errors correctly. Decorator.decorator takes
|
||||
# care of that.
|
||||
return decorator.decorator(wrapper)
|
||||
|
||||
# Custom CherryPy tools
|
||||
|
||||
def CORS_allow(methods):
|
||||
"""This does several things:
|
||||
|
||||
Handles CORS preflight requests.
|
||||
Adds Allow: header to all requests.
|
||||
Raise 405 if request.method not in method.
|
||||
|
||||
It is similar to cherrypy.tools.allow, with the CORS stuff added.
|
||||
"""
|
||||
request = cherrypy.request.headers
|
||||
response = cherrypy.response.headers
|
||||
|
||||
if not isinstance(methods, (tuple, list)): # pragma: no cover
|
||||
methods = [ methods ]
|
||||
methods = [ m.upper() for m in methods if m ]
|
||||
if not methods: # pragma: no cover
|
||||
methods = [ 'GET', 'HEAD' ]
|
||||
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
|
||||
methods.append('HEAD')
|
||||
response['Allow'] = ', '.join(methods)
|
||||
|
||||
# Allow all origins
|
||||
if 'Origin' in request:
|
||||
response['Access-Control-Allow-Origin'] = request['Origin']
|
||||
|
||||
# If it's a CORS request, send response.
|
||||
request_method = request.get("Access-Control-Request-Method", None)
|
||||
request_headers = request.get("Access-Control-Request-Headers", None)
|
||||
if (cherrypy.request.method == "OPTIONS" and
|
||||
request_method and request_headers):
|
||||
response['Access-Control-Allow-Headers'] = request_headers
|
||||
response['Access-Control-Allow-Methods'] = ', '.join(methods)
|
||||
# Try to stop further processing and return a 200 OK
|
||||
cherrypy.response.status = "200 OK"
|
||||
cherrypy.response.body = ""
|
||||
cherrypy.request.handler = lambda: ""
|
||||
return
|
||||
|
||||
# Reject methods that were not explicitly allowed
|
||||
if cherrypy.request.method not in methods:
|
||||
raise cherrypy.HTTPError(405)
|
||||
|
||||
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
|
||||
|
||||
# Helper for json_in tool to process JSON data into normal request
|
||||
# parameters.
|
||||
def json_to_request_params(body):
|
||||
cherrypy.lib.jsontools.json_processor(body)
|
||||
if not isinstance(cherrypy.request.json, dict):
|
||||
raise cherrypy.HTTPError(415)
|
||||
cherrypy.request.params.update(cherrypy.request.json)
|
||||
|
||||
# CherryPy apps
|
||||
class Root(NilmApp):
|
||||
"""Root application for NILM database"""
|
||||
@@ -147,7 +46,10 @@ class Root(NilmApp):
|
||||
# /
|
||||
@cherrypy.expose
|
||||
def index(self):
|
||||
raise cherrypy.NotFound()
|
||||
cherrypy.response.headers['Content-Type'] = 'text/plain'
|
||||
msg = sprintf("This is NilmDB version %s, running on host %s.\n",
|
||||
nilmdb.__version__, socket.getfqdn())
|
||||
return msg
|
||||
|
||||
# /favicon.ico
|
||||
@cherrypy.expose
|
||||
@@ -167,9 +69,13 @@ class Root(NilmApp):
|
||||
"""Return a dictionary with the database path,
|
||||
size of the database in bytes, and free disk space in bytes"""
|
||||
path = self.db.get_basepath()
|
||||
usage = psutil.disk_usage(path)
|
||||
dbsize = nilmdb.utils.du(path)
|
||||
return { "path": path,
|
||||
"size": nilmdb.utils.du(path),
|
||||
"free": psutil.disk_usage(path).free }
|
||||
"size": dbsize,
|
||||
"other": usage.used - dbsize,
|
||||
"reserved": usage.total - usage.used - usage.free,
|
||||
"free": usage.free }
|
||||
|
||||
class Stream(NilmApp):
|
||||
"""Stream-specific operations"""
|
||||
@@ -199,10 +105,10 @@ class Stream(NilmApp):
|
||||
layout parameter, just list streams that match the given path
|
||||
or layout.
|
||||
|
||||
If extent is not given, returns a list of lists containing
|
||||
the path and layout: [ path, layout ]
|
||||
If extended is missing or zero, returns a list of lists
|
||||
containing the path and layout: [ path, layout ]
|
||||
|
||||
If extended is provided, returns a list of lists containing
|
||||
If extended is true, returns a list of lists containing
|
||||
extended info: [ path, layout, extent_min, extent_max,
|
||||
total_rows, total_seconds ]. More data may be added.
|
||||
"""
|
||||
@@ -347,24 +253,34 @@ class Stream(NilmApp):
|
||||
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_in()
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError)
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
@chunked_response
|
||||
@response_type("application/x-json-stream")
|
||||
def remove(self, path, start = None, end = None):
|
||||
"""
|
||||
Remove data from the backend database. Removes all data in
|
||||
the interval [start, end). Returns the number of data points
|
||||
removed.
|
||||
the interval [start, end).
|
||||
|
||||
Returns the number of data points removed. Since this is a potentially
|
||||
long-running operation, multiple numbers may be returned as the
|
||||
data gets removed from the backend database. The total number of
|
||||
points removed is the sum of all of these numbers.
|
||||
"""
|
||||
(start, end) = self._get_times(start, end)
|
||||
total_removed = 0
|
||||
while True:
|
||||
(removed, restart) = self.db.stream_remove(path, start, end)
|
||||
total_removed += removed
|
||||
if restart is None:
|
||||
break
|
||||
start = restart
|
||||
return total_removed
|
||||
|
||||
if len(self.db.stream_list(path = path)) != 1:
|
||||
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||
|
||||
@workaround_cp_bug_1200
|
||||
def content(start, end):
|
||||
# Note: disable chunked responses to see tracebacks from here.
|
||||
while True:
|
||||
(removed, restart) = self.db.stream_remove(path, start, end)
|
||||
yield json.dumps(removed) + "\r\n"
|
||||
if restart is None:
|
||||
break
|
||||
start = restart
|
||||
return content(start, end)
|
||||
|
||||
# /stream/intervals?path=/newton/prep
|
||||
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
@@ -556,70 +472,14 @@ class Server(object):
|
||||
|
||||
def json_error_page(self, status, message, traceback, version):
|
||||
"""Return a custom error page in JSON so the client can parse it"""
|
||||
errordata = { "status" : status,
|
||||
"message" : message,
|
||||
"traceback" : traceback }
|
||||
# Don't send a traceback if the error was 400-499 (client's fault)
|
||||
try:
|
||||
code = int(status.split()[0])
|
||||
if not self.force_traceback:
|
||||
if code >= 400 and code <= 499:
|
||||
errordata["traceback"] = ""
|
||||
except Exception: # pragma: no cover
|
||||
pass
|
||||
# Override the response type, which was previously set to text/html
|
||||
cherrypy.serving.response.headers['Content-Type'] = (
|
||||
"application/json;charset=utf-8" )
|
||||
# Undo the HTML escaping that cherrypy's get_error_page function applies
|
||||
# (cherrypy issue 1135)
|
||||
for k, v in errordata.iteritems():
|
||||
v = v.replace("<","<")
|
||||
v = v.replace(">",">")
|
||||
v = v.replace("&","&")
|
||||
errordata[k] = v
|
||||
return json.dumps(errordata, separators=(',',':'))
|
||||
return json_error_page(status, message, traceback, version,
|
||||
self.force_traceback)
|
||||
|
||||
def start(self, blocking = False, event = None):
|
||||
|
||||
if not self.embedded: # pragma: no cover
|
||||
# Handle signals nicely
|
||||
if hasattr(cherrypy.engine, "signal_handler"):
|
||||
cherrypy.engine.signal_handler.subscribe()
|
||||
if hasattr(cherrypy.engine, "console_control_handler"):
|
||||
cherrypy.engine.console_control_handler.subscribe()
|
||||
|
||||
# Cherrypy stupidly calls os._exit(70) when it can't bind the
|
||||
# port. At least try to print a reasonable error and continue
|
||||
# in this case, rather than just dying silently (as we would
|
||||
# otherwise do in embedded mode)
|
||||
real_exit = os._exit
|
||||
def fake_exit(code): # pragma: no cover
|
||||
if code == os.EX_SOFTWARE:
|
||||
fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
|
||||
else:
|
||||
real_exit(code)
|
||||
os._exit = fake_exit
|
||||
cherrypy.engine.start()
|
||||
os._exit = real_exit
|
||||
|
||||
# Signal that the engine has started successfully
|
||||
if event is not None:
|
||||
event.set()
|
||||
|
||||
if blocking:
|
||||
try:
|
||||
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
|
||||
interval = 0.1, channel = 'main')
|
||||
except (KeyboardInterrupt, IOError): # pragma: no cover
|
||||
cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
|
||||
cherrypy.engine.exit()
|
||||
except SystemExit: # pragma: no cover
|
||||
cherrypy.engine.log('SystemExit raised: shutting down bus')
|
||||
cherrypy.engine.exit()
|
||||
raise
|
||||
cherrypy_start(blocking, event, self.embedded)
|
||||
|
||||
def stop(self):
|
||||
cherrypy.engine.exit()
|
||||
cherrypy_stop()
|
||||
|
||||
# Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
|
||||
# instance since the database can only be opened once. For this to
|
||||
|
199
nilmdb/server/serverutil.py
Normal file
199
nilmdb/server/serverutil.py
Normal file
@@ -0,0 +1,199 @@
|
||||
"""Miscellaneous decorators and other helpers for running a CherryPy
|
||||
server"""
|
||||
|
||||
import cherrypy
|
||||
import sys
|
||||
import os
|
||||
import decorator
|
||||
import simplejson as json
|
||||
|
||||
# Decorators
|
||||
def chunked_response(func):
|
||||
"""Decorator to enable chunked responses."""
|
||||
# Set this to False to get better tracebacks from some requests
|
||||
# (/stream/extract, /stream/intervals).
|
||||
func._cp_config = { 'response.stream': True }
|
||||
return func
|
||||
|
||||
def response_type(content_type):
|
||||
"""Return a decorator-generating function that sets the
|
||||
response type to the specified string."""
|
||||
def wrapper(func, *args, **kwargs):
|
||||
cherrypy.response.headers['Content-Type'] = content_type
|
||||
return func(*args, **kwargs)
|
||||
return decorator.decorator(wrapper)
|
||||
|
||||
@decorator.decorator
|
||||
def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
|
||||
"""Decorator to work around CherryPy bug #1200 in a response
|
||||
generator.
|
||||
|
||||
Even if chunked responses are disabled, LookupError or
|
||||
UnicodeError exceptions may still be swallowed by CherryPy due to
|
||||
bug #1200. This throws them as generic Exceptions instead so that
|
||||
they make it through.
|
||||
"""
|
||||
exc_info = None
|
||||
try:
|
||||
for val in func(*args, **kwargs):
|
||||
yield val
|
||||
except (LookupError, UnicodeError):
|
||||
# Re-raise it, but maintain the original traceback
|
||||
exc_info = sys.exc_info()
|
||||
new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1]))
|
||||
raise new_exc, None, exc_info[2]
|
||||
finally:
|
||||
del exc_info
|
||||
|
||||
def exception_to_httperror(*expected):
|
||||
"""Return a decorator-generating function that catches expected
|
||||
errors and throws a HTTPError describing it instead.
|
||||
|
||||
@exception_to_httperror(NilmDBError, ValueError)
|
||||
def foo():
|
||||
pass
|
||||
"""
|
||||
def wrapper(func, *args, **kwargs):
|
||||
exc_info = None
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except expected:
|
||||
# Re-raise it, but maintain the original traceback
|
||||
exc_info = sys.exc_info()
|
||||
new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1]))
|
||||
raise new_exc, None, exc_info[2]
|
||||
finally:
|
||||
del exc_info
|
||||
# We need to preserve the function's argspecs for CherryPy to
|
||||
# handle argument errors correctly. Decorator.decorator takes
|
||||
# care of that.
|
||||
return decorator.decorator(wrapper)
|
||||
|
||||
# Custom CherryPy tools
|
||||
|
||||
def CORS_allow(methods):
|
||||
"""This does several things:
|
||||
|
||||
Handles CORS preflight requests.
|
||||
Adds Allow: header to all requests.
|
||||
Raise 405 if request.method not in method.
|
||||
|
||||
It is similar to cherrypy.tools.allow, with the CORS stuff added.
|
||||
|
||||
Add this to CherryPy with:
|
||||
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
|
||||
"""
|
||||
request = cherrypy.request.headers
|
||||
response = cherrypy.response.headers
|
||||
|
||||
if not isinstance(methods, (tuple, list)): # pragma: no cover
|
||||
methods = [ methods ]
|
||||
methods = [ m.upper() for m in methods if m ]
|
||||
if not methods: # pragma: no cover
|
||||
methods = [ 'GET', 'HEAD' ]
|
||||
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
|
||||
methods.append('HEAD')
|
||||
response['Allow'] = ', '.join(methods)
|
||||
|
||||
# Allow all origins
|
||||
if 'Origin' in request:
|
||||
response['Access-Control-Allow-Origin'] = request['Origin']
|
||||
|
||||
# If it's a CORS request, send response.
|
||||
request_method = request.get("Access-Control-Request-Method", None)
|
||||
request_headers = request.get("Access-Control-Request-Headers", None)
|
||||
if (cherrypy.request.method == "OPTIONS" and
|
||||
request_method and request_headers):
|
||||
response['Access-Control-Allow-Headers'] = request_headers
|
||||
response['Access-Control-Allow-Methods'] = ', '.join(methods)
|
||||
# Try to stop further processing and return a 200 OK
|
||||
cherrypy.response.status = "200 OK"
|
||||
cherrypy.response.body = ""
|
||||
cherrypy.request.handler = lambda: ""
|
||||
return
|
||||
|
||||
# Reject methods that were not explicitly allowed
|
||||
if cherrypy.request.method not in methods:
|
||||
raise cherrypy.HTTPError(405)
|
||||
|
||||
|
||||
# Helper for json_in tool to process JSON data into normal request
|
||||
# parameters.
|
||||
def json_to_request_params(body):
|
||||
cherrypy.lib.jsontools.json_processor(body)
|
||||
if not isinstance(cherrypy.request.json, dict):
|
||||
raise cherrypy.HTTPError(415)
|
||||
cherrypy.request.params.update(cherrypy.request.json)
|
||||
|
||||
# Used as an "error_page.default" handler
|
||||
def json_error_page(status, message, traceback, version,
|
||||
force_traceback = False):
|
||||
"""Return a custom error page in JSON so the client can parse it"""
|
||||
errordata = { "status" : status,
|
||||
"message" : message,
|
||||
"traceback" : traceback }
|
||||
# Don't send a traceback if the error was 400-499 (client's fault)
|
||||
try:
|
||||
code = int(status.split()[0])
|
||||
if not force_traceback:
|
||||
if code >= 400 and code <= 499:
|
||||
errordata["traceback"] = ""
|
||||
except Exception: # pragma: no cover
|
||||
pass
|
||||
# Override the response type, which was previously set to text/html
|
||||
cherrypy.serving.response.headers['Content-Type'] = (
|
||||
"application/json;charset=utf-8" )
|
||||
# Undo the HTML escaping that cherrypy's get_error_page function applies
|
||||
# (cherrypy issue 1135)
|
||||
for k, v in errordata.iteritems():
|
||||
v = v.replace("<","<")
|
||||
v = v.replace(">",">")
|
||||
v = v.replace("&","&")
|
||||
errordata[k] = v
|
||||
return json.dumps(errordata, separators=(',',':'))
|
||||
|
||||
# Start/stop CherryPy standalone server
|
||||
def cherrypy_start(blocking = False, event = False, embedded = False):
|
||||
"""Start the CherryPy server, handling errors and signals
|
||||
somewhat gracefully."""
|
||||
|
||||
if not embedded: # pragma: no cover
|
||||
# Handle signals nicely
|
||||
if hasattr(cherrypy.engine, "signal_handler"):
|
||||
cherrypy.engine.signal_handler.subscribe()
|
||||
if hasattr(cherrypy.engine, "console_control_handler"):
|
||||
cherrypy.engine.console_control_handler.subscribe()
|
||||
|
||||
# Cherrypy stupidly calls os._exit(70) when it can't bind the
|
||||
# port. At least try to print a reasonable error and continue
|
||||
# in this case, rather than just dying silently (as we would
|
||||
# otherwise do in embedded mode)
|
||||
real_exit = os._exit
|
||||
def fake_exit(code): # pragma: no cover
|
||||
if code == os.EX_SOFTWARE:
|
||||
fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
|
||||
else:
|
||||
real_exit(code)
|
||||
os._exit = fake_exit
|
||||
cherrypy.engine.start()
|
||||
os._exit = real_exit
|
||||
|
||||
# Signal that the engine has started successfully
|
||||
if event is not None:
|
||||
event.set()
|
||||
|
||||
if blocking:
|
||||
try:
|
||||
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
|
||||
interval = 0.1, channel = 'main')
|
||||
except (KeyboardInterrupt, IOError): # pragma: no cover
|
||||
cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
|
||||
cherrypy.engine.exit()
|
||||
except SystemExit: # pragma: no cover
|
||||
cherrypy.engine.log('SystemExit raised: shutting down bus')
|
||||
cherrypy.engine.exit()
|
||||
raise
|
||||
|
||||
# Stop CherryPy server
|
||||
def cherrypy_stop():
|
||||
cherrypy.engine.exit()
|
@@ -13,3 +13,5 @@ import nilmdb.utils.time
|
||||
import nilmdb.utils.iterator
|
||||
import nilmdb.utils.interval
|
||||
import nilmdb.utils.lock
|
||||
import nilmdb.utils.sort
|
||||
import nilmdb.utils.unicode
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
import errno
|
||||
from math import log
|
||||
|
||||
def human_size(num):
|
||||
@@ -16,10 +17,17 @@ def human_size(num):
|
||||
return '1 byte'
|
||||
|
||||
def du(path):
|
||||
"""Like du -sb, returns total size of path in bytes."""
|
||||
size = os.path.getsize(path)
|
||||
if os.path.isdir(path):
|
||||
for thisfile in os.listdir(path):
|
||||
filepath = os.path.join(path, thisfile)
|
||||
size += du(filepath)
|
||||
return size
|
||||
"""Like du -sb, returns total size of path in bytes. Ignore
|
||||
errors that might occur if we encounter broken symlinks or
|
||||
files in the process of being removed."""
|
||||
try:
|
||||
size = os.path.getsize(path)
|
||||
if os.path.isdir(path):
|
||||
for thisfile in os.listdir(path):
|
||||
filepath = os.path.join(path, thisfile)
|
||||
size += du(filepath)
|
||||
return size
|
||||
except OSError as e: # pragma: no cover
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
return 0
|
||||
|
@@ -1,5 +1,6 @@
|
||||
"""Interval. Like nilmdb.server.interval, but re-implemented here
|
||||
in plain Python so clients have easier access to it.
|
||||
in plain Python so clients have easier access to it, and with a few
|
||||
helper functions.
|
||||
|
||||
Intervals are half-open, ie. they include data points with timestamps
|
||||
[start, end)
|
||||
@@ -34,6 +35,10 @@ class Interval:
|
||||
return ("[" + nilmdb.utils.time.timestamp_to_string(self.start) +
|
||||
" -> " + nilmdb.utils.time.timestamp_to_string(self.end) + ")")
|
||||
|
||||
def human_string(self):
|
||||
return ("[ " + nilmdb.utils.time.timestamp_to_human(self.start) +
|
||||
" -> " + nilmdb.utils.time.timestamp_to_human(self.end) + " ]")
|
||||
|
||||
def __cmp__(self, other):
|
||||
"""Compare two intervals. If non-equal, order by start then end"""
|
||||
return cmp(self.start, other.start) or cmp(self.end, other.end)
|
||||
@@ -104,3 +109,20 @@ def set_difference(a, b):
|
||||
b_interval = None
|
||||
if a_interval:
|
||||
out_start = ts
|
||||
|
||||
def optimize(it):
|
||||
"""
|
||||
Given an iterable 'it' with intervals, optimize them by joining
|
||||
together intervals that are adjacent in time, and return a generator
|
||||
that yields the new intervals.
|
||||
"""
|
||||
saved_int = None
|
||||
for interval in it:
|
||||
if saved_int is not None:
|
||||
if saved_int.end == interval.start:
|
||||
interval.start = saved_int.start
|
||||
else:
|
||||
yield saved_int
|
||||
saved_int = interval
|
||||
if saved_int is not None:
|
||||
yield saved_int
|
||||
|
18
nilmdb/utils/sort.py
Normal file
18
nilmdb/utils/sort.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import re
|
||||
|
||||
def sort_human(items, key = None):
|
||||
"""Human-friendly sort (/stream/2 before /stream/10)"""
|
||||
def to_num(val):
|
||||
try:
|
||||
return int(val)
|
||||
except Exception:
|
||||
return val
|
||||
|
||||
def human_key(text):
|
||||
if key:
|
||||
text = key(text)
|
||||
# Break into character and numeric chunks.
|
||||
chunks = re.split(r'([0-9]+)', text)
|
||||
return [ to_num(c) for c in chunks ]
|
||||
|
||||
return sorted(items, key = human_key)
|
22
nilmdb/utils/unicode.py
Normal file
22
nilmdb/utils/unicode.py
Normal file
@@ -0,0 +1,22 @@
|
||||
def encode(u):
|
||||
"""Try to encode something from Unicode to a string using the
|
||||
default encoding. If it fails, try encoding as UTF-8."""
|
||||
if not isinstance(u, unicode):
|
||||
return u
|
||||
try:
|
||||
return u.encode()
|
||||
except UnicodeEncodeError:
|
||||
return u.encode("utf-8")
|
||||
|
||||
def decode(s):
|
||||
"""Try to decode someting from string to Unicode using the
|
||||
default encoding. If it fails, try decoding as UTF-8."""
|
||||
if isinstance(s, unicode):
|
||||
return s
|
||||
try:
|
||||
return s.decode()
|
||||
except UnicodeDecodeError:
|
||||
try:
|
||||
return s.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
return s # best we can do
|
8
tests/data/timestamped
Normal file
8
tests/data/timestamped
Normal file
@@ -0,0 +1,8 @@
|
||||
-10000000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
-100000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
-100000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
-1000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
1 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
1000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
1000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
||||
1000000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
|
@@ -105,16 +105,19 @@ class TestClient(object):
|
||||
client.http.post("/stream/list")
|
||||
client = nilmdb.client.Client(url = testurl)
|
||||
|
||||
# Create three streams
|
||||
# Create four streams
|
||||
client.stream_create("/newton/prep", "float32_8")
|
||||
client.stream_create("/newton/raw", "uint16_6")
|
||||
client.stream_create("/newton/zzz/rawnotch", "uint16_9")
|
||||
client.stream_create("/newton/zzz/rawnotch2", "uint16_9")
|
||||
client.stream_create("/newton/zzz/rawnotch11", "uint16_9")
|
||||
|
||||
# Verify we got 3 streams
|
||||
# Verify we got 4 streams in the right order
|
||||
eq_(client.stream_list(), [ ["/newton/prep", "float32_8"],
|
||||
["/newton/raw", "uint16_6"],
|
||||
["/newton/zzz/rawnotch", "uint16_9"]
|
||||
["/newton/zzz/rawnotch2", "uint16_9"],
|
||||
["/newton/zzz/rawnotch11", "uint16_9"]
|
||||
])
|
||||
|
||||
# Match just one type or one path
|
||||
eq_(client.stream_list(layout="uint16_6"),
|
||||
[ ["/newton/raw", "uint16_6"] ])
|
||||
@@ -327,6 +330,10 @@ class TestClient(object):
|
||||
2525.169921875, 8350.83984375, 3724.699951171875,
|
||||
1355.3399658203125, 2039.0))
|
||||
|
||||
# Just get some coverage
|
||||
with assert_raises(ClientError) as e:
|
||||
client.http.post("/stream/remove", { "path": "/none" })
|
||||
|
||||
client.close()
|
||||
|
||||
def test_client_06_generators(self):
|
||||
@@ -613,8 +620,12 @@ class TestClient(object):
|
||||
with client.stream_insert_context("/empty/test", end = 950):
|
||||
pass
|
||||
|
||||
# Equal start and end is OK as long as there's no data
|
||||
with client.stream_insert_context("/empty/test", start=9, end=9):
|
||||
pass
|
||||
|
||||
# Try various things that might cause problems
|
||||
with client.stream_insert_context("/empty/test", 1000, 1050):
|
||||
with client.stream_insert_context("/empty/test", 1000, 1050) as ctx:
|
||||
ctx.finalize() # inserts [1000, 1050]
|
||||
ctx.finalize() # nothing
|
||||
ctx.finalize() # nothing
|
||||
|
@@ -88,7 +88,7 @@ class TestCmdline(object):
|
||||
sys.exit(0)
|
||||
except SystemExit as e:
|
||||
exitcode = e.code
|
||||
captured = outfile.getvalue()
|
||||
captured = nilmdb.utils.unicode.decode(outfile.getvalue())
|
||||
self.captured = captured
|
||||
self.exitcode = exitcode
|
||||
|
||||
@@ -245,8 +245,10 @@ class TestCmdline(object):
|
||||
self.contain("Client version: " + nilmdb.__version__)
|
||||
self.contain("Server version: " + test_server.version)
|
||||
self.contain("Server database path")
|
||||
self.contain("Server database size")
|
||||
self.contain("Server database free space")
|
||||
self.contain("Server disk space used by NilmDB")
|
||||
self.contain("Server disk space used by other")
|
||||
self.contain("Server disk space reserved")
|
||||
self.contain("Server disk space free")
|
||||
|
||||
def test_04_createlist(self):
|
||||
# Basic stream tests, like those in test_client.
|
||||
@@ -473,6 +475,13 @@ class TestCmdline(object):
|
||||
# bad start time
|
||||
self.fail("insert -t -r 120 --start 'whatever' /newton/prep /dev/null")
|
||||
|
||||
# Test negative times
|
||||
self.ok("insert --start @-10000000000 --end @1000000001 /newton/prep"
|
||||
" tests/data/timestamped")
|
||||
self.ok("extract -c /newton/prep --start min --end @1000000001")
|
||||
self.match("8\n")
|
||||
self.ok("remove /newton/prep --start min --end @1000000001")
|
||||
|
||||
def test_07_detail_extended(self):
|
||||
# Just count the number of lines, it's probably fine
|
||||
self.ok("list --detail")
|
||||
@@ -601,6 +610,14 @@ class TestCmdline(object):
|
||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
self.match("43200\n")
|
||||
|
||||
# test binary mode
|
||||
self.fail("extract -c -B /newton/prep -s min -e max")
|
||||
self.contain("binary cannot be combined")
|
||||
self.fail("extract -m -B /newton/prep -s min -e max")
|
||||
self.contain("binary cannot be combined")
|
||||
self.ok("extract -B /newton/prep -s min -e max")
|
||||
eq_(len(self.captured), 43200 * (8 + 8*4))
|
||||
|
||||
# markup for 3 intervals, plus extra markup lines whenever we had
|
||||
# a "restart" from the nilmdb.stream_extract function
|
||||
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
|
@@ -59,6 +59,14 @@ class TestInterval:
|
||||
self.test_interval_intersect()
|
||||
Interval = NilmdbInterval
|
||||
|
||||
# Other helpers in nilmdb.utils.interval
|
||||
i = [ UtilsInterval(1,2), UtilsInterval(2,3), UtilsInterval(4,5) ]
|
||||
eq_(list(nilmdb.utils.interval.optimize(i)),
|
||||
[ UtilsInterval(1,3), UtilsInterval(4,5) ])
|
||||
eq_(UtilsInterval(1234567890123456, 1234567890654321).human_string(),
|
||||
"[ Fri, 13 Feb 2009 18:31:30.123456 -0500 -> " +
|
||||
"Fri, 13 Feb 2009 18:31:30.654321 -0500 ]")
|
||||
|
||||
def test_interval(self):
|
||||
# Test Interval class
|
||||
os.environ['TZ'] = "America/New_York"
|
||||
|
@@ -157,11 +157,14 @@ class TestServer(object):
|
||||
|
||||
def test_server(self):
|
||||
# Make sure we can't force an exit, and test other 404 errors
|
||||
for url in [ "/exit", "/", "/favicon.ico" ]:
|
||||
for url in [ "/exit", "/favicon.ico" ]:
|
||||
with assert_raises(HTTPError) as e:
|
||||
geturl(url)
|
||||
eq_(e.exception.code, 404)
|
||||
|
||||
# Root page
|
||||
in_("This is NilmDB", geturl("/"))
|
||||
|
||||
# Check version
|
||||
eq_(distutils.version.LooseVersion(getjson("/version")),
|
||||
distutils.version.LooseVersion(nilmdb.__version__))
|
||||
|
@@ -28,7 +28,10 @@ def setup_module():
|
||||
recursive_unlink(testdb)
|
||||
|
||||
# Start web app on a custom port
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
|
||||
testdb, bulkdata_args = { "file_size" : 16384,
|
||||
"files_per_dir" : 3 } )
|
||||
|
||||
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
||||
port = 32180, stoppable = False,
|
||||
fast_shutdown = True,
|
||||
@@ -179,6 +182,17 @@ class TestNumpyClient(object):
|
||||
assert(np.array_equal(a,b))
|
||||
assert(np.array_equal(a,c))
|
||||
|
||||
# Make sure none of the files are greater than 16384 bytes as
|
||||
# we configured with the bulkdata_args above.
|
||||
datapath = os.path.join(testdb, "data")
|
||||
for (dirpath, dirnames, filenames) in os.walk(datapath):
|
||||
for f in filenames:
|
||||
fn = os.path.join(dirpath, f)
|
||||
size = os.path.getsize(fn)
|
||||
if size > 16384:
|
||||
raise AssertionError(sprintf("%s is too big: %d > %d\n",
|
||||
fn, size, 16384))
|
||||
|
||||
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
|
||||
client.close()
|
||||
|
||||
@@ -295,8 +309,25 @@ class TestNumpyClient(object):
|
||||
with client.stream_insert_numpy_context("/empty/test", end = 950):
|
||||
pass
|
||||
|
||||
# Equal start and end is OK as long as there's no data
|
||||
with assert_raises(ClientError) as e:
|
||||
with client.stream_insert_numpy_context("/empty/test",
|
||||
start=9, end=9) as ctx:
|
||||
ctx.insert([[9, 9]])
|
||||
ctx.finalize()
|
||||
in_("have data to send, but invalid start/end times", str(e.exception))
|
||||
|
||||
with client.stream_insert_numpy_context("/empty/test",
|
||||
start=9, end=9) as ctx:
|
||||
pass
|
||||
|
||||
# reusing a context object is bad
|
||||
with assert_raises(Exception) as e:
|
||||
ctx.insert([[9, 9]])
|
||||
|
||||
# Try various things that might cause problems
|
||||
with client.stream_insert_numpy_context("/empty/test", 1000, 1050):
|
||||
with client.stream_insert_numpy_context("/empty/test",
|
||||
1000, 1050) as ctx:
|
||||
ctx.finalize() # inserts [1000, 1050]
|
||||
ctx.finalize() # nothing
|
||||
ctx.finalize() # nothing
|
||||
|
Reference in New Issue
Block a user