Browse Source

Merge branch 'binary'

tags/nilmdb-1.5.0
Jim Paris 11 years ago
parent
commit
b4d6aad6de
10 changed files with 669 additions and 92 deletions
  1. +39
    -22
      nilmdb/client/client.py
  2. +6
    -2
      nilmdb/client/httpclient.py
  3. +188
    -6
      nilmdb/client/numpyclient.py
  4. +20
    -7
      nilmdb/server/bulkdata.py
  5. +6
    -2
      nilmdb/server/nilmdb.py
  6. +94
    -6
      nilmdb/server/rocket.c
  7. +21
    -7
      nilmdb/server/server.py
  8. +4
    -4
      tests/test_bulkdata.py
  9. +66
    -36
      tests/test_client.py
  10. +225
    -0
      tests/test_numpyclient.py

+ 39
- 22
nilmdb/client/client.py View File

@@ -127,10 +127,11 @@ class Client(object):
@contextlib.contextmanager
def stream_insert_context(self, path, start = None, end = None):
"""Return a context manager that allows data to be efficiently
inserted into a stream in a piecewise manner. Data is be provided
as single lines, and is aggregated and sent to the server in larger
chunks as necessary. Data lines must match the database layout for
the given path, and end with a newline.
inserted into a stream in a piecewise manner. Data is
provided as ASCII lines, and is aggregated and sent to the
server in larger or smaller chunks as necessary. Data lines
must match the database layout for the given path, and end
with a newline.

Example:
with client.stream_insert_context('/path', start, end) as ctx:
@@ -142,15 +143,16 @@ class Client(object):
This may make multiple requests to the server, if the data is
large enough or enough time has passed between insertions.
"""
ctx = StreamInserter(self.http, path, start, end)
ctx = StreamInserter(self, path, start, end)
yield ctx
ctx.finalize()

def stream_insert(self, path, data, start = None, end = None):
"""Insert rows of data into a stream. data should be a string
or iterable that provides ASCII data that matches the database
layout for path. See stream_insert_context for details on the
'start' and 'end' parameters."""
layout for path. Data is passed through stream_insert_context,
so it will be broken into reasonably-sized chunks and
start/end will be deduced if missing."""
with self.stream_insert_context(path, start, end) as ctx:
if isinstance(data, basestring):
ctx.insert(data)
@@ -159,11 +161,28 @@ class Client(object):
ctx.insert(chunk)
return ctx.last_response

def stream_insert_block(self, path, data, start, end, binary = False):
"""Insert a single fixed block of data into the stream. It is
sent directly to the server in one block with no further
processing.

If 'binary' is True, provide raw binary data in little-endian
format matching the path layout, including an int64 timestamp.
Otherwise, provide ASCII data matching the layout."""
params = {
"path": path,
"start": timestamp_to_string(start),
"end": timestamp_to_string(end),
}
if binary:
params["binary"] = 1
return self.http.put("stream/insert", data, params, binary = binary)

def stream_intervals(self, path, start = None, end = None, diffpath = None):
"""
Return a generator that yields each stream interval.

If diffpath is not None, yields only interval ranges that are
If 'diffpath' is not None, yields only interval ranges that are
present in 'path' but not in 'diffpath'.
"""
params = {
@@ -184,16 +203,16 @@ class Client(object):
lines of ASCII-formatted data that matches the database
layout for the given path.

Specify count = True to return a count of matching data points
If 'count' is True, return a count of matching data points
rather than the actual data. The output format is unchanged.

Specify markup = True to include comments in the returned data
If 'markup' is True, include comments in the returned data
that indicate interval starts and ends.

Specify binary = True to return chunks of raw binary data,
rather than lines of ASCII-formatted data. Raw binary data
is always little-endian and matches the database types
(including a uint64 timestamp).
If 'binary' is True, return chunks of raw binary data, rather
than lines of ASCII-formatted data. Raw binary data is
little-endian and matches the database types (including an
int64 timestamp).
"""
params = {
"path": path,
@@ -257,13 +276,13 @@ class StreamInserter(object):
_max_data = 2 * 1024 * 1024
_max_data_after_send = 64 * 1024

def __init__(self, http, path, start = None, end = None):
"""'http' is the httpclient object. 'path' is the database
def __init__(self, client, path, start, end):
"""'client' is the client object. 'path' is the database
path to insert to. 'start' and 'end' are used for the first
contiguous interval."""
contiguous interval and may be None."""
self.last_response = None

self._http = http
self._client = client
self._path = path

# Start and end for the overall contiguous interval we're
@@ -431,9 +450,7 @@ class StreamInserter(object):
raise ClientError("have data to send, but no start/end times")

# Send it
params = { "path": self._path,
"start": timestamp_to_string(start_ts),
"end": timestamp_to_string(end_ts) }
self.last_response = self._http.put("stream/insert", block, params)
self.last_response = self._client.stream_insert_block(
self._path, block, start_ts, end_ts, binary = False)

return

+ 6
- 2
nilmdb/client/httpclient.py View File

@@ -105,9 +105,13 @@ class HTTPClient(object):
else:
return self._req("POST", url, None, params)

def put(self, url, data, params = None):
def put(self, url, data, params = None, binary = False):
"""Simple PUT (parameters in URL, data in body)"""
return self._req("PUT", url, params, data)
if binary:
h = { 'Content-type': 'application/octet-stream' }
else:
h = { 'Content-type': 'text/plain; charset=utf-8' }
return self._req("PUT", url, query = params, body = data, headers = h)

# Generator versions that return data one line at a time.
def _req_gen(self, method, url, query = None, body = None,


+ 188
- 6
nilmdb/client/numpyclient.py View File

@@ -31,6 +31,14 @@ class NumpyClient(nilmdb.client.client.Client):
"""Subclass of nilmdb.client.Client that adds additional methods for
extracting and inserting data via Numpy arrays."""

def _get_dtype(self, path, layout):
if layout is None:
streams = self.stream_list(path)
if len(streams) != 1:
raise ClientError("can't get layout for path: " + path)
layout = streams[0][1]
return layout_to_dtype(layout)

def stream_extract_numpy(self, path, start = None, end = None,
layout = None, maxrows = 100000,
structured = False):
@@ -44,12 +52,7 @@ class NumpyClient(nilmdb.client.client.Client):
and returned in a flat 2D array. Otherwise, data is returned
as a structured dtype in a 1D array.
"""
if layout is None:
streams = self.stream_list(path)
if len(streams) != 1:
raise ClientError("can't get layout for path: " + path)
layout = streams[0][1]
dtype = layout_to_dtype(layout)
dtype = self._get_dtype(path, layout)

def to_numpy(data):
a = numpy.fromstring(data, dtype)
@@ -75,3 +78,182 @@ class NumpyClient(nilmdb.client.client.Client):

if total_len:
yield to_numpy("".join(chunks))

@contextlib.contextmanager
def stream_insert_numpy_context(self, path, start = None, end = None,
layout = None):
"""Return a context manager that allows data to be efficiently
inserted into a stream in a piecewise manner. Data is
provided as Numpy arrays, and is aggregated and sent to the
server in larger or smaller chunks as necessary. Data format
must match the database layout for the given path.

For more details, see help for
nilmdb.client.numpyclient.StreamInserterNumpy

If 'layout' is not None, use it as the layout rather than
querying the database.
"""
dtype = self._get_dtype(path, layout)
ctx = StreamInserterNumpy(self, path, start, end, dtype)
yield ctx
ctx.finalize()

def stream_insert_numpy(self, path, data, start = None, end = None,
layout = None):
"""Insert data into a stream. data should be a Numpy array
which will be passed through stream_insert_numpy_context to
break it into chunks etc. See the help for that function
for details."""
with self.stream_insert_numpy_context(path, start, end, layout) as ctx:
if isinstance(data, numpy.ndarray):
ctx.insert(data)
else:
for chunk in data:
ctx.insert(chunk)
return ctx.last_response

class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
"""Object returned by stream_insert_numpy_context() that manages
the insertion of rows of data into a particular path.

See help for nilmdb.client.client.StreamInserter for details.
The only difference is that, instead of ASCII formatted data,
this context manager can take Numpy arrays, which are either
structured (1D with complex dtype) or flat (2D with simple dtype).
"""

# Soft limit of how many bytes to send per HTTP request.
_max_data = 2 * 1024 * 1024

def __init__(self, client, path, start, end, dtype):
"""
'client' is the client object. 'path' is the database path
to insert to. 'start' and 'end' are used for the first
contiguous interval and may be None. 'dtype' is the Numpy
dtype for this stream.
"""
self.last_response = None

self._dtype = dtype
self._client = client
self._path = path

# Start and end for the overall contiguous interval we're
# filling
self._interval_start = start
self._interval_end = end

# Max rows to send at once
self._max_rows = self._max_data // self._dtype.itemsize

# List of the current arrays we're building up to send
self._block_arrays = []
self._block_rows = 0

def insert(self, array):
"""Insert Numpy data, which must match the layout type."""
if type(array) != numpy.ndarray:
array = numpy.array(array)
if array.ndim == 1:
# Already a structured array; just verify the type
if array.dtype != self._dtype:
raise ValueError("wrong dtype for 1D (structured) array")
elif array.ndim == 2:
# Convert to structured array
sarray = numpy.zeros(array.shape[0], dtype=self._dtype)
sarray['timestamp'] = array[:,0]
# Need the squeeze in case sarray['data'] is 1 dimensional
sarray['data'] = numpy.squeeze(array[:,1:])
array = sarray
else:
raise ValueError("wrong number of dimensions in array")

length = len(array)
maxrows = self._max_rows

if length == 0:
return
if length > maxrows:
# This is more than twice what we wanted to send, so split
# it up. This is a bit inefficient, but the user really
# shouldn't be providing this much data at once.
for cut in range(0, length, maxrows):
self.insert(array[cut:(cut + maxrows)])
return

# Add this array to our list
self._block_arrays.append(array)
self._block_rows += length

# Send if it's too long
if self._block_rows >= maxrows:
self._send_block(final = False)

def _send_block(self, final = False):
"""Send the data current stored up. One row might be left
over if we need its timestamp saved."""

# Build the full array to send
if self._block_rows == 0:
array = numpy.zeros(0, dtype = self._dtype)
else:
array = numpy.hstack(self._block_arrays)

# Get starting timestamp
start_ts = self._interval_start
if start_ts is None:
# Pull start from the first row
try:
start_ts = array['timestamp'][0]
except IndexError:
pass # no timestamp is OK, if we have no data

# Get ending timestamp
if final:
# For a final block, the timestamp is either the
# user-provided end, or the timestamp of the last line
# plus epsilon.
end_ts = self._interval_end
if end_ts is None:
try:
end_ts = array['timestamp'][-1]
end_ts += nilmdb.utils.time.epsilon
except IndexError:
pass # no timestamp is OK, if we have no data
self._block_arrays = []
self._block_rows = 0

# Next block is completely fresh
self._interval_start = None
self._interval_end = None
else:
# An intermediate block. We need to save the last row
# for the next block, and use its timestamp as the ending
# timestamp for this one.
if len(array) < 2:
# Not enough data to send an intermediate block
return
end_ts = array['timestamp'][-1]
if self._interval_end is not None and end_ts > self._interval_end:
# User gave us bad endpoints; send it anyway, and let
# the server complain so that the error is the same
# as if we hadn't done this chunking.
end_ts = self._interval_end
self._block_arrays = [ array[-1:] ]
self._block_rows = 1
array = array[:-1]

# Next block continues where this one ended
self._interval_start = end_ts

# If we have no endpoints, it's because we had no data to send.
if start_ts is None or end_ts is None:
return

# Send it
data = array.tostring()
self.last_response = self._client.stream_insert_block(
self._path, data, start_ts, end_ts, binary = True)

return

+ 20
- 7
nilmdb/server/bulkdata.py View File

@@ -413,12 +413,16 @@ class Table(object):
return rocket.Rocket(self.layout,
os.path.join(self.root, subdir, filename))

def append_string(self, data, start, end):
def append_data(self, data, start, end, binary = False):
"""Parse the formatted string in 'data', according to the
current layout, and append it to the table. If any timestamps
are non-monotonic, or don't fall between 'start' and 'end',
a ValueError is raised.

If 'binary' is True, the data should be in raw binary format
instead: little-endian, matching the current table's layout,
including the int64 timestamp.

If this function succeeds, it returns normally. Otherwise,
the table is reverted back to its original state by truncating
or deleting files as necessary."""
@@ -437,17 +441,26 @@ class Table(object):
# Ask the rocket object to parse and append up to "count"
# rows of data, verifying things along the way.
try:
if binary:
appender = f.append_binary
else:
appender = f.append_string
(added_rows, data_offset, last_timestamp, linenum
) = f.append_string(count, data, data_offset, linenum,
start, end, last_timestamp)
) = appender(count, data, data_offset, linenum,
start, end, last_timestamp)
except rocket.ParseError as e:
(linenum, colnum, errtype, obj) = e.args
where = "line %d, column %d: " % (linenum, colnum)
if binary:
where = "byte %d: " % (linenum)
else:
where = "line %d, column %d: " % (linenum, colnum)
# Extract out the error line, add column marker
try:
if binary:
raise IndexError
bad = data.splitlines()[linenum-1]
badptr = ' ' * (colnum - 1) + '^'
except IndexError: # pragma: no cover
bad += '\n' + ' ' * (colnum - 1) + '^'
except IndexError:
bad = ""
if errtype == rocket.ERR_NON_MONOTONIC:
err = "timestamp is not monotonically increasing"
@@ -463,7 +476,7 @@ class Table(object):
else:
err = str(obj)
raise ValueError("error parsing input data: " +
where + err + "\n" + bad + "\n" + badptr)
where + err + "\n" + bad)
tot_rows += added_rows
except Exception:
# Some failure, so try to roll things back by truncating or


+ 6
- 2
nilmdb/server/nilmdb.py View File

@@ -475,12 +475,16 @@ class NilmDB(object):
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
con.execute("DELETE FROM streams WHERE id=?", (stream_id,))

def stream_insert(self, path, start, end, data):
def stream_insert(self, path, start, end, data, binary = False):
"""Insert new data into the database.
path: Path at which to add the data
start: Starting timestamp
end: Ending timestamp
data: Textual data, formatted according to the layout of path

'binary', if True, means that 'data' is raw binary:
little-endian, matching the current table's layout,
including the int64 timestamp.
"""
# First check for basic overlap using timestamp info given.
stream_id = self._stream_id(path)
@@ -494,7 +498,7 @@ class NilmDB(object):
# there are any parse errors.
table = self.data.getnode(path)
row_start = table.nrows
table.append_string(data, start, end)
table.append_data(data, start, end, binary)
row_end = table.nrows

# Insert the record into the sql database.


+ 94
- 6
nilmdb/server/rocket.c View File

@@ -419,6 +419,68 @@ extra_data_on_line:
ERR_OTHER, "extra data on line");
}

/****
* Append from binary data
*/

/* .append_binary(count, data, offset, linenum, start, end, last_timestamp) */
static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
{
int count;
const uint8_t *data;
int data_len;
int linenum;
int offset;
timestamp_t start;
timestamp_t end;
timestamp_t last_timestamp;

if (!PyArg_ParseTuple(args, "it#iilll:append_binary",
&count, &data, &data_len, &offset,
&linenum, &start, &end, &last_timestamp))
return NULL;

/* Advance to offset */
if (offset > data_len)
return raise_str(0, 0, ERR_OTHER, "bad offset");
data += offset;
data_len -= offset;

/* Figure out max number of rows to insert */
int rows = data_len / self->binary_size;
if (rows > count)
rows = count;

/* Check timestamps */
timestamp_t ts;
int i;
for (i = 0; i < rows; i++) {
/* Read raw timestamp, byteswap if needed */
memcpy(&ts, &data[i * self->binary_size], 8);
ts = le64toh(ts);

/* Check limits */
if (ts <= last_timestamp)
return raise_int(i, 0, ERR_NON_MONOTONIC, ts);
last_timestamp = ts;
if (ts < start || ts >= end)
return raise_int(i, 0, ERR_OUT_OF_INTERVAL, ts);
}

/* Write binary data */
if (fwrite(data, data_len, 1, self->file) != 1) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
fflush(self->file);

/* Build return value and return */
PyObject *o;
o = Py_BuildValue("(iili)", rows, offset + rows * self->binary_size,
last_timestamp, linenum);
return o;
}

/****
* Extract to string
*/
@@ -484,7 +546,7 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
/* read and format in a loop */ \
for (i = 0; i < self->layout_count; i++) { \
if (fread(&disktype, bytes, \
1, self->file) < 0) \
1, self->file) != 1) \
goto err; \
disktype = letoh(disktype); \
ret = sprintf(&str[len], " " fmt, \
@@ -611,11 +673,13 @@ static PyMemberDef Rocket_members[] = {
};

static PyMethodDef Rocket_methods[] = {
{ "close", (PyCFunction)Rocket_close, METH_NOARGS,
{ "close",
(PyCFunction)Rocket_close, METH_NOARGS,
"close(self)\n\n"
"Close file handle" },

{ "append_string", (PyCFunction)Rocket_append_string, METH_VARARGS,
{ "append_string",
(PyCFunction)Rocket_append_string, METH_VARARGS,
"append_string(self, count, data, offset, line, start, end, ts)\n\n"
"Parse string and append data.\n"
"\n"
@@ -630,12 +694,36 @@ static PyMethodDef Rocket_methods[] = {
"Raises ParseError if timestamps are non-monotonic, outside\n"
"the start/end interval etc.\n"
"\n"
"On success, return a tuple with three values:\n"
"On success, return a tuple:\n"
" added_rows: how many rows were added from the file\n"
" data_offset: current offset into the data string\n"
" last_timestamp: last timestamp we parsed\n"
" linenum: current line number" },

{ "append_binary",
(PyCFunction)Rocket_append_binary, METH_VARARGS,
"append_binary(self, count, data, offset, line, start, end, ts)\n\n"
"Append binary data, which must match the data layout.\n"
"\n"
" count: maximum number of rows to add\n"
" data: binary data\n"
" offset: byte offset into data to start adding\n"
" line: current line number (unused)\n"
" start: starting timestamp for interval\n"
" end: end timestamp for interval\n"
" ts: last timestamp that was previously parsed\n"
"\n"
"Raises ParseError if timestamps are non-monotonic, outside\n"
"the start/end interval etc.\n"
"\n"
"On success, return a tuple:\n"
" added_rows: how many rows were added from the file\n"
" data_offset: current offset into the data string\n"
" last_timestamp: last timestamp we parsed" },
" last_timestamp: last timestamp we parsed\n"
" linenum: current line number (copied from argument)" },

{ "extract_string", (PyCFunction)Rocket_extract_string, METH_VARARGS,
{ "extract_string",
(PyCFunction)Rocket_extract_string, METH_VARARGS,
"extract_string(self, offset, count)\n\n"
"Extract count rows of data from the file at offset offset.\n"
"Return an ascii formatted string according to the layout" },


+ 21
- 7
nilmdb/server/server.py View File

@@ -305,10 +305,15 @@ class Stream(NilmApp):
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, ValueError)
@cherrypy.tools.CORS_allow(methods = ["PUT"])
def insert(self, path, start, end):
def insert(self, path, start, end, binary = False):
"""
Insert new data into the database. Provide textual data
(matching the path's layout) as a HTTP PUT.

If 'binary' is True, expect raw binary data, rather than lines
of ASCII-formatted data. Raw binary data is always
little-endian and matches the database types (including an
int64 timestamp).
"""
# Important that we always read the input before throwing any
# errors, to keep lengths happy for persistent connections.
@@ -316,6 +321,14 @@ class Stream(NilmApp):
# requests, if we ever want to handle those (issue #1134)
body = cherrypy.request.body.read()

# Verify content type for binary data
content_type = cherrypy.request.headers.get('content-type')
if binary and content_type:
if content_type != "application/octet-stream":
raise cherrypy.HTTPError("400", "Content type must be "
"application/octet-stream for "
"binary data, not " + content_type)

# Check path and get layout
if len(self.db.stream_list(path = path)) != 1:
raise cherrypy.HTTPError("404", "No such stream: " + path)
@@ -325,7 +338,7 @@ class Stream(NilmApp):

# Pass the data directly to nilmdb, which will parse it and
# raise a ValueError if there are any problems.
self.db.stream_insert(path, start, end, body)
self.db.stream_insert(path, start, end, body, binary)

# Done
return
@@ -398,7 +411,6 @@ class Stream(NilmApp):
# /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose
@chunked_response
@response_type("text/plain")
def extract(self, path, start = None, end = None,
count = False, markup = False, binary = False):
"""
@@ -414,8 +426,8 @@ class Stream(NilmApp):

If 'binary' is True, return raw binary data, rather than lines
of ASCII-formatted data. Raw binary data is always
little-endian and matches the database types (including a
uint64 timestamp).
little-endian and matches the database types (including an
int64 timestamp).
"""
(start, end) = self._get_times(start, end)

@@ -424,11 +436,13 @@ class Stream(NilmApp):
raise cherrypy.HTTPError("404", "No such stream: " + path)

if binary:
cherrypy.response.headers['Content-Type'] = (
"application/octet-stream")
content_type = "application/octet-stream"
if markup or count:
raise cherrypy.HTTPError("400", "can't mix binary and "
"markup or count modes")
else:
content_type = "text/plain"
cherrypy.response.headers['Content-Type'] = content_type

@workaround_cp_bug_1200
def content(start, end):


+ 4
- 4
tests/test_bulkdata.py View File

@@ -69,9 +69,9 @@ class TestBulkData(object):
raw = []
for i in range(1000):
raw.append("%d 1 2 3 4 5 6 7 8\n" % (10000 + i))
node.append_string("".join(raw[0:1]), 0, 50000)
node.append_string("".join(raw[1:100]), 0, 50000)
node.append_string("".join(raw[100:]), 0, 50000)
node.append_data("".join(raw[0:1]), 0, 50000)
node.append_data("".join(raw[1:100]), 0, 50000)
node.append_data("".join(raw[100:]), 0, 50000)

misc_slices = [ 0, 100, slice(None), slice(0), slice(10),
slice(5,10), slice(3,None), slice(3,-3),
@@ -85,7 +85,7 @@ class TestBulkData(object):
# Extract misc slices while appending, to make sure the
# data isn't being added in the middle of the file
for s in [2, slice(1,5), 2, slice(1,5)]:
node.append_string("0 0 0 0 0 0 0 0 0\n", 0, 50000)
node.append_data("0 0 0 0 0 0 0 0 0\n", 0, 50000)
raw.append("0 0 0 0 0 0 0 0 0\n")
eq_(get_node_slice(s), raw[s])



+ 66
- 36
tests/test_client.py View File

@@ -239,6 +239,22 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception))
in_("start must precede end", str(e.exception))

# Good content type
with assert_raises(ClientError) as e:
client.http.put("stream/insert", "",
{ "path": "xxxx", "start": 0, "end": 1,
"binary": 1 },
binary = True)
in_("No such stream", str(e.exception))

# Bad content type
with assert_raises(ClientError) as e:
client.http.put("stream/insert", "",
{ "path": "xxxx", "start": 0, "end": 1,
"binary": 1 },
binary = False)
in_("Content type must be application/octet-stream", str(e.exception))

# Specify start/end (starts too late)
data = timestamper.TimestamperRate(testfile, start, 120)
with assert_raises(ClientError) as e:
@@ -383,6 +399,17 @@ class TestClient(object):
raise AssertionError("/stream/extract is not text/plain:\n" +
headers())

x = http.get("stream/extract",
{ "path": "/newton/prep",
"start": "123",
"end": "124",
"binary": "1" })
if "transfer-encoding: chunked" not in headers():
warnings.warn("Non-chunked HTTP response for /stream/extract")
if "content-type: application/octet-stream" not in headers():
raise AssertionError("/stream/extract is not binary:\n" +
headers())

client.close()

def test_client_08_unicode(self):
@@ -459,72 +486,75 @@ class TestClient(object):
# override _max_data to trigger frequent server updates
ctx._max_data = 15

ctx.insert("100 1\n")
ctx.insert("1000 1\n")

ctx.insert("101 ")
ctx.insert("1\n102 1")
ctx.insert("1010 ")
ctx.insert("1\n1020 1")
ctx.insert("")
ctx.insert("\n103 1\n")
ctx.insert("\n1030 1\n")

ctx.insert("104 1\n")
ctx.insert("1040 1\n")
ctx.insert("# hello\n")
ctx.insert(" # hello\n")
ctx.insert(" 105 1\n")
ctx.insert(" 1050 1\n")
ctx.finalize()

ctx.insert("107 1\n")
ctx.update_end(108)
ctx.insert("1070 1\n")
ctx.update_end(1080)
ctx.finalize()
ctx.update_start(109)
ctx.insert("110 1\n")
ctx.insert("111 1\n")
ctx.update_start(1090)
ctx.insert("1100 1\n")
ctx.insert("1110 1\n")
ctx.send()
ctx.insert("112 1\n")
ctx.insert("113 1\n")
ctx.insert("114 1\n")
ctx.update_end(116)
ctx.insert("115 1\n")
ctx.update_end(117)
ctx.insert("116 1\n")
ctx.update_end(118)
ctx.insert("117 1" +
ctx.insert("1120 1\n")
ctx.insert("1130 1\n")
ctx.insert("1140 1\n")
ctx.update_end(1160)
ctx.insert("1150 1\n")
ctx.update_end(1170)
ctx.insert("1160 1\n")
ctx.update_end(1180)
ctx.insert("1170 1" +
" # this is super long" * 100 +
"\n")
ctx.finalize()
ctx.insert("# this is super long" * 100)

with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 100, 200) as ctx:
ctx.insert("118 1\n")
with client.stream_insert_context("/context/test",
1000, 2000) as ctx:
ctx.insert("1180 1\n")

with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 200, 300) as ctx:
ctx.insert("118 1\n")
with client.stream_insert_context("/context/test",
2000, 3000) as ctx:
ctx.insert("1180 1\n")

with assert_raises(ClientError):
with client.stream_insert_context("/context/test") as ctx:
ctx.insert("bogus data\n")

with client.stream_insert_context("/context/test", 200, 300) as ctx:
with client.stream_insert_context("/context/test", 2000, 3000) as ctx:
# make sure our override wasn't permanent
ne_(ctx._max_data, 15)
ctx.insert("225 1\n")
ctx.insert("2250 1\n")
ctx.finalize()

with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 300, 400) as ctx:
ctx.insert("301 1\n")
ctx.insert("302 2\n")
ctx.insert("303 3\n")
ctx.insert("304 4\n")
ctx.insert("304 4\n") # non-monotonic after a few lines
with client.stream_insert_context("/context/test",
3000, 4000) as ctx:
ctx.insert("3010 1\n")
ctx.insert("3020 2\n")
ctx.insert("3030 3\n")
ctx.insert("3040 4\n")
ctx.insert("3040 4\n") # non-monotonic after a few lines
ctx.finalize()

eq_(list(client.stream_intervals("/context/test")),
[ [ 100, 106 ],
[ 107, 108 ],
[ 109, 118 ],
[ 200, 300 ] ])
[ [ 1000, 1051 ],
[ 1070, 1080 ],
[ 1090, 1180 ],
[ 2000, 3000 ] ])

# destroy stream (try without removing data first)
with assert_raises(ClientError):


+ 225
- 0
tests/test_numpyclient.py View File

@@ -106,3 +106,228 @@ class TestNumpyClient(object):
assert(np.allclose(array, actual))

client.close()

def test_numpyclient_03_insert(self):
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)

# Limit _max_data just to get better coverage
old_max_data = nilmdb.client.numpyclient.StreamInserterNumpy._max_data
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = 100000

client.stream_create("/test/1", "uint16_1")
client.stream_insert_numpy("/test/1",
np.array([[0, 1],
[1, 2],
[2, 3],
[3, 4]]))

# Wrong number of dimensions
with assert_raises(ValueError) as e:
client.stream_insert_numpy("/test/1",
np.array([[[0, 1],
[1, 2]],
[[3, 4],
[4, 5]]]))
in_("wrong number of dimensions", str(e.exception))

# Unstructured
client.stream_create("/test/2", "float32_8")
client.stream_insert_numpy(
"/test/2",
client.stream_extract_numpy(
"/newton/prep", structured = False, maxrows = 1000))

# Structured, and specifying layout
client.stream_create("/test/3", "float32_8")
client.stream_insert_numpy(
path = "/test/3", layout = "float32_8",
data = client.stream_extract_numpy(
"/newton/prep", structured = True, maxrows = 1000))

# Structured, specifying wrong layout
client.stream_create("/test/4", "float32_8")
with assert_raises(ValueError) as e:
client.stream_insert_numpy(
"/test/4", layout = "uint16_1",
data = client.stream_extract_numpy(
"/newton/prep", structured = True, maxrows = 1000))
in_("wrong dtype", str(e.exception))

# Unstructured, and specifying wrong layout
client.stream_create("/test/5", "float32_8")
with assert_raises(ClientError) as e:
client.stream_insert_numpy(
"/test/5", layout = "uint16_8",
data = client.stream_extract_numpy(
"/newton/prep", structured = False, maxrows = 1000))
# timestamps will be screwy here, because data will be parsed wrong
in_("error parsing input data", str(e.exception))

# Make sure the /newton/prep copies are identical
a = np.vstack(client.stream_extract_numpy("/newton/prep"))
b = np.vstack(client.stream_extract_numpy("/test/2"))
c = np.vstack(client.stream_extract_numpy("/test/3"))
assert(np.array_equal(a,b))
assert(np.array_equal(a,c))

nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
client.close()

def test_numpyclient_04_context(self):
# Like test_client_context, but with Numpy data
client = nilmdb.client.numpyclient.NumpyClient(testurl)

client.stream_create("/context/test", "uint16_1")
with client.stream_insert_numpy_context("/context/test") as ctx:
# override _max_rows to trigger frequent server updates
ctx._max_rows = 2
ctx.insert([[1000, 1]])
ctx.insert([[1010, 1], [1020, 1], [1030, 1]])
ctx.insert([[1040, 1], [1050, 1]])
ctx.finalize()
ctx.insert([[1070, 1]])
ctx.update_end(1080)
ctx.finalize()
ctx.update_start(1090)
ctx.insert([[1100, 1]])
ctx.insert([[1110, 1]])
ctx.send()
ctx.insert([[1120, 1], [1130, 1], [1140, 1]])
ctx.update_end(1160)
ctx.insert([[1150, 1]])
ctx.update_end(1170)
ctx.insert([[1160, 1]])
ctx.update_end(1180)
ctx.insert([[1170, 123456789.0]])
ctx.finalize()
ctx.insert(np.zeros((0,2)))

with assert_raises(ClientError):
with client.stream_insert_numpy_context("/context/test",
1000, 2000) as ctx:
ctx.insert([[1180, 1]])

with assert_raises(ClientError):
with client.stream_insert_numpy_context("/context/test",
2000, 3000) as ctx:
ctx._max_rows = 2
ctx.insert([[3180, 1]])
ctx.insert([[3181, 1]])

with client.stream_insert_numpy_context("/context/test",
2000, 3000) as ctx:
# make sure our override wasn't permanent
ne_(ctx._max_rows, 2)
ctx.insert([[2250, 1]])
ctx.finalize()

with assert_raises(ClientError):
with client.stream_insert_numpy_context("/context/test",
3000, 4000) as ctx:
ctx.insert([[3010, 1]])
ctx.insert([[3020, 2]])
ctx.insert([[3030, 3]])
ctx.insert([[3040, 4]])
ctx.insert([[3040, 4]]) # non-monotonic after a few lines
ctx.finalize()

eq_(list(client.stream_intervals("/context/test")),
[ [ 1000, 1051 ],
[ 1070, 1080 ],
[ 1090, 1180 ],
[ 2000, 3000 ] ])

client.stream_remove("/context/test")
client.stream_destroy("/context/test")
client.close()

def test_numpyclient_05_emptyintervals(self):
# Like test_client_emptyintervals, with insert_numpy_context
client = nilmdb.client.numpyclient.NumpyClient(testurl)
client.stream_create("/empty/test", "uint16_1")
def info():
result = []
for interval in list(client.stream_intervals("/empty/test")):
result.append((client.stream_count("/empty/test", *interval),
interval))
return result
eq_(info(), [])

# Insert a region with just a few points
with client.stream_insert_numpy_context("/empty/test") as ctx:
ctx.update_start(100)
ctx.insert([[140, 1]])
ctx.insert([[150, 1]])
ctx.insert([[160, 1]])
ctx.update_end(200)
ctx.finalize()
eq_(info(), [(3, [100, 200])])

# Delete chunk, which will leave one data point and two intervals
client.stream_remove("/empty/test", 145, 175)
eq_(info(), [(1, [100, 145]),
(0, [175, 200])])

# Try also creating a completely empty interval from scratch,
# in a few different ways.
client.stream_insert("/empty/test", "", 300, 350)
client.stream_insert("/empty/test", [], 400, 450)
with client.stream_insert_numpy_context("/empty/test", 500, 550):
pass

# If enough timestamps aren't provided, empty streams won't be created.
client.stream_insert("/empty/test", [])
with client.stream_insert_numpy_context("/empty/test"):
pass
client.stream_insert("/empty/test", [], start = 600)
with client.stream_insert_numpy_context("/empty/test", start = 700):
pass
client.stream_insert("/empty/test", [], end = 850)
with client.stream_insert_numpy_context("/empty/test", end = 950):
pass

# Try various things that might cause problems
with client.stream_insert_numpy_context("/empty/test", 1000, 1050):
ctx.finalize() # inserts [1000, 1050]
ctx.finalize() # nothing
ctx.finalize() # nothing
ctx.insert([[1100, 1]])
ctx.finalize() # inserts [1100, 1101]
ctx.update_start(1199)
ctx.insert([[1200, 1]])
ctx.update_end(1250)
ctx.finalize() # inserts [1199, 1250]
ctx.update_start(1299)
ctx.finalize() # nothing
ctx.update_end(1350)
ctx.finalize() # nothing
ctx.update_start(1400)
ctx.insert(np.zeros((0,2)))
ctx.update_end(1450)
ctx.finalize()
ctx.update_start(1500)
ctx.insert(np.zeros((0,2)))
ctx.update_end(1550)
ctx.finalize()
ctx.insert(np.zeros((0,2)))
ctx.insert(np.zeros((0,2)))
ctx.insert(np.zeros((0,2)))
ctx.finalize()

# Check everything
eq_(info(), [(1, [100, 145]),
(0, [175, 200]),
(0, [300, 350]),
(0, [400, 450]),
(0, [500, 550]),
(0, [1000, 1050]),
(1, [1100, 1101]),
(1, [1199, 1250]),
(0, [1400, 1450]),
(0, [1500, 1550]),
])

# Clean up
client.stream_remove("/empty/test")
client.stream_destroy("/empty/test")
client.close()

Loading…
Cancel
Save