Compare commits
17 Commits
nilmdb-1.4
...
nilmdb-1.5
Author | SHA1 | Date | |
---|---|---|---|
99a4228285 | |||
230ec72609 | |||
d36ece3767 | |||
231963538e | |||
b4d6aad6de | |||
e95142eabf | |||
d21c3470bc | |||
7576883f49 | |||
cc211542f8 | |||
8292dcf70b | |||
b362fd37f6 | |||
41ec13ee17 | |||
efa9aa9097 | |||
d9afb48f45 | |||
d1140e0f16 | |||
6091e44561 | |||
e233ba790f |
@@ -10,6 +10,9 @@ Prerequisites:
|
|||||||
sudo apt-get install python-cherrypy3 python-decorator python-simplejson
|
sudo apt-get install python-cherrypy3 python-decorator python-simplejson
|
||||||
sudo apt-get install python-requests python-dateutil python-tz python-psutil
|
sudo apt-get install python-requests python-dateutil python-tz python-psutil
|
||||||
|
|
||||||
|
# Other dependencies (required by some modules)
|
||||||
|
sudo apt-get install python-numpy
|
||||||
|
|
||||||
# Tools for running tests
|
# Tools for running tests
|
||||||
sudo apt-get install python-nose python-coverage
|
sudo apt-get install python-nose python-coverage
|
||||||
|
|
||||||
|
@@ -389,3 +389,35 @@ Possible solutions:
|
|||||||
are always printed as int64 values, and a new format
|
are always printed as int64 values, and a new format
|
||||||
"@1234567890123456" is added to the parser for specifying them
|
"@1234567890123456" is added to the parser for specifying them
|
||||||
exactly.
|
exactly.
|
||||||
|
|
||||||
|
Binary interface
|
||||||
|
----------------
|
||||||
|
|
||||||
|
The ASCII interface is too slow for high-bandwidth processing, like
|
||||||
|
sinefits, prep, etc. A binary interface was added so that you can
|
||||||
|
extract the raw binary out of the bulkdata storage. This binary is
|
||||||
|
a little-endian format, e.g. in C a uint16_6 stream would be:
|
||||||
|
|
||||||
|
#include <endian.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
struct {
|
||||||
|
int64_t timestamp_le;
|
||||||
|
uint16_t data_le[6];
|
||||||
|
} __attribute__((packed));
|
||||||
|
|
||||||
|
Remember to byteswap (with e.g. `letoh` in C)!
|
||||||
|
|
||||||
|
This interface is used by the new `nilmdb.client.numpyclient.NumpyClient`
|
||||||
|
class, which is a subclass of the normal `nilmcb.client.client.Client`
|
||||||
|
and has all of the same functions. It adds three new functions:
|
||||||
|
|
||||||
|
- `stream_extract_numpy` to extract data as a Numpy array
|
||||||
|
|
||||||
|
- `stream_insert_numpy` to insert data as a Numpy array
|
||||||
|
|
||||||
|
- `stream_insert_numpy_context` is the context manager for
|
||||||
|
incrementally inserting data
|
||||||
|
|
||||||
|
It is significantly faster! It is about 20 times faster to decimate a
|
||||||
|
stream with `nilm-decimate` when the filter code is using the new
|
||||||
|
binary/numpy interface.
|
||||||
|
@@ -127,10 +127,11 @@ class Client(object):
|
|||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def stream_insert_context(self, path, start = None, end = None):
|
def stream_insert_context(self, path, start = None, end = None):
|
||||||
"""Return a context manager that allows data to be efficiently
|
"""Return a context manager that allows data to be efficiently
|
||||||
inserted into a stream in a piecewise manner. Data is be provided
|
inserted into a stream in a piecewise manner. Data is
|
||||||
as single lines, and is aggregated and sent to the server in larger
|
provided as ASCII lines, and is aggregated and sent to the
|
||||||
chunks as necessary. Data lines must match the database layout for
|
server in larger or smaller chunks as necessary. Data lines
|
||||||
the given path, and end with a newline.
|
must match the database layout for the given path, and end
|
||||||
|
with a newline.
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
with client.stream_insert_context('/path', start, end) as ctx:
|
with client.stream_insert_context('/path', start, end) as ctx:
|
||||||
@@ -142,15 +143,16 @@ class Client(object):
|
|||||||
This may make multiple requests to the server, if the data is
|
This may make multiple requests to the server, if the data is
|
||||||
large enough or enough time has passed between insertions.
|
large enough or enough time has passed between insertions.
|
||||||
"""
|
"""
|
||||||
ctx = StreamInserter(self.http, path, start, end)
|
ctx = StreamInserter(self, path, start, end)
|
||||||
yield ctx
|
yield ctx
|
||||||
ctx.finalize()
|
ctx.finalize()
|
||||||
|
|
||||||
def stream_insert(self, path, data, start = None, end = None):
|
def stream_insert(self, path, data, start = None, end = None):
|
||||||
"""Insert rows of data into a stream. data should be a string
|
"""Insert rows of data into a stream. data should be a string
|
||||||
or iterable that provides ASCII data that matches the database
|
or iterable that provides ASCII data that matches the database
|
||||||
layout for path. See stream_insert_context for details on the
|
layout for path. Data is passed through stream_insert_context,
|
||||||
'start' and 'end' parameters."""
|
so it will be broken into reasonably-sized chunks and
|
||||||
|
start/end will be deduced if missing."""
|
||||||
with self.stream_insert_context(path, start, end) as ctx:
|
with self.stream_insert_context(path, start, end) as ctx:
|
||||||
if isinstance(data, basestring):
|
if isinstance(data, basestring):
|
||||||
ctx.insert(data)
|
ctx.insert(data)
|
||||||
@@ -159,11 +161,28 @@ class Client(object):
|
|||||||
ctx.insert(chunk)
|
ctx.insert(chunk)
|
||||||
return ctx.last_response
|
return ctx.last_response
|
||||||
|
|
||||||
|
def stream_insert_block(self, path, data, start, end, binary = False):
|
||||||
|
"""Insert a single fixed block of data into the stream. It is
|
||||||
|
sent directly to the server in one block with no further
|
||||||
|
processing.
|
||||||
|
|
||||||
|
If 'binary' is True, provide raw binary data in little-endian
|
||||||
|
format matching the path layout, including an int64 timestamp.
|
||||||
|
Otherwise, provide ASCII data matching the layout."""
|
||||||
|
params = {
|
||||||
|
"path": path,
|
||||||
|
"start": timestamp_to_string(start),
|
||||||
|
"end": timestamp_to_string(end),
|
||||||
|
}
|
||||||
|
if binary:
|
||||||
|
params["binary"] = 1
|
||||||
|
return self.http.put("stream/insert", data, params, binary = binary)
|
||||||
|
|
||||||
def stream_intervals(self, path, start = None, end = None, diffpath = None):
|
def stream_intervals(self, path, start = None, end = None, diffpath = None):
|
||||||
"""
|
"""
|
||||||
Return a generator that yields each stream interval.
|
Return a generator that yields each stream interval.
|
||||||
|
|
||||||
If diffpath is not None, yields only interval ranges that are
|
If 'diffpath' is not None, yields only interval ranges that are
|
||||||
present in 'path' but not in 'diffpath'.
|
present in 'path' but not in 'diffpath'.
|
||||||
"""
|
"""
|
||||||
params = {
|
params = {
|
||||||
@@ -184,16 +203,16 @@ class Client(object):
|
|||||||
lines of ASCII-formatted data that matches the database
|
lines of ASCII-formatted data that matches the database
|
||||||
layout for the given path.
|
layout for the given path.
|
||||||
|
|
||||||
Specify count = True to return a count of matching data points
|
If 'count' is True, return a count of matching data points
|
||||||
rather than the actual data. The output format is unchanged.
|
rather than the actual data. The output format is unchanged.
|
||||||
|
|
||||||
Specify markup = True to include comments in the returned data
|
If 'markup' is True, include comments in the returned data
|
||||||
that indicate interval starts and ends.
|
that indicate interval starts and ends.
|
||||||
|
|
||||||
Specify binary = True to return chunks of raw binary data,
|
If 'binary' is True, return chunks of raw binary data, rather
|
||||||
rather than lines of ASCII-formatted data. Raw binary data
|
than lines of ASCII-formatted data. Raw binary data is
|
||||||
is always little-endian and matches the database types
|
little-endian and matches the database types (including an
|
||||||
(including a uint64 timestamp).
|
int64 timestamp).
|
||||||
"""
|
"""
|
||||||
params = {
|
params = {
|
||||||
"path": path,
|
"path": path,
|
||||||
@@ -257,13 +276,13 @@ class StreamInserter(object):
|
|||||||
_max_data = 2 * 1024 * 1024
|
_max_data = 2 * 1024 * 1024
|
||||||
_max_data_after_send = 64 * 1024
|
_max_data_after_send = 64 * 1024
|
||||||
|
|
||||||
def __init__(self, http, path, start = None, end = None):
|
def __init__(self, client, path, start, end):
|
||||||
"""'http' is the httpclient object. 'path' is the database
|
"""'client' is the client object. 'path' is the database
|
||||||
path to insert to. 'start' and 'end' are used for the first
|
path to insert to. 'start' and 'end' are used for the first
|
||||||
contiguous interval."""
|
contiguous interval and may be None."""
|
||||||
self.last_response = None
|
self.last_response = None
|
||||||
|
|
||||||
self._http = http
|
self._client = client
|
||||||
self._path = path
|
self._path = path
|
||||||
|
|
||||||
# Start and end for the overall contiguous interval we're
|
# Start and end for the overall contiguous interval we're
|
||||||
@@ -431,9 +450,7 @@ class StreamInserter(object):
|
|||||||
raise ClientError("have data to send, but no start/end times")
|
raise ClientError("have data to send, but no start/end times")
|
||||||
|
|
||||||
# Send it
|
# Send it
|
||||||
params = { "path": self._path,
|
self.last_response = self._client.stream_insert_block(
|
||||||
"start": timestamp_to_string(start_ts),
|
self._path, block, start_ts, end_ts, binary = False)
|
||||||
"end": timestamp_to_string(end_ts) }
|
|
||||||
self.last_response = self._http.put("stream/insert", block, params)
|
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@@ -105,9 +105,13 @@ class HTTPClient(object):
|
|||||||
else:
|
else:
|
||||||
return self._req("POST", url, None, params)
|
return self._req("POST", url, None, params)
|
||||||
|
|
||||||
def put(self, url, data, params = None):
|
def put(self, url, data, params = None, binary = False):
|
||||||
"""Simple PUT (parameters in URL, data in body)"""
|
"""Simple PUT (parameters in URL, data in body)"""
|
||||||
return self._req("PUT", url, params, data)
|
if binary:
|
||||||
|
h = { 'Content-type': 'application/octet-stream' }
|
||||||
|
else:
|
||||||
|
h = { 'Content-type': 'text/plain; charset=utf-8' }
|
||||||
|
return self._req("PUT", url, query = params, body = data, headers = h)
|
||||||
|
|
||||||
# Generator versions that return data one line at a time.
|
# Generator versions that return data one line at a time.
|
||||||
def _req_gen(self, method, url, query = None, body = None,
|
def _req_gen(self, method, url, query = None, body = None,
|
||||||
|
@@ -31,6 +31,14 @@ class NumpyClient(nilmdb.client.client.Client):
|
|||||||
"""Subclass of nilmdb.client.Client that adds additional methods for
|
"""Subclass of nilmdb.client.Client that adds additional methods for
|
||||||
extracting and inserting data via Numpy arrays."""
|
extracting and inserting data via Numpy arrays."""
|
||||||
|
|
||||||
|
def _get_dtype(self, path, layout):
|
||||||
|
if layout is None:
|
||||||
|
streams = self.stream_list(path)
|
||||||
|
if len(streams) != 1:
|
||||||
|
raise ClientError("can't get layout for path: " + path)
|
||||||
|
layout = streams[0][1]
|
||||||
|
return layout_to_dtype(layout)
|
||||||
|
|
||||||
def stream_extract_numpy(self, path, start = None, end = None,
|
def stream_extract_numpy(self, path, start = None, end = None,
|
||||||
layout = None, maxrows = 100000,
|
layout = None, maxrows = 100000,
|
||||||
structured = False):
|
structured = False):
|
||||||
@@ -44,12 +52,7 @@ class NumpyClient(nilmdb.client.client.Client):
|
|||||||
and returned in a flat 2D array. Otherwise, data is returned
|
and returned in a flat 2D array. Otherwise, data is returned
|
||||||
as a structured dtype in a 1D array.
|
as a structured dtype in a 1D array.
|
||||||
"""
|
"""
|
||||||
if layout is None:
|
dtype = self._get_dtype(path, layout)
|
||||||
streams = self.stream_list(path)
|
|
||||||
if len(streams) != 1:
|
|
||||||
raise ClientError("can't get layout for path: " + path)
|
|
||||||
layout = streams[0][1]
|
|
||||||
dtype = layout_to_dtype(layout)
|
|
||||||
|
|
||||||
def to_numpy(data):
|
def to_numpy(data):
|
||||||
a = numpy.fromstring(data, dtype)
|
a = numpy.fromstring(data, dtype)
|
||||||
@@ -75,3 +78,182 @@ class NumpyClient(nilmdb.client.client.Client):
|
|||||||
|
|
||||||
if total_len:
|
if total_len:
|
||||||
yield to_numpy("".join(chunks))
|
yield to_numpy("".join(chunks))
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def stream_insert_numpy_context(self, path, start = None, end = None,
|
||||||
|
layout = None):
|
||||||
|
"""Return a context manager that allows data to be efficiently
|
||||||
|
inserted into a stream in a piecewise manner. Data is
|
||||||
|
provided as Numpy arrays, and is aggregated and sent to the
|
||||||
|
server in larger or smaller chunks as necessary. Data format
|
||||||
|
must match the database layout for the given path.
|
||||||
|
|
||||||
|
For more details, see help for
|
||||||
|
nilmdb.client.numpyclient.StreamInserterNumpy
|
||||||
|
|
||||||
|
If 'layout' is not None, use it as the layout rather than
|
||||||
|
querying the database.
|
||||||
|
"""
|
||||||
|
dtype = self._get_dtype(path, layout)
|
||||||
|
ctx = StreamInserterNumpy(self, path, start, end, dtype)
|
||||||
|
yield ctx
|
||||||
|
ctx.finalize()
|
||||||
|
|
||||||
|
def stream_insert_numpy(self, path, data, start = None, end = None,
|
||||||
|
layout = None):
|
||||||
|
"""Insert data into a stream. data should be a Numpy array
|
||||||
|
which will be passed through stream_insert_numpy_context to
|
||||||
|
break it into chunks etc. See the help for that function
|
||||||
|
for details."""
|
||||||
|
with self.stream_insert_numpy_context(path, start, end, layout) as ctx:
|
||||||
|
if isinstance(data, numpy.ndarray):
|
||||||
|
ctx.insert(data)
|
||||||
|
else:
|
||||||
|
for chunk in data:
|
||||||
|
ctx.insert(chunk)
|
||||||
|
return ctx.last_response
|
||||||
|
|
||||||
|
class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
|
||||||
|
"""Object returned by stream_insert_numpy_context() that manages
|
||||||
|
the insertion of rows of data into a particular path.
|
||||||
|
|
||||||
|
See help for nilmdb.client.client.StreamInserter for details.
|
||||||
|
The only difference is that, instead of ASCII formatted data,
|
||||||
|
this context manager can take Numpy arrays, which are either
|
||||||
|
structured (1D with complex dtype) or flat (2D with simple dtype).
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Soft limit of how many bytes to send per HTTP request.
|
||||||
|
_max_data = 2 * 1024 * 1024
|
||||||
|
|
||||||
|
def __init__(self, client, path, start, end, dtype):
|
||||||
|
"""
|
||||||
|
'client' is the client object. 'path' is the database path
|
||||||
|
to insert to. 'start' and 'end' are used for the first
|
||||||
|
contiguous interval and may be None. 'dtype' is the Numpy
|
||||||
|
dtype for this stream.
|
||||||
|
"""
|
||||||
|
self.last_response = None
|
||||||
|
|
||||||
|
self._dtype = dtype
|
||||||
|
self._client = client
|
||||||
|
self._path = path
|
||||||
|
|
||||||
|
# Start and end for the overall contiguous interval we're
|
||||||
|
# filling
|
||||||
|
self._interval_start = start
|
||||||
|
self._interval_end = end
|
||||||
|
|
||||||
|
# Max rows to send at once
|
||||||
|
self._max_rows = self._max_data // self._dtype.itemsize
|
||||||
|
|
||||||
|
# List of the current arrays we're building up to send
|
||||||
|
self._block_arrays = []
|
||||||
|
self._block_rows = 0
|
||||||
|
|
||||||
|
def insert(self, array):
|
||||||
|
"""Insert Numpy data, which must match the layout type."""
|
||||||
|
if type(array) != numpy.ndarray:
|
||||||
|
array = numpy.array(array)
|
||||||
|
if array.ndim == 1:
|
||||||
|
# Already a structured array; just verify the type
|
||||||
|
if array.dtype != self._dtype:
|
||||||
|
raise ValueError("wrong dtype for 1D (structured) array")
|
||||||
|
elif array.ndim == 2:
|
||||||
|
# Convert to structured array
|
||||||
|
sarray = numpy.zeros(array.shape[0], dtype=self._dtype)
|
||||||
|
sarray['timestamp'] = array[:,0]
|
||||||
|
# Need the squeeze in case sarray['data'] is 1 dimensional
|
||||||
|
sarray['data'] = numpy.squeeze(array[:,1:])
|
||||||
|
array = sarray
|
||||||
|
else:
|
||||||
|
raise ValueError("wrong number of dimensions in array")
|
||||||
|
|
||||||
|
length = len(array)
|
||||||
|
maxrows = self._max_rows
|
||||||
|
|
||||||
|
if length == 0:
|
||||||
|
return
|
||||||
|
if length > maxrows:
|
||||||
|
# This is more than twice what we wanted to send, so split
|
||||||
|
# it up. This is a bit inefficient, but the user really
|
||||||
|
# shouldn't be providing this much data at once.
|
||||||
|
for cut in range(0, length, maxrows):
|
||||||
|
self.insert(array[cut:(cut + maxrows)])
|
||||||
|
return
|
||||||
|
|
||||||
|
# Add this array to our list
|
||||||
|
self._block_arrays.append(array)
|
||||||
|
self._block_rows += length
|
||||||
|
|
||||||
|
# Send if it's too long
|
||||||
|
if self._block_rows >= maxrows:
|
||||||
|
self._send_block(final = False)
|
||||||
|
|
||||||
|
def _send_block(self, final = False):
|
||||||
|
"""Send the data current stored up. One row might be left
|
||||||
|
over if we need its timestamp saved."""
|
||||||
|
|
||||||
|
# Build the full array to send
|
||||||
|
if self._block_rows == 0:
|
||||||
|
array = numpy.zeros(0, dtype = self._dtype)
|
||||||
|
else:
|
||||||
|
array = numpy.hstack(self._block_arrays)
|
||||||
|
|
||||||
|
# Get starting timestamp
|
||||||
|
start_ts = self._interval_start
|
||||||
|
if start_ts is None:
|
||||||
|
# Pull start from the first row
|
||||||
|
try:
|
||||||
|
start_ts = array['timestamp'][0]
|
||||||
|
except IndexError:
|
||||||
|
pass # no timestamp is OK, if we have no data
|
||||||
|
|
||||||
|
# Get ending timestamp
|
||||||
|
if final:
|
||||||
|
# For a final block, the timestamp is either the
|
||||||
|
# user-provided end, or the timestamp of the last line
|
||||||
|
# plus epsilon.
|
||||||
|
end_ts = self._interval_end
|
||||||
|
if end_ts is None:
|
||||||
|
try:
|
||||||
|
end_ts = array['timestamp'][-1]
|
||||||
|
end_ts += nilmdb.utils.time.epsilon
|
||||||
|
except IndexError:
|
||||||
|
pass # no timestamp is OK, if we have no data
|
||||||
|
self._block_arrays = []
|
||||||
|
self._block_rows = 0
|
||||||
|
|
||||||
|
# Next block is completely fresh
|
||||||
|
self._interval_start = None
|
||||||
|
self._interval_end = None
|
||||||
|
else:
|
||||||
|
# An intermediate block. We need to save the last row
|
||||||
|
# for the next block, and use its timestamp as the ending
|
||||||
|
# timestamp for this one.
|
||||||
|
if len(array) < 2:
|
||||||
|
# Not enough data to send an intermediate block
|
||||||
|
return
|
||||||
|
end_ts = array['timestamp'][-1]
|
||||||
|
if self._interval_end is not None and end_ts > self._interval_end:
|
||||||
|
# User gave us bad endpoints; send it anyway, and let
|
||||||
|
# the server complain so that the error is the same
|
||||||
|
# as if we hadn't done this chunking.
|
||||||
|
end_ts = self._interval_end
|
||||||
|
self._block_arrays = [ array[-1:] ]
|
||||||
|
self._block_rows = 1
|
||||||
|
array = array[:-1]
|
||||||
|
|
||||||
|
# Next block continues where this one ended
|
||||||
|
self._interval_start = end_ts
|
||||||
|
|
||||||
|
# If we have no endpoints, it's because we had no data to send.
|
||||||
|
if start_ts is None or end_ts is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Send it
|
||||||
|
data = array.tostring()
|
||||||
|
self.last_response = self._client.stream_insert_block(
|
||||||
|
self._path, data, start_ts, end_ts, binary = True)
|
||||||
|
|
||||||
|
return
|
||||||
|
@@ -10,6 +10,7 @@ import sys
|
|||||||
import os
|
import os
|
||||||
import argparse
|
import argparse
|
||||||
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
||||||
|
import signal
|
||||||
|
|
||||||
try: # pragma: no cover
|
try: # pragma: no cover
|
||||||
import argcomplete
|
import argcomplete
|
||||||
@@ -126,6 +127,13 @@ class Cmdline(object):
|
|||||||
sys.exit(-1)
|
sys.exit(-1)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
# Set SIGPIPE to its default handler -- we don't need Python
|
||||||
|
# to catch it for us.
|
||||||
|
try:
|
||||||
|
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
||||||
|
except ValueError: # pragma: no cover
|
||||||
|
pass
|
||||||
|
|
||||||
# Clear cached timezone, so that we can pick up timezone changes
|
# Clear cached timezone, so that we can pick up timezone changes
|
||||||
# while running this from the test suite.
|
# while running this from the test suite.
|
||||||
datetime_tz._localtz = None
|
datetime_tz._localtz = None
|
||||||
|
@@ -413,12 +413,16 @@ class Table(object):
|
|||||||
return rocket.Rocket(self.layout,
|
return rocket.Rocket(self.layout,
|
||||||
os.path.join(self.root, subdir, filename))
|
os.path.join(self.root, subdir, filename))
|
||||||
|
|
||||||
def append_string(self, data, start, end):
|
def append_data(self, data, start, end, binary = False):
|
||||||
"""Parse the formatted string in 'data', according to the
|
"""Parse the formatted string in 'data', according to the
|
||||||
current layout, and append it to the table. If any timestamps
|
current layout, and append it to the table. If any timestamps
|
||||||
are non-monotonic, or don't fall between 'start' and 'end',
|
are non-monotonic, or don't fall between 'start' and 'end',
|
||||||
a ValueError is raised.
|
a ValueError is raised.
|
||||||
|
|
||||||
|
If 'binary' is True, the data should be in raw binary format
|
||||||
|
instead: little-endian, matching the current table's layout,
|
||||||
|
including the int64 timestamp.
|
||||||
|
|
||||||
If this function succeeds, it returns normally. Otherwise,
|
If this function succeeds, it returns normally. Otherwise,
|
||||||
the table is reverted back to its original state by truncating
|
the table is reverted back to its original state by truncating
|
||||||
or deleting files as necessary."""
|
or deleting files as necessary."""
|
||||||
@@ -437,17 +441,26 @@ class Table(object):
|
|||||||
# Ask the rocket object to parse and append up to "count"
|
# Ask the rocket object to parse and append up to "count"
|
||||||
# rows of data, verifying things along the way.
|
# rows of data, verifying things along the way.
|
||||||
try:
|
try:
|
||||||
|
if binary:
|
||||||
|
appender = f.append_binary
|
||||||
|
else:
|
||||||
|
appender = f.append_string
|
||||||
(added_rows, data_offset, last_timestamp, linenum
|
(added_rows, data_offset, last_timestamp, linenum
|
||||||
) = f.append_string(count, data, data_offset, linenum,
|
) = appender(count, data, data_offset, linenum,
|
||||||
start, end, last_timestamp)
|
start, end, last_timestamp)
|
||||||
except rocket.ParseError as e:
|
except rocket.ParseError as e:
|
||||||
(linenum, colnum, errtype, obj) = e.args
|
(linenum, colnum, errtype, obj) = e.args
|
||||||
|
if binary:
|
||||||
|
where = "byte %d: " % (linenum)
|
||||||
|
else:
|
||||||
where = "line %d, column %d: " % (linenum, colnum)
|
where = "line %d, column %d: " % (linenum, colnum)
|
||||||
# Extract out the error line, add column marker
|
# Extract out the error line, add column marker
|
||||||
try:
|
try:
|
||||||
|
if binary:
|
||||||
|
raise IndexError
|
||||||
bad = data.splitlines()[linenum-1]
|
bad = data.splitlines()[linenum-1]
|
||||||
badptr = ' ' * (colnum - 1) + '^'
|
bad += '\n' + ' ' * (colnum - 1) + '^'
|
||||||
except IndexError: # pragma: no cover
|
except IndexError:
|
||||||
bad = ""
|
bad = ""
|
||||||
if errtype == rocket.ERR_NON_MONOTONIC:
|
if errtype == rocket.ERR_NON_MONOTONIC:
|
||||||
err = "timestamp is not monotonically increasing"
|
err = "timestamp is not monotonically increasing"
|
||||||
@@ -463,7 +476,7 @@ class Table(object):
|
|||||||
else:
|
else:
|
||||||
err = str(obj)
|
err = str(obj)
|
||||||
raise ValueError("error parsing input data: " +
|
raise ValueError("error parsing input data: " +
|
||||||
where + err + "\n" + bad + "\n" + badptr)
|
where + err + "\n" + bad)
|
||||||
tot_rows += added_rows
|
tot_rows += added_rows
|
||||||
except Exception:
|
except Exception:
|
||||||
# Some failure, so try to roll things back by truncating or
|
# Some failure, so try to roll things back by truncating or
|
||||||
|
@@ -475,12 +475,16 @@ class NilmDB(object):
|
|||||||
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
|
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
|
||||||
con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
|
con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
|
||||||
|
|
||||||
def stream_insert(self, path, start, end, data):
|
def stream_insert(self, path, start, end, data, binary = False):
|
||||||
"""Insert new data into the database.
|
"""Insert new data into the database.
|
||||||
path: Path at which to add the data
|
path: Path at which to add the data
|
||||||
start: Starting timestamp
|
start: Starting timestamp
|
||||||
end: Ending timestamp
|
end: Ending timestamp
|
||||||
data: Textual data, formatted according to the layout of path
|
data: Textual data, formatted according to the layout of path
|
||||||
|
|
||||||
|
'binary', if True, means that 'data' is raw binary:
|
||||||
|
little-endian, matching the current table's layout,
|
||||||
|
including the int64 timestamp.
|
||||||
"""
|
"""
|
||||||
# First check for basic overlap using timestamp info given.
|
# First check for basic overlap using timestamp info given.
|
||||||
stream_id = self._stream_id(path)
|
stream_id = self._stream_id(path)
|
||||||
@@ -494,7 +498,7 @@ class NilmDB(object):
|
|||||||
# there are any parse errors.
|
# there are any parse errors.
|
||||||
table = self.data.getnode(path)
|
table = self.data.getnode(path)
|
||||||
row_start = table.nrows
|
row_start = table.nrows
|
||||||
table.append_string(data, start, end)
|
table.append_data(data, start, end, binary)
|
||||||
row_end = table.nrows
|
row_end = table.nrows
|
||||||
|
|
||||||
# Insert the record into the sql database.
|
# Insert the record into the sql database.
|
||||||
|
@@ -419,6 +419,68 @@ extra_data_on_line:
|
|||||||
ERR_OTHER, "extra data on line");
|
ERR_OTHER, "extra data on line");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/****
|
||||||
|
* Append from binary data
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* .append_binary(count, data, offset, linenum, start, end, last_timestamp) */
|
||||||
|
static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
|
||||||
|
{
|
||||||
|
int count;
|
||||||
|
const uint8_t *data;
|
||||||
|
int data_len;
|
||||||
|
int linenum;
|
||||||
|
int offset;
|
||||||
|
timestamp_t start;
|
||||||
|
timestamp_t end;
|
||||||
|
timestamp_t last_timestamp;
|
||||||
|
|
||||||
|
if (!PyArg_ParseTuple(args, "it#iilll:append_binary",
|
||||||
|
&count, &data, &data_len, &offset,
|
||||||
|
&linenum, &start, &end, &last_timestamp))
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
/* Advance to offset */
|
||||||
|
if (offset > data_len)
|
||||||
|
return raise_str(0, 0, ERR_OTHER, "bad offset");
|
||||||
|
data += offset;
|
||||||
|
data_len -= offset;
|
||||||
|
|
||||||
|
/* Figure out max number of rows to insert */
|
||||||
|
int rows = data_len / self->binary_size;
|
||||||
|
if (rows > count)
|
||||||
|
rows = count;
|
||||||
|
|
||||||
|
/* Check timestamps */
|
||||||
|
timestamp_t ts;
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < rows; i++) {
|
||||||
|
/* Read raw timestamp, byteswap if needed */
|
||||||
|
memcpy(&ts, &data[i * self->binary_size], 8);
|
||||||
|
ts = le64toh(ts);
|
||||||
|
|
||||||
|
/* Check limits */
|
||||||
|
if (ts <= last_timestamp)
|
||||||
|
return raise_int(i, 0, ERR_NON_MONOTONIC, ts);
|
||||||
|
last_timestamp = ts;
|
||||||
|
if (ts < start || ts >= end)
|
||||||
|
return raise_int(i, 0, ERR_OUT_OF_INTERVAL, ts);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Write binary data */
|
||||||
|
if (fwrite(data, data_len, 1, self->file) != 1) {
|
||||||
|
PyErr_SetFromErrno(PyExc_OSError);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
fflush(self->file);
|
||||||
|
|
||||||
|
/* Build return value and return */
|
||||||
|
PyObject *o;
|
||||||
|
o = Py_BuildValue("(iili)", rows, offset + rows * self->binary_size,
|
||||||
|
last_timestamp, linenum);
|
||||||
|
return o;
|
||||||
|
}
|
||||||
|
|
||||||
/****
|
/****
|
||||||
* Extract to string
|
* Extract to string
|
||||||
*/
|
*/
|
||||||
@@ -484,7 +546,7 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
|
|||||||
/* read and format in a loop */ \
|
/* read and format in a loop */ \
|
||||||
for (i = 0; i < self->layout_count; i++) { \
|
for (i = 0; i < self->layout_count; i++) { \
|
||||||
if (fread(&disktype, bytes, \
|
if (fread(&disktype, bytes, \
|
||||||
1, self->file) < 0) \
|
1, self->file) != 1) \
|
||||||
goto err; \
|
goto err; \
|
||||||
disktype = letoh(disktype); \
|
disktype = letoh(disktype); \
|
||||||
ret = sprintf(&str[len], " " fmt, \
|
ret = sprintf(&str[len], " " fmt, \
|
||||||
@@ -611,11 +673,13 @@ static PyMemberDef Rocket_members[] = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
static PyMethodDef Rocket_methods[] = {
|
static PyMethodDef Rocket_methods[] = {
|
||||||
{ "close", (PyCFunction)Rocket_close, METH_NOARGS,
|
{ "close",
|
||||||
|
(PyCFunction)Rocket_close, METH_NOARGS,
|
||||||
"close(self)\n\n"
|
"close(self)\n\n"
|
||||||
"Close file handle" },
|
"Close file handle" },
|
||||||
|
|
||||||
{ "append_string", (PyCFunction)Rocket_append_string, METH_VARARGS,
|
{ "append_string",
|
||||||
|
(PyCFunction)Rocket_append_string, METH_VARARGS,
|
||||||
"append_string(self, count, data, offset, line, start, end, ts)\n\n"
|
"append_string(self, count, data, offset, line, start, end, ts)\n\n"
|
||||||
"Parse string and append data.\n"
|
"Parse string and append data.\n"
|
||||||
"\n"
|
"\n"
|
||||||
@@ -630,12 +694,36 @@ static PyMethodDef Rocket_methods[] = {
|
|||||||
"Raises ParseError if timestamps are non-monotonic, outside\n"
|
"Raises ParseError if timestamps are non-monotonic, outside\n"
|
||||||
"the start/end interval etc.\n"
|
"the start/end interval etc.\n"
|
||||||
"\n"
|
"\n"
|
||||||
"On success, return a tuple with three values:\n"
|
"On success, return a tuple:\n"
|
||||||
" added_rows: how many rows were added from the file\n"
|
" added_rows: how many rows were added from the file\n"
|
||||||
" data_offset: current offset into the data string\n"
|
" data_offset: current offset into the data string\n"
|
||||||
" last_timestamp: last timestamp we parsed" },
|
" last_timestamp: last timestamp we parsed\n"
|
||||||
|
" linenum: current line number" },
|
||||||
|
|
||||||
{ "extract_string", (PyCFunction)Rocket_extract_string, METH_VARARGS,
|
{ "append_binary",
|
||||||
|
(PyCFunction)Rocket_append_binary, METH_VARARGS,
|
||||||
|
"append_binary(self, count, data, offset, line, start, end, ts)\n\n"
|
||||||
|
"Append binary data, which must match the data layout.\n"
|
||||||
|
"\n"
|
||||||
|
" count: maximum number of rows to add\n"
|
||||||
|
" data: binary data\n"
|
||||||
|
" offset: byte offset into data to start adding\n"
|
||||||
|
" line: current line number (unused)\n"
|
||||||
|
" start: starting timestamp for interval\n"
|
||||||
|
" end: end timestamp for interval\n"
|
||||||
|
" ts: last timestamp that was previously parsed\n"
|
||||||
|
"\n"
|
||||||
|
"Raises ParseError if timestamps are non-monotonic, outside\n"
|
||||||
|
"the start/end interval etc.\n"
|
||||||
|
"\n"
|
||||||
|
"On success, return a tuple:\n"
|
||||||
|
" added_rows: how many rows were added from the file\n"
|
||||||
|
" data_offset: current offset into the data string\n"
|
||||||
|
" last_timestamp: last timestamp we parsed\n"
|
||||||
|
" linenum: current line number (copied from argument)" },
|
||||||
|
|
||||||
|
{ "extract_string",
|
||||||
|
(PyCFunction)Rocket_extract_string, METH_VARARGS,
|
||||||
"extract_string(self, offset, count)\n\n"
|
"extract_string(self, offset, count)\n\n"
|
||||||
"Extract count rows of data from the file at offset offset.\n"
|
"Extract count rows of data from the file at offset offset.\n"
|
||||||
"Return an ascii formatted string according to the layout" },
|
"Return an ascii formatted string according to the layout" },
|
||||||
|
@@ -305,10 +305,15 @@ class Stream(NilmApp):
|
|||||||
@cherrypy.tools.json_out()
|
@cherrypy.tools.json_out()
|
||||||
@exception_to_httperror(NilmDBError, ValueError)
|
@exception_to_httperror(NilmDBError, ValueError)
|
||||||
@cherrypy.tools.CORS_allow(methods = ["PUT"])
|
@cherrypy.tools.CORS_allow(methods = ["PUT"])
|
||||||
def insert(self, path, start, end):
|
def insert(self, path, start, end, binary = False):
|
||||||
"""
|
"""
|
||||||
Insert new data into the database. Provide textual data
|
Insert new data into the database. Provide textual data
|
||||||
(matching the path's layout) as a HTTP PUT.
|
(matching the path's layout) as a HTTP PUT.
|
||||||
|
|
||||||
|
If 'binary' is True, expect raw binary data, rather than lines
|
||||||
|
of ASCII-formatted data. Raw binary data is always
|
||||||
|
little-endian and matches the database types (including an
|
||||||
|
int64 timestamp).
|
||||||
"""
|
"""
|
||||||
# Important that we always read the input before throwing any
|
# Important that we always read the input before throwing any
|
||||||
# errors, to keep lengths happy for persistent connections.
|
# errors, to keep lengths happy for persistent connections.
|
||||||
@@ -316,6 +321,14 @@ class Stream(NilmApp):
|
|||||||
# requests, if we ever want to handle those (issue #1134)
|
# requests, if we ever want to handle those (issue #1134)
|
||||||
body = cherrypy.request.body.read()
|
body = cherrypy.request.body.read()
|
||||||
|
|
||||||
|
# Verify content type for binary data
|
||||||
|
content_type = cherrypy.request.headers.get('content-type')
|
||||||
|
if binary and content_type:
|
||||||
|
if content_type != "application/octet-stream":
|
||||||
|
raise cherrypy.HTTPError("400", "Content type must be "
|
||||||
|
"application/octet-stream for "
|
||||||
|
"binary data, not " + content_type)
|
||||||
|
|
||||||
# Check path and get layout
|
# Check path and get layout
|
||||||
if len(self.db.stream_list(path = path)) != 1:
|
if len(self.db.stream_list(path = path)) != 1:
|
||||||
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||||
@@ -325,7 +338,7 @@ class Stream(NilmApp):
|
|||||||
|
|
||||||
# Pass the data directly to nilmdb, which will parse it and
|
# Pass the data directly to nilmdb, which will parse it and
|
||||||
# raise a ValueError if there are any problems.
|
# raise a ValueError if there are any problems.
|
||||||
self.db.stream_insert(path, start, end, body)
|
self.db.stream_insert(path, start, end, body, binary)
|
||||||
|
|
||||||
# Done
|
# Done
|
||||||
return
|
return
|
||||||
@@ -398,7 +411,6 @@ class Stream(NilmApp):
|
|||||||
# /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
# /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@chunked_response
|
@chunked_response
|
||||||
@response_type("text/plain")
|
|
||||||
def extract(self, path, start = None, end = None,
|
def extract(self, path, start = None, end = None,
|
||||||
count = False, markup = False, binary = False):
|
count = False, markup = False, binary = False):
|
||||||
"""
|
"""
|
||||||
@@ -414,8 +426,8 @@ class Stream(NilmApp):
|
|||||||
|
|
||||||
If 'binary' is True, return raw binary data, rather than lines
|
If 'binary' is True, return raw binary data, rather than lines
|
||||||
of ASCII-formatted data. Raw binary data is always
|
of ASCII-formatted data. Raw binary data is always
|
||||||
little-endian and matches the database types (including a
|
little-endian and matches the database types (including an
|
||||||
uint64 timestamp).
|
int64 timestamp).
|
||||||
"""
|
"""
|
||||||
(start, end) = self._get_times(start, end)
|
(start, end) = self._get_times(start, end)
|
||||||
|
|
||||||
@@ -424,11 +436,13 @@ class Stream(NilmApp):
|
|||||||
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||||
|
|
||||||
if binary:
|
if binary:
|
||||||
cherrypy.response.headers['Content-Type'] = (
|
content_type = "application/octet-stream"
|
||||||
"application/octet-stream")
|
|
||||||
if markup or count:
|
if markup or count:
|
||||||
raise cherrypy.HTTPError("400", "can't mix binary and "
|
raise cherrypy.HTTPError("400", "can't mix binary and "
|
||||||
"markup or count modes")
|
"markup or count modes")
|
||||||
|
else:
|
||||||
|
content_type = "text/plain"
|
||||||
|
cherrypy.response.headers['Content-Type'] = content_type
|
||||||
|
|
||||||
@workaround_cp_bug_1200
|
@workaround_cp_bug_1200
|
||||||
def content(start, end):
|
def content(start, end):
|
||||||
|
@@ -6,7 +6,7 @@ import time
|
|||||||
|
|
||||||
# Range
|
# Range
|
||||||
min_timestamp = (-2**63)
|
min_timestamp = (-2**63)
|
||||||
max_timestamp = (2**62 - 1)
|
max_timestamp = (2**63 - 1)
|
||||||
|
|
||||||
# Smallest representable step
|
# Smallest representable step
|
||||||
epsilon = 1
|
epsilon = 1
|
||||||
@@ -32,6 +32,10 @@ def timestamp_to_human(timestamp):
|
|||||||
"""Convert a timestamp (integer microseconds since epoch) to a
|
"""Convert a timestamp (integer microseconds since epoch) to a
|
||||||
human-readable string, using the local timezone for display
|
human-readable string, using the local timezone for display
|
||||||
(e.g. from the TZ env var)."""
|
(e.g. from the TZ env var)."""
|
||||||
|
if timestamp == min_timestamp:
|
||||||
|
return "(minimum)"
|
||||||
|
if timestamp == max_timestamp:
|
||||||
|
return "(maximum)"
|
||||||
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_unix(timestamp))
|
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_unix(timestamp))
|
||||||
return dt.strftime("%a, %d %b %Y %H:%M:%S.%f %z")
|
return dt.strftime("%a, %d %b %Y %H:%M:%S.%f %z")
|
||||||
|
|
||||||
|
1
setup.py
1
setup.py
@@ -114,7 +114,6 @@ setup(name='nilmdb',
|
|||||||
install_requires = [ 'decorator',
|
install_requires = [ 'decorator',
|
||||||
'cherrypy >= 3.2',
|
'cherrypy >= 3.2',
|
||||||
'simplejson',
|
'simplejson',
|
||||||
'pycurl',
|
|
||||||
'python-dateutil',
|
'python-dateutil',
|
||||||
'pytz',
|
'pytz',
|
||||||
'psutil >= 0.3.0',
|
'psutil >= 0.3.0',
|
||||||
|
@@ -69,9 +69,9 @@ class TestBulkData(object):
|
|||||||
raw = []
|
raw = []
|
||||||
for i in range(1000):
|
for i in range(1000):
|
||||||
raw.append("%d 1 2 3 4 5 6 7 8\n" % (10000 + i))
|
raw.append("%d 1 2 3 4 5 6 7 8\n" % (10000 + i))
|
||||||
node.append_string("".join(raw[0:1]), 0, 50000)
|
node.append_data("".join(raw[0:1]), 0, 50000)
|
||||||
node.append_string("".join(raw[1:100]), 0, 50000)
|
node.append_data("".join(raw[1:100]), 0, 50000)
|
||||||
node.append_string("".join(raw[100:]), 0, 50000)
|
node.append_data("".join(raw[100:]), 0, 50000)
|
||||||
|
|
||||||
misc_slices = [ 0, 100, slice(None), slice(0), slice(10),
|
misc_slices = [ 0, 100, slice(None), slice(0), slice(10),
|
||||||
slice(5,10), slice(3,None), slice(3,-3),
|
slice(5,10), slice(3,None), slice(3,-3),
|
||||||
@@ -85,7 +85,7 @@ class TestBulkData(object):
|
|||||||
# Extract misc slices while appending, to make sure the
|
# Extract misc slices while appending, to make sure the
|
||||||
# data isn't being added in the middle of the file
|
# data isn't being added in the middle of the file
|
||||||
for s in [2, slice(1,5), 2, slice(1,5)]:
|
for s in [2, slice(1,5), 2, slice(1,5)]:
|
||||||
node.append_string("0 0 0 0 0 0 0 0 0\n", 0, 50000)
|
node.append_data("0 0 0 0 0 0 0 0 0\n", 0, 50000)
|
||||||
raw.append("0 0 0 0 0 0 0 0 0\n")
|
raw.append("0 0 0 0 0 0 0 0 0\n")
|
||||||
eq_(get_node_slice(s), raw[s])
|
eq_(get_node_slice(s), raw[s])
|
||||||
|
|
||||||
|
@@ -239,6 +239,22 @@ class TestClient(object):
|
|||||||
in_("400 Bad Request", str(e.exception))
|
in_("400 Bad Request", str(e.exception))
|
||||||
in_("start must precede end", str(e.exception))
|
in_("start must precede end", str(e.exception))
|
||||||
|
|
||||||
|
# Good content type
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
client.http.put("stream/insert", "",
|
||||||
|
{ "path": "xxxx", "start": 0, "end": 1,
|
||||||
|
"binary": 1 },
|
||||||
|
binary = True)
|
||||||
|
in_("No such stream", str(e.exception))
|
||||||
|
|
||||||
|
# Bad content type
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
client.http.put("stream/insert", "",
|
||||||
|
{ "path": "xxxx", "start": 0, "end": 1,
|
||||||
|
"binary": 1 },
|
||||||
|
binary = False)
|
||||||
|
in_("Content type must be application/octet-stream", str(e.exception))
|
||||||
|
|
||||||
# Specify start/end (starts too late)
|
# Specify start/end (starts too late)
|
||||||
data = timestamper.TimestamperRate(testfile, start, 120)
|
data = timestamper.TimestamperRate(testfile, start, 120)
|
||||||
with assert_raises(ClientError) as e:
|
with assert_raises(ClientError) as e:
|
||||||
@@ -383,6 +399,17 @@ class TestClient(object):
|
|||||||
raise AssertionError("/stream/extract is not text/plain:\n" +
|
raise AssertionError("/stream/extract is not text/plain:\n" +
|
||||||
headers())
|
headers())
|
||||||
|
|
||||||
|
x = http.get("stream/extract",
|
||||||
|
{ "path": "/newton/prep",
|
||||||
|
"start": "123",
|
||||||
|
"end": "124",
|
||||||
|
"binary": "1" })
|
||||||
|
if "transfer-encoding: chunked" not in headers():
|
||||||
|
warnings.warn("Non-chunked HTTP response for /stream/extract")
|
||||||
|
if "content-type: application/octet-stream" not in headers():
|
||||||
|
raise AssertionError("/stream/extract is not binary:\n" +
|
||||||
|
headers())
|
||||||
|
|
||||||
client.close()
|
client.close()
|
||||||
|
|
||||||
def test_client_08_unicode(self):
|
def test_client_08_unicode(self):
|
||||||
@@ -459,72 +486,75 @@ class TestClient(object):
|
|||||||
# override _max_data to trigger frequent server updates
|
# override _max_data to trigger frequent server updates
|
||||||
ctx._max_data = 15
|
ctx._max_data = 15
|
||||||
|
|
||||||
ctx.insert("100 1\n")
|
ctx.insert("1000 1\n")
|
||||||
|
|
||||||
ctx.insert("101 ")
|
ctx.insert("1010 ")
|
||||||
ctx.insert("1\n102 1")
|
ctx.insert("1\n1020 1")
|
||||||
ctx.insert("")
|
ctx.insert("")
|
||||||
ctx.insert("\n103 1\n")
|
ctx.insert("\n1030 1\n")
|
||||||
|
|
||||||
ctx.insert("104 1\n")
|
ctx.insert("1040 1\n")
|
||||||
ctx.insert("# hello\n")
|
ctx.insert("# hello\n")
|
||||||
ctx.insert(" # hello\n")
|
ctx.insert(" # hello\n")
|
||||||
ctx.insert(" 105 1\n")
|
ctx.insert(" 1050 1\n")
|
||||||
ctx.finalize()
|
ctx.finalize()
|
||||||
|
|
||||||
ctx.insert("107 1\n")
|
ctx.insert("1070 1\n")
|
||||||
ctx.update_end(108)
|
ctx.update_end(1080)
|
||||||
ctx.finalize()
|
ctx.finalize()
|
||||||
ctx.update_start(109)
|
ctx.update_start(1090)
|
||||||
ctx.insert("110 1\n")
|
ctx.insert("1100 1\n")
|
||||||
ctx.insert("111 1\n")
|
ctx.insert("1110 1\n")
|
||||||
ctx.send()
|
ctx.send()
|
||||||
ctx.insert("112 1\n")
|
ctx.insert("1120 1\n")
|
||||||
ctx.insert("113 1\n")
|
ctx.insert("1130 1\n")
|
||||||
ctx.insert("114 1\n")
|
ctx.insert("1140 1\n")
|
||||||
ctx.update_end(116)
|
ctx.update_end(1160)
|
||||||
ctx.insert("115 1\n")
|
ctx.insert("1150 1\n")
|
||||||
ctx.update_end(117)
|
ctx.update_end(1170)
|
||||||
ctx.insert("116 1\n")
|
ctx.insert("1160 1\n")
|
||||||
ctx.update_end(118)
|
ctx.update_end(1180)
|
||||||
ctx.insert("117 1" +
|
ctx.insert("1170 1" +
|
||||||
" # this is super long" * 100 +
|
" # this is super long" * 100 +
|
||||||
"\n")
|
"\n")
|
||||||
ctx.finalize()
|
ctx.finalize()
|
||||||
ctx.insert("# this is super long" * 100)
|
ctx.insert("# this is super long" * 100)
|
||||||
|
|
||||||
with assert_raises(ClientError):
|
with assert_raises(ClientError):
|
||||||
with client.stream_insert_context("/context/test", 100, 200) as ctx:
|
with client.stream_insert_context("/context/test",
|
||||||
ctx.insert("118 1\n")
|
1000, 2000) as ctx:
|
||||||
|
ctx.insert("1180 1\n")
|
||||||
|
|
||||||
with assert_raises(ClientError):
|
with assert_raises(ClientError):
|
||||||
with client.stream_insert_context("/context/test", 200, 300) as ctx:
|
with client.stream_insert_context("/context/test",
|
||||||
ctx.insert("118 1\n")
|
2000, 3000) as ctx:
|
||||||
|
ctx.insert("1180 1\n")
|
||||||
|
|
||||||
with assert_raises(ClientError):
|
with assert_raises(ClientError):
|
||||||
with client.stream_insert_context("/context/test") as ctx:
|
with client.stream_insert_context("/context/test") as ctx:
|
||||||
ctx.insert("bogus data\n")
|
ctx.insert("bogus data\n")
|
||||||
|
|
||||||
with client.stream_insert_context("/context/test", 200, 300) as ctx:
|
with client.stream_insert_context("/context/test", 2000, 3000) as ctx:
|
||||||
# make sure our override wasn't permanent
|
# make sure our override wasn't permanent
|
||||||
ne_(ctx._max_data, 15)
|
ne_(ctx._max_data, 15)
|
||||||
ctx.insert("225 1\n")
|
ctx.insert("2250 1\n")
|
||||||
ctx.finalize()
|
ctx.finalize()
|
||||||
|
|
||||||
with assert_raises(ClientError):
|
with assert_raises(ClientError):
|
||||||
with client.stream_insert_context("/context/test", 300, 400) as ctx:
|
with client.stream_insert_context("/context/test",
|
||||||
ctx.insert("301 1\n")
|
3000, 4000) as ctx:
|
||||||
ctx.insert("302 2\n")
|
ctx.insert("3010 1\n")
|
||||||
ctx.insert("303 3\n")
|
ctx.insert("3020 2\n")
|
||||||
ctx.insert("304 4\n")
|
ctx.insert("3030 3\n")
|
||||||
ctx.insert("304 4\n") # non-monotonic after a few lines
|
ctx.insert("3040 4\n")
|
||||||
|
ctx.insert("3040 4\n") # non-monotonic after a few lines
|
||||||
ctx.finalize()
|
ctx.finalize()
|
||||||
|
|
||||||
eq_(list(client.stream_intervals("/context/test")),
|
eq_(list(client.stream_intervals("/context/test")),
|
||||||
[ [ 100, 106 ],
|
[ [ 1000, 1051 ],
|
||||||
[ 107, 108 ],
|
[ 1070, 1080 ],
|
||||||
[ 109, 118 ],
|
[ 1090, 1180 ],
|
||||||
[ 200, 300 ] ])
|
[ 2000, 3000 ] ])
|
||||||
|
|
||||||
# destroy stream (try without removing data first)
|
# destroy stream (try without removing data first)
|
||||||
with assert_raises(ClientError):
|
with assert_raises(ClientError):
|
||||||
|
@@ -615,7 +615,7 @@ class TestCmdline(object):
|
|||||||
test(8, "10:01:59.9", "10:02:00.1", extra="-m")
|
test(8, "10:01:59.9", "10:02:00.1", extra="-m")
|
||||||
|
|
||||||
# all data put in by tests
|
# all data put in by tests
|
||||||
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
|
self.ok("extract -a /newton/prep --start min --end max")
|
||||||
lines_(self.captured, 43204)
|
lines_(self.captured, 43204)
|
||||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||||
self.match("43200\n")
|
self.match("43200\n")
|
||||||
|
@@ -106,3 +106,228 @@ class TestNumpyClient(object):
|
|||||||
assert(np.allclose(array, actual))
|
assert(np.allclose(array, actual))
|
||||||
|
|
||||||
client.close()
|
client.close()
|
||||||
|
|
||||||
|
def test_numpyclient_03_insert(self):
|
||||||
|
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
|
||||||
|
|
||||||
|
# Limit _max_data just to get better coverage
|
||||||
|
old_max_data = nilmdb.client.numpyclient.StreamInserterNumpy._max_data
|
||||||
|
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = 100000
|
||||||
|
|
||||||
|
client.stream_create("/test/1", "uint16_1")
|
||||||
|
client.stream_insert_numpy("/test/1",
|
||||||
|
np.array([[0, 1],
|
||||||
|
[1, 2],
|
||||||
|
[2, 3],
|
||||||
|
[3, 4]]))
|
||||||
|
|
||||||
|
# Wrong number of dimensions
|
||||||
|
with assert_raises(ValueError) as e:
|
||||||
|
client.stream_insert_numpy("/test/1",
|
||||||
|
np.array([[[0, 1],
|
||||||
|
[1, 2]],
|
||||||
|
[[3, 4],
|
||||||
|
[4, 5]]]))
|
||||||
|
in_("wrong number of dimensions", str(e.exception))
|
||||||
|
|
||||||
|
# Unstructured
|
||||||
|
client.stream_create("/test/2", "float32_8")
|
||||||
|
client.stream_insert_numpy(
|
||||||
|
"/test/2",
|
||||||
|
client.stream_extract_numpy(
|
||||||
|
"/newton/prep", structured = False, maxrows = 1000))
|
||||||
|
|
||||||
|
# Structured, and specifying layout
|
||||||
|
client.stream_create("/test/3", "float32_8")
|
||||||
|
client.stream_insert_numpy(
|
||||||
|
path = "/test/3", layout = "float32_8",
|
||||||
|
data = client.stream_extract_numpy(
|
||||||
|
"/newton/prep", structured = True, maxrows = 1000))
|
||||||
|
|
||||||
|
# Structured, specifying wrong layout
|
||||||
|
client.stream_create("/test/4", "float32_8")
|
||||||
|
with assert_raises(ValueError) as e:
|
||||||
|
client.stream_insert_numpy(
|
||||||
|
"/test/4", layout = "uint16_1",
|
||||||
|
data = client.stream_extract_numpy(
|
||||||
|
"/newton/prep", structured = True, maxrows = 1000))
|
||||||
|
in_("wrong dtype", str(e.exception))
|
||||||
|
|
||||||
|
# Unstructured, and specifying wrong layout
|
||||||
|
client.stream_create("/test/5", "float32_8")
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
client.stream_insert_numpy(
|
||||||
|
"/test/5", layout = "uint16_8",
|
||||||
|
data = client.stream_extract_numpy(
|
||||||
|
"/newton/prep", structured = False, maxrows = 1000))
|
||||||
|
# timestamps will be screwy here, because data will be parsed wrong
|
||||||
|
in_("error parsing input data", str(e.exception))
|
||||||
|
|
||||||
|
# Make sure the /newton/prep copies are identical
|
||||||
|
a = np.vstack(client.stream_extract_numpy("/newton/prep"))
|
||||||
|
b = np.vstack(client.stream_extract_numpy("/test/2"))
|
||||||
|
c = np.vstack(client.stream_extract_numpy("/test/3"))
|
||||||
|
assert(np.array_equal(a,b))
|
||||||
|
assert(np.array_equal(a,c))
|
||||||
|
|
||||||
|
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
def test_numpyclient_04_context(self):
|
||||||
|
# Like test_client_context, but with Numpy data
|
||||||
|
client = nilmdb.client.numpyclient.NumpyClient(testurl)
|
||||||
|
|
||||||
|
client.stream_create("/context/test", "uint16_1")
|
||||||
|
with client.stream_insert_numpy_context("/context/test") as ctx:
|
||||||
|
# override _max_rows to trigger frequent server updates
|
||||||
|
ctx._max_rows = 2
|
||||||
|
ctx.insert([[1000, 1]])
|
||||||
|
ctx.insert([[1010, 1], [1020, 1], [1030, 1]])
|
||||||
|
ctx.insert([[1040, 1], [1050, 1]])
|
||||||
|
ctx.finalize()
|
||||||
|
ctx.insert([[1070, 1]])
|
||||||
|
ctx.update_end(1080)
|
||||||
|
ctx.finalize()
|
||||||
|
ctx.update_start(1090)
|
||||||
|
ctx.insert([[1100, 1]])
|
||||||
|
ctx.insert([[1110, 1]])
|
||||||
|
ctx.send()
|
||||||
|
ctx.insert([[1120, 1], [1130, 1], [1140, 1]])
|
||||||
|
ctx.update_end(1160)
|
||||||
|
ctx.insert([[1150, 1]])
|
||||||
|
ctx.update_end(1170)
|
||||||
|
ctx.insert([[1160, 1]])
|
||||||
|
ctx.update_end(1180)
|
||||||
|
ctx.insert([[1170, 123456789.0]])
|
||||||
|
ctx.finalize()
|
||||||
|
ctx.insert(np.zeros((0,2)))
|
||||||
|
|
||||||
|
with assert_raises(ClientError):
|
||||||
|
with client.stream_insert_numpy_context("/context/test",
|
||||||
|
1000, 2000) as ctx:
|
||||||
|
ctx.insert([[1180, 1]])
|
||||||
|
|
||||||
|
with assert_raises(ClientError):
|
||||||
|
with client.stream_insert_numpy_context("/context/test",
|
||||||
|
2000, 3000) as ctx:
|
||||||
|
ctx._max_rows = 2
|
||||||
|
ctx.insert([[3180, 1]])
|
||||||
|
ctx.insert([[3181, 1]])
|
||||||
|
|
||||||
|
with client.stream_insert_numpy_context("/context/test",
|
||||||
|
2000, 3000) as ctx:
|
||||||
|
# make sure our override wasn't permanent
|
||||||
|
ne_(ctx._max_rows, 2)
|
||||||
|
ctx.insert([[2250, 1]])
|
||||||
|
ctx.finalize()
|
||||||
|
|
||||||
|
with assert_raises(ClientError):
|
||||||
|
with client.stream_insert_numpy_context("/context/test",
|
||||||
|
3000, 4000) as ctx:
|
||||||
|
ctx.insert([[3010, 1]])
|
||||||
|
ctx.insert([[3020, 2]])
|
||||||
|
ctx.insert([[3030, 3]])
|
||||||
|
ctx.insert([[3040, 4]])
|
||||||
|
ctx.insert([[3040, 4]]) # non-monotonic after a few lines
|
||||||
|
ctx.finalize()
|
||||||
|
|
||||||
|
eq_(list(client.stream_intervals("/context/test")),
|
||||||
|
[ [ 1000, 1051 ],
|
||||||
|
[ 1070, 1080 ],
|
||||||
|
[ 1090, 1180 ],
|
||||||
|
[ 2000, 3000 ] ])
|
||||||
|
|
||||||
|
client.stream_remove("/context/test")
|
||||||
|
client.stream_destroy("/context/test")
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
def test_numpyclient_05_emptyintervals(self):
|
||||||
|
# Like test_client_emptyintervals, with insert_numpy_context
|
||||||
|
client = nilmdb.client.numpyclient.NumpyClient(testurl)
|
||||||
|
client.stream_create("/empty/test", "uint16_1")
|
||||||
|
def info():
|
||||||
|
result = []
|
||||||
|
for interval in list(client.stream_intervals("/empty/test")):
|
||||||
|
result.append((client.stream_count("/empty/test", *interval),
|
||||||
|
interval))
|
||||||
|
return result
|
||||||
|
eq_(info(), [])
|
||||||
|
|
||||||
|
# Insert a region with just a few points
|
||||||
|
with client.stream_insert_numpy_context("/empty/test") as ctx:
|
||||||
|
ctx.update_start(100)
|
||||||
|
ctx.insert([[140, 1]])
|
||||||
|
ctx.insert([[150, 1]])
|
||||||
|
ctx.insert([[160, 1]])
|
||||||
|
ctx.update_end(200)
|
||||||
|
ctx.finalize()
|
||||||
|
eq_(info(), [(3, [100, 200])])
|
||||||
|
|
||||||
|
# Delete chunk, which will leave one data point and two intervals
|
||||||
|
client.stream_remove("/empty/test", 145, 175)
|
||||||
|
eq_(info(), [(1, [100, 145]),
|
||||||
|
(0, [175, 200])])
|
||||||
|
|
||||||
|
# Try also creating a completely empty interval from scratch,
|
||||||
|
# in a few different ways.
|
||||||
|
client.stream_insert("/empty/test", "", 300, 350)
|
||||||
|
client.stream_insert("/empty/test", [], 400, 450)
|
||||||
|
with client.stream_insert_numpy_context("/empty/test", 500, 550):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# If enough timestamps aren't provided, empty streams won't be created.
|
||||||
|
client.stream_insert("/empty/test", [])
|
||||||
|
with client.stream_insert_numpy_context("/empty/test"):
|
||||||
|
pass
|
||||||
|
client.stream_insert("/empty/test", [], start = 600)
|
||||||
|
with client.stream_insert_numpy_context("/empty/test", start = 700):
|
||||||
|
pass
|
||||||
|
client.stream_insert("/empty/test", [], end = 850)
|
||||||
|
with client.stream_insert_numpy_context("/empty/test", end = 950):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Try various things that might cause problems
|
||||||
|
with client.stream_insert_numpy_context("/empty/test", 1000, 1050):
|
||||||
|
ctx.finalize() # inserts [1000, 1050]
|
||||||
|
ctx.finalize() # nothing
|
||||||
|
ctx.finalize() # nothing
|
||||||
|
ctx.insert([[1100, 1]])
|
||||||
|
ctx.finalize() # inserts [1100, 1101]
|
||||||
|
ctx.update_start(1199)
|
||||||
|
ctx.insert([[1200, 1]])
|
||||||
|
ctx.update_end(1250)
|
||||||
|
ctx.finalize() # inserts [1199, 1250]
|
||||||
|
ctx.update_start(1299)
|
||||||
|
ctx.finalize() # nothing
|
||||||
|
ctx.update_end(1350)
|
||||||
|
ctx.finalize() # nothing
|
||||||
|
ctx.update_start(1400)
|
||||||
|
ctx.insert(np.zeros((0,2)))
|
||||||
|
ctx.update_end(1450)
|
||||||
|
ctx.finalize()
|
||||||
|
ctx.update_start(1500)
|
||||||
|
ctx.insert(np.zeros((0,2)))
|
||||||
|
ctx.update_end(1550)
|
||||||
|
ctx.finalize()
|
||||||
|
ctx.insert(np.zeros((0,2)))
|
||||||
|
ctx.insert(np.zeros((0,2)))
|
||||||
|
ctx.insert(np.zeros((0,2)))
|
||||||
|
ctx.finalize()
|
||||||
|
|
||||||
|
# Check everything
|
||||||
|
eq_(info(), [(1, [100, 145]),
|
||||||
|
(0, [175, 200]),
|
||||||
|
(0, [300, 350]),
|
||||||
|
(0, [400, 450]),
|
||||||
|
(0, [500, 550]),
|
||||||
|
(0, [1000, 1050]),
|
||||||
|
(1, [1100, 1101]),
|
||||||
|
(1, [1199, 1250]),
|
||||||
|
(0, [1400, 1450]),
|
||||||
|
(0, [1500, 1550]),
|
||||||
|
])
|
||||||
|
|
||||||
|
# Clean up
|
||||||
|
client.stream_remove("/empty/test")
|
||||||
|
client.stream_destroy("/empty/test")
|
||||||
|
client.close()
|
||||||
|
Reference in New Issue
Block a user