@@ -178,7 +178,7 @@ class Client(object): | |||
return self.http.get_gen("stream/intervals", params) | |||
def stream_extract(self, path, start = None, end = None, | |||
count = False, markup = False): | |||
count = False, markup = False, binary = False): | |||
""" | |||
Extract data from a stream. Returns a generator that yields | |||
lines of ASCII-formatted data that matches the database | |||
@@ -189,6 +189,11 @@ class Client(object): | |||
Specify markup = True to include comments in the returned data | |||
that indicate interval starts and ends. | |||
Specify binary = True to return chunks of raw binary data, | |||
rather than lines of ASCII-formatted data. Raw binary data | |||
is always little-endian and matches the database types | |||
(including a uint64 timestamp). | |||
""" | |||
params = { | |||
"path": path, | |||
@@ -201,7 +206,9 @@ class Client(object): | |||
params["count"] = 1 | |||
if markup: | |||
params["markup"] = 1 | |||
return self.http.get_gen("stream/extract", params) | |||
if binary: | |||
params["binary"] = 1 | |||
return self.http.get_gen("stream/extract", params, binary = binary) | |||
def stream_count(self, path, start = None, end = None): | |||
""" | |||
@@ -110,7 +110,8 @@ class HTTPClient(object): | |||
return self._req("PUT", url, params, data) | |||
# Generator versions that return data one line at a time. | |||
def _req_gen(self, method, url, query = None, body = None, headers = None): | |||
def _req_gen(self, method, url, query = None, body = None, | |||
headers = None, binary = False): | |||
""" | |||
Make a request and return a generator that gives back strings | |||
or JSON decoded lines of the body data, or raise an error if | |||
@@ -118,16 +119,19 @@ class HTTPClient(object): | |||
""" | |||
(response, isjson) = self._do_req(method, url, query, body, | |||
stream = True, headers = headers) | |||
if isjson: | |||
if binary: | |||
for chunk in response.iter_content(chunk_size = 65536): | |||
yield chunk | |||
elif isjson: | |||
for line in response.iter_lines(): | |||
yield json.loads(line) | |||
else: | |||
for line in response.iter_lines(): | |||
yield line | |||
def get_gen(self, url, params = None): | |||
def get_gen(self, url, params = None, binary = False): | |||
"""Simple GET (parameters in URL) returning a generator""" | |||
return self._req_gen("GET", url, params) | |||
return self._req_gen("GET", url, params, binary = binary) | |||
# Not much use for a POST or PUT generator, since they don't | |||
# return much data. |
@@ -0,0 +1,77 @@ | |||
# -*- coding: utf-8 -*- | |||
"""Provide a NumpyClient class that is based on normal Client, but has | |||
additional methods for extracting and inserting data via Numpy arrays.""" | |||
import nilmdb.utils | |||
import nilmdb.client.client | |||
import nilmdb.client.httpclient | |||
from nilmdb.client.errors import ClientError | |||
import contextlib | |||
from nilmdb.utils.time import timestamp_to_string, string_to_timestamp | |||
import numpy | |||
import cStringIO | |||
def layout_to_dtype(layout): | |||
ltype = layout.split('_')[0] | |||
lcount = int(layout.split('_')[1]) | |||
if ltype.startswith('int'): | |||
atype = '<i' + str(int(ltype[3:]) / 8) | |||
elif ltype.startswith('uint'): | |||
atype = '<u' + str(int(ltype[4:]) / 8) | |||
elif ltype.startswith('float'): | |||
atype = '<f' + str(int(ltype[5:]) / 8) | |||
else: | |||
raise ValueError("bad layout") | |||
return numpy.dtype([('timestamp', '<i8'), ('data', atype, lcount)]) | |||
class NumpyClient(nilmdb.client.client.Client): | |||
"""Subclass of nilmdb.client.Client that adds additional methods for | |||
extracting and inserting data via Numpy arrays.""" | |||
def stream_extract_numpy(self, path, start = None, end = None, | |||
layout = None, maxrows = 100000, | |||
structured = False): | |||
""" | |||
Extract data from a stream. Returns a generator that yields | |||
Numpy arrays of up to 'maxrows' of data each. | |||
If 'layout' is None, it is read using stream_info. | |||
If 'structured' is False, all data is converted to float64 | |||
and returned in a flat 2D array. Otherwise, data is returned | |||
as a structured dtype in a 1D array. | |||
""" | |||
if layout is None: | |||
streams = self.stream_list(path) | |||
if len(streams) != 1: | |||
raise ClientError("can't get layout for path: " + path) | |||
layout = streams[0][1] | |||
dtype = layout_to_dtype(layout) | |||
def to_numpy(data): | |||
a = numpy.fromstring(data, dtype) | |||
if structured: | |||
return a | |||
return numpy.c_[a['timestamp'], a['data']] | |||
chunks = [] | |||
total_len = 0 | |||
maxsize = dtype.itemsize * maxrows | |||
for data in self.stream_extract(path, start, end, binary = True): | |||
# Add this block of binary data | |||
chunks.append(data) | |||
total_len += len(data) | |||
# See if we have enough to make the requested Numpy array | |||
while total_len >= maxsize: | |||
assembled = "".join(chunks) | |||
total_len -= maxsize | |||
chunks = [ assembled[maxsize:] ] | |||
block = assembled[:maxsize] | |||
yield to_numpy(block) | |||
if total_len: | |||
yield to_numpy("".join(chunks)) |
@@ -479,7 +479,7 @@ class Table(object): | |||
# Success, so update self.nrows accordingly | |||
self.nrows = tot_rows | |||
def get_data(self, start, stop): | |||
def get_data(self, start, stop, binary = False): | |||
"""Extract data corresponding to Python range [n:m], | |||
and returns a formatted string""" | |||
if (start is None or | |||
@@ -497,10 +497,13 @@ class Table(object): | |||
if count > remaining: | |||
count = remaining | |||
f = self.file_open(subdir, filename) | |||
ret.append(f.extract_string(offset, count)) | |||
if binary: | |||
ret.append(f.extract_binary(offset, count)) | |||
else: | |||
ret.append(f.extract_string(offset, count)) | |||
remaining -= count | |||
row += count | |||
return "".join(ret) | |||
return b"".join(ret) | |||
def __getitem__(self, row): | |||
"""Extract timestamps from a row, with table[n] notation.""" | |||
@@ -538,7 +538,7 @@ class NilmDB(object): | |||
dbinterval.db_endpos) | |||
def stream_extract(self, path, start = None, end = None, | |||
count = False, markup = False): | |||
count = False, markup = False, binary = False): | |||
""" | |||
Returns (data, restart) tuple. | |||
@@ -559,6 +559,9 @@ class NilmDB(object): | |||
'markup', if true, indicates that returned data should be | |||
marked with a comment denoting when a particular interval | |||
starts, and another comment when an interval ends. | |||
'binary', if true, means to return raw binary rather than | |||
ASCII-formatted data. | |||
""" | |||
stream_id = self._stream_id(path) | |||
table = self.data.getnode(path) | |||
@@ -569,6 +572,8 @@ class NilmDB(object): | |||
matched = 0 | |||
remaining = self.max_results | |||
restart = None | |||
if binary and (markup or count): | |||
raise NilmDBError("binary mode can't be used with markup or count") | |||
for interval in intervals.intersection(requested): | |||
# Reading single rows from the table is too slow, so | |||
# we use two bisections to find both the starting and | |||
@@ -593,7 +598,7 @@ class NilmDB(object): | |||
timestamp_to_string(interval.start) + "\n") | |||
# Gather these results up | |||
result.append(table.get_data(row_start, row_end)) | |||
result.append(table.get_data(row_start, row_end, binary)) | |||
# Count them | |||
remaining -= row_end - row_start | |||
@@ -527,6 +527,46 @@ err: | |||
return NULL; | |||
} | |||
/**** | |||
* Extract to binary string containing raw little-endian binary data | |||
*/ | |||
static PyObject *Rocket_extract_binary(Rocket *self, PyObject *args) | |||
{ | |||
long count; | |||
long offset; | |||
if (!PyArg_ParseTuple(args, "ll", &offset, &count)) | |||
return NULL; | |||
if (!self->file) { | |||
PyErr_SetString(PyExc_Exception, "no file"); | |||
return NULL; | |||
} | |||
/* Seek to target location */ | |||
if (fseek(self->file, offset, SEEK_SET) < 0) { | |||
PyErr_SetFromErrno(PyExc_OSError); | |||
return NULL; | |||
} | |||
uint8_t *str; | |||
int len = count * self->binary_size; | |||
str = malloc(len); | |||
if (str == NULL) { | |||
PyErr_SetFromErrno(PyExc_OSError); | |||
return NULL; | |||
} | |||
/* Data in the file is already in the desired little-endian | |||
binary format, so just read it directly. */ | |||
if (fread(str, self->binary_size, count, self->file) != count) { | |||
free(str); | |||
PyErr_SetFromErrno(PyExc_OSError); | |||
return NULL; | |||
} | |||
PyObject *pystr = PyBytes_FromStringAndSize((char *)str, len); | |||
free(str); | |||
return pystr; | |||
} | |||
/**** | |||
* Extract timestamp | |||
@@ -600,6 +640,12 @@ static PyMethodDef Rocket_methods[] = { | |||
"Extract count rows of data from the file at offset offset.\n" | |||
"Return an ascii formatted string according to the layout" }, | |||
{ "extract_binary", | |||
(PyCFunction)Rocket_extract_binary, METH_VARARGS, | |||
"extract_binary(self, offset, count)\n\n" | |||
"Extract count rows of data from the file at offset offset.\n" | |||
"Return a raw binary string of data matching the data layout." }, | |||
{ "extract_timestamp", | |||
(PyCFunction)Rocket_extract_timestamp, METH_VARARGS, | |||
"extract_timestamp(self, offset)\n\n" | |||
@@ -400,7 +400,7 @@ class Stream(NilmApp): | |||
@chunked_response | |||
@response_type("text/plain") | |||
def extract(self, path, start = None, end = None, | |||
count = False, markup = False): | |||
count = False, markup = False, binary = False): | |||
""" | |||
Extract data from backend database. Streams the resulting | |||
entries as ASCII text lines separated by newlines. This may | |||
@@ -411,6 +411,11 @@ class Stream(NilmApp): | |||
If 'markup' is True, adds comments to the stream denoting each | |||
interval's start and end timestamp. | |||
If 'binary' is True, return raw binary data, rather than lines | |||
of ASCII-formatted data. Raw binary data is always | |||
little-endian and matches the database types (including a | |||
uint64 timestamp). | |||
""" | |||
(start, end) = self._get_times(start, end) | |||
@@ -418,6 +423,13 @@ class Stream(NilmApp): | |||
if len(self.db.stream_list(path = path)) != 1: | |||
raise cherrypy.HTTPError("404", "No such stream: " + path) | |||
if binary: | |||
cherrypy.response.headers['Content-Type'] = ( | |||
"application/octet-stream") | |||
if markup or count: | |||
raise cherrypy.HTTPError("400", "can't mix binary and " | |||
"markup or count modes") | |||
@workaround_cp_bug_1200 | |||
def content(start, end): | |||
# Note: disable chunked responses to see tracebacks from here. | |||
@@ -429,7 +441,8 @@ class Stream(NilmApp): | |||
while True: | |||
(data, restart) = self.db.stream_extract( | |||
path, start, end, count = False, markup = markup) | |||
path, start, end, count = False, | |||
markup = markup, binary = binary) | |||
yield data | |||
if restart is None: | |||
@@ -107,6 +107,7 @@ setup(name='nilmdb', | |||
author_email = 'jim@jtan.com', | |||
tests_require = [ 'nose', | |||
'coverage', | |||
'numpy', | |||
], | |||
setup_requires = [ 'distribute', | |||
], | |||
@@ -12,6 +12,7 @@ test_interval.py | |||
test_bulkdata.py | |||
test_nilmdb.py | |||
test_client.py | |||
test_numpyclient.py | |||
test_cmdline.py | |||
test_*.py |
@@ -23,6 +23,7 @@ import warnings | |||
import resource | |||
import time | |||
import re | |||
import struct | |||
from testutil.helpers import * | |||
@@ -293,6 +294,23 @@ class TestClient(object): | |||
# Test count | |||
eq_(client.stream_count("/newton/prep"), 14400) | |||
# Test binary output | |||
with assert_raises(ClientError) as e: | |||
list(client.stream_extract("/newton/prep", | |||
markup = True, binary = True)) | |||
with assert_raises(ClientError) as e: | |||
list(client.stream_extract("/newton/prep", | |||
count = True, binary = True)) | |||
data = "".join(client.stream_extract("/newton/prep", binary = True)) | |||
# Quick check using struct | |||
unpacker = struct.Struct("<qffffffff") | |||
out = [] | |||
for i in range(14400): | |||
out.append(unpacker.unpack_from(data, i * unpacker.size)) | |||
eq_(out[0], (1332511200000000, 266568.0, 224029.0, 5161.39990234375, | |||
2525.169921875, 8350.83984375, 3724.699951171875, | |||
1355.3399658203125, 2039.0)) | |||
client.close() | |||
def test_client_06_generators(self): | |||
@@ -90,13 +90,16 @@ class Test00Nilmdb(object): # named 00 so it runs first | |||
eq_(db.stream_get_metadata("/newton/prep"), meta1) | |||
eq_(db.stream_get_metadata("/newton/raw"), meta1) | |||
# fill in some test coverage for start >= end | |||
# fill in some misc. test coverage | |||
with assert_raises(nilmdb.server.NilmDBError): | |||
db.stream_remove("/newton/prep", 0, 0) | |||
with assert_raises(nilmdb.server.NilmDBError): | |||
db.stream_remove("/newton/prep", 1, 0) | |||
db.stream_remove("/newton/prep", 0, 1) | |||
with assert_raises(nilmdb.server.NilmDBError): | |||
db.stream_extract("/newton/prep", count = True, binary = True) | |||
db.close() | |||
class TestBlockingServer(object): | |||
@@ -0,0 +1,108 @@ | |||
# -*- coding: utf-8 -*- | |||
import nilmdb.server | |||
import nilmdb.client | |||
import nilmdb.client.numpyclient | |||
from nilmdb.utils.printf import * | |||
from nilmdb.utils import timestamper | |||
from nilmdb.client import ClientError, ServerError | |||
from nilmdb.utils import datetime_tz | |||
from nose.plugins.skip import SkipTest | |||
from nose.tools import * | |||
from nose.tools import assert_raises | |||
import itertools | |||
import distutils.version | |||
from testutil.helpers import * | |||
import numpy as np | |||
testdb = "tests/numpyclient-testdb" | |||
testurl = "http://localhost:32180/" | |||
def setup_module(): | |||
global test_server, test_db | |||
# Clear out DB | |||
recursive_unlink(testdb) | |||
# Start web app on a custom port | |||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb) | |||
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", | |||
port = 32180, stoppable = False, | |||
fast_shutdown = True, | |||
force_traceback = True) | |||
test_server.start(blocking = False) | |||
def teardown_module(): | |||
global test_server, test_db | |||
# Close web app | |||
test_server.stop() | |||
test_db.close() | |||
class TestNumpyClient(object): | |||
def test_numpyclient_01_basic(self): | |||
# Test basic connection | |||
client = nilmdb.client.numpyclient.NumpyClient(url = testurl) | |||
version = client.version() | |||
eq_(distutils.version.LooseVersion(version), | |||
distutils.version.LooseVersion(test_server.version)) | |||
# Verify subclassing | |||
assert(isinstance(client, nilmdb.client.Client)) | |||
# Layouts | |||
for layout in "int8_t", "something_8", "integer_1": | |||
with assert_raises(ValueError): | |||
for x in client.stream_extract_numpy("/foo", layout=layout): | |||
pass | |||
for layout in "int8_1", "uint8_30", "int16_20", "float64_100": | |||
with assert_raises(ClientError) as e: | |||
for x in client.stream_extract_numpy("/foo", layout=layout): | |||
pass | |||
in_("No such stream", str(e.exception)) | |||
with assert_raises(ClientError) as e: | |||
for x in client.stream_extract_numpy("/foo"): | |||
pass | |||
in_("can't get layout for path", str(e.exception)) | |||
client.close() | |||
def test_numpyclient_02_extract(self): | |||
client = nilmdb.client.numpyclient.NumpyClient(url = testurl) | |||
# Insert some data as text | |||
client.stream_create("/newton/prep", "float32_8") | |||
testfile = "tests/data/prep-20120323T1000" | |||
start = nilmdb.utils.time.parse_time("20120323T1000") | |||
rate = 120 | |||
data = timestamper.TimestamperRate(testfile, start, rate) | |||
result = client.stream_insert("/newton/prep", data, | |||
start, start + 119999777) | |||
# Extract Numpy arrays | |||
array = None | |||
pieces = 0 | |||
for chunk in client.stream_extract_numpy("/newton/prep", maxrows=1000): | |||
pieces += 1 | |||
if array is not None: | |||
array = np.vstack((array, chunk)) | |||
else: | |||
array = chunk | |||
eq_(array.shape, (14400, 9)) | |||
eq_(pieces, 15) | |||
# Try structured | |||
s = list(client.stream_extract_numpy("/newton/prep", structured = True)) | |||
assert(np.array_equal(np.c_[s[0]['timestamp'], s[0]['data']], array)) | |||
# Compare. Will be close but not exact because the conversion | |||
# to and from ASCII was lossy. | |||
data = timestamper.TimestamperRate(testfile, start, rate) | |||
actual = np.fromstring(" ".join(data), sep=' ').reshape(14400, 9) | |||
assert(np.allclose(array, actual)) | |||
client.close() |