Compare commits
11 Commits
nilmdb-1.4
...
nilmdb-1.4
Author | SHA1 | Date | |
---|---|---|---|
f0304b4c00 | |||
60594ca58e | |||
c7f2df4abc | |||
5b7409f802 | |||
06038062a2 | |||
ae9fe89759 | |||
04def60021 | |||
9ce0f69dff | |||
90c3be91c4 | |||
ebccfb3531 | |||
e006f1d02e |
@@ -6,6 +6,7 @@ import nilmdb.utils
|
|||||||
import nilmdb.client.httpclient
|
import nilmdb.client.httpclient
|
||||||
from nilmdb.client.errors import ClientError
|
from nilmdb.client.errors import ClientError
|
||||||
|
|
||||||
|
import re
|
||||||
import time
|
import time
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
import contextlib
|
import contextlib
|
||||||
@@ -65,7 +66,12 @@ class Client(object):
|
|||||||
params["layout"] = layout
|
params["layout"] = layout
|
||||||
if extended:
|
if extended:
|
||||||
params["extended"] = 1
|
params["extended"] = 1
|
||||||
return self.http.get("stream/list", params)
|
def sort_streams_nicely(x):
|
||||||
|
"""Human-friendly sort (/stream/2 before /stream/10)"""
|
||||||
|
num = lambda t: int(t) if t.isdigit() else t
|
||||||
|
key = lambda k: [ num(c) for c in re.split('([0-9]+)', k[0]) ]
|
||||||
|
return sorted(x, key = key)
|
||||||
|
return sort_streams_nicely(self.http.get("stream/list", params))
|
||||||
|
|
||||||
def stream_get_metadata(self, path, keys = None):
|
def stream_get_metadata(self, path, keys = None):
|
||||||
params = { "path": path }
|
params = { "path": path }
|
||||||
@@ -172,7 +178,7 @@ class Client(object):
|
|||||||
return self.http.get_gen("stream/intervals", params)
|
return self.http.get_gen("stream/intervals", params)
|
||||||
|
|
||||||
def stream_extract(self, path, start = None, end = None,
|
def stream_extract(self, path, start = None, end = None,
|
||||||
count = False, markup = False):
|
count = False, markup = False, binary = False):
|
||||||
"""
|
"""
|
||||||
Extract data from a stream. Returns a generator that yields
|
Extract data from a stream. Returns a generator that yields
|
||||||
lines of ASCII-formatted data that matches the database
|
lines of ASCII-formatted data that matches the database
|
||||||
@@ -183,6 +189,11 @@ class Client(object):
|
|||||||
|
|
||||||
Specify markup = True to include comments in the returned data
|
Specify markup = True to include comments in the returned data
|
||||||
that indicate interval starts and ends.
|
that indicate interval starts and ends.
|
||||||
|
|
||||||
|
Specify binary = True to return chunks of raw binary data,
|
||||||
|
rather than lines of ASCII-formatted data. Raw binary data
|
||||||
|
is always little-endian and matches the database types
|
||||||
|
(including a uint64 timestamp).
|
||||||
"""
|
"""
|
||||||
params = {
|
params = {
|
||||||
"path": path,
|
"path": path,
|
||||||
@@ -195,7 +206,9 @@ class Client(object):
|
|||||||
params["count"] = 1
|
params["count"] = 1
|
||||||
if markup:
|
if markup:
|
||||||
params["markup"] = 1
|
params["markup"] = 1
|
||||||
return self.http.get_gen("stream/extract", params)
|
if binary:
|
||||||
|
params["binary"] = 1
|
||||||
|
return self.http.get_gen("stream/extract", params, binary = binary)
|
||||||
|
|
||||||
def stream_count(self, path, start = None, end = None):
|
def stream_count(self, path, start = None, end = None):
|
||||||
"""
|
"""
|
||||||
|
@@ -110,7 +110,8 @@ class HTTPClient(object):
|
|||||||
return self._req("PUT", url, params, data)
|
return self._req("PUT", url, params, data)
|
||||||
|
|
||||||
# Generator versions that return data one line at a time.
|
# Generator versions that return data one line at a time.
|
||||||
def _req_gen(self, method, url, query = None, body = None, headers = None):
|
def _req_gen(self, method, url, query = None, body = None,
|
||||||
|
headers = None, binary = False):
|
||||||
"""
|
"""
|
||||||
Make a request and return a generator that gives back strings
|
Make a request and return a generator that gives back strings
|
||||||
or JSON decoded lines of the body data, or raise an error if
|
or JSON decoded lines of the body data, or raise an error if
|
||||||
@@ -118,16 +119,19 @@ class HTTPClient(object):
|
|||||||
"""
|
"""
|
||||||
(response, isjson) = self._do_req(method, url, query, body,
|
(response, isjson) = self._do_req(method, url, query, body,
|
||||||
stream = True, headers = headers)
|
stream = True, headers = headers)
|
||||||
if isjson:
|
if binary:
|
||||||
|
for chunk in response.iter_content(chunk_size = 65536):
|
||||||
|
yield chunk
|
||||||
|
elif isjson:
|
||||||
for line in response.iter_lines():
|
for line in response.iter_lines():
|
||||||
yield json.loads(line)
|
yield json.loads(line)
|
||||||
else:
|
else:
|
||||||
for line in response.iter_lines():
|
for line in response.iter_lines():
|
||||||
yield line
|
yield line
|
||||||
|
|
||||||
def get_gen(self, url, params = None):
|
def get_gen(self, url, params = None, binary = False):
|
||||||
"""Simple GET (parameters in URL) returning a generator"""
|
"""Simple GET (parameters in URL) returning a generator"""
|
||||||
return self._req_gen("GET", url, params)
|
return self._req_gen("GET", url, params, binary = binary)
|
||||||
|
|
||||||
# Not much use for a POST or PUT generator, since they don't
|
# Not much use for a POST or PUT generator, since they don't
|
||||||
# return much data.
|
# return much data.
|
||||||
|
77
nilmdb/client/numpyclient.py
Normal file
77
nilmdb/client/numpyclient.py
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
"""Provide a NumpyClient class that is based on normal Client, but has
|
||||||
|
additional methods for extracting and inserting data via Numpy arrays."""
|
||||||
|
|
||||||
|
import nilmdb.utils
|
||||||
|
import nilmdb.client.client
|
||||||
|
import nilmdb.client.httpclient
|
||||||
|
from nilmdb.client.errors import ClientError
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
from nilmdb.utils.time import timestamp_to_string, string_to_timestamp
|
||||||
|
|
||||||
|
import numpy
|
||||||
|
import cStringIO
|
||||||
|
|
||||||
|
def layout_to_dtype(layout):
|
||||||
|
ltype = layout.split('_')[0]
|
||||||
|
lcount = int(layout.split('_')[1])
|
||||||
|
if ltype.startswith('int'):
|
||||||
|
atype = '<i' + str(int(ltype[3:]) / 8)
|
||||||
|
elif ltype.startswith('uint'):
|
||||||
|
atype = '<u' + str(int(ltype[4:]) / 8)
|
||||||
|
elif ltype.startswith('float'):
|
||||||
|
atype = '<f' + str(int(ltype[5:]) / 8)
|
||||||
|
else:
|
||||||
|
raise ValueError("bad layout")
|
||||||
|
return numpy.dtype([('timestamp', '<i8'), ('data', atype, lcount)])
|
||||||
|
|
||||||
|
class NumpyClient(nilmdb.client.client.Client):
|
||||||
|
"""Subclass of nilmdb.client.Client that adds additional methods for
|
||||||
|
extracting and inserting data via Numpy arrays."""
|
||||||
|
|
||||||
|
def stream_extract_numpy(self, path, start = None, end = None,
|
||||||
|
layout = None, maxrows = 100000,
|
||||||
|
structured = False):
|
||||||
|
"""
|
||||||
|
Extract data from a stream. Returns a generator that yields
|
||||||
|
Numpy arrays of up to 'maxrows' of data each.
|
||||||
|
|
||||||
|
If 'layout' is None, it is read using stream_info.
|
||||||
|
|
||||||
|
If 'structured' is False, all data is converted to float64
|
||||||
|
and returned in a flat 2D array. Otherwise, data is returned
|
||||||
|
as a structured dtype in a 1D array.
|
||||||
|
"""
|
||||||
|
if layout is None:
|
||||||
|
streams = self.stream_list(path)
|
||||||
|
if len(streams) != 1:
|
||||||
|
raise ClientError("can't get layout for path: " + path)
|
||||||
|
layout = streams[0][1]
|
||||||
|
dtype = layout_to_dtype(layout)
|
||||||
|
|
||||||
|
def to_numpy(data):
|
||||||
|
a = numpy.fromstring(data, dtype)
|
||||||
|
if structured:
|
||||||
|
return a
|
||||||
|
return numpy.c_[a['timestamp'], a['data']]
|
||||||
|
|
||||||
|
chunks = []
|
||||||
|
total_len = 0
|
||||||
|
maxsize = dtype.itemsize * maxrows
|
||||||
|
for data in self.stream_extract(path, start, end, binary = True):
|
||||||
|
# Add this block of binary data
|
||||||
|
chunks.append(data)
|
||||||
|
total_len += len(data)
|
||||||
|
|
||||||
|
# See if we have enough to make the requested Numpy array
|
||||||
|
while total_len >= maxsize:
|
||||||
|
assembled = "".join(chunks)
|
||||||
|
total_len -= maxsize
|
||||||
|
chunks = [ assembled[maxsize:] ]
|
||||||
|
block = assembled[:maxsize]
|
||||||
|
yield to_numpy(block)
|
||||||
|
|
||||||
|
if total_len:
|
||||||
|
yield to_numpy("".join(chunks))
|
@@ -81,7 +81,7 @@ class Cmdline(object):
|
|||||||
def __init__(self, argv = None):
|
def __init__(self, argv = None):
|
||||||
self.argv = argv or sys.argv[1:]
|
self.argv = argv or sys.argv[1:]
|
||||||
self.client = None
|
self.client = None
|
||||||
self.def_url = os.environ.get("NILMDB_URL", "http://localhost:12380")
|
self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
|
||||||
self.subcmd = {}
|
self.subcmd = {}
|
||||||
self.complete = Complete()
|
self.complete = Complete()
|
||||||
|
|
||||||
|
@@ -9,7 +9,8 @@ def setup(self, sub):
|
|||||||
a stream.
|
a stream.
|
||||||
""",
|
""",
|
||||||
usage="%(prog)s path [-g [key ...] | "
|
usage="%(prog)s path [-g [key ...] | "
|
||||||
"-s key=value [...] | -u key=value [...]]")
|
"-s key=value [...] | -u key=value [...]] | "
|
||||||
|
"-d [key ...]")
|
||||||
cmd.set_defaults(handler = cmd_metadata)
|
cmd.set_defaults(handler = cmd_metadata)
|
||||||
|
|
||||||
group = cmd.add_argument_group("Required arguments")
|
group = cmd.add_argument_group("Required arguments")
|
||||||
@@ -30,6 +31,9 @@ def setup(self, sub):
|
|||||||
help="Update metadata using provided "
|
help="Update metadata using provided "
|
||||||
"key=value pairs",
|
"key=value pairs",
|
||||||
).completer = self.complete.meta_keyval
|
).completer = self.complete.meta_keyval
|
||||||
|
exc.add_argument("-d", "--delete", nargs="*", metavar="key",
|
||||||
|
help="Delete metadata for specified keys (default all)",
|
||||||
|
).completer = self.complete.meta_key
|
||||||
return cmd
|
return cmd
|
||||||
|
|
||||||
def cmd_metadata(self):
|
def cmd_metadata(self):
|
||||||
@@ -56,6 +60,16 @@ def cmd_metadata(self):
|
|||||||
handler(self.args.path, data)
|
handler(self.args.path, data)
|
||||||
except nilmdb.client.ClientError as e:
|
except nilmdb.client.ClientError as e:
|
||||||
self.die("error setting/updating metadata: %s", str(e))
|
self.die("error setting/updating metadata: %s", str(e))
|
||||||
|
elif self.args.delete is not None:
|
||||||
|
# Delete (by setting values to empty strings)
|
||||||
|
keys = self.args.delete or None
|
||||||
|
try:
|
||||||
|
data = self.client.stream_get_metadata(self.args.path, keys)
|
||||||
|
for key in data:
|
||||||
|
data[key] = ""
|
||||||
|
self.client.stream_update_metadata(self.args.path, data)
|
||||||
|
except nilmdb.client.ClientError as e:
|
||||||
|
self.die("error deleting metadata: %s", str(e))
|
||||||
else:
|
else:
|
||||||
# Get (or unspecified)
|
# Get (or unspecified)
|
||||||
keys = self.args.get or None
|
keys = self.args.get or None
|
||||||
@@ -64,7 +78,7 @@ def cmd_metadata(self):
|
|||||||
except nilmdb.client.ClientError as e:
|
except nilmdb.client.ClientError as e:
|
||||||
self.die("error getting metadata: %s", str(e))
|
self.die("error getting metadata: %s", str(e))
|
||||||
for key, value in sorted(data.items()):
|
for key, value in sorted(data.items()):
|
||||||
# Omit nonexistant keys
|
# Print nonexistant keys as having empty value
|
||||||
if value is None:
|
if value is None:
|
||||||
value = ""
|
value = ""
|
||||||
printf("%s=%s\n", key, value)
|
printf("%s=%s\n", key, value)
|
||||||
|
@@ -79,7 +79,12 @@ class BulkData(object):
|
|||||||
if Table.exists(ospath):
|
if Table.exists(ospath):
|
||||||
raise ValueError("stream already exists at this path")
|
raise ValueError("stream already exists at this path")
|
||||||
if os.path.isdir(ospath):
|
if os.path.isdir(ospath):
|
||||||
raise ValueError("subdirs of this path already exist")
|
# Look for any files in subdirectories. Fully empty subdirectories
|
||||||
|
# are OK; they might be there during a rename
|
||||||
|
for (root, dirs, files) in os.walk(ospath):
|
||||||
|
if len(files):
|
||||||
|
raise ValueError(
|
||||||
|
"non-empty subdirs of this path already exist")
|
||||||
|
|
||||||
def _create_parents(self, unicodepath):
|
def _create_parents(self, unicodepath):
|
||||||
"""Verify the path name, and create parent directories if they
|
"""Verify the path name, and create parent directories if they
|
||||||
@@ -188,7 +193,6 @@ class BulkData(object):
|
|||||||
# Basic checks
|
# Basic checks
|
||||||
if oldospath == newospath:
|
if oldospath == newospath:
|
||||||
raise ValueError("old and new paths are the same")
|
raise ValueError("old and new paths are the same")
|
||||||
self._create_check_ospath(newospath)
|
|
||||||
|
|
||||||
# Move the table to a temporary location
|
# Move the table to a temporary location
|
||||||
tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root)
|
tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root)
|
||||||
@@ -196,6 +200,9 @@ class BulkData(object):
|
|||||||
os.rename(oldospath, tmppath)
|
os.rename(oldospath, tmppath)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Check destination path
|
||||||
|
self._create_check_ospath(newospath)
|
||||||
|
|
||||||
# Create parent dirs for new location
|
# Create parent dirs for new location
|
||||||
self._create_parents(newunicodepath)
|
self._create_parents(newunicodepath)
|
||||||
|
|
||||||
@@ -472,7 +479,7 @@ class Table(object):
|
|||||||
# Success, so update self.nrows accordingly
|
# Success, so update self.nrows accordingly
|
||||||
self.nrows = tot_rows
|
self.nrows = tot_rows
|
||||||
|
|
||||||
def get_data(self, start, stop):
|
def get_data(self, start, stop, binary = False):
|
||||||
"""Extract data corresponding to Python range [n:m],
|
"""Extract data corresponding to Python range [n:m],
|
||||||
and returns a formatted string"""
|
and returns a formatted string"""
|
||||||
if (start is None or
|
if (start is None or
|
||||||
@@ -490,10 +497,13 @@ class Table(object):
|
|||||||
if count > remaining:
|
if count > remaining:
|
||||||
count = remaining
|
count = remaining
|
||||||
f = self.file_open(subdir, filename)
|
f = self.file_open(subdir, filename)
|
||||||
ret.append(f.extract_string(offset, count))
|
if binary:
|
||||||
|
ret.append(f.extract_binary(offset, count))
|
||||||
|
else:
|
||||||
|
ret.append(f.extract_string(offset, count))
|
||||||
remaining -= count
|
remaining -= count
|
||||||
row += count
|
row += count
|
||||||
return "".join(ret)
|
return b"".join(ret)
|
||||||
|
|
||||||
def __getitem__(self, row):
|
def __getitem__(self, row):
|
||||||
"""Extract timestamps from a row, with table[n] notation."""
|
"""Extract timestamps from a row, with table[n] notation."""
|
||||||
|
@@ -538,7 +538,7 @@ class NilmDB(object):
|
|||||||
dbinterval.db_endpos)
|
dbinterval.db_endpos)
|
||||||
|
|
||||||
def stream_extract(self, path, start = None, end = None,
|
def stream_extract(self, path, start = None, end = None,
|
||||||
count = False, markup = False):
|
count = False, markup = False, binary = False):
|
||||||
"""
|
"""
|
||||||
Returns (data, restart) tuple.
|
Returns (data, restart) tuple.
|
||||||
|
|
||||||
@@ -559,6 +559,9 @@ class NilmDB(object):
|
|||||||
'markup', if true, indicates that returned data should be
|
'markup', if true, indicates that returned data should be
|
||||||
marked with a comment denoting when a particular interval
|
marked with a comment denoting when a particular interval
|
||||||
starts, and another comment when an interval ends.
|
starts, and another comment when an interval ends.
|
||||||
|
|
||||||
|
'binary', if true, means to return raw binary rather than
|
||||||
|
ASCII-formatted data.
|
||||||
"""
|
"""
|
||||||
stream_id = self._stream_id(path)
|
stream_id = self._stream_id(path)
|
||||||
table = self.data.getnode(path)
|
table = self.data.getnode(path)
|
||||||
@@ -569,6 +572,8 @@ class NilmDB(object):
|
|||||||
matched = 0
|
matched = 0
|
||||||
remaining = self.max_results
|
remaining = self.max_results
|
||||||
restart = None
|
restart = None
|
||||||
|
if binary and (markup or count):
|
||||||
|
raise NilmDBError("binary mode can't be used with markup or count")
|
||||||
for interval in intervals.intersection(requested):
|
for interval in intervals.intersection(requested):
|
||||||
# Reading single rows from the table is too slow, so
|
# Reading single rows from the table is too slow, so
|
||||||
# we use two bisections to find both the starting and
|
# we use two bisections to find both the starting and
|
||||||
@@ -593,7 +598,7 @@ class NilmDB(object):
|
|||||||
timestamp_to_string(interval.start) + "\n")
|
timestamp_to_string(interval.start) + "\n")
|
||||||
|
|
||||||
# Gather these results up
|
# Gather these results up
|
||||||
result.append(table.get_data(row_start, row_end))
|
result.append(table.get_data(row_start, row_end, binary))
|
||||||
|
|
||||||
# Count them
|
# Count them
|
||||||
remaining -= row_end - row_start
|
remaining -= row_end - row_start
|
||||||
|
@@ -527,6 +527,46 @@ err:
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/****
|
||||||
|
* Extract to binary string containing raw little-endian binary data
|
||||||
|
*/
|
||||||
|
static PyObject *Rocket_extract_binary(Rocket *self, PyObject *args)
|
||||||
|
{
|
||||||
|
long count;
|
||||||
|
long offset;
|
||||||
|
|
||||||
|
if (!PyArg_ParseTuple(args, "ll", &offset, &count))
|
||||||
|
return NULL;
|
||||||
|
if (!self->file) {
|
||||||
|
PyErr_SetString(PyExc_Exception, "no file");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
/* Seek to target location */
|
||||||
|
if (fseek(self->file, offset, SEEK_SET) < 0) {
|
||||||
|
PyErr_SetFromErrno(PyExc_OSError);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t *str;
|
||||||
|
int len = count * self->binary_size;
|
||||||
|
str = malloc(len);
|
||||||
|
if (str == NULL) {
|
||||||
|
PyErr_SetFromErrno(PyExc_OSError);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Data in the file is already in the desired little-endian
|
||||||
|
binary format, so just read it directly. */
|
||||||
|
if (fread(str, self->binary_size, count, self->file) != count) {
|
||||||
|
free(str);
|
||||||
|
PyErr_SetFromErrno(PyExc_OSError);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
PyObject *pystr = PyBytes_FromStringAndSize((char *)str, len);
|
||||||
|
free(str);
|
||||||
|
return pystr;
|
||||||
|
}
|
||||||
|
|
||||||
/****
|
/****
|
||||||
* Extract timestamp
|
* Extract timestamp
|
||||||
@@ -600,6 +640,12 @@ static PyMethodDef Rocket_methods[] = {
|
|||||||
"Extract count rows of data from the file at offset offset.\n"
|
"Extract count rows of data from the file at offset offset.\n"
|
||||||
"Return an ascii formatted string according to the layout" },
|
"Return an ascii formatted string according to the layout" },
|
||||||
|
|
||||||
|
{ "extract_binary",
|
||||||
|
(PyCFunction)Rocket_extract_binary, METH_VARARGS,
|
||||||
|
"extract_binary(self, offset, count)\n\n"
|
||||||
|
"Extract count rows of data from the file at offset offset.\n"
|
||||||
|
"Return a raw binary string of data matching the data layout." },
|
||||||
|
|
||||||
{ "extract_timestamp",
|
{ "extract_timestamp",
|
||||||
(PyCFunction)Rocket_extract_timestamp, METH_VARARGS,
|
(PyCFunction)Rocket_extract_timestamp, METH_VARARGS,
|
||||||
"extract_timestamp(self, offset)\n\n"
|
"extract_timestamp(self, offset)\n\n"
|
||||||
|
@@ -317,9 +317,8 @@ class Stream(NilmApp):
|
|||||||
body = cherrypy.request.body.read()
|
body = cherrypy.request.body.read()
|
||||||
|
|
||||||
# Check path and get layout
|
# Check path and get layout
|
||||||
streams = self.db.stream_list(path = path)
|
if len(self.db.stream_list(path = path)) != 1:
|
||||||
if len(streams) != 1:
|
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||||
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
|
||||||
|
|
||||||
# Check limits
|
# Check limits
|
||||||
(start, end) = self._get_times(start, end)
|
(start, end) = self._get_times(start, end)
|
||||||
@@ -401,7 +400,7 @@ class Stream(NilmApp):
|
|||||||
@chunked_response
|
@chunked_response
|
||||||
@response_type("text/plain")
|
@response_type("text/plain")
|
||||||
def extract(self, path, start = None, end = None,
|
def extract(self, path, start = None, end = None,
|
||||||
count = False, markup = False):
|
count = False, markup = False, binary = False):
|
||||||
"""
|
"""
|
||||||
Extract data from backend database. Streams the resulting
|
Extract data from backend database. Streams the resulting
|
||||||
entries as ASCII text lines separated by newlines. This may
|
entries as ASCII text lines separated by newlines. This may
|
||||||
@@ -412,13 +411,24 @@ class Stream(NilmApp):
|
|||||||
|
|
||||||
If 'markup' is True, adds comments to the stream denoting each
|
If 'markup' is True, adds comments to the stream denoting each
|
||||||
interval's start and end timestamp.
|
interval's start and end timestamp.
|
||||||
|
|
||||||
|
If 'binary' is True, return raw binary data, rather than lines
|
||||||
|
of ASCII-formatted data. Raw binary data is always
|
||||||
|
little-endian and matches the database types (including a
|
||||||
|
uint64 timestamp).
|
||||||
"""
|
"""
|
||||||
(start, end) = self._get_times(start, end)
|
(start, end) = self._get_times(start, end)
|
||||||
|
|
||||||
# Check path and get layout
|
# Check path and get layout
|
||||||
streams = self.db.stream_list(path = path)
|
if len(self.db.stream_list(path = path)) != 1:
|
||||||
if len(streams) != 1:
|
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||||
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
|
||||||
|
if binary:
|
||||||
|
cherrypy.response.headers['Content-Type'] = (
|
||||||
|
"application/octet-stream")
|
||||||
|
if markup or count:
|
||||||
|
raise cherrypy.HTTPError("400", "can't mix binary and "
|
||||||
|
"markup or count modes")
|
||||||
|
|
||||||
@workaround_cp_bug_1200
|
@workaround_cp_bug_1200
|
||||||
def content(start, end):
|
def content(start, end):
|
||||||
@@ -431,7 +441,8 @@ class Stream(NilmApp):
|
|||||||
|
|
||||||
while True:
|
while True:
|
||||||
(data, restart) = self.db.stream_extract(
|
(data, restart) = self.db.stream_extract(
|
||||||
path, start, end, count = False, markup = markup)
|
path, start, end, count = False,
|
||||||
|
markup = markup, binary = binary)
|
||||||
yield data
|
yield data
|
||||||
|
|
||||||
if restart is None:
|
if restart is None:
|
||||||
|
@@ -65,6 +65,14 @@ def parse_time(toparse):
|
|||||||
if toparse == "max":
|
if toparse == "max":
|
||||||
return max_timestamp
|
return max_timestamp
|
||||||
|
|
||||||
|
# If it starts with @, treat it as a NILM timestamp
|
||||||
|
# (integer microseconds since epoch)
|
||||||
|
try:
|
||||||
|
if toparse[0] == '@':
|
||||||
|
return int(toparse[1:])
|
||||||
|
except (ValueError, KeyError, IndexError):
|
||||||
|
pass
|
||||||
|
|
||||||
# If string isn't "now" and doesn't contain at least 4 digits,
|
# If string isn't "now" and doesn't contain at least 4 digits,
|
||||||
# consider it invalid. smartparse might otherwise accept
|
# consider it invalid. smartparse might otherwise accept
|
||||||
# empty strings and strings with just separators.
|
# empty strings and strings with just separators.
|
||||||
@@ -78,14 +86,6 @@ def parse_time(toparse):
|
|||||||
except (ValueError, OverflowError):
|
except (ValueError, OverflowError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# If it starts with @, treat it as a NILM timestamp
|
|
||||||
# (integer microseconds since epoch)
|
|
||||||
try:
|
|
||||||
if toparse[0] == '@':
|
|
||||||
return int(toparse[1:])
|
|
||||||
except (ValueError, KeyError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
# If it's parseable as a float, treat it as a Unix or NILM
|
# If it's parseable as a float, treat it as a Unix or NILM
|
||||||
# timestamp based on its range.
|
# timestamp based on its range.
|
||||||
try:
|
try:
|
||||||
|
1
setup.py
1
setup.py
@@ -107,6 +107,7 @@ setup(name='nilmdb',
|
|||||||
author_email = 'jim@jtan.com',
|
author_email = 'jim@jtan.com',
|
||||||
tests_require = [ 'nose',
|
tests_require = [ 'nose',
|
||||||
'coverage',
|
'coverage',
|
||||||
|
'numpy',
|
||||||
],
|
],
|
||||||
setup_requires = [ 'distribute',
|
setup_requires = [ 'distribute',
|
||||||
],
|
],
|
||||||
|
@@ -12,6 +12,7 @@ test_interval.py
|
|||||||
test_bulkdata.py
|
test_bulkdata.py
|
||||||
test_nilmdb.py
|
test_nilmdb.py
|
||||||
test_client.py
|
test_client.py
|
||||||
|
test_numpyclient.py
|
||||||
test_cmdline.py
|
test_cmdline.py
|
||||||
|
|
||||||
test_*.py
|
test_*.py
|
||||||
|
@@ -23,6 +23,7 @@ import warnings
|
|||||||
import resource
|
import resource
|
||||||
import time
|
import time
|
||||||
import re
|
import re
|
||||||
|
import struct
|
||||||
|
|
||||||
from testutil.helpers import *
|
from testutil.helpers import *
|
||||||
|
|
||||||
@@ -293,6 +294,23 @@ class TestClient(object):
|
|||||||
# Test count
|
# Test count
|
||||||
eq_(client.stream_count("/newton/prep"), 14400)
|
eq_(client.stream_count("/newton/prep"), 14400)
|
||||||
|
|
||||||
|
# Test binary output
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
list(client.stream_extract("/newton/prep",
|
||||||
|
markup = True, binary = True))
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
list(client.stream_extract("/newton/prep",
|
||||||
|
count = True, binary = True))
|
||||||
|
data = "".join(client.stream_extract("/newton/prep", binary = True))
|
||||||
|
# Quick check using struct
|
||||||
|
unpacker = struct.Struct("<qffffffff")
|
||||||
|
out = []
|
||||||
|
for i in range(14400):
|
||||||
|
out.append(unpacker.unpack_from(data, i * unpacker.size))
|
||||||
|
eq_(out[0], (1332511200000000, 266568.0, 224029.0, 5161.39990234375,
|
||||||
|
2525.169921875, 8350.83984375, 3724.699951171875,
|
||||||
|
1355.3399658203125, 2039.0))
|
||||||
|
|
||||||
client.close()
|
client.close()
|
||||||
|
|
||||||
def test_client_06_generators(self):
|
def test_client_06_generators(self):
|
||||||
|
@@ -369,6 +369,8 @@ class TestCmdline(object):
|
|||||||
self.contain("No stream at path")
|
self.contain("No stream at path")
|
||||||
self.fail("metadata /newton/nosuchstream --set foo=bar")
|
self.fail("metadata /newton/nosuchstream --set foo=bar")
|
||||||
self.contain("No stream at path")
|
self.contain("No stream at path")
|
||||||
|
self.fail("metadata /newton/nosuchstream --delete")
|
||||||
|
self.contain("No stream at path")
|
||||||
|
|
||||||
self.ok("metadata /newton/prep")
|
self.ok("metadata /newton/prep")
|
||||||
self.match("description=The Data\nv_scale=1.234\n")
|
self.match("description=The Data\nv_scale=1.234\n")
|
||||||
@@ -394,6 +396,19 @@ class TestCmdline(object):
|
|||||||
self.fail("metadata /newton/nosuchpath")
|
self.fail("metadata /newton/nosuchpath")
|
||||||
self.contain("No stream at path /newton/nosuchpath")
|
self.contain("No stream at path /newton/nosuchpath")
|
||||||
|
|
||||||
|
self.ok("metadata /newton/prep --delete")
|
||||||
|
self.ok("metadata /newton/prep --get")
|
||||||
|
self.match("")
|
||||||
|
self.ok("metadata /newton/prep --set "
|
||||||
|
"'description=The Data' "
|
||||||
|
"v_scale=1.234")
|
||||||
|
self.ok("metadata /newton/prep --delete v_scale")
|
||||||
|
self.ok("metadata /newton/prep --get")
|
||||||
|
self.match("description=The Data\n")
|
||||||
|
self.ok("metadata /newton/prep --set description=")
|
||||||
|
self.ok("metadata /newton/prep --get")
|
||||||
|
self.match("")
|
||||||
|
|
||||||
def test_06_insert(self):
|
def test_06_insert(self):
|
||||||
self.ok("insert --help")
|
self.ok("insert --help")
|
||||||
|
|
||||||
@@ -1038,10 +1053,12 @@ class TestCmdline(object):
|
|||||||
self.contain("old and new paths are the same")
|
self.contain("old and new paths are the same")
|
||||||
check_path("newton", "prep")
|
check_path("newton", "prep")
|
||||||
self.fail("rename /newton/prep /newton")
|
self.fail("rename /newton/prep /newton")
|
||||||
self.contain("subdirs of this path already exist")
|
self.contain("path must contain at least one folder")
|
||||||
self.fail("rename /newton/prep /newton/prep/")
|
self.fail("rename /newton/prep /newton/prep/")
|
||||||
self.contain("invalid path")
|
self.contain("invalid path")
|
||||||
self.ok("rename /newton/prep /newton/foo")
|
self.ok("rename /newton/prep /newton/foo/1")
|
||||||
|
check_path("newton", "foo", "1")
|
||||||
|
self.ok("rename /newton/foo/1 /newton/foo")
|
||||||
check_path("newton", "foo")
|
check_path("newton", "foo")
|
||||||
self.ok("rename /newton/foo /totally/different/thing")
|
self.ok("rename /newton/foo /totally/different/thing")
|
||||||
check_path("totally", "different", "thing")
|
check_path("totally", "different", "thing")
|
||||||
|
@@ -90,13 +90,16 @@ class Test00Nilmdb(object): # named 00 so it runs first
|
|||||||
eq_(db.stream_get_metadata("/newton/prep"), meta1)
|
eq_(db.stream_get_metadata("/newton/prep"), meta1)
|
||||||
eq_(db.stream_get_metadata("/newton/raw"), meta1)
|
eq_(db.stream_get_metadata("/newton/raw"), meta1)
|
||||||
|
|
||||||
# fill in some test coverage for start >= end
|
# fill in some misc. test coverage
|
||||||
with assert_raises(nilmdb.server.NilmDBError):
|
with assert_raises(nilmdb.server.NilmDBError):
|
||||||
db.stream_remove("/newton/prep", 0, 0)
|
db.stream_remove("/newton/prep", 0, 0)
|
||||||
with assert_raises(nilmdb.server.NilmDBError):
|
with assert_raises(nilmdb.server.NilmDBError):
|
||||||
db.stream_remove("/newton/prep", 1, 0)
|
db.stream_remove("/newton/prep", 1, 0)
|
||||||
db.stream_remove("/newton/prep", 0, 1)
|
db.stream_remove("/newton/prep", 0, 1)
|
||||||
|
|
||||||
|
with assert_raises(nilmdb.server.NilmDBError):
|
||||||
|
db.stream_extract("/newton/prep", count = True, binary = True)
|
||||||
|
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
class TestBlockingServer(object):
|
class TestBlockingServer(object):
|
||||||
|
108
tests/test_numpyclient.py
Normal file
108
tests/test_numpyclient.py
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import nilmdb.server
|
||||||
|
import nilmdb.client
|
||||||
|
import nilmdb.client.numpyclient
|
||||||
|
|
||||||
|
from nilmdb.utils.printf import *
|
||||||
|
from nilmdb.utils import timestamper
|
||||||
|
from nilmdb.client import ClientError, ServerError
|
||||||
|
from nilmdb.utils import datetime_tz
|
||||||
|
|
||||||
|
from nose.plugins.skip import SkipTest
|
||||||
|
from nose.tools import *
|
||||||
|
from nose.tools import assert_raises
|
||||||
|
import itertools
|
||||||
|
import distutils.version
|
||||||
|
|
||||||
|
from testutil.helpers import *
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
testdb = "tests/numpyclient-testdb"
|
||||||
|
testurl = "http://localhost:32180/"
|
||||||
|
|
||||||
|
def setup_module():
|
||||||
|
global test_server, test_db
|
||||||
|
# Clear out DB
|
||||||
|
recursive_unlink(testdb)
|
||||||
|
|
||||||
|
# Start web app on a custom port
|
||||||
|
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
|
||||||
|
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
||||||
|
port = 32180, stoppable = False,
|
||||||
|
fast_shutdown = True,
|
||||||
|
force_traceback = True)
|
||||||
|
test_server.start(blocking = False)
|
||||||
|
|
||||||
|
def teardown_module():
|
||||||
|
global test_server, test_db
|
||||||
|
# Close web app
|
||||||
|
test_server.stop()
|
||||||
|
test_db.close()
|
||||||
|
|
||||||
|
class TestNumpyClient(object):
|
||||||
|
|
||||||
|
def test_numpyclient_01_basic(self):
|
||||||
|
# Test basic connection
|
||||||
|
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
|
||||||
|
version = client.version()
|
||||||
|
eq_(distutils.version.LooseVersion(version),
|
||||||
|
distutils.version.LooseVersion(test_server.version))
|
||||||
|
|
||||||
|
# Verify subclassing
|
||||||
|
assert(isinstance(client, nilmdb.client.Client))
|
||||||
|
|
||||||
|
# Layouts
|
||||||
|
for layout in "int8_t", "something_8", "integer_1":
|
||||||
|
with assert_raises(ValueError):
|
||||||
|
for x in client.stream_extract_numpy("/foo", layout=layout):
|
||||||
|
pass
|
||||||
|
for layout in "int8_1", "uint8_30", "int16_20", "float64_100":
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
for x in client.stream_extract_numpy("/foo", layout=layout):
|
||||||
|
pass
|
||||||
|
in_("No such stream", str(e.exception))
|
||||||
|
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
for x in client.stream_extract_numpy("/foo"):
|
||||||
|
pass
|
||||||
|
in_("can't get layout for path", str(e.exception))
|
||||||
|
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
def test_numpyclient_02_extract(self):
|
||||||
|
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
|
||||||
|
|
||||||
|
# Insert some data as text
|
||||||
|
client.stream_create("/newton/prep", "float32_8")
|
||||||
|
testfile = "tests/data/prep-20120323T1000"
|
||||||
|
start = nilmdb.utils.time.parse_time("20120323T1000")
|
||||||
|
rate = 120
|
||||||
|
data = timestamper.TimestamperRate(testfile, start, rate)
|
||||||
|
result = client.stream_insert("/newton/prep", data,
|
||||||
|
start, start + 119999777)
|
||||||
|
|
||||||
|
# Extract Numpy arrays
|
||||||
|
array = None
|
||||||
|
pieces = 0
|
||||||
|
for chunk in client.stream_extract_numpy("/newton/prep", maxrows=1000):
|
||||||
|
pieces += 1
|
||||||
|
if array is not None:
|
||||||
|
array = np.vstack((array, chunk))
|
||||||
|
else:
|
||||||
|
array = chunk
|
||||||
|
eq_(array.shape, (14400, 9))
|
||||||
|
eq_(pieces, 15)
|
||||||
|
|
||||||
|
# Try structured
|
||||||
|
s = list(client.stream_extract_numpy("/newton/prep", structured = True))
|
||||||
|
assert(np.array_equal(np.c_[s[0]['timestamp'], s[0]['data']], array))
|
||||||
|
|
||||||
|
# Compare. Will be close but not exact because the conversion
|
||||||
|
# to and from ASCII was lossy.
|
||||||
|
data = timestamper.TimestamperRate(testfile, start, rate)
|
||||||
|
actual = np.fromstring(" ".join(data), sep=' ').reshape(14400, 9)
|
||||||
|
assert(np.allclose(array, actual))
|
||||||
|
|
||||||
|
client.close()
|
Reference in New Issue
Block a user