@@ -178,7 +178,7 @@ class Client(object): | |||
return self.http.get_gen("stream/intervals", params) | |||
def stream_extract(self, path, start = None, end = None, | |||
count = False, markup = False): | |||
count = False, markup = False, binary = False): | |||
""" | |||
Extract data from a stream. Returns a generator that yields | |||
lines of ASCII-formatted data that matches the database | |||
@@ -189,6 +189,11 @@ class Client(object): | |||
Specify markup = True to include comments in the returned data | |||
that indicate interval starts and ends. | |||
Specify binary = True to return chunks of raw binary data, | |||
rather than lines of ASCII-formatted data. Raw binary data | |||
is always little-endian and matches the database types | |||
(including a uint64 timestamp). | |||
""" | |||
params = { | |||
"path": path, | |||
@@ -201,7 +206,9 @@ class Client(object): | |||
params["count"] = 1 | |||
if markup: | |||
params["markup"] = 1 | |||
return self.http.get_gen("stream/extract", params) | |||
if binary: | |||
params["binary"] = 1 | |||
return self.http.get_gen("stream/extract", params, binary = binary) | |||
def stream_count(self, path, start = None, end = None): | |||
""" | |||
@@ -110,7 +110,8 @@ class HTTPClient(object): | |||
return self._req("PUT", url, params, data) | |||
# Generator versions that return data one line at a time. | |||
def _req_gen(self, method, url, query = None, body = None, headers = None): | |||
def _req_gen(self, method, url, query = None, body = None, | |||
headers = None, binary = False): | |||
""" | |||
Make a request and return a generator that gives back strings | |||
or JSON decoded lines of the body data, or raise an error if | |||
@@ -118,16 +119,19 @@ class HTTPClient(object): | |||
""" | |||
(response, isjson) = self._do_req(method, url, query, body, | |||
stream = True, headers = headers) | |||
if isjson: | |||
if binary: | |||
for chunk in response.iter_content(chunk_size = 65536): | |||
yield chunk | |||
elif isjson: | |||
for line in response.iter_lines(): | |||
yield json.loads(line) | |||
else: | |||
for line in response.iter_lines(): | |||
yield line | |||
def get_gen(self, url, params = None): | |||
def get_gen(self, url, params = None, binary = False): | |||
"""Simple GET (parameters in URL) returning a generator""" | |||
return self._req_gen("GET", url, params) | |||
return self._req_gen("GET", url, params, binary = binary) | |||
# Not much use for a POST or PUT generator, since they don't | |||
# return much data. |
@@ -479,7 +479,7 @@ class Table(object): | |||
# Success, so update self.nrows accordingly | |||
self.nrows = tot_rows | |||
def get_data(self, start, stop): | |||
def get_data(self, start, stop, binary = False): | |||
"""Extract data corresponding to Python range [n:m], | |||
and returns a formatted string""" | |||
if (start is None or | |||
@@ -497,10 +497,13 @@ class Table(object): | |||
if count > remaining: | |||
count = remaining | |||
f = self.file_open(subdir, filename) | |||
ret.append(f.extract_string(offset, count)) | |||
if binary: | |||
ret.append(f.extract_binary(offset, count)) | |||
else: | |||
ret.append(f.extract_string(offset, count)) | |||
remaining -= count | |||
row += count | |||
return "".join(ret) | |||
return b"".join(ret) | |||
def __getitem__(self, row): | |||
"""Extract timestamps from a row, with table[n] notation.""" | |||
@@ -538,7 +538,7 @@ class NilmDB(object): | |||
dbinterval.db_endpos) | |||
def stream_extract(self, path, start = None, end = None, | |||
count = False, markup = False): | |||
count = False, markup = False, binary = False): | |||
""" | |||
Returns (data, restart) tuple. | |||
@@ -559,6 +559,9 @@ class NilmDB(object): | |||
'markup', if true, indicates that returned data should be | |||
marked with a comment denoting when a particular interval | |||
starts, and another comment when an interval ends. | |||
'binary', if true, means to return raw binary rather than | |||
ASCII-formatted data. | |||
""" | |||
stream_id = self._stream_id(path) | |||
table = self.data.getnode(path) | |||
@@ -569,6 +572,8 @@ class NilmDB(object): | |||
matched = 0 | |||
remaining = self.max_results | |||
restart = None | |||
if binary and (markup or count): | |||
raise NilmDBError("binary mode can't be used with markup or count") | |||
for interval in intervals.intersection(requested): | |||
# Reading single rows from the table is too slow, so | |||
# we use two bisections to find both the starting and | |||
@@ -593,7 +598,7 @@ class NilmDB(object): | |||
timestamp_to_string(interval.start) + "\n") | |||
# Gather these results up | |||
result.append(table.get_data(row_start, row_end)) | |||
result.append(table.get_data(row_start, row_end, binary)) | |||
# Count them | |||
remaining -= row_end - row_start | |||
@@ -527,6 +527,46 @@ err: | |||
return NULL; | |||
} | |||
/**** | |||
* Extract to binary string containing raw little-endian binary data | |||
*/ | |||
static PyObject *Rocket_extract_binary(Rocket *self, PyObject *args) | |||
{ | |||
long count; | |||
long offset; | |||
if (!PyArg_ParseTuple(args, "ll", &offset, &count)) | |||
return NULL; | |||
if (!self->file) { | |||
PyErr_SetString(PyExc_Exception, "no file"); | |||
return NULL; | |||
} | |||
/* Seek to target location */ | |||
if (fseek(self->file, offset, SEEK_SET) < 0) { | |||
PyErr_SetFromErrno(PyExc_OSError); | |||
return NULL; | |||
} | |||
uint8_t *str; | |||
int len = count * self->binary_size; | |||
str = malloc(len); | |||
if (str == NULL) { | |||
PyErr_SetFromErrno(PyExc_OSError); | |||
return NULL; | |||
} | |||
/* Data in the file is already in the desired little-endian | |||
binary format, so just read it directly. */ | |||
if (fread(str, self->binary_size, count, self->file) != count) { | |||
free(str); | |||
PyErr_SetFromErrno(PyExc_OSError); | |||
return NULL; | |||
} | |||
PyObject *pystr = PyBytes_FromStringAndSize((char *)str, len); | |||
free(str); | |||
return pystr; | |||
} | |||
/**** | |||
* Extract timestamp | |||
@@ -600,6 +640,12 @@ static PyMethodDef Rocket_methods[] = { | |||
"Extract count rows of data from the file at offset offset.\n" | |||
"Return an ascii formatted string according to the layout" }, | |||
{ "extract_binary", | |||
(PyCFunction)Rocket_extract_binary, METH_VARARGS, | |||
"extract_binary(self, offset, count)\n\n" | |||
"Extract count rows of data from the file at offset offset.\n" | |||
"Return a raw binary string of data matching the data layout." }, | |||
{ "extract_timestamp", | |||
(PyCFunction)Rocket_extract_timestamp, METH_VARARGS, | |||
"extract_timestamp(self, offset)\n\n" | |||
@@ -400,7 +400,7 @@ class Stream(NilmApp): | |||
@chunked_response | |||
@response_type("text/plain") | |||
def extract(self, path, start = None, end = None, | |||
count = False, markup = False): | |||
count = False, markup = False, binary = False): | |||
""" | |||
Extract data from backend database. Streams the resulting | |||
entries as ASCII text lines separated by newlines. This may | |||
@@ -411,6 +411,11 @@ class Stream(NilmApp): | |||
If 'markup' is True, adds comments to the stream denoting each | |||
interval's start and end timestamp. | |||
If 'binary' is True, return raw binary data, rather than lines | |||
of ASCII-formatted data. Raw binary data is always | |||
little-endian and matches the database types (including a | |||
uint64 timestamp). | |||
""" | |||
(start, end) = self._get_times(start, end) | |||
@@ -418,6 +423,13 @@ class Stream(NilmApp): | |||
if len(self.db.stream_list(path = path)) != 1: | |||
raise cherrypy.HTTPError("404", "No such stream: " + path) | |||
if binary: | |||
cherrypy.response.headers['Content-Type'] = ( | |||
"application/octet-stream") | |||
if markup or count: | |||
raise cherrypy.HTTPError("400", "can't mix binary and " | |||
"markup or count modes") | |||
@workaround_cp_bug_1200 | |||
def content(start, end): | |||
# Note: disable chunked responses to see tracebacks from here. | |||
@@ -429,7 +441,8 @@ class Stream(NilmApp): | |||
while True: | |||
(data, restart) = self.db.stream_extract( | |||
path, start, end, count = False, markup = markup) | |||
path, start, end, count = False, | |||
markup = markup, binary = binary) | |||
yield data | |||
if restart is None: | |||
@@ -23,6 +23,7 @@ import warnings | |||
import resource | |||
import time | |||
import re | |||
import struct | |||
from testutil.helpers import * | |||
@@ -293,6 +294,23 @@ class TestClient(object): | |||
# Test count | |||
eq_(client.stream_count("/newton/prep"), 14400) | |||
# Test binary output | |||
with assert_raises(ClientError) as e: | |||
list(client.stream_extract("/newton/prep", | |||
markup = True, binary = True)) | |||
with assert_raises(ClientError) as e: | |||
list(client.stream_extract("/newton/prep", | |||
count = True, binary = True)) | |||
data = "".join(client.stream_extract("/newton/prep", binary = True)) | |||
# Quick check using struct | |||
unpacker = struct.Struct("<qffffffff") | |||
out = [] | |||
for i in range(14400): | |||
out.append(unpacker.unpack_from(data, i * unpacker.size)) | |||
eq_(out[0], (1332511200000000, 266568.0, 224029.0, 5161.39990234375, | |||
2525.169921875, 8350.83984375, 3724.699951171875, | |||
1355.3399658203125, 2039.0)) | |||
client.close() | |||
def test_client_06_generators(self): | |||
@@ -90,13 +90,16 @@ class Test00Nilmdb(object): # named 00 so it runs first | |||
eq_(db.stream_get_metadata("/newton/prep"), meta1) | |||
eq_(db.stream_get_metadata("/newton/raw"), meta1) | |||
# fill in some test coverage for start >= end | |||
# fill in some misc. test coverage | |||
with assert_raises(nilmdb.server.NilmDBError): | |||
db.stream_remove("/newton/prep", 0, 0) | |||
with assert_raises(nilmdb.server.NilmDBError): | |||
db.stream_remove("/newton/prep", 1, 0) | |||
db.stream_remove("/newton/prep", 0, 1) | |||
with assert_raises(nilmdb.server.NilmDBError): | |||
db.stream_extract("/newton/prep", count = True, binary = True) | |||
db.close() | |||
class TestBlockingServer(object): | |||