Compare commits
43 Commits
nilmdb-1.4
...
nilmdb-1.5
Author | SHA1 | Date | |
---|---|---|---|
bfb09a189f | |||
416a499866 | |||
637d193807 | |||
b7fa5745ce | |||
0104c8edd9 | |||
cf3b8e787d | |||
83d022016c | |||
43b740ecaa | |||
4ce059b920 | |||
99a4228285 | |||
230ec72609 | |||
d36ece3767 | |||
231963538e | |||
b4d6aad6de | |||
e95142eabf | |||
d21c3470bc | |||
7576883f49 | |||
cc211542f8 | |||
8292dcf70b | |||
b362fd37f6 | |||
41ec13ee17 | |||
efa9aa9097 | |||
d9afb48f45 | |||
d1140e0f16 | |||
6091e44561 | |||
e233ba790f | |||
f0304b4c00 | |||
60594ca58e | |||
c7f2df4abc | |||
5b7409f802 | |||
06038062a2 | |||
ae9fe89759 | |||
04def60021 | |||
9ce0f69dff | |||
90c3be91c4 | |||
ebccfb3531 | |||
e006f1d02e | |||
5292319802 | |||
173121ca87 | |||
26bab031bd | |||
b5fefffa09 | |||
dccb3e370a | |||
95ca55aa7e |
@@ -10,6 +10,9 @@ Prerequisites:
|
||||
sudo apt-get install python-cherrypy3 python-decorator python-simplejson
|
||||
sudo apt-get install python-requests python-dateutil python-tz python-psutil
|
||||
|
||||
# Other dependencies (required by some modules)
|
||||
sudo apt-get install python-numpy
|
||||
|
||||
# Tools for running tests
|
||||
sudo apt-get install python-nose python-coverage
|
||||
|
||||
|
@@ -389,3 +389,35 @@ Possible solutions:
|
||||
are always printed as int64 values, and a new format
|
||||
"@1234567890123456" is added to the parser for specifying them
|
||||
exactly.
|
||||
|
||||
Binary interface
|
||||
----------------
|
||||
|
||||
The ASCII interface is too slow for high-bandwidth processing, like
|
||||
sinefits, prep, etc. A binary interface was added so that you can
|
||||
extract the raw binary out of the bulkdata storage. This binary is
|
||||
a little-endian format, e.g. in C a uint16_6 stream would be:
|
||||
|
||||
#include <endian.h>
|
||||
#include <stdint.h>
|
||||
struct {
|
||||
int64_t timestamp_le;
|
||||
uint16_t data_le[6];
|
||||
} __attribute__((packed));
|
||||
|
||||
Remember to byteswap (with e.g. `letoh` in C)!
|
||||
|
||||
This interface is used by the new `nilmdb.client.numpyclient.NumpyClient`
|
||||
class, which is a subclass of the normal `nilmcb.client.client.Client`
|
||||
and has all of the same functions. It adds three new functions:
|
||||
|
||||
- `stream_extract_numpy` to extract data as a Numpy array
|
||||
|
||||
- `stream_insert_numpy` to insert data as a Numpy array
|
||||
|
||||
- `stream_insert_numpy_context` is the context manager for
|
||||
incrementally inserting data
|
||||
|
||||
It is significantly faster! It is about 20 times faster to decimate a
|
||||
stream with `nilm-decimate` when the filter code is using the new
|
||||
binary/numpy interface.
|
||||
|
@@ -19,8 +19,9 @@ Then, set up Apache with a configuration like:
|
||||
|
||||
<VirtualHost>
|
||||
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
|
||||
WSGIProcessGroup nilmdb-server
|
||||
WSGIDaemonProcess nilmdb-server threads=32 user=nilm group=nilm
|
||||
WSGIApplicationGroup nilmdb-appgroup
|
||||
WSGIProcessGroup nilmdb-procgroup
|
||||
WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm
|
||||
|
||||
# Access control example:
|
||||
<Location /nilmdb>
|
||||
|
@@ -6,6 +6,7 @@ import nilmdb.utils
|
||||
import nilmdb.client.httpclient
|
||||
from nilmdb.client.errors import ClientError
|
||||
|
||||
import re
|
||||
import time
|
||||
import simplejson as json
|
||||
import contextlib
|
||||
@@ -65,7 +66,12 @@ class Client(object):
|
||||
params["layout"] = layout
|
||||
if extended:
|
||||
params["extended"] = 1
|
||||
return self.http.get("stream/list", params)
|
||||
def sort_streams_nicely(x):
|
||||
"""Human-friendly sort (/stream/2 before /stream/10)"""
|
||||
num = lambda t: int(t) if t.isdigit() else t
|
||||
key = lambda k: [ num(c) for c in re.split('([0-9]+)', k[0]) ]
|
||||
return sorted(x, key = key)
|
||||
return sort_streams_nicely(self.http.get("stream/list", params))
|
||||
|
||||
def stream_get_metadata(self, path, keys = None):
|
||||
params = { "path": path }
|
||||
@@ -121,10 +127,11 @@ class Client(object):
|
||||
@contextlib.contextmanager
|
||||
def stream_insert_context(self, path, start = None, end = None):
|
||||
"""Return a context manager that allows data to be efficiently
|
||||
inserted into a stream in a piecewise manner. Data is be provided
|
||||
as single lines, and is aggregated and sent to the server in larger
|
||||
chunks as necessary. Data lines must match the database layout for
|
||||
the given path, and end with a newline.
|
||||
inserted into a stream in a piecewise manner. Data is
|
||||
provided as ASCII lines, and is aggregated and sent to the
|
||||
server in larger or smaller chunks as necessary. Data lines
|
||||
must match the database layout for the given path, and end
|
||||
with a newline.
|
||||
|
||||
Example:
|
||||
with client.stream_insert_context('/path', start, end) as ctx:
|
||||
@@ -136,15 +143,16 @@ class Client(object):
|
||||
This may make multiple requests to the server, if the data is
|
||||
large enough or enough time has passed between insertions.
|
||||
"""
|
||||
ctx = StreamInserter(self.http, path, start, end)
|
||||
ctx = StreamInserter(self, path, start, end)
|
||||
yield ctx
|
||||
ctx.finalize()
|
||||
|
||||
def stream_insert(self, path, data, start = None, end = None):
|
||||
"""Insert rows of data into a stream. data should be a string
|
||||
or iterable that provides ASCII data that matches the database
|
||||
layout for path. See stream_insert_context for details on the
|
||||
'start' and 'end' parameters."""
|
||||
layout for path. Data is passed through stream_insert_context,
|
||||
so it will be broken into reasonably-sized chunks and
|
||||
start/end will be deduced if missing."""
|
||||
with self.stream_insert_context(path, start, end) as ctx:
|
||||
if isinstance(data, basestring):
|
||||
ctx.insert(data)
|
||||
@@ -153,11 +161,28 @@ class Client(object):
|
||||
ctx.insert(chunk)
|
||||
return ctx.last_response
|
||||
|
||||
def stream_insert_block(self, path, data, start, end, binary = False):
|
||||
"""Insert a single fixed block of data into the stream. It is
|
||||
sent directly to the server in one block with no further
|
||||
processing.
|
||||
|
||||
If 'binary' is True, provide raw binary data in little-endian
|
||||
format matching the path layout, including an int64 timestamp.
|
||||
Otherwise, provide ASCII data matching the layout."""
|
||||
params = {
|
||||
"path": path,
|
||||
"start": timestamp_to_string(start),
|
||||
"end": timestamp_to_string(end),
|
||||
}
|
||||
if binary:
|
||||
params["binary"] = 1
|
||||
return self.http.put("stream/insert", data, params, binary = binary)
|
||||
|
||||
def stream_intervals(self, path, start = None, end = None, diffpath = None):
|
||||
"""
|
||||
Return a generator that yields each stream interval.
|
||||
|
||||
If diffpath is not None, yields only interval ranges that are
|
||||
If 'diffpath' is not None, yields only interval ranges that are
|
||||
present in 'path' but not in 'diffpath'.
|
||||
"""
|
||||
params = {
|
||||
@@ -172,17 +197,22 @@ class Client(object):
|
||||
return self.http.get_gen("stream/intervals", params)
|
||||
|
||||
def stream_extract(self, path, start = None, end = None,
|
||||
count = False, markup = False):
|
||||
count = False, markup = False, binary = False):
|
||||
"""
|
||||
Extract data from a stream. Returns a generator that yields
|
||||
lines of ASCII-formatted data that matches the database
|
||||
layout for the given path.
|
||||
|
||||
Specify count = True to return a count of matching data points
|
||||
If 'count' is True, return a count of matching data points
|
||||
rather than the actual data. The output format is unchanged.
|
||||
|
||||
Specify markup = True to include comments in the returned data
|
||||
If 'markup' is True, include comments in the returned data
|
||||
that indicate interval starts and ends.
|
||||
|
||||
If 'binary' is True, return chunks of raw binary data, rather
|
||||
than lines of ASCII-formatted data. Raw binary data is
|
||||
little-endian and matches the database types (including an
|
||||
int64 timestamp).
|
||||
"""
|
||||
params = {
|
||||
"path": path,
|
||||
@@ -195,7 +225,9 @@ class Client(object):
|
||||
params["count"] = 1
|
||||
if markup:
|
||||
params["markup"] = 1
|
||||
return self.http.get_gen("stream/extract", params)
|
||||
if binary:
|
||||
params["binary"] = 1
|
||||
return self.http.get_gen("stream/extract", params, binary = binary)
|
||||
|
||||
def stream_count(self, path, start = None, end = None):
|
||||
"""
|
||||
@@ -244,13 +276,13 @@ class StreamInserter(object):
|
||||
_max_data = 2 * 1024 * 1024
|
||||
_max_data_after_send = 64 * 1024
|
||||
|
||||
def __init__(self, http, path, start = None, end = None):
|
||||
"""'http' is the httpclient object. 'path' is the database
|
||||
def __init__(self, client, path, start, end):
|
||||
"""'client' is the client object. 'path' is the database
|
||||
path to insert to. 'start' and 'end' are used for the first
|
||||
contiguous interval."""
|
||||
contiguous interval and may be None."""
|
||||
self.last_response = None
|
||||
|
||||
self._http = http
|
||||
self._client = client
|
||||
self._path = path
|
||||
|
||||
# Start and end for the overall contiguous interval we're
|
||||
@@ -313,6 +345,11 @@ class StreamInserter(object):
|
||||
part of a new interval and there may be a gap left in-between."""
|
||||
self._send_block(final = True)
|
||||
|
||||
def send(self):
|
||||
"""Send any data that we might have buffered up. Does not affect
|
||||
any other treatment of timestamps or endpoints."""
|
||||
self._send_block(final = False)
|
||||
|
||||
def _get_first_noncomment(self, block):
|
||||
"""Return the (start, end) indices of the first full line in
|
||||
block that isn't a comment, or raise IndexError if
|
||||
@@ -413,9 +450,7 @@ class StreamInserter(object):
|
||||
raise ClientError("have data to send, but no start/end times")
|
||||
|
||||
# Send it
|
||||
params = { "path": self._path,
|
||||
"start": timestamp_to_string(start_ts),
|
||||
"end": timestamp_to_string(end_ts) }
|
||||
self.last_response = self._http.put("stream/insert", block, params)
|
||||
self.last_response = self._client.stream_insert_block(
|
||||
self._path, block, start_ts, end_ts, binary = False)
|
||||
|
||||
return
|
||||
|
@@ -105,12 +105,17 @@ class HTTPClient(object):
|
||||
else:
|
||||
return self._req("POST", url, None, params)
|
||||
|
||||
def put(self, url, data, params = None):
|
||||
def put(self, url, data, params = None, binary = False):
|
||||
"""Simple PUT (parameters in URL, data in body)"""
|
||||
return self._req("PUT", url, params, data)
|
||||
if binary:
|
||||
h = { 'Content-type': 'application/octet-stream' }
|
||||
else:
|
||||
h = { 'Content-type': 'text/plain; charset=utf-8' }
|
||||
return self._req("PUT", url, query = params, body = data, headers = h)
|
||||
|
||||
# Generator versions that return data one line at a time.
|
||||
def _req_gen(self, method, url, query = None, body = None, headers = None):
|
||||
def _req_gen(self, method, url, query = None, body = None,
|
||||
headers = None, binary = False):
|
||||
"""
|
||||
Make a request and return a generator that gives back strings
|
||||
or JSON decoded lines of the body data, or raise an error if
|
||||
@@ -118,16 +123,19 @@ class HTTPClient(object):
|
||||
"""
|
||||
(response, isjson) = self._do_req(method, url, query, body,
|
||||
stream = True, headers = headers)
|
||||
if isjson:
|
||||
if binary:
|
||||
for chunk in response.iter_content(chunk_size = 65536):
|
||||
yield chunk
|
||||
elif isjson:
|
||||
for line in response.iter_lines():
|
||||
yield json.loads(line)
|
||||
else:
|
||||
for line in response.iter_lines():
|
||||
yield line
|
||||
|
||||
def get_gen(self, url, params = None):
|
||||
def get_gen(self, url, params = None, binary = False):
|
||||
"""Simple GET (parameters in URL) returning a generator"""
|
||||
return self._req_gen("GET", url, params)
|
||||
return self._req_gen("GET", url, params, binary = binary)
|
||||
|
||||
# Not much use for a POST or PUT generator, since they don't
|
||||
# return much data.
|
||||
|
262
nilmdb/client/numpyclient.py
Normal file
262
nilmdb/client/numpyclient.py
Normal file
@@ -0,0 +1,262 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Provide a NumpyClient class that is based on normal Client, but has
|
||||
additional methods for extracting and inserting data via Numpy arrays."""
|
||||
|
||||
import nilmdb.utils
|
||||
import nilmdb.client.client
|
||||
import nilmdb.client.httpclient
|
||||
from nilmdb.client.errors import ClientError
|
||||
|
||||
import contextlib
|
||||
from nilmdb.utils.time import timestamp_to_string, string_to_timestamp
|
||||
|
||||
import numpy
|
||||
import cStringIO
|
||||
|
||||
def layout_to_dtype(layout):
|
||||
ltype = layout.split('_')[0]
|
||||
lcount = int(layout.split('_')[1])
|
||||
if ltype.startswith('int'):
|
||||
atype = '<i' + str(int(ltype[3:]) / 8)
|
||||
elif ltype.startswith('uint'):
|
||||
atype = '<u' + str(int(ltype[4:]) / 8)
|
||||
elif ltype.startswith('float'):
|
||||
atype = '<f' + str(int(ltype[5:]) / 8)
|
||||
else:
|
||||
raise ValueError("bad layout")
|
||||
return numpy.dtype([('timestamp', '<i8'), ('data', atype, lcount)])
|
||||
|
||||
class NumpyClient(nilmdb.client.client.Client):
|
||||
"""Subclass of nilmdb.client.Client that adds additional methods for
|
||||
extracting and inserting data via Numpy arrays."""
|
||||
|
||||
def _get_dtype(self, path, layout):
|
||||
if layout is None:
|
||||
streams = self.stream_list(path)
|
||||
if len(streams) != 1:
|
||||
raise ClientError("can't get layout for path: " + path)
|
||||
layout = streams[0][1]
|
||||
return layout_to_dtype(layout)
|
||||
|
||||
def stream_extract_numpy(self, path, start = None, end = None,
|
||||
layout = None, maxrows = 100000,
|
||||
structured = False):
|
||||
"""
|
||||
Extract data from a stream. Returns a generator that yields
|
||||
Numpy arrays of up to 'maxrows' of data each.
|
||||
|
||||
If 'layout' is None, it is read using stream_info.
|
||||
|
||||
If 'structured' is False, all data is converted to float64
|
||||
and returned in a flat 2D array. Otherwise, data is returned
|
||||
as a structured dtype in a 1D array.
|
||||
"""
|
||||
dtype = self._get_dtype(path, layout)
|
||||
|
||||
def to_numpy(data):
|
||||
a = numpy.fromstring(data, dtype)
|
||||
if structured:
|
||||
return a
|
||||
return numpy.c_[a['timestamp'], a['data']]
|
||||
|
||||
chunks = []
|
||||
total_len = 0
|
||||
maxsize = dtype.itemsize * maxrows
|
||||
for data in self.stream_extract(path, start, end, binary = True):
|
||||
# Add this block of binary data
|
||||
chunks.append(data)
|
||||
total_len += len(data)
|
||||
|
||||
# See if we have enough to make the requested Numpy array
|
||||
while total_len >= maxsize:
|
||||
assembled = "".join(chunks)
|
||||
total_len -= maxsize
|
||||
chunks = [ assembled[maxsize:] ]
|
||||
block = assembled[:maxsize]
|
||||
yield to_numpy(block)
|
||||
|
||||
if total_len:
|
||||
yield to_numpy("".join(chunks))
|
||||
|
||||
@contextlib.contextmanager
|
||||
def stream_insert_numpy_context(self, path, start = None, end = None,
|
||||
layout = None):
|
||||
"""Return a context manager that allows data to be efficiently
|
||||
inserted into a stream in a piecewise manner. Data is
|
||||
provided as Numpy arrays, and is aggregated and sent to the
|
||||
server in larger or smaller chunks as necessary. Data format
|
||||
must match the database layout for the given path.
|
||||
|
||||
For more details, see help for
|
||||
nilmdb.client.numpyclient.StreamInserterNumpy
|
||||
|
||||
If 'layout' is not None, use it as the layout rather than
|
||||
querying the database.
|
||||
"""
|
||||
dtype = self._get_dtype(path, layout)
|
||||
ctx = StreamInserterNumpy(self, path, start, end, dtype)
|
||||
yield ctx
|
||||
ctx.finalize()
|
||||
|
||||
def stream_insert_numpy(self, path, data, start = None, end = None,
|
||||
layout = None):
|
||||
"""Insert data into a stream. data should be a Numpy array
|
||||
which will be passed through stream_insert_numpy_context to
|
||||
break it into chunks etc. See the help for that function
|
||||
for details."""
|
||||
with self.stream_insert_numpy_context(path, start, end, layout) as ctx:
|
||||
if isinstance(data, numpy.ndarray):
|
||||
ctx.insert(data)
|
||||
else:
|
||||
for chunk in data:
|
||||
ctx.insert(chunk)
|
||||
return ctx.last_response
|
||||
|
||||
class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
|
||||
"""Object returned by stream_insert_numpy_context() that manages
|
||||
the insertion of rows of data into a particular path.
|
||||
|
||||
See help for nilmdb.client.client.StreamInserter for details.
|
||||
The only difference is that, instead of ASCII formatted data,
|
||||
this context manager can take Numpy arrays, which are either
|
||||
structured (1D with complex dtype) or flat (2D with simple dtype).
|
||||
"""
|
||||
|
||||
# Soft limit of how many bytes to send per HTTP request.
|
||||
_max_data = 2 * 1024 * 1024
|
||||
|
||||
def __init__(self, client, path, start, end, dtype):
|
||||
"""
|
||||
'client' is the client object. 'path' is the database path
|
||||
to insert to. 'start' and 'end' are used for the first
|
||||
contiguous interval and may be None. 'dtype' is the Numpy
|
||||
dtype for this stream.
|
||||
"""
|
||||
self.last_response = None
|
||||
|
||||
self._dtype = dtype
|
||||
self._client = client
|
||||
self._path = path
|
||||
|
||||
# Start and end for the overall contiguous interval we're
|
||||
# filling
|
||||
self._interval_start = start
|
||||
self._interval_end = end
|
||||
|
||||
# Max rows to send at once
|
||||
self._max_rows = self._max_data // self._dtype.itemsize
|
||||
|
||||
# List of the current arrays we're building up to send
|
||||
self._block_arrays = []
|
||||
self._block_rows = 0
|
||||
|
||||
def insert(self, array):
|
||||
"""Insert Numpy data, which must match the layout type."""
|
||||
if type(array) != numpy.ndarray:
|
||||
array = numpy.array(array)
|
||||
if array.ndim == 1:
|
||||
# Already a structured array; just verify the type
|
||||
if array.dtype != self._dtype:
|
||||
raise ValueError("wrong dtype for 1D (structured) array")
|
||||
elif array.ndim == 2:
|
||||
# Convert to structured array
|
||||
sarray = numpy.zeros(array.shape[0], dtype=self._dtype)
|
||||
try:
|
||||
sarray['timestamp'] = array[:,0]
|
||||
# Need the squeeze in case sarray['data'] is 1 dimensional
|
||||
sarray['data'] = numpy.squeeze(array[:,1:])
|
||||
except (IndexError, ValueError):
|
||||
raise ValueError("wrong number of fields for this data type")
|
||||
array = sarray
|
||||
else:
|
||||
raise ValueError("wrong number of dimensions in array")
|
||||
|
||||
length = len(array)
|
||||
maxrows = self._max_rows
|
||||
|
||||
if length == 0:
|
||||
return
|
||||
if length > maxrows:
|
||||
# This is more than twice what we wanted to send, so split
|
||||
# it up. This is a bit inefficient, but the user really
|
||||
# shouldn't be providing this much data at once.
|
||||
for cut in range(0, length, maxrows):
|
||||
self.insert(array[cut:(cut + maxrows)])
|
||||
return
|
||||
|
||||
# Add this array to our list
|
||||
self._block_arrays.append(array)
|
||||
self._block_rows += length
|
||||
|
||||
# Send if it's too long
|
||||
if self._block_rows >= maxrows:
|
||||
self._send_block(final = False)
|
||||
|
||||
def _send_block(self, final = False):
|
||||
"""Send the data current stored up. One row might be left
|
||||
over if we need its timestamp saved."""
|
||||
|
||||
# Build the full array to send
|
||||
if self._block_rows == 0:
|
||||
array = numpy.zeros(0, dtype = self._dtype)
|
||||
else:
|
||||
array = numpy.hstack(self._block_arrays)
|
||||
|
||||
# Get starting timestamp
|
||||
start_ts = self._interval_start
|
||||
if start_ts is None:
|
||||
# Pull start from the first row
|
||||
try:
|
||||
start_ts = array['timestamp'][0]
|
||||
except IndexError:
|
||||
pass # no timestamp is OK, if we have no data
|
||||
|
||||
# Get ending timestamp
|
||||
if final:
|
||||
# For a final block, the timestamp is either the
|
||||
# user-provided end, or the timestamp of the last line
|
||||
# plus epsilon.
|
||||
end_ts = self._interval_end
|
||||
if end_ts is None:
|
||||
try:
|
||||
end_ts = array['timestamp'][-1]
|
||||
end_ts += nilmdb.utils.time.epsilon
|
||||
except IndexError:
|
||||
pass # no timestamp is OK, if we have no data
|
||||
self._block_arrays = []
|
||||
self._block_rows = 0
|
||||
|
||||
# Next block is completely fresh
|
||||
self._interval_start = None
|
||||
self._interval_end = None
|
||||
else:
|
||||
# An intermediate block. We need to save the last row
|
||||
# for the next block, and use its timestamp as the ending
|
||||
# timestamp for this one.
|
||||
if len(array) < 2:
|
||||
# Not enough data to send an intermediate block
|
||||
return
|
||||
end_ts = array['timestamp'][-1]
|
||||
if self._interval_end is not None and end_ts > self._interval_end:
|
||||
# User gave us bad endpoints; send it anyway, and let
|
||||
# the server complain so that the error is the same
|
||||
# as if we hadn't done this chunking.
|
||||
end_ts = self._interval_end
|
||||
self._block_arrays = [ array[-1:] ]
|
||||
self._block_rows = 1
|
||||
array = array[:-1]
|
||||
|
||||
# Next block continues where this one ended
|
||||
self._interval_start = end_ts
|
||||
|
||||
# If we have no endpoints, it's because we had no data to send.
|
||||
if start_ts is None or end_ts is None:
|
||||
return
|
||||
|
||||
# Send it
|
||||
data = array.tostring()
|
||||
self.last_response = self._client.stream_insert_block(
|
||||
self._path, data, start_ts, end_ts, binary = True)
|
||||
|
||||
return
|
@@ -10,6 +10,7 @@ import sys
|
||||
import os
|
||||
import argparse
|
||||
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
||||
import signal
|
||||
|
||||
try: # pragma: no cover
|
||||
import argcomplete
|
||||
@@ -80,8 +81,14 @@ class Cmdline(object):
|
||||
|
||||
def __init__(self, argv = None):
|
||||
self.argv = argv or sys.argv[1:]
|
||||
try:
|
||||
# Assume command line arguments are encoded with stdin's encoding,
|
||||
# and reverse it. Won't be needed in Python 3, but for now..
|
||||
self.argv = [ x.decode(sys.stdin.encoding) for x in self.argv ]
|
||||
except Exception: # pragma: no cover
|
||||
pass
|
||||
self.client = None
|
||||
self.def_url = os.environ.get("NILMDB_URL", "http://localhost:12380")
|
||||
self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
|
||||
self.subcmd = {}
|
||||
self.complete = Complete()
|
||||
|
||||
@@ -126,6 +133,13 @@ class Cmdline(object):
|
||||
sys.exit(-1)
|
||||
|
||||
def run(self):
|
||||
# Set SIGPIPE to its default handler -- we don't need Python
|
||||
# to catch it for us.
|
||||
try:
|
||||
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
||||
except ValueError: # pragma: no cover
|
||||
pass
|
||||
|
||||
# Clear cached timezone, so that we can pick up timezone changes
|
||||
# while running this from the test suite.
|
||||
datetime_tz._localtz = None
|
||||
|
@@ -1,5 +1,6 @@
|
||||
from nilmdb.utils.printf import *
|
||||
import nilmdb.client
|
||||
import fnmatch
|
||||
|
||||
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
||||
|
||||
@@ -10,25 +11,39 @@ def setup(self, sub):
|
||||
Destroy the stream at the specified path.
|
||||
The stream must be empty. All metadata
|
||||
related to the stream is permanently deleted.
|
||||
|
||||
Wildcards and multiple paths are supported.
|
||||
""")
|
||||
cmd.set_defaults(handler = cmd_destroy)
|
||||
group = cmd.add_argument_group("Options")
|
||||
group.add_argument("-R", "--remove", action="store_true",
|
||||
help="Remove all data before destroying stream")
|
||||
group.add_argument("-q", "--quiet", action="store_true",
|
||||
help="Don't display names when destroying "
|
||||
"multiple paths")
|
||||
group = cmd.add_argument_group("Required arguments")
|
||||
group.add_argument("path",
|
||||
help="Path of the stream to delete, e.g. /foo/bar",
|
||||
group.add_argument("path", nargs='+',
|
||||
help="Path of the stream to delete, e.g. /foo/bar/*",
|
||||
).completer = self.complete.path
|
||||
return cmd
|
||||
|
||||
def cmd_destroy(self):
|
||||
"""Destroy stream"""
|
||||
if self.args.remove:
|
||||
streams = [ s[0] for s in self.client.stream_list() ]
|
||||
paths = []
|
||||
for path in self.args.path:
|
||||
new = fnmatch.filter(streams, path)
|
||||
if not new:
|
||||
self.die("error: no stream matched path: %s", path)
|
||||
paths.extend(new)
|
||||
|
||||
for path in paths:
|
||||
if not self.args.quiet and len(paths) > 1:
|
||||
printf("Destroying %s\n", path)
|
||||
|
||||
try:
|
||||
count = self.client.stream_remove(self.args.path)
|
||||
if self.args.remove:
|
||||
count = self.client.stream_remove(path)
|
||||
self.client.stream_destroy(path)
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("error removing data: %s", str(e))
|
||||
try:
|
||||
self.client.stream_destroy(self.args.path)
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("error destroying stream: %s", str(e))
|
||||
self.die("error destroying stream: %s", str(e))
|
||||
|
@@ -10,22 +10,16 @@ def setup(self, sub):
|
||||
formatter_class = def_form,
|
||||
description="""
|
||||
List streams available in the database,
|
||||
optionally filtering by layout or path. Wildcards
|
||||
are accepted.
|
||||
optionally filtering by path. Wildcards
|
||||
are accepted; non-matching paths or wildcards
|
||||
are ignored.
|
||||
""")
|
||||
cmd.set_defaults(verify = cmd_list_verify,
|
||||
handler = cmd_list)
|
||||
|
||||
group = cmd.add_argument_group("Stream filtering")
|
||||
group.add_argument("-p", "--path", metavar="PATH", default="*",
|
||||
help="Match only this path (-p can be omitted)",
|
||||
group.add_argument("path", metavar="PATH", default=["*"], nargs='*',
|
||||
).completer = self.complete.path
|
||||
group.add_argument("path_positional", default="*",
|
||||
nargs="?", help=argparse.SUPPRESS,
|
||||
).completer = self.complete.path
|
||||
group.add_argument("-l", "--layout", default="*",
|
||||
help="Match only this stream layout",
|
||||
).completer = self.complete.layout
|
||||
|
||||
group = cmd.add_argument_group("Interval info")
|
||||
group.add_argument("-E", "--ext", action="store_true",
|
||||
@@ -49,20 +43,12 @@ def setup(self, sub):
|
||||
group = cmd.add_argument_group("Misc options")
|
||||
group.add_argument("-T", "--timestamp-raw", action="store_true",
|
||||
help="Show raw timestamps when printing times")
|
||||
group.add_argument("-l", "--layout", action="store_true",
|
||||
help="Show layout type next to path name")
|
||||
|
||||
return cmd
|
||||
|
||||
def cmd_list_verify(self):
|
||||
# A hidden "path_positional" argument lets the user leave off the
|
||||
# "-p" when specifying the path. Handle it here.
|
||||
got_opt = self.args.path != "*"
|
||||
got_pos = self.args.path_positional != "*"
|
||||
if got_pos:
|
||||
if got_opt:
|
||||
self.parser.error("too many paths specified")
|
||||
else:
|
||||
self.args.path = self.args.path_positional
|
||||
|
||||
if self.args.start is not None and self.args.end is not None:
|
||||
if self.args.start >= self.args.end:
|
||||
self.parser.error("start must precede end")
|
||||
@@ -80,29 +66,33 @@ def cmd_list(self):
|
||||
else:
|
||||
time_string = nilmdb.utils.time.timestamp_to_human
|
||||
|
||||
for stream in streams:
|
||||
(path, layout, int_min, int_max, rows, time) = stream[:6]
|
||||
if not (fnmatch.fnmatch(path, self.args.path) and
|
||||
fnmatch.fnmatch(layout, self.args.layout)):
|
||||
continue
|
||||
for argpath in self.args.path:
|
||||
for stream in streams:
|
||||
(path, layout, int_min, int_max, rows, time) = stream[:6]
|
||||
if not fnmatch.fnmatch(path, argpath):
|
||||
continue
|
||||
|
||||
printf("%s %s\n", path, layout)
|
||||
|
||||
if self.args.ext:
|
||||
if int_min is None or int_max is None:
|
||||
printf(" interval extents: (no data)\n")
|
||||
if self.args.layout:
|
||||
printf("%s %s\n", path, layout)
|
||||
else:
|
||||
printf(" interval extents: %s -> %s\n",
|
||||
time_string(int_min), time_string(int_max))
|
||||
printf(" total data: %d rows, %.6f seconds\n",
|
||||
rows or 0,
|
||||
nilmdb.utils.time.timestamp_to_seconds(time or 0))
|
||||
printf("%s\n", path)
|
||||
|
||||
if self.args.detail:
|
||||
printed = False
|
||||
for (start, end) in self.client.stream_intervals(
|
||||
path, self.args.start, self.args.end):
|
||||
printf(" [ %s -> %s ]\n", time_string(start), time_string(end))
|
||||
printed = True
|
||||
if not printed:
|
||||
printf(" (no intervals)\n")
|
||||
if self.args.ext:
|
||||
if int_min is None or int_max is None:
|
||||
printf(" interval extents: (no data)\n")
|
||||
else:
|
||||
printf(" interval extents: %s -> %s\n",
|
||||
time_string(int_min), time_string(int_max))
|
||||
printf(" total data: %d rows, %.6f seconds\n",
|
||||
rows or 0,
|
||||
nilmdb.utils.time.timestamp_to_seconds(time or 0))
|
||||
|
||||
if self.args.detail:
|
||||
printed = False
|
||||
for (start, end) in self.client.stream_intervals(
|
||||
path, self.args.start, self.args.end):
|
||||
printf(" [ %s -> %s ]\n",
|
||||
time_string(start), time_string(end))
|
||||
printed = True
|
||||
if not printed:
|
||||
printf(" (no intervals)\n")
|
||||
|
@@ -9,7 +9,8 @@ def setup(self, sub):
|
||||
a stream.
|
||||
""",
|
||||
usage="%(prog)s path [-g [key ...] | "
|
||||
"-s key=value [...] | -u key=value [...]]")
|
||||
"-s key=value [...] | -u key=value [...]] | "
|
||||
"-d [key ...]")
|
||||
cmd.set_defaults(handler = cmd_metadata)
|
||||
|
||||
group = cmd.add_argument_group("Required arguments")
|
||||
@@ -30,6 +31,9 @@ def setup(self, sub):
|
||||
help="Update metadata using provided "
|
||||
"key=value pairs",
|
||||
).completer = self.complete.meta_keyval
|
||||
exc.add_argument("-d", "--delete", nargs="*", metavar="key",
|
||||
help="Delete metadata for specified keys (default all)",
|
||||
).completer = self.complete.meta_key
|
||||
return cmd
|
||||
|
||||
def cmd_metadata(self):
|
||||
@@ -56,6 +60,16 @@ def cmd_metadata(self):
|
||||
handler(self.args.path, data)
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("error setting/updating metadata: %s", str(e))
|
||||
elif self.args.delete is not None:
|
||||
# Delete (by setting values to empty strings)
|
||||
keys = self.args.delete or None
|
||||
try:
|
||||
data = self.client.stream_get_metadata(self.args.path, keys)
|
||||
for key in data:
|
||||
data[key] = ""
|
||||
self.client.stream_update_metadata(self.args.path, data)
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("error deleting metadata: %s", str(e))
|
||||
else:
|
||||
# Get (or unspecified)
|
||||
keys = self.args.get or None
|
||||
@@ -64,7 +78,7 @@ def cmd_metadata(self):
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("error getting metadata: %s", str(e))
|
||||
for key, value in sorted(data.items()):
|
||||
# Omit nonexistant keys
|
||||
# Print nonexistant keys as having empty value
|
||||
if value is None:
|
||||
value = ""
|
||||
printf("%s=%s\n", key, value)
|
||||
|
@@ -1,17 +1,19 @@
|
||||
from nilmdb.utils.printf import *
|
||||
import nilmdb.client
|
||||
import fnmatch
|
||||
|
||||
def setup(self, sub):
|
||||
cmd = sub.add_parser("remove", help="Remove data",
|
||||
description="""
|
||||
Remove all data from a specified time range within a
|
||||
stream.
|
||||
stream. If multiple streams or wildcards are provided,
|
||||
the same time range is removed from all streams.
|
||||
""")
|
||||
cmd.set_defaults(handler = cmd_remove)
|
||||
|
||||
group = cmd.add_argument_group("Data selection")
|
||||
group.add_argument("path",
|
||||
help="Path of stream, e.g. /foo/bar",
|
||||
group.add_argument("path", nargs='+',
|
||||
help="Path of stream, e.g. /foo/bar/*",
|
||||
).completer = self.complete.path
|
||||
group.add_argument("-s", "--start", required=True,
|
||||
metavar="TIME", type=self.arg_time,
|
||||
@@ -23,18 +25,31 @@ def setup(self, sub):
|
||||
).completer = self.complete.time
|
||||
|
||||
group = cmd.add_argument_group("Output format")
|
||||
group.add_argument("-q", "--quiet", action="store_true",
|
||||
help="Don't display names when removing "
|
||||
"from multiple paths")
|
||||
group.add_argument("-c", "--count", action="store_true",
|
||||
help="Output number of data points removed")
|
||||
return cmd
|
||||
|
||||
def cmd_remove(self):
|
||||
streams = [ s[0] for s in self.client.stream_list() ]
|
||||
paths = []
|
||||
for path in self.args.path:
|
||||
new = fnmatch.filter(streams, path)
|
||||
if not new:
|
||||
self.die("error: no stream matched path: %s", path)
|
||||
paths.extend(new)
|
||||
|
||||
try:
|
||||
count = self.client.stream_remove(self.args.path,
|
||||
self.args.start, self.args.end)
|
||||
for path in paths:
|
||||
if not self.args.quiet and len(paths) > 1:
|
||||
printf("Removing from %s\n", path)
|
||||
count = self.client.stream_remove(path,
|
||||
self.args.start, self.args.end)
|
||||
if self.args.count:
|
||||
printf("%d\n", count);
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("error removing data: %s", str(e))
|
||||
|
||||
if self.args.count:
|
||||
printf("%d\n", count)
|
||||
|
||||
return 0
|
||||
|
@@ -79,7 +79,12 @@ class BulkData(object):
|
||||
if Table.exists(ospath):
|
||||
raise ValueError("stream already exists at this path")
|
||||
if os.path.isdir(ospath):
|
||||
raise ValueError("subdirs of this path already exist")
|
||||
# Look for any files in subdirectories. Fully empty subdirectories
|
||||
# are OK; they might be there during a rename
|
||||
for (root, dirs, files) in os.walk(ospath):
|
||||
if len(files):
|
||||
raise ValueError(
|
||||
"non-empty subdirs of this path already exist")
|
||||
|
||||
def _create_parents(self, unicodepath):
|
||||
"""Verify the path name, and create parent directories if they
|
||||
@@ -188,7 +193,6 @@ class BulkData(object):
|
||||
# Basic checks
|
||||
if oldospath == newospath:
|
||||
raise ValueError("old and new paths are the same")
|
||||
self._create_check_ospath(newospath)
|
||||
|
||||
# Move the table to a temporary location
|
||||
tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root)
|
||||
@@ -196,6 +200,9 @@ class BulkData(object):
|
||||
os.rename(oldospath, tmppath)
|
||||
|
||||
try:
|
||||
# Check destination path
|
||||
self._create_check_ospath(newospath)
|
||||
|
||||
# Create parent dirs for new location
|
||||
self._create_parents(newunicodepath)
|
||||
|
||||
@@ -406,12 +413,16 @@ class Table(object):
|
||||
return rocket.Rocket(self.layout,
|
||||
os.path.join(self.root, subdir, filename))
|
||||
|
||||
def append_string(self, data, start, end):
|
||||
def append_data(self, data, start, end, binary = False):
|
||||
"""Parse the formatted string in 'data', according to the
|
||||
current layout, and append it to the table. If any timestamps
|
||||
are non-monotonic, or don't fall between 'start' and 'end',
|
||||
a ValueError is raised.
|
||||
|
||||
If 'binary' is True, the data should be in raw binary format
|
||||
instead: little-endian, matching the current table's layout,
|
||||
including the int64 timestamp.
|
||||
|
||||
If this function succeeds, it returns normally. Otherwise,
|
||||
the table is reverted back to its original state by truncating
|
||||
or deleting files as necessary."""
|
||||
@@ -430,17 +441,26 @@ class Table(object):
|
||||
# Ask the rocket object to parse and append up to "count"
|
||||
# rows of data, verifying things along the way.
|
||||
try:
|
||||
if binary:
|
||||
appender = f.append_binary
|
||||
else:
|
||||
appender = f.append_string
|
||||
(added_rows, data_offset, last_timestamp, linenum
|
||||
) = f.append_string(count, data, data_offset, linenum,
|
||||
start, end, last_timestamp)
|
||||
) = appender(count, data, data_offset, linenum,
|
||||
start, end, last_timestamp)
|
||||
except rocket.ParseError as e:
|
||||
(linenum, colnum, errtype, obj) = e.args
|
||||
where = "line %d, column %d: " % (linenum, colnum)
|
||||
if binary:
|
||||
where = "byte %d: " % (linenum)
|
||||
else:
|
||||
where = "line %d, column %d: " % (linenum, colnum)
|
||||
# Extract out the error line, add column marker
|
||||
try:
|
||||
if binary:
|
||||
raise IndexError
|
||||
bad = data.splitlines()[linenum-1]
|
||||
badptr = ' ' * (colnum - 1) + '^'
|
||||
except IndexError: # pragma: no cover
|
||||
bad += '\n' + ' ' * (colnum - 1) + '^'
|
||||
except IndexError:
|
||||
bad = ""
|
||||
if errtype == rocket.ERR_NON_MONOTONIC:
|
||||
err = "timestamp is not monotonically increasing"
|
||||
@@ -456,7 +476,7 @@ class Table(object):
|
||||
else:
|
||||
err = str(obj)
|
||||
raise ValueError("error parsing input data: " +
|
||||
where + err + "\n" + bad + "\n" + badptr)
|
||||
where + err + "\n" + bad)
|
||||
tot_rows += added_rows
|
||||
except Exception:
|
||||
# Some failure, so try to roll things back by truncating or
|
||||
@@ -472,7 +492,7 @@ class Table(object):
|
||||
# Success, so update self.nrows accordingly
|
||||
self.nrows = tot_rows
|
||||
|
||||
def get_data(self, start, stop):
|
||||
def get_data(self, start, stop, binary = False):
|
||||
"""Extract data corresponding to Python range [n:m],
|
||||
and returns a formatted string"""
|
||||
if (start is None or
|
||||
@@ -490,10 +510,13 @@ class Table(object):
|
||||
if count > remaining:
|
||||
count = remaining
|
||||
f = self.file_open(subdir, filename)
|
||||
ret.append(f.extract_string(offset, count))
|
||||
if binary:
|
||||
ret.append(f.extract_binary(offset, count))
|
||||
else:
|
||||
ret.append(f.extract_string(offset, count))
|
||||
remaining -= count
|
||||
row += count
|
||||
return "".join(ret)
|
||||
return b"".join(ret)
|
||||
|
||||
def __getitem__(self, row):
|
||||
"""Extract timestamps from a row, with table[n] notation."""
|
||||
|
@@ -475,12 +475,16 @@ class NilmDB(object):
|
||||
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
|
||||
con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
|
||||
|
||||
def stream_insert(self, path, start, end, data):
|
||||
def stream_insert(self, path, start, end, data, binary = False):
|
||||
"""Insert new data into the database.
|
||||
path: Path at which to add the data
|
||||
start: Starting timestamp
|
||||
end: Ending timestamp
|
||||
data: Textual data, formatted according to the layout of path
|
||||
|
||||
'binary', if True, means that 'data' is raw binary:
|
||||
little-endian, matching the current table's layout,
|
||||
including the int64 timestamp.
|
||||
"""
|
||||
# First check for basic overlap using timestamp info given.
|
||||
stream_id = self._stream_id(path)
|
||||
@@ -494,7 +498,7 @@ class NilmDB(object):
|
||||
# there are any parse errors.
|
||||
table = self.data.getnode(path)
|
||||
row_start = table.nrows
|
||||
table.append_string(data, start, end)
|
||||
table.append_data(data, start, end, binary)
|
||||
row_end = table.nrows
|
||||
|
||||
# Insert the record into the sql database.
|
||||
@@ -538,7 +542,7 @@ class NilmDB(object):
|
||||
dbinterval.db_endpos)
|
||||
|
||||
def stream_extract(self, path, start = None, end = None,
|
||||
count = False, markup = False):
|
||||
count = False, markup = False, binary = False):
|
||||
"""
|
||||
Returns (data, restart) tuple.
|
||||
|
||||
@@ -559,6 +563,9 @@ class NilmDB(object):
|
||||
'markup', if true, indicates that returned data should be
|
||||
marked with a comment denoting when a particular interval
|
||||
starts, and another comment when an interval ends.
|
||||
|
||||
'binary', if true, means to return raw binary rather than
|
||||
ASCII-formatted data.
|
||||
"""
|
||||
stream_id = self._stream_id(path)
|
||||
table = self.data.getnode(path)
|
||||
@@ -569,6 +576,8 @@ class NilmDB(object):
|
||||
matched = 0
|
||||
remaining = self.max_results
|
||||
restart = None
|
||||
if binary and (markup or count):
|
||||
raise NilmDBError("binary mode can't be used with markup or count")
|
||||
for interval in intervals.intersection(requested):
|
||||
# Reading single rows from the table is too slow, so
|
||||
# we use two bisections to find both the starting and
|
||||
@@ -593,7 +602,7 @@ class NilmDB(object):
|
||||
timestamp_to_string(interval.start) + "\n")
|
||||
|
||||
# Gather these results up
|
||||
result.append(table.get_data(row_start, row_end))
|
||||
result.append(table.get_data(row_start, row_end, binary))
|
||||
|
||||
# Count them
|
||||
remaining -= row_end - row_start
|
||||
|
@@ -419,6 +419,68 @@ extra_data_on_line:
|
||||
ERR_OTHER, "extra data on line");
|
||||
}
|
||||
|
||||
/****
|
||||
* Append from binary data
|
||||
*/
|
||||
|
||||
/* .append_binary(count, data, offset, linenum, start, end, last_timestamp) */
|
||||
static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
|
||||
{
|
||||
int count;
|
||||
const uint8_t *data;
|
||||
int data_len;
|
||||
int linenum;
|
||||
int offset;
|
||||
timestamp_t start;
|
||||
timestamp_t end;
|
||||
timestamp_t last_timestamp;
|
||||
|
||||
if (!PyArg_ParseTuple(args, "it#iilll:append_binary",
|
||||
&count, &data, &data_len, &offset,
|
||||
&linenum, &start, &end, &last_timestamp))
|
||||
return NULL;
|
||||
|
||||
/* Advance to offset */
|
||||
if (offset > data_len)
|
||||
return raise_str(0, 0, ERR_OTHER, "bad offset");
|
||||
data += offset;
|
||||
data_len -= offset;
|
||||
|
||||
/* Figure out max number of rows to insert */
|
||||
int rows = data_len / self->binary_size;
|
||||
if (rows > count)
|
||||
rows = count;
|
||||
|
||||
/* Check timestamps */
|
||||
timestamp_t ts;
|
||||
int i;
|
||||
for (i = 0; i < rows; i++) {
|
||||
/* Read raw timestamp, byteswap if needed */
|
||||
memcpy(&ts, &data[i * self->binary_size], 8);
|
||||
ts = le64toh(ts);
|
||||
|
||||
/* Check limits */
|
||||
if (ts <= last_timestamp)
|
||||
return raise_int(i, 0, ERR_NON_MONOTONIC, ts);
|
||||
last_timestamp = ts;
|
||||
if (ts < start || ts >= end)
|
||||
return raise_int(i, 0, ERR_OUT_OF_INTERVAL, ts);
|
||||
}
|
||||
|
||||
/* Write binary data */
|
||||
if (fwrite(data, data_len, 1, self->file) != 1) {
|
||||
PyErr_SetFromErrno(PyExc_OSError);
|
||||
return NULL;
|
||||
}
|
||||
fflush(self->file);
|
||||
|
||||
/* Build return value and return */
|
||||
PyObject *o;
|
||||
o = Py_BuildValue("(iili)", rows, offset + rows * self->binary_size,
|
||||
last_timestamp, linenum);
|
||||
return o;
|
||||
}
|
||||
|
||||
/****
|
||||
* Extract to string
|
||||
*/
|
||||
@@ -484,7 +546,7 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
|
||||
/* read and format in a loop */ \
|
||||
for (i = 0; i < self->layout_count; i++) { \
|
||||
if (fread(&disktype, bytes, \
|
||||
1, self->file) < 0) \
|
||||
1, self->file) != 1) \
|
||||
goto err; \
|
||||
disktype = letoh(disktype); \
|
||||
ret = sprintf(&str[len], " " fmt, \
|
||||
@@ -527,6 +589,46 @@ err:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/****
|
||||
* Extract to binary string containing raw little-endian binary data
|
||||
*/
|
||||
static PyObject *Rocket_extract_binary(Rocket *self, PyObject *args)
|
||||
{
|
||||
long count;
|
||||
long offset;
|
||||
|
||||
if (!PyArg_ParseTuple(args, "ll", &offset, &count))
|
||||
return NULL;
|
||||
if (!self->file) {
|
||||
PyErr_SetString(PyExc_Exception, "no file");
|
||||
return NULL;
|
||||
}
|
||||
/* Seek to target location */
|
||||
if (fseek(self->file, offset, SEEK_SET) < 0) {
|
||||
PyErr_SetFromErrno(PyExc_OSError);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uint8_t *str;
|
||||
int len = count * self->binary_size;
|
||||
str = malloc(len);
|
||||
if (str == NULL) {
|
||||
PyErr_SetFromErrno(PyExc_OSError);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Data in the file is already in the desired little-endian
|
||||
binary format, so just read it directly. */
|
||||
if (fread(str, self->binary_size, count, self->file) != count) {
|
||||
free(str);
|
||||
PyErr_SetFromErrno(PyExc_OSError);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
PyObject *pystr = PyBytes_FromStringAndSize((char *)str, len);
|
||||
free(str);
|
||||
return pystr;
|
||||
}
|
||||
|
||||
/****
|
||||
* Extract timestamp
|
||||
@@ -571,11 +673,13 @@ static PyMemberDef Rocket_members[] = {
|
||||
};
|
||||
|
||||
static PyMethodDef Rocket_methods[] = {
|
||||
{ "close", (PyCFunction)Rocket_close, METH_NOARGS,
|
||||
{ "close",
|
||||
(PyCFunction)Rocket_close, METH_NOARGS,
|
||||
"close(self)\n\n"
|
||||
"Close file handle" },
|
||||
|
||||
{ "append_string", (PyCFunction)Rocket_append_string, METH_VARARGS,
|
||||
{ "append_string",
|
||||
(PyCFunction)Rocket_append_string, METH_VARARGS,
|
||||
"append_string(self, count, data, offset, line, start, end, ts)\n\n"
|
||||
"Parse string and append data.\n"
|
||||
"\n"
|
||||
@@ -590,16 +694,46 @@ static PyMethodDef Rocket_methods[] = {
|
||||
"Raises ParseError if timestamps are non-monotonic, outside\n"
|
||||
"the start/end interval etc.\n"
|
||||
"\n"
|
||||
"On success, return a tuple with three values:\n"
|
||||
"On success, return a tuple:\n"
|
||||
" added_rows: how many rows were added from the file\n"
|
||||
" data_offset: current offset into the data string\n"
|
||||
" last_timestamp: last timestamp we parsed" },
|
||||
" last_timestamp: last timestamp we parsed\n"
|
||||
" linenum: current line number" },
|
||||
|
||||
{ "extract_string", (PyCFunction)Rocket_extract_string, METH_VARARGS,
|
||||
{ "append_binary",
|
||||
(PyCFunction)Rocket_append_binary, METH_VARARGS,
|
||||
"append_binary(self, count, data, offset, line, start, end, ts)\n\n"
|
||||
"Append binary data, which must match the data layout.\n"
|
||||
"\n"
|
||||
" count: maximum number of rows to add\n"
|
||||
" data: binary data\n"
|
||||
" offset: byte offset into data to start adding\n"
|
||||
" line: current line number (unused)\n"
|
||||
" start: starting timestamp for interval\n"
|
||||
" end: end timestamp for interval\n"
|
||||
" ts: last timestamp that was previously parsed\n"
|
||||
"\n"
|
||||
"Raises ParseError if timestamps are non-monotonic, outside\n"
|
||||
"the start/end interval etc.\n"
|
||||
"\n"
|
||||
"On success, return a tuple:\n"
|
||||
" added_rows: how many rows were added from the file\n"
|
||||
" data_offset: current offset into the data string\n"
|
||||
" last_timestamp: last timestamp we parsed\n"
|
||||
" linenum: current line number (copied from argument)" },
|
||||
|
||||
{ "extract_string",
|
||||
(PyCFunction)Rocket_extract_string, METH_VARARGS,
|
||||
"extract_string(self, offset, count)\n\n"
|
||||
"Extract count rows of data from the file at offset offset.\n"
|
||||
"Return an ascii formatted string according to the layout" },
|
||||
|
||||
{ "extract_binary",
|
||||
(PyCFunction)Rocket_extract_binary, METH_VARARGS,
|
||||
"extract_binary(self, offset, count)\n\n"
|
||||
"Extract count rows of data from the file at offset offset.\n"
|
||||
"Return a raw binary string of data matching the data layout." },
|
||||
|
||||
{ "extract_timestamp",
|
||||
(PyCFunction)Rocket_extract_timestamp, METH_VARARGS,
|
||||
"extract_timestamp(self, offset)\n\n"
|
||||
|
@@ -11,6 +11,7 @@ from nilmdb.utils.time import string_to_timestamp
|
||||
import cherrypy
|
||||
import sys
|
||||
import os
|
||||
import socket
|
||||
import simplejson as json
|
||||
import decorator
|
||||
import psutil
|
||||
@@ -173,6 +174,21 @@ class Root(NilmApp):
|
||||
class Stream(NilmApp):
|
||||
"""Stream-specific operations"""
|
||||
|
||||
# Helpers
|
||||
def _get_times(self, start_param, end_param):
|
||||
(start, end) = (None, None)
|
||||
if start_param is not None:
|
||||
start = string_to_timestamp(start_param)
|
||||
if end_param is not None:
|
||||
end = string_to_timestamp(end_param)
|
||||
if start is not None and end is not None:
|
||||
if start >= end:
|
||||
raise cherrypy.HTTPError(
|
||||
"400 Bad Request",
|
||||
sprintf("start must precede end (%s >= %s)",
|
||||
start_param, end_param))
|
||||
return (start, end)
|
||||
|
||||
# /stream/list
|
||||
# /stream/list?layout=float32_8
|
||||
# /stream/list?path=/newton/prep&extended=1
|
||||
@@ -289,10 +305,15 @@ class Stream(NilmApp):
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, ValueError)
|
||||
@cherrypy.tools.CORS_allow(methods = ["PUT"])
|
||||
def insert(self, path, start, end):
|
||||
def insert(self, path, start, end, binary = False):
|
||||
"""
|
||||
Insert new data into the database. Provide textual data
|
||||
(matching the path's layout) as a HTTP PUT.
|
||||
|
||||
If 'binary' is True, expect raw binary data, rather than lines
|
||||
of ASCII-formatted data. Raw binary data is always
|
||||
little-endian and matches the database types (including an
|
||||
int64 timestamp).
|
||||
"""
|
||||
# Important that we always read the input before throwing any
|
||||
# errors, to keep lengths happy for persistent connections.
|
||||
@@ -300,21 +321,24 @@ class Stream(NilmApp):
|
||||
# requests, if we ever want to handle those (issue #1134)
|
||||
body = cherrypy.request.body.read()
|
||||
|
||||
# Verify content type for binary data
|
||||
content_type = cherrypy.request.headers.get('content-type')
|
||||
if binary and content_type:
|
||||
if content_type != "application/octet-stream":
|
||||
raise cherrypy.HTTPError("400", "Content type must be "
|
||||
"application/octet-stream for "
|
||||
"binary data, not " + content_type)
|
||||
|
||||
# Check path and get layout
|
||||
streams = self.db.stream_list(path = path)
|
||||
if len(streams) != 1:
|
||||
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
||||
if len(self.db.stream_list(path = path)) != 1:
|
||||
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||
|
||||
# Check limits
|
||||
start = string_to_timestamp(start)
|
||||
end = string_to_timestamp(end)
|
||||
if start >= end:
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"start must precede end")
|
||||
(start, end) = self._get_times(start, end)
|
||||
|
||||
# Pass the data directly to nilmdb, which will parse it and
|
||||
# raise a ValueError if there are any problems.
|
||||
self.db.stream_insert(path, start, end, body)
|
||||
self.db.stream_insert(path, start, end, body, binary)
|
||||
|
||||
# Done
|
||||
return
|
||||
@@ -332,14 +356,7 @@ class Stream(NilmApp):
|
||||
the interval [start, end). Returns the number of data points
|
||||
removed.
|
||||
"""
|
||||
if start is not None:
|
||||
start = string_to_timestamp(start)
|
||||
if end is not None:
|
||||
end = string_to_timestamp(end)
|
||||
if start is not None and end is not None:
|
||||
if start >= end:
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"start must precede end")
|
||||
(start, end) = self._get_times(start, end)
|
||||
total_removed = 0
|
||||
while True:
|
||||
(removed, restart) = self.db.stream_remove(path, start, end)
|
||||
@@ -370,15 +387,7 @@ class Stream(NilmApp):
|
||||
Note that the response type is the non-standard
|
||||
'application/x-json-stream' for lack of a better option.
|
||||
"""
|
||||
if start is not None:
|
||||
start = string_to_timestamp(start)
|
||||
if end is not None:
|
||||
end = string_to_timestamp(end)
|
||||
|
||||
if start is not None and end is not None:
|
||||
if start >= end:
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"start must precede end")
|
||||
(start, end) = self._get_times(start, end)
|
||||
|
||||
if len(self.db.stream_list(path = path)) != 1:
|
||||
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||
@@ -402,9 +411,8 @@ class Stream(NilmApp):
|
||||
# /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
@cherrypy.expose
|
||||
@chunked_response
|
||||
@response_type("text/plain")
|
||||
def extract(self, path, start = None, end = None,
|
||||
count = False, markup = False):
|
||||
count = False, markup = False, binary = False):
|
||||
"""
|
||||
Extract data from backend database. Streams the resulting
|
||||
entries as ASCII text lines separated by newlines. This may
|
||||
@@ -415,22 +423,26 @@ class Stream(NilmApp):
|
||||
|
||||
If 'markup' is True, adds comments to the stream denoting each
|
||||
interval's start and end timestamp.
|
||||
"""
|
||||
if start is not None:
|
||||
start = string_to_timestamp(start)
|
||||
if end is not None:
|
||||
end = string_to_timestamp(end)
|
||||
|
||||
# Check parameters
|
||||
if start is not None and end is not None:
|
||||
if start >= end:
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"start must precede end")
|
||||
If 'binary' is True, return raw binary data, rather than lines
|
||||
of ASCII-formatted data. Raw binary data is always
|
||||
little-endian and matches the database types (including an
|
||||
int64 timestamp).
|
||||
"""
|
||||
(start, end) = self._get_times(start, end)
|
||||
|
||||
# Check path and get layout
|
||||
streams = self.db.stream_list(path = path)
|
||||
if len(streams) != 1:
|
||||
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
||||
if len(self.db.stream_list(path = path)) != 1:
|
||||
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||
|
||||
if binary:
|
||||
content_type = "application/octet-stream"
|
||||
if markup or count:
|
||||
raise cherrypy.HTTPError("400", "can't mix binary and "
|
||||
"markup or count modes")
|
||||
else:
|
||||
content_type = "text/plain"
|
||||
cherrypy.response.headers['Content-Type'] = content_type
|
||||
|
||||
@workaround_cp_bug_1200
|
||||
def content(start, end):
|
||||
@@ -443,7 +455,8 @@ class Stream(NilmApp):
|
||||
|
||||
while True:
|
||||
(data, restart) = self.db.stream_extract(
|
||||
path, start, end, count = False, markup = markup)
|
||||
path, start, end, count = False,
|
||||
markup = markup, binary = binary)
|
||||
yield data
|
||||
|
||||
if restart is None:
|
||||
@@ -608,6 +621,11 @@ class Server(object):
|
||||
def stop(self):
|
||||
cherrypy.engine.exit()
|
||||
|
||||
# Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
|
||||
# instance since the database can only be opened once. For this to
|
||||
# work, the web server must use only a single process and single
|
||||
# Python interpreter. Multiple threads are OK.
|
||||
_wsgi_server = None
|
||||
def wsgi_application(dbpath, basepath): # pragma: no cover
|
||||
"""Return a WSGI application object with a database at the
|
||||
specified path.
|
||||
@@ -618,29 +636,33 @@ def wsgi_application(dbpath, basepath): # pragma: no cover
|
||||
is the same as the first argument to Apache's WSGIScriptAlias
|
||||
directive.
|
||||
"""
|
||||
server = [None]
|
||||
def application(environ, start_response):
|
||||
if server[0] is None:
|
||||
global _wsgi_server
|
||||
if _wsgi_server is None:
|
||||
# Try to start the server
|
||||
try:
|
||||
db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath)
|
||||
server[0] = nilmdb.server.Server(
|
||||
_wsgi_server = nilmdb.server.Server(
|
||||
db, embedded = True,
|
||||
basepath = basepath.rstrip('/'))
|
||||
except Exception:
|
||||
# Build an error message on failure
|
||||
import pprint
|
||||
err = sprintf("Initializing database at path '%s' failed:\n\n",
|
||||
dbpath)
|
||||
err += traceback.format_exc()
|
||||
try:
|
||||
import pwd
|
||||
import grp
|
||||
err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s)\n",
|
||||
err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s) "
|
||||
"on host %s, pid %d\n",
|
||||
os.getuid(), pwd.getpwuid(os.getuid())[0],
|
||||
os.getgid(), grp.getgrgid(os.getgid())[0])
|
||||
os.getgid(), grp.getgrgid(os.getgid())[0],
|
||||
socket.gethostname(), os.getpid())
|
||||
except ImportError:
|
||||
pass
|
||||
if server[0] is None:
|
||||
err += sprintf("\nEnvironment:\n%s\n", pprint.pformat(environ))
|
||||
if _wsgi_server is None:
|
||||
# Serve up the error with our own mini WSGI app.
|
||||
headers = [ ('Content-type', 'text/plain'),
|
||||
('Content-length', str(len(err))) ]
|
||||
@@ -648,5 +670,5 @@ def wsgi_application(dbpath, basepath): # pragma: no cover
|
||||
return [err]
|
||||
|
||||
# Call the normal application
|
||||
return server[0].wsgi_application(environ, start_response)
|
||||
return _wsgi_server.wsgi_application(environ, start_response)
|
||||
return application
|
||||
|
@@ -6,7 +6,7 @@ import time
|
||||
|
||||
# Range
|
||||
min_timestamp = (-2**63)
|
||||
max_timestamp = (2**62 - 1)
|
||||
max_timestamp = (2**63 - 1)
|
||||
|
||||
# Smallest representable step
|
||||
epsilon = 1
|
||||
@@ -32,6 +32,10 @@ def timestamp_to_human(timestamp):
|
||||
"""Convert a timestamp (integer microseconds since epoch) to a
|
||||
human-readable string, using the local timezone for display
|
||||
(e.g. from the TZ env var)."""
|
||||
if timestamp == min_timestamp:
|
||||
return "(minimum)"
|
||||
if timestamp == max_timestamp:
|
||||
return "(maximum)"
|
||||
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_unix(timestamp))
|
||||
return dt.strftime("%a, %d %b %Y %H:%M:%S.%f %z")
|
||||
|
||||
@@ -65,6 +69,14 @@ def parse_time(toparse):
|
||||
if toparse == "max":
|
||||
return max_timestamp
|
||||
|
||||
# If it starts with @, treat it as a NILM timestamp
|
||||
# (integer microseconds since epoch)
|
||||
try:
|
||||
if toparse[0] == '@':
|
||||
return int(toparse[1:])
|
||||
except (ValueError, KeyError, IndexError):
|
||||
pass
|
||||
|
||||
# If string isn't "now" and doesn't contain at least 4 digits,
|
||||
# consider it invalid. smartparse might otherwise accept
|
||||
# empty strings and strings with just separators.
|
||||
@@ -78,14 +90,6 @@ def parse_time(toparse):
|
||||
except (ValueError, OverflowError):
|
||||
pass
|
||||
|
||||
# If it starts with @, treat it as a NILM timestamp
|
||||
# (integer microseconds since epoch)
|
||||
try:
|
||||
if toparse[0] == '@':
|
||||
return int(toparse[1:])
|
||||
except (ValueError, KeyError):
|
||||
pass
|
||||
|
||||
# If it's parseable as a float, treat it as a Unix or NILM
|
||||
# timestamp based on its range.
|
||||
try:
|
||||
|
2
setup.py
2
setup.py
@@ -107,13 +107,13 @@ setup(name='nilmdb',
|
||||
author_email = 'jim@jtan.com',
|
||||
tests_require = [ 'nose',
|
||||
'coverage',
|
||||
'numpy',
|
||||
],
|
||||
setup_requires = [ 'distribute',
|
||||
],
|
||||
install_requires = [ 'decorator',
|
||||
'cherrypy >= 3.2',
|
||||
'simplejson',
|
||||
'pycurl',
|
||||
'python-dateutil',
|
||||
'pytz',
|
||||
'psutil >= 0.3.0',
|
||||
|
@@ -12,6 +12,7 @@ test_interval.py
|
||||
test_bulkdata.py
|
||||
test_nilmdb.py
|
||||
test_client.py
|
||||
test_numpyclient.py
|
||||
test_cmdline.py
|
||||
|
||||
test_*.py
|
||||
|
@@ -69,9 +69,9 @@ class TestBulkData(object):
|
||||
raw = []
|
||||
for i in range(1000):
|
||||
raw.append("%d 1 2 3 4 5 6 7 8\n" % (10000 + i))
|
||||
node.append_string("".join(raw[0:1]), 0, 50000)
|
||||
node.append_string("".join(raw[1:100]), 0, 50000)
|
||||
node.append_string("".join(raw[100:]), 0, 50000)
|
||||
node.append_data("".join(raw[0:1]), 0, 50000)
|
||||
node.append_data("".join(raw[1:100]), 0, 50000)
|
||||
node.append_data("".join(raw[100:]), 0, 50000)
|
||||
|
||||
misc_slices = [ 0, 100, slice(None), slice(0), slice(10),
|
||||
slice(5,10), slice(3,None), slice(3,-3),
|
||||
@@ -85,7 +85,7 @@ class TestBulkData(object):
|
||||
# Extract misc slices while appending, to make sure the
|
||||
# data isn't being added in the middle of the file
|
||||
for s in [2, slice(1,5), 2, slice(1,5)]:
|
||||
node.append_string("0 0 0 0 0 0 0 0 0\n", 0, 50000)
|
||||
node.append_data("0 0 0 0 0 0 0 0 0\n", 0, 50000)
|
||||
raw.append("0 0 0 0 0 0 0 0 0\n")
|
||||
eq_(get_node_slice(s), raw[s])
|
||||
|
||||
|
@@ -23,6 +23,7 @@ import warnings
|
||||
import resource
|
||||
import time
|
||||
import re
|
||||
import struct
|
||||
|
||||
from testutil.helpers import *
|
||||
|
||||
@@ -238,6 +239,22 @@ class TestClient(object):
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("start must precede end", str(e.exception))
|
||||
|
||||
# Good content type
|
||||
with assert_raises(ClientError) as e:
|
||||
client.http.put("stream/insert", "",
|
||||
{ "path": "xxxx", "start": 0, "end": 1,
|
||||
"binary": 1 },
|
||||
binary = True)
|
||||
in_("No such stream", str(e.exception))
|
||||
|
||||
# Bad content type
|
||||
with assert_raises(ClientError) as e:
|
||||
client.http.put("stream/insert", "",
|
||||
{ "path": "xxxx", "start": 0, "end": 1,
|
||||
"binary": 1 },
|
||||
binary = False)
|
||||
in_("Content type must be application/octet-stream", str(e.exception))
|
||||
|
||||
# Specify start/end (starts too late)
|
||||
data = timestamper.TimestamperRate(testfile, start, 120)
|
||||
with assert_raises(ClientError) as e:
|
||||
@@ -293,6 +310,23 @@ class TestClient(object):
|
||||
# Test count
|
||||
eq_(client.stream_count("/newton/prep"), 14400)
|
||||
|
||||
# Test binary output
|
||||
with assert_raises(ClientError) as e:
|
||||
list(client.stream_extract("/newton/prep",
|
||||
markup = True, binary = True))
|
||||
with assert_raises(ClientError) as e:
|
||||
list(client.stream_extract("/newton/prep",
|
||||
count = True, binary = True))
|
||||
data = "".join(client.stream_extract("/newton/prep", binary = True))
|
||||
# Quick check using struct
|
||||
unpacker = struct.Struct("<qffffffff")
|
||||
out = []
|
||||
for i in range(14400):
|
||||
out.append(unpacker.unpack_from(data, i * unpacker.size))
|
||||
eq_(out[0], (1332511200000000, 266568.0, 224029.0, 5161.39990234375,
|
||||
2525.169921875, 8350.83984375, 3724.699951171875,
|
||||
1355.3399658203125, 2039.0))
|
||||
|
||||
client.close()
|
||||
|
||||
def test_client_06_generators(self):
|
||||
@@ -311,11 +345,11 @@ class TestClient(object):
|
||||
|
||||
# Trigger a curl error in generator
|
||||
with assert_raises(ServerError) as e:
|
||||
client.http.get_gen("http://nosuchurl/").next()
|
||||
client.http.get_gen("http://nosuchurl.example.com./").next()
|
||||
|
||||
# Trigger a curl error in generator
|
||||
with assert_raises(ServerError) as e:
|
||||
client.http.get_gen("http://nosuchurl/").next()
|
||||
client.http.get_gen("http://nosuchurl.example.com./").next()
|
||||
|
||||
# Check 404 for missing streams
|
||||
for function in [ client.stream_intervals, client.stream_extract ]:
|
||||
@@ -365,6 +399,17 @@ class TestClient(object):
|
||||
raise AssertionError("/stream/extract is not text/plain:\n" +
|
||||
headers())
|
||||
|
||||
x = http.get("stream/extract",
|
||||
{ "path": "/newton/prep",
|
||||
"start": "123",
|
||||
"end": "124",
|
||||
"binary": "1" })
|
||||
if "transfer-encoding: chunked" not in headers():
|
||||
warnings.warn("Non-chunked HTTP response for /stream/extract")
|
||||
if "content-type: application/octet-stream" not in headers():
|
||||
raise AssertionError("/stream/extract is not binary:\n" +
|
||||
headers())
|
||||
|
||||
client.close()
|
||||
|
||||
def test_client_08_unicode(self):
|
||||
@@ -441,71 +486,75 @@ class TestClient(object):
|
||||
# override _max_data to trigger frequent server updates
|
||||
ctx._max_data = 15
|
||||
|
||||
ctx.insert("100 1\n")
|
||||
ctx.insert("1000 1\n")
|
||||
|
||||
ctx.insert("101 ")
|
||||
ctx.insert("1\n102 1")
|
||||
ctx.insert("1010 ")
|
||||
ctx.insert("1\n1020 1")
|
||||
ctx.insert("")
|
||||
ctx.insert("\n103 1\n")
|
||||
ctx.insert("\n1030 1\n")
|
||||
|
||||
ctx.insert("104 1\n")
|
||||
ctx.insert("1040 1\n")
|
||||
ctx.insert("# hello\n")
|
||||
ctx.insert(" # hello\n")
|
||||
ctx.insert(" 105 1\n")
|
||||
ctx.insert(" 1050 1\n")
|
||||
ctx.finalize()
|
||||
|
||||
ctx.insert("107 1\n")
|
||||
ctx.update_end(108)
|
||||
ctx.insert("1070 1\n")
|
||||
ctx.update_end(1080)
|
||||
ctx.finalize()
|
||||
ctx.update_start(109)
|
||||
ctx.insert("110 1\n")
|
||||
ctx.insert("111 1\n")
|
||||
ctx.insert("112 1\n")
|
||||
ctx.insert("113 1\n")
|
||||
ctx.insert("114 1\n")
|
||||
ctx.update_end(116)
|
||||
ctx.insert("115 1\n")
|
||||
ctx.update_end(117)
|
||||
ctx.insert("116 1\n")
|
||||
ctx.update_end(118)
|
||||
ctx.insert("117 1" +
|
||||
ctx.update_start(1090)
|
||||
ctx.insert("1100 1\n")
|
||||
ctx.insert("1110 1\n")
|
||||
ctx.send()
|
||||
ctx.insert("1120 1\n")
|
||||
ctx.insert("1130 1\n")
|
||||
ctx.insert("1140 1\n")
|
||||
ctx.update_end(1160)
|
||||
ctx.insert("1150 1\n")
|
||||
ctx.update_end(1170)
|
||||
ctx.insert("1160 1\n")
|
||||
ctx.update_end(1180)
|
||||
ctx.insert("1170 1" +
|
||||
" # this is super long" * 100 +
|
||||
"\n")
|
||||
ctx.finalize()
|
||||
ctx.insert("# this is super long" * 100)
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_context("/context/test", 100, 200) as ctx:
|
||||
ctx.insert("118 1\n")
|
||||
with client.stream_insert_context("/context/test",
|
||||
1000, 2000) as ctx:
|
||||
ctx.insert("1180 1\n")
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_context("/context/test", 200, 300) as ctx:
|
||||
ctx.insert("118 1\n")
|
||||
with client.stream_insert_context("/context/test",
|
||||
2000, 3000) as ctx:
|
||||
ctx.insert("1180 1\n")
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_context("/context/test") as ctx:
|
||||
ctx.insert("bogus data\n")
|
||||
|
||||
with client.stream_insert_context("/context/test", 200, 300) as ctx:
|
||||
with client.stream_insert_context("/context/test", 2000, 3000) as ctx:
|
||||
# make sure our override wasn't permanent
|
||||
ne_(ctx._max_data, 15)
|
||||
ctx.insert("225 1\n")
|
||||
ctx.insert("2250 1\n")
|
||||
ctx.finalize()
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_context("/context/test", 300, 400) as ctx:
|
||||
ctx.insert("301 1\n")
|
||||
ctx.insert("302 2\n")
|
||||
ctx.insert("303 3\n")
|
||||
ctx.insert("304 4\n")
|
||||
ctx.insert("304 4\n") # non-monotonic after a few lines
|
||||
with client.stream_insert_context("/context/test",
|
||||
3000, 4000) as ctx:
|
||||
ctx.insert("3010 1\n")
|
||||
ctx.insert("3020 2\n")
|
||||
ctx.insert("3030 3\n")
|
||||
ctx.insert("3040 4\n")
|
||||
ctx.insert("3040 4\n") # non-monotonic after a few lines
|
||||
ctx.finalize()
|
||||
|
||||
eq_(list(client.stream_intervals("/context/test")),
|
||||
[ [ 100, 106 ],
|
||||
[ 107, 108 ],
|
||||
[ 109, 118 ],
|
||||
[ 200, 300 ] ])
|
||||
[ [ 1000, 1051 ],
|
||||
[ 1070, 1080 ],
|
||||
[ 1090, 1180 ],
|
||||
[ 2000, 3000 ] ])
|
||||
|
||||
# destroy stream (try without removing data first)
|
||||
with assert_raises(ClientError):
|
||||
|
@@ -300,38 +300,19 @@ class TestCmdline(object):
|
||||
|
||||
# Verify we got those 3 streams and they're returned in
|
||||
# alphabetical order.
|
||||
self.ok("list")
|
||||
self.ok("list -l")
|
||||
self.match("/newton/prep float32_8\n"
|
||||
"/newton/raw uint16_6\n"
|
||||
"/newton/zzz/rawnotch uint16_9\n")
|
||||
|
||||
# Match just one type or one path. Also check
|
||||
# that --path is optional
|
||||
self.ok("list --path /newton/raw")
|
||||
self.match("/newton/raw uint16_6\n")
|
||||
|
||||
self.ok("list /newton/raw")
|
||||
self.match("/newton/raw uint16_6\n")
|
||||
|
||||
self.fail("list -p /newton/raw /newton/raw")
|
||||
self.contain("too many paths")
|
||||
|
||||
self.ok("list --layout uint16_6")
|
||||
self.ok("list --layout /newton/raw")
|
||||
self.match("/newton/raw uint16_6\n")
|
||||
|
||||
# Wildcard matches
|
||||
self.ok("list --layout uint16*")
|
||||
self.match("/newton/raw uint16_6\n"
|
||||
"/newton/zzz/rawnotch uint16_9\n")
|
||||
|
||||
self.ok("list --path *zzz* --layout uint16*")
|
||||
self.match("/newton/zzz/rawnotch uint16_9\n")
|
||||
|
||||
self.ok("list *zzz* --layout uint16*")
|
||||
self.match("/newton/zzz/rawnotch uint16_9\n")
|
||||
|
||||
self.ok("list --path *zzz* --layout float32*")
|
||||
self.match("")
|
||||
self.ok("list *zzz*")
|
||||
self.match("/newton/zzz/rawnotch\n")
|
||||
|
||||
# reversed range
|
||||
self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01")
|
||||
@@ -369,6 +350,8 @@ class TestCmdline(object):
|
||||
self.contain("No stream at path")
|
||||
self.fail("metadata /newton/nosuchstream --set foo=bar")
|
||||
self.contain("No stream at path")
|
||||
self.fail("metadata /newton/nosuchstream --delete")
|
||||
self.contain("No stream at path")
|
||||
|
||||
self.ok("metadata /newton/prep")
|
||||
self.match("description=The Data\nv_scale=1.234\n")
|
||||
@@ -394,6 +377,19 @@ class TestCmdline(object):
|
||||
self.fail("metadata /newton/nosuchpath")
|
||||
self.contain("No stream at path /newton/nosuchpath")
|
||||
|
||||
self.ok("metadata /newton/prep --delete")
|
||||
self.ok("metadata /newton/prep --get")
|
||||
self.match("")
|
||||
self.ok("metadata /newton/prep --set "
|
||||
"'description=The Data' "
|
||||
"v_scale=1.234")
|
||||
self.ok("metadata /newton/prep --delete v_scale")
|
||||
self.ok("metadata /newton/prep --get")
|
||||
self.match("description=The Data\n")
|
||||
self.ok("metadata /newton/prep --set description=")
|
||||
self.ok("metadata /newton/prep --get")
|
||||
self.match("")
|
||||
|
||||
def test_06_insert(self):
|
||||
self.ok("insert --help")
|
||||
|
||||
@@ -482,28 +478,28 @@ class TestCmdline(object):
|
||||
self.ok("list --detail")
|
||||
lines_(self.captured, 8)
|
||||
|
||||
self.ok("list --detail --path *prep")
|
||||
self.ok("list --detail *prep")
|
||||
lines_(self.captured, 4)
|
||||
|
||||
self.ok("list --detail --path *prep --start='23 Mar 2012 10:02'")
|
||||
self.ok("list --detail *prep --start='23 Mar 2012 10:02'")
|
||||
lines_(self.captured, 3)
|
||||
|
||||
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05'")
|
||||
self.ok("list --detail *prep --start='23 Mar 2012 10:05'")
|
||||
lines_(self.captured, 2)
|
||||
|
||||
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15'")
|
||||
self.ok("list --detail *prep --start='23 Mar 2012 10:05:15'")
|
||||
lines_(self.captured, 2)
|
||||
self.contain("10:05:15.000")
|
||||
|
||||
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'")
|
||||
self.ok("list --detail *prep --start='23 Mar 2012 10:05:15.50'")
|
||||
lines_(self.captured, 2)
|
||||
self.contain("10:05:15.500")
|
||||
|
||||
self.ok("list --detail --path *prep --start='23 Mar 2012 19:05:15.50'")
|
||||
self.ok("list --detail *prep --start='23 Mar 2012 19:05:15.50'")
|
||||
lines_(self.captured, 2)
|
||||
self.contain("no intervals")
|
||||
|
||||
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'"
|
||||
self.ok("list --detail *prep --start='23 Mar 2012 10:05:15.50'"
|
||||
+ " --end='23 Mar 2012 10:05:15.51'")
|
||||
lines_(self.captured, 2)
|
||||
self.contain("10:05:15.500")
|
||||
@@ -512,15 +508,15 @@ class TestCmdline(object):
|
||||
lines_(self.captured, 8)
|
||||
|
||||
# Verify the "raw timestamp" output
|
||||
self.ok("list --detail --path *prep --timestamp-raw "
|
||||
self.ok("list --detail *prep --timestamp-raw "
|
||||
"--start='23 Mar 2012 10:05:15.50'")
|
||||
lines_(self.captured, 2)
|
||||
self.contain("[ 1332497115500000 -> 1332497160000000 ]")
|
||||
|
||||
# bad time
|
||||
self.fail("list --detail --path *prep -T --start='9332497115.612'")
|
||||
self.fail("list --detail *prep -T --start='9332497115.612'")
|
||||
# good time
|
||||
self.ok("list --detail --path *prep -T --start='1332497115.612'")
|
||||
self.ok("list --detail *prep -T --start='1332497115.612'")
|
||||
lines_(self.captured, 2)
|
||||
self.contain("[ 1332497115612000 -> 1332497160000000 ]")
|
||||
|
||||
@@ -600,7 +596,7 @@ class TestCmdline(object):
|
||||
test(8, "10:01:59.9", "10:02:00.1", extra="-m")
|
||||
|
||||
# all data put in by tests
|
||||
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
self.ok("extract -a /newton/prep --start min --end max")
|
||||
lines_(self.captured, 43204)
|
||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
self.match("43200\n")
|
||||
@@ -624,7 +620,7 @@ class TestCmdline(object):
|
||||
|
||||
# Try nonexistent stream
|
||||
self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01")
|
||||
self.contain("No stream at path")
|
||||
self.contain("no stream matched path")
|
||||
|
||||
# empty or backward ranges return errors
|
||||
self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01")
|
||||
@@ -652,9 +648,14 @@ class TestCmdline(object):
|
||||
"--start '23 Mar 2022 20:00:30' " +
|
||||
"--end '23 Mar 2022 20:00:31'")
|
||||
self.match("0\n")
|
||||
self.ok("remove -c /newton/prep /newton/pre* " +
|
||||
"--start '23 Mar 2022 20:00:30' " +
|
||||
"--end '23 Mar 2022 20:00:31'")
|
||||
self.match("Removing from /newton/prep\n0\n" +
|
||||
"Removing from /newton/prep\n0\n")
|
||||
|
||||
# Make sure we have the data we expect
|
||||
self.ok("list --detail /newton/prep")
|
||||
self.ok("list -l --detail /newton/prep")
|
||||
self.match("/newton/prep float32_8\n" +
|
||||
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
|
||||
@@ -689,7 +690,7 @@ class TestCmdline(object):
|
||||
self.match("24000\n")
|
||||
|
||||
# See the missing chunks in list output
|
||||
self.ok("list --detail /newton/prep")
|
||||
self.ok("list --layout --detail /newton/prep")
|
||||
self.match("/newton/prep float32_8\n" +
|
||||
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n"
|
||||
@@ -703,7 +704,7 @@ class TestCmdline(object):
|
||||
# Remove all data, verify it's missing
|
||||
self.ok("remove /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
self.match("") # no count requested this time
|
||||
self.ok("list --detail /newton/prep")
|
||||
self.ok("list -l --detail /newton/prep")
|
||||
self.match("/newton/prep float32_8\n" +
|
||||
" (no intervals)\n")
|
||||
|
||||
@@ -721,16 +722,16 @@ class TestCmdline(object):
|
||||
self.contain("too few arguments")
|
||||
|
||||
self.fail("destroy /no/such/stream")
|
||||
self.contain("No stream at path")
|
||||
self.contain("no stream matched path")
|
||||
|
||||
self.fail("destroy -R /no/such/stream")
|
||||
self.contain("No stream at path")
|
||||
self.contain("no stream matched path")
|
||||
|
||||
self.fail("destroy asdfasdf")
|
||||
self.contain("No stream at path")
|
||||
self.contain("no stream matched path")
|
||||
|
||||
# From previous tests, we have:
|
||||
self.ok("list")
|
||||
self.ok("list -l")
|
||||
self.match("/newton/prep float32_8\n"
|
||||
"/newton/raw uint16_6\n"
|
||||
"/newton/zzz/rawnotch uint16_9\n")
|
||||
@@ -746,13 +747,13 @@ class TestCmdline(object):
|
||||
lines_(self.captured, 7)
|
||||
|
||||
# Destroy for real
|
||||
self.ok("destroy -R /newton/prep")
|
||||
self.ok("list")
|
||||
self.ok("destroy -R /n*/prep")
|
||||
self.ok("list -l")
|
||||
self.match("/newton/raw uint16_6\n"
|
||||
"/newton/zzz/rawnotch uint16_9\n")
|
||||
|
||||
self.ok("destroy /newton/zzz/rawnotch")
|
||||
self.ok("list")
|
||||
self.ok("list -l")
|
||||
self.match("/newton/raw uint16_6\n")
|
||||
|
||||
self.ok("destroy /newton/raw")
|
||||
@@ -771,18 +772,17 @@ class TestCmdline(object):
|
||||
self.ok("list")
|
||||
self.contain(path)
|
||||
# Make sure it was created empty
|
||||
self.ok("list --detail --path " + path)
|
||||
self.ok("list --detail " + path)
|
||||
self.contain("(no intervals)")
|
||||
|
||||
def test_12_unicode(self):
|
||||
# Unicode paths.
|
||||
self.ok("destroy /newton/asdf/qwer")
|
||||
self.ok("destroy /newton/prep")
|
||||
self.ok("destroy /newton/raw")
|
||||
self.ok("destroy /newton/prep /newton/raw")
|
||||
self.ok("destroy /newton/zzz")
|
||||
|
||||
self.ok(u"create /düsseldorf/raw uint16_6")
|
||||
self.ok("list --detail")
|
||||
self.ok("list -l --detail")
|
||||
self.contain(u"/düsseldorf/raw uint16_6")
|
||||
self.contain("(no intervals)")
|
||||
|
||||
@@ -868,7 +868,7 @@ class TestCmdline(object):
|
||||
du_before = nilmdb.utils.diskusage.du(testdb)
|
||||
|
||||
# Make sure we have the data we expect
|
||||
self.ok("list --detail")
|
||||
self.ok("list -l --detail")
|
||||
self.match("/newton/prep float32_8\n" +
|
||||
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
|
||||
@@ -904,7 +904,7 @@ class TestCmdline(object):
|
||||
self.match("3600\n")
|
||||
|
||||
# See the missing chunks in list output
|
||||
self.ok("list --detail")
|
||||
self.ok("list -l --detail")
|
||||
self.match("/newton/prep float32_8\n" +
|
||||
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n"
|
||||
@@ -1028,7 +1028,7 @@ class TestCmdline(object):
|
||||
else:
|
||||
raise AssertionError("data not found at " + seek)
|
||||
# Verify "list" output
|
||||
self.ok("list")
|
||||
self.ok("list -l")
|
||||
self.match("/" + "/".join(components) + " float32_8\n")
|
||||
|
||||
# Lots of renames
|
||||
@@ -1038,10 +1038,12 @@ class TestCmdline(object):
|
||||
self.contain("old and new paths are the same")
|
||||
check_path("newton", "prep")
|
||||
self.fail("rename /newton/prep /newton")
|
||||
self.contain("subdirs of this path already exist")
|
||||
self.contain("path must contain at least one folder")
|
||||
self.fail("rename /newton/prep /newton/prep/")
|
||||
self.contain("invalid path")
|
||||
self.ok("rename /newton/prep /newton/foo")
|
||||
self.ok("rename /newton/prep /newton/foo/1")
|
||||
check_path("newton", "foo", "1")
|
||||
self.ok("rename /newton/foo/1 /newton/foo")
|
||||
check_path("newton", "foo")
|
||||
self.ok("rename /newton/foo /totally/different/thing")
|
||||
check_path("totally", "different", "thing")
|
||||
|
@@ -90,13 +90,16 @@ class Test00Nilmdb(object): # named 00 so it runs first
|
||||
eq_(db.stream_get_metadata("/newton/prep"), meta1)
|
||||
eq_(db.stream_get_metadata("/newton/raw"), meta1)
|
||||
|
||||
# fill in some test coverage for start >= end
|
||||
# fill in some misc. test coverage
|
||||
with assert_raises(nilmdb.server.NilmDBError):
|
||||
db.stream_remove("/newton/prep", 0, 0)
|
||||
with assert_raises(nilmdb.server.NilmDBError):
|
||||
db.stream_remove("/newton/prep", 1, 0)
|
||||
db.stream_remove("/newton/prep", 0, 1)
|
||||
|
||||
with assert_raises(nilmdb.server.NilmDBError):
|
||||
db.stream_extract("/newton/prep", count = True, binary = True)
|
||||
|
||||
db.close()
|
||||
|
||||
class TestBlockingServer(object):
|
||||
|
342
tests/test_numpyclient.py
Normal file
342
tests/test_numpyclient.py
Normal file
@@ -0,0 +1,342 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import nilmdb.server
|
||||
import nilmdb.client
|
||||
import nilmdb.client.numpyclient
|
||||
|
||||
from nilmdb.utils.printf import *
|
||||
from nilmdb.utils import timestamper
|
||||
from nilmdb.client import ClientError, ServerError
|
||||
from nilmdb.utils import datetime_tz
|
||||
|
||||
from nose.plugins.skip import SkipTest
|
||||
from nose.tools import *
|
||||
from nose.tools import assert_raises
|
||||
import itertools
|
||||
import distutils.version
|
||||
|
||||
from testutil.helpers import *
|
||||
|
||||
import numpy as np
|
||||
|
||||
testdb = "tests/numpyclient-testdb"
|
||||
testurl = "http://localhost:32180/"
|
||||
|
||||
def setup_module():
|
||||
global test_server, test_db
|
||||
# Clear out DB
|
||||
recursive_unlink(testdb)
|
||||
|
||||
# Start web app on a custom port
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
|
||||
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
||||
port = 32180, stoppable = False,
|
||||
fast_shutdown = True,
|
||||
force_traceback = True)
|
||||
test_server.start(blocking = False)
|
||||
|
||||
def teardown_module():
|
||||
global test_server, test_db
|
||||
# Close web app
|
||||
test_server.stop()
|
||||
test_db.close()
|
||||
|
||||
class TestNumpyClient(object):
|
||||
|
||||
def test_numpyclient_01_basic(self):
|
||||
# Test basic connection
|
||||
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
|
||||
version = client.version()
|
||||
eq_(distutils.version.LooseVersion(version),
|
||||
distutils.version.LooseVersion(test_server.version))
|
||||
|
||||
# Verify subclassing
|
||||
assert(isinstance(client, nilmdb.client.Client))
|
||||
|
||||
# Layouts
|
||||
for layout in "int8_t", "something_8", "integer_1":
|
||||
with assert_raises(ValueError):
|
||||
for x in client.stream_extract_numpy("/foo", layout=layout):
|
||||
pass
|
||||
for layout in "int8_1", "uint8_30", "int16_20", "float64_100":
|
||||
with assert_raises(ClientError) as e:
|
||||
for x in client.stream_extract_numpy("/foo", layout=layout):
|
||||
pass
|
||||
in_("No such stream", str(e.exception))
|
||||
|
||||
with assert_raises(ClientError) as e:
|
||||
for x in client.stream_extract_numpy("/foo"):
|
||||
pass
|
||||
in_("can't get layout for path", str(e.exception))
|
||||
|
||||
client.close()
|
||||
|
||||
def test_numpyclient_02_extract(self):
|
||||
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
|
||||
|
||||
# Insert some data as text
|
||||
client.stream_create("/newton/prep", "float32_8")
|
||||
testfile = "tests/data/prep-20120323T1000"
|
||||
start = nilmdb.utils.time.parse_time("20120323T1000")
|
||||
rate = 120
|
||||
data = timestamper.TimestamperRate(testfile, start, rate)
|
||||
result = client.stream_insert("/newton/prep", data,
|
||||
start, start + 119999777)
|
||||
|
||||
# Extract Numpy arrays
|
||||
array = None
|
||||
pieces = 0
|
||||
for chunk in client.stream_extract_numpy("/newton/prep", maxrows=1000):
|
||||
pieces += 1
|
||||
if array is not None:
|
||||
array = np.vstack((array, chunk))
|
||||
else:
|
||||
array = chunk
|
||||
eq_(array.shape, (14400, 9))
|
||||
eq_(pieces, 15)
|
||||
|
||||
# Try structured
|
||||
s = list(client.stream_extract_numpy("/newton/prep", structured = True))
|
||||
assert(np.array_equal(np.c_[s[0]['timestamp'], s[0]['data']], array))
|
||||
|
||||
# Compare. Will be close but not exact because the conversion
|
||||
# to and from ASCII was lossy.
|
||||
data = timestamper.TimestamperRate(testfile, start, rate)
|
||||
actual = np.fromstring(" ".join(data), sep=' ').reshape(14400, 9)
|
||||
assert(np.allclose(array, actual))
|
||||
|
||||
client.close()
|
||||
|
||||
def test_numpyclient_03_insert(self):
|
||||
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
|
||||
|
||||
# Limit _max_data just to get better coverage
|
||||
old_max_data = nilmdb.client.numpyclient.StreamInserterNumpy._max_data
|
||||
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = 100000
|
||||
|
||||
client.stream_create("/test/1", "uint16_1")
|
||||
client.stream_insert_numpy("/test/1",
|
||||
np.array([[0, 1],
|
||||
[1, 2],
|
||||
[2, 3],
|
||||
[3, 4]]))
|
||||
|
||||
# Wrong number of dimensions
|
||||
with assert_raises(ValueError) as e:
|
||||
client.stream_insert_numpy("/test/1",
|
||||
np.array([[[0, 1],
|
||||
[1, 2]],
|
||||
[[3, 4],
|
||||
[4, 5]]]))
|
||||
in_("wrong number of dimensions", str(e.exception))
|
||||
|
||||
# Wrong number of fields
|
||||
with assert_raises(ValueError) as e:
|
||||
client.stream_insert_numpy("/test/1",
|
||||
np.array([[0, 1, 2],
|
||||
[1, 2, 3],
|
||||
[3, 4, 5],
|
||||
[4, 5, 6]]))
|
||||
in_("wrong number of fields", str(e.exception))
|
||||
|
||||
# Unstructured
|
||||
client.stream_create("/test/2", "float32_8")
|
||||
client.stream_insert_numpy(
|
||||
"/test/2",
|
||||
client.stream_extract_numpy(
|
||||
"/newton/prep", structured = False, maxrows = 1000))
|
||||
|
||||
# Structured, and specifying layout
|
||||
client.stream_create("/test/3", "float32_8")
|
||||
client.stream_insert_numpy(
|
||||
path = "/test/3", layout = "float32_8",
|
||||
data = client.stream_extract_numpy(
|
||||
"/newton/prep", structured = True, maxrows = 1000))
|
||||
|
||||
# Structured, specifying wrong layout
|
||||
client.stream_create("/test/4", "float32_8")
|
||||
with assert_raises(ValueError) as e:
|
||||
client.stream_insert_numpy(
|
||||
"/test/4", layout = "uint16_1",
|
||||
data = client.stream_extract_numpy(
|
||||
"/newton/prep", structured = True, maxrows = 1000))
|
||||
in_("wrong dtype", str(e.exception))
|
||||
|
||||
# Unstructured, and specifying wrong layout
|
||||
client.stream_create("/test/5", "float32_8")
|
||||
with assert_raises(ClientError) as e:
|
||||
client.stream_insert_numpy(
|
||||
"/test/5", layout = "uint16_8",
|
||||
data = client.stream_extract_numpy(
|
||||
"/newton/prep", structured = False, maxrows = 1000))
|
||||
# timestamps will be screwy here, because data will be parsed wrong
|
||||
in_("error parsing input data", str(e.exception))
|
||||
|
||||
# Make sure the /newton/prep copies are identical
|
||||
a = np.vstack(client.stream_extract_numpy("/newton/prep"))
|
||||
b = np.vstack(client.stream_extract_numpy("/test/2"))
|
||||
c = np.vstack(client.stream_extract_numpy("/test/3"))
|
||||
assert(np.array_equal(a,b))
|
||||
assert(np.array_equal(a,c))
|
||||
|
||||
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
|
||||
client.close()
|
||||
|
||||
def test_numpyclient_04_context(self):
|
||||
# Like test_client_context, but with Numpy data
|
||||
client = nilmdb.client.numpyclient.NumpyClient(testurl)
|
||||
|
||||
client.stream_create("/context/test", "uint16_1")
|
||||
with client.stream_insert_numpy_context("/context/test") as ctx:
|
||||
# override _max_rows to trigger frequent server updates
|
||||
ctx._max_rows = 2
|
||||
ctx.insert([[1000, 1]])
|
||||
ctx.insert([[1010, 1], [1020, 1], [1030, 1]])
|
||||
ctx.insert([[1040, 1], [1050, 1]])
|
||||
ctx.finalize()
|
||||
ctx.insert([[1070, 1]])
|
||||
ctx.update_end(1080)
|
||||
ctx.finalize()
|
||||
ctx.update_start(1090)
|
||||
ctx.insert([[1100, 1]])
|
||||
ctx.insert([[1110, 1]])
|
||||
ctx.send()
|
||||
ctx.insert([[1120, 1], [1130, 1], [1140, 1]])
|
||||
ctx.update_end(1160)
|
||||
ctx.insert([[1150, 1]])
|
||||
ctx.update_end(1170)
|
||||
ctx.insert([[1160, 1]])
|
||||
ctx.update_end(1180)
|
||||
ctx.insert([[1170, 123456789.0]])
|
||||
ctx.finalize()
|
||||
ctx.insert(np.zeros((0,2)))
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_numpy_context("/context/test",
|
||||
1000, 2000) as ctx:
|
||||
ctx.insert([[1180, 1]])
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_numpy_context("/context/test",
|
||||
2000, 3000) as ctx:
|
||||
ctx._max_rows = 2
|
||||
ctx.insert([[3180, 1]])
|
||||
ctx.insert([[3181, 1]])
|
||||
|
||||
with client.stream_insert_numpy_context("/context/test",
|
||||
2000, 3000) as ctx:
|
||||
# make sure our override wasn't permanent
|
||||
ne_(ctx._max_rows, 2)
|
||||
ctx.insert([[2250, 1]])
|
||||
ctx.finalize()
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_numpy_context("/context/test",
|
||||
3000, 4000) as ctx:
|
||||
ctx.insert([[3010, 1]])
|
||||
ctx.insert([[3020, 2]])
|
||||
ctx.insert([[3030, 3]])
|
||||
ctx.insert([[3040, 4]])
|
||||
ctx.insert([[3040, 4]]) # non-monotonic after a few lines
|
||||
ctx.finalize()
|
||||
|
||||
eq_(list(client.stream_intervals("/context/test")),
|
||||
[ [ 1000, 1051 ],
|
||||
[ 1070, 1080 ],
|
||||
[ 1090, 1180 ],
|
||||
[ 2000, 3000 ] ])
|
||||
|
||||
client.stream_remove("/context/test")
|
||||
client.stream_destroy("/context/test")
|
||||
client.close()
|
||||
|
||||
def test_numpyclient_05_emptyintervals(self):
|
||||
# Like test_client_emptyintervals, with insert_numpy_context
|
||||
client = nilmdb.client.numpyclient.NumpyClient(testurl)
|
||||
client.stream_create("/empty/test", "uint16_1")
|
||||
def info():
|
||||
result = []
|
||||
for interval in list(client.stream_intervals("/empty/test")):
|
||||
result.append((client.stream_count("/empty/test", *interval),
|
||||
interval))
|
||||
return result
|
||||
eq_(info(), [])
|
||||
|
||||
# Insert a region with just a few points
|
||||
with client.stream_insert_numpy_context("/empty/test") as ctx:
|
||||
ctx.update_start(100)
|
||||
ctx.insert([[140, 1]])
|
||||
ctx.insert([[150, 1]])
|
||||
ctx.insert([[160, 1]])
|
||||
ctx.update_end(200)
|
||||
ctx.finalize()
|
||||
eq_(info(), [(3, [100, 200])])
|
||||
|
||||
# Delete chunk, which will leave one data point and two intervals
|
||||
client.stream_remove("/empty/test", 145, 175)
|
||||
eq_(info(), [(1, [100, 145]),
|
||||
(0, [175, 200])])
|
||||
|
||||
# Try also creating a completely empty interval from scratch,
|
||||
# in a few different ways.
|
||||
client.stream_insert("/empty/test", "", 300, 350)
|
||||
client.stream_insert("/empty/test", [], 400, 450)
|
||||
with client.stream_insert_numpy_context("/empty/test", 500, 550):
|
||||
pass
|
||||
|
||||
# If enough timestamps aren't provided, empty streams won't be created.
|
||||
client.stream_insert("/empty/test", [])
|
||||
with client.stream_insert_numpy_context("/empty/test"):
|
||||
pass
|
||||
client.stream_insert("/empty/test", [], start = 600)
|
||||
with client.stream_insert_numpy_context("/empty/test", start = 700):
|
||||
pass
|
||||
client.stream_insert("/empty/test", [], end = 850)
|
||||
with client.stream_insert_numpy_context("/empty/test", end = 950):
|
||||
pass
|
||||
|
||||
# Try various things that might cause problems
|
||||
with client.stream_insert_numpy_context("/empty/test", 1000, 1050):
|
||||
ctx.finalize() # inserts [1000, 1050]
|
||||
ctx.finalize() # nothing
|
||||
ctx.finalize() # nothing
|
||||
ctx.insert([[1100, 1]])
|
||||
ctx.finalize() # inserts [1100, 1101]
|
||||
ctx.update_start(1199)
|
||||
ctx.insert([[1200, 1]])
|
||||
ctx.update_end(1250)
|
||||
ctx.finalize() # inserts [1199, 1250]
|
||||
ctx.update_start(1299)
|
||||
ctx.finalize() # nothing
|
||||
ctx.update_end(1350)
|
||||
ctx.finalize() # nothing
|
||||
ctx.update_start(1400)
|
||||
ctx.insert(np.zeros((0,2)))
|
||||
ctx.update_end(1450)
|
||||
ctx.finalize()
|
||||
ctx.update_start(1500)
|
||||
ctx.insert(np.zeros((0,2)))
|
||||
ctx.update_end(1550)
|
||||
ctx.finalize()
|
||||
ctx.insert(np.zeros((0,2)))
|
||||
ctx.insert(np.zeros((0,2)))
|
||||
ctx.insert(np.zeros((0,2)))
|
||||
ctx.finalize()
|
||||
|
||||
# Check everything
|
||||
eq_(info(), [(1, [100, 145]),
|
||||
(0, [175, 200]),
|
||||
(0, [300, 350]),
|
||||
(0, [400, 450]),
|
||||
(0, [500, 550]),
|
||||
(0, [1000, 1050]),
|
||||
(1, [1100, 1101]),
|
||||
(1, [1199, 1250]),
|
||||
(0, [1400, 1450]),
|
||||
(0, [1500, 1550]),
|
||||
])
|
||||
|
||||
# Clean up
|
||||
client.stream_remove("/empty/test")
|
||||
client.stream_destroy("/empty/test")
|
||||
client.close()
|
Reference in New Issue
Block a user