Compare commits

..

28 Commits

Author SHA1 Message Date
99a4228285 Set up default SIGPIPE handler
This lets you do something like "nilmtool extract | head" without
triggering backtraces.
2013-04-09 18:25:09 -04:00
230ec72609 Fix timestamp display issues with --annotate 2013-04-09 18:19:32 -04:00
d36ece3767 Fix up dependencies 2013-04-08 18:53:13 -04:00
231963538e Add some info about binary interface to design docs 2013-04-08 18:53:13 -04:00
b4d6aad6de Merge branch 'binary' 2013-04-08 18:52:52 -04:00
e95142eabf Huge update to support inserting in client.numpyclient, with tests
This includes both client.stream_insert_numpy and
client.stream_insert_numpy_context().  The test code is based on
similar test code for client.stream_insert_context, so it should be
fairly complete.
2013-04-08 18:51:45 -04:00
d21c3470bc Client cleanups; fix tests to account for time epsilon = 1 2013-04-08 18:51:45 -04:00
7576883f49 Add basic binary support to client, and restructure a bit 2013-04-08 18:51:45 -04:00
cc211542f8 Add binary support to nilmdb.server; enforce content-type 2013-04-08 18:51:45 -04:00
8292dcf70b Clean up stream/extract content-type and add a test for it 2013-04-08 18:51:45 -04:00
b362fd37f6 Add binary option to nilmdb.stream_insert 2013-04-08 18:51:45 -04:00
41ec13ee17 Rename bulkdata.append_string to bulkdata.append_data 2013-04-08 18:51:45 -04:00
efa9aa9097 Add binary option to bulkdata.append_string 2013-04-08 18:51:45 -04:00
d9afb48f45 Make append_binary signature look like append_string 2013-04-08 18:51:44 -04:00
d1140e0f16 Timestamps are int64, not uint64 2013-04-08 18:51:44 -04:00
6091e44561 Fix fread return value check 2013-04-08 18:51:44 -04:00
e233ba790f Add append_binary to rocket 2013-04-08 18:51:44 -04:00
f0304b4c00 Merge branch 'binary' into HEAD 2013-04-07 18:08:10 -04:00
60594ca58e Numpy is required for tests now, due to nilmdb.client.numpyclient
Still allow installation without it, though.
2013-04-07 18:05:43 -04:00
c7f2df4abc Add nilmdb.client.numpyclient.NumpyClient with stream_extract_numpy
This is a subclass of nilmdb.client.client.Client that adds numpy
specific routines, which should be a lot faster.
2013-04-07 17:43:52 -04:00
5b7409f802 Add binary extract to client, server, nilmdb, bulkdata, and rocket. 2013-04-07 16:06:52 -04:00
06038062a2 Fix error in time parsing 2013-04-06 19:12:17 -04:00
ae9fe89759 Parse timestamps with '@' before any other checks 2013-04-04 14:43:18 -04:00
04def60021 Include stream path in "no such stream" errors 2013-04-02 21:06:49 -04:00
9ce0f69dff Add "--delete" option to "nilmtool metadata" tool
This is the same as "--update" with an empty string as the value.
2013-04-02 16:07:28 -04:00
90c3be91c4 Natural sort for streams in client.stream_list 2013-04-02 14:37:32 -04:00
ebccfb3531 Fix stream renaming when the new path is a parent of the old 2013-04-01 19:25:17 -04:00
e006f1d02e Change default URL to http://localhost/nilmdb/ 2013-04-01 18:04:31 -04:00
19 changed files with 1068 additions and 117 deletions

View File

@@ -10,6 +10,9 @@ Prerequisites:
sudo apt-get install python-cherrypy3 python-decorator python-simplejson sudo apt-get install python-cherrypy3 python-decorator python-simplejson
sudo apt-get install python-requests python-dateutil python-tz python-psutil sudo apt-get install python-requests python-dateutil python-tz python-psutil
# Other dependencies (required by some modules)
sudo apt-get install python-numpy
# Tools for running tests # Tools for running tests
sudo apt-get install python-nose python-coverage sudo apt-get install python-nose python-coverage

View File

@@ -389,3 +389,35 @@ Possible solutions:
are always printed as int64 values, and a new format are always printed as int64 values, and a new format
"@1234567890123456" is added to the parser for specifying them "@1234567890123456" is added to the parser for specifying them
exactly. exactly.
Binary interface
----------------
The ASCII interface is too slow for high-bandwidth processing, like
sinefits, prep, etc. A binary interface was added so that you can
extract the raw binary out of the bulkdata storage. This binary is
a little-endian format, e.g. in C a uint16_6 stream would be:
#include <endian.h>
#include <stdint.h>
struct {
int64_t timestamp_le;
uint16_t data_le[6];
} __attribute__((packed));
Remember to byteswap (with e.g. `letoh` in C)!
This interface is used by the new `nilmdb.client.numpyclient.NumpyClient`
class, which is a subclass of the normal `nilmcb.client.client.Client`
and has all of the same functions. It adds three new functions:
- `stream_extract_numpy` to extract data as a Numpy array
- `stream_insert_numpy` to insert data as a Numpy array
- `stream_insert_numpy_context` is the context manager for
incrementally inserting data
It is significantly faster! It is about 20 times faster to decimate a
stream with `nilm-decimate` when the filter code is using the new
binary/numpy interface.

View File

@@ -6,6 +6,7 @@ import nilmdb.utils
import nilmdb.client.httpclient import nilmdb.client.httpclient
from nilmdb.client.errors import ClientError from nilmdb.client.errors import ClientError
import re
import time import time
import simplejson as json import simplejson as json
import contextlib import contextlib
@@ -65,7 +66,12 @@ class Client(object):
params["layout"] = layout params["layout"] = layout
if extended: if extended:
params["extended"] = 1 params["extended"] = 1
return self.http.get("stream/list", params) def sort_streams_nicely(x):
"""Human-friendly sort (/stream/2 before /stream/10)"""
num = lambda t: int(t) if t.isdigit() else t
key = lambda k: [ num(c) for c in re.split('([0-9]+)', k[0]) ]
return sorted(x, key = key)
return sort_streams_nicely(self.http.get("stream/list", params))
def stream_get_metadata(self, path, keys = None): def stream_get_metadata(self, path, keys = None):
params = { "path": path } params = { "path": path }
@@ -121,10 +127,11 @@ class Client(object):
@contextlib.contextmanager @contextlib.contextmanager
def stream_insert_context(self, path, start = None, end = None): def stream_insert_context(self, path, start = None, end = None):
"""Return a context manager that allows data to be efficiently """Return a context manager that allows data to be efficiently
inserted into a stream in a piecewise manner. Data is be provided inserted into a stream in a piecewise manner. Data is
as single lines, and is aggregated and sent to the server in larger provided as ASCII lines, and is aggregated and sent to the
chunks as necessary. Data lines must match the database layout for server in larger or smaller chunks as necessary. Data lines
the given path, and end with a newline. must match the database layout for the given path, and end
with a newline.
Example: Example:
with client.stream_insert_context('/path', start, end) as ctx: with client.stream_insert_context('/path', start, end) as ctx:
@@ -136,15 +143,16 @@ class Client(object):
This may make multiple requests to the server, if the data is This may make multiple requests to the server, if the data is
large enough or enough time has passed between insertions. large enough or enough time has passed between insertions.
""" """
ctx = StreamInserter(self.http, path, start, end) ctx = StreamInserter(self, path, start, end)
yield ctx yield ctx
ctx.finalize() ctx.finalize()
def stream_insert(self, path, data, start = None, end = None): def stream_insert(self, path, data, start = None, end = None):
"""Insert rows of data into a stream. data should be a string """Insert rows of data into a stream. data should be a string
or iterable that provides ASCII data that matches the database or iterable that provides ASCII data that matches the database
layout for path. See stream_insert_context for details on the layout for path. Data is passed through stream_insert_context,
'start' and 'end' parameters.""" so it will be broken into reasonably-sized chunks and
start/end will be deduced if missing."""
with self.stream_insert_context(path, start, end) as ctx: with self.stream_insert_context(path, start, end) as ctx:
if isinstance(data, basestring): if isinstance(data, basestring):
ctx.insert(data) ctx.insert(data)
@@ -153,11 +161,28 @@ class Client(object):
ctx.insert(chunk) ctx.insert(chunk)
return ctx.last_response return ctx.last_response
def stream_insert_block(self, path, data, start, end, binary = False):
"""Insert a single fixed block of data into the stream. It is
sent directly to the server in one block with no further
processing.
If 'binary' is True, provide raw binary data in little-endian
format matching the path layout, including an int64 timestamp.
Otherwise, provide ASCII data matching the layout."""
params = {
"path": path,
"start": timestamp_to_string(start),
"end": timestamp_to_string(end),
}
if binary:
params["binary"] = 1
return self.http.put("stream/insert", data, params, binary = binary)
def stream_intervals(self, path, start = None, end = None, diffpath = None): def stream_intervals(self, path, start = None, end = None, diffpath = None):
""" """
Return a generator that yields each stream interval. Return a generator that yields each stream interval.
If diffpath is not None, yields only interval ranges that are If 'diffpath' is not None, yields only interval ranges that are
present in 'path' but not in 'diffpath'. present in 'path' but not in 'diffpath'.
""" """
params = { params = {
@@ -172,17 +197,22 @@ class Client(object):
return self.http.get_gen("stream/intervals", params) return self.http.get_gen("stream/intervals", params)
def stream_extract(self, path, start = None, end = None, def stream_extract(self, path, start = None, end = None,
count = False, markup = False): count = False, markup = False, binary = False):
""" """
Extract data from a stream. Returns a generator that yields Extract data from a stream. Returns a generator that yields
lines of ASCII-formatted data that matches the database lines of ASCII-formatted data that matches the database
layout for the given path. layout for the given path.
Specify count = True to return a count of matching data points If 'count' is True, return a count of matching data points
rather than the actual data. The output format is unchanged. rather than the actual data. The output format is unchanged.
Specify markup = True to include comments in the returned data If 'markup' is True, include comments in the returned data
that indicate interval starts and ends. that indicate interval starts and ends.
If 'binary' is True, return chunks of raw binary data, rather
than lines of ASCII-formatted data. Raw binary data is
little-endian and matches the database types (including an
int64 timestamp).
""" """
params = { params = {
"path": path, "path": path,
@@ -195,7 +225,9 @@ class Client(object):
params["count"] = 1 params["count"] = 1
if markup: if markup:
params["markup"] = 1 params["markup"] = 1
return self.http.get_gen("stream/extract", params) if binary:
params["binary"] = 1
return self.http.get_gen("stream/extract", params, binary = binary)
def stream_count(self, path, start = None, end = None): def stream_count(self, path, start = None, end = None):
""" """
@@ -244,13 +276,13 @@ class StreamInserter(object):
_max_data = 2 * 1024 * 1024 _max_data = 2 * 1024 * 1024
_max_data_after_send = 64 * 1024 _max_data_after_send = 64 * 1024
def __init__(self, http, path, start = None, end = None): def __init__(self, client, path, start, end):
"""'http' is the httpclient object. 'path' is the database """'client' is the client object. 'path' is the database
path to insert to. 'start' and 'end' are used for the first path to insert to. 'start' and 'end' are used for the first
contiguous interval.""" contiguous interval and may be None."""
self.last_response = None self.last_response = None
self._http = http self._client = client
self._path = path self._path = path
# Start and end for the overall contiguous interval we're # Start and end for the overall contiguous interval we're
@@ -418,9 +450,7 @@ class StreamInserter(object):
raise ClientError("have data to send, but no start/end times") raise ClientError("have data to send, but no start/end times")
# Send it # Send it
params = { "path": self._path, self.last_response = self._client.stream_insert_block(
"start": timestamp_to_string(start_ts), self._path, block, start_ts, end_ts, binary = False)
"end": timestamp_to_string(end_ts) }
self.last_response = self._http.put("stream/insert", block, params)
return return

View File

@@ -105,12 +105,17 @@ class HTTPClient(object):
else: else:
return self._req("POST", url, None, params) return self._req("POST", url, None, params)
def put(self, url, data, params = None): def put(self, url, data, params = None, binary = False):
"""Simple PUT (parameters in URL, data in body)""" """Simple PUT (parameters in URL, data in body)"""
return self._req("PUT", url, params, data) if binary:
h = { 'Content-type': 'application/octet-stream' }
else:
h = { 'Content-type': 'text/plain; charset=utf-8' }
return self._req("PUT", url, query = params, body = data, headers = h)
# Generator versions that return data one line at a time. # Generator versions that return data one line at a time.
def _req_gen(self, method, url, query = None, body = None, headers = None): def _req_gen(self, method, url, query = None, body = None,
headers = None, binary = False):
""" """
Make a request and return a generator that gives back strings Make a request and return a generator that gives back strings
or JSON decoded lines of the body data, or raise an error if or JSON decoded lines of the body data, or raise an error if
@@ -118,16 +123,19 @@ class HTTPClient(object):
""" """
(response, isjson) = self._do_req(method, url, query, body, (response, isjson) = self._do_req(method, url, query, body,
stream = True, headers = headers) stream = True, headers = headers)
if isjson: if binary:
for chunk in response.iter_content(chunk_size = 65536):
yield chunk
elif isjson:
for line in response.iter_lines(): for line in response.iter_lines():
yield json.loads(line) yield json.loads(line)
else: else:
for line in response.iter_lines(): for line in response.iter_lines():
yield line yield line
def get_gen(self, url, params = None): def get_gen(self, url, params = None, binary = False):
"""Simple GET (parameters in URL) returning a generator""" """Simple GET (parameters in URL) returning a generator"""
return self._req_gen("GET", url, params) return self._req_gen("GET", url, params, binary = binary)
# Not much use for a POST or PUT generator, since they don't # Not much use for a POST or PUT generator, since they don't
# return much data. # return much data.

View File

@@ -0,0 +1,259 @@
# -*- coding: utf-8 -*-
"""Provide a NumpyClient class that is based on normal Client, but has
additional methods for extracting and inserting data via Numpy arrays."""
import nilmdb.utils
import nilmdb.client.client
import nilmdb.client.httpclient
from nilmdb.client.errors import ClientError
import contextlib
from nilmdb.utils.time import timestamp_to_string, string_to_timestamp
import numpy
import cStringIO
def layout_to_dtype(layout):
ltype = layout.split('_')[0]
lcount = int(layout.split('_')[1])
if ltype.startswith('int'):
atype = '<i' + str(int(ltype[3:]) / 8)
elif ltype.startswith('uint'):
atype = '<u' + str(int(ltype[4:]) / 8)
elif ltype.startswith('float'):
atype = '<f' + str(int(ltype[5:]) / 8)
else:
raise ValueError("bad layout")
return numpy.dtype([('timestamp', '<i8'), ('data', atype, lcount)])
class NumpyClient(nilmdb.client.client.Client):
"""Subclass of nilmdb.client.Client that adds additional methods for
extracting and inserting data via Numpy arrays."""
def _get_dtype(self, path, layout):
if layout is None:
streams = self.stream_list(path)
if len(streams) != 1:
raise ClientError("can't get layout for path: " + path)
layout = streams[0][1]
return layout_to_dtype(layout)
def stream_extract_numpy(self, path, start = None, end = None,
layout = None, maxrows = 100000,
structured = False):
"""
Extract data from a stream. Returns a generator that yields
Numpy arrays of up to 'maxrows' of data each.
If 'layout' is None, it is read using stream_info.
If 'structured' is False, all data is converted to float64
and returned in a flat 2D array. Otherwise, data is returned
as a structured dtype in a 1D array.
"""
dtype = self._get_dtype(path, layout)
def to_numpy(data):
a = numpy.fromstring(data, dtype)
if structured:
return a
return numpy.c_[a['timestamp'], a['data']]
chunks = []
total_len = 0
maxsize = dtype.itemsize * maxrows
for data in self.stream_extract(path, start, end, binary = True):
# Add this block of binary data
chunks.append(data)
total_len += len(data)
# See if we have enough to make the requested Numpy array
while total_len >= maxsize:
assembled = "".join(chunks)
total_len -= maxsize
chunks = [ assembled[maxsize:] ]
block = assembled[:maxsize]
yield to_numpy(block)
if total_len:
yield to_numpy("".join(chunks))
@contextlib.contextmanager
def stream_insert_numpy_context(self, path, start = None, end = None,
layout = None):
"""Return a context manager that allows data to be efficiently
inserted into a stream in a piecewise manner. Data is
provided as Numpy arrays, and is aggregated and sent to the
server in larger or smaller chunks as necessary. Data format
must match the database layout for the given path.
For more details, see help for
nilmdb.client.numpyclient.StreamInserterNumpy
If 'layout' is not None, use it as the layout rather than
querying the database.
"""
dtype = self._get_dtype(path, layout)
ctx = StreamInserterNumpy(self, path, start, end, dtype)
yield ctx
ctx.finalize()
def stream_insert_numpy(self, path, data, start = None, end = None,
layout = None):
"""Insert data into a stream. data should be a Numpy array
which will be passed through stream_insert_numpy_context to
break it into chunks etc. See the help for that function
for details."""
with self.stream_insert_numpy_context(path, start, end, layout) as ctx:
if isinstance(data, numpy.ndarray):
ctx.insert(data)
else:
for chunk in data:
ctx.insert(chunk)
return ctx.last_response
class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
"""Object returned by stream_insert_numpy_context() that manages
the insertion of rows of data into a particular path.
See help for nilmdb.client.client.StreamInserter for details.
The only difference is that, instead of ASCII formatted data,
this context manager can take Numpy arrays, which are either
structured (1D with complex dtype) or flat (2D with simple dtype).
"""
# Soft limit of how many bytes to send per HTTP request.
_max_data = 2 * 1024 * 1024
def __init__(self, client, path, start, end, dtype):
"""
'client' is the client object. 'path' is the database path
to insert to. 'start' and 'end' are used for the first
contiguous interval and may be None. 'dtype' is the Numpy
dtype for this stream.
"""
self.last_response = None
self._dtype = dtype
self._client = client
self._path = path
# Start and end for the overall contiguous interval we're
# filling
self._interval_start = start
self._interval_end = end
# Max rows to send at once
self._max_rows = self._max_data // self._dtype.itemsize
# List of the current arrays we're building up to send
self._block_arrays = []
self._block_rows = 0
def insert(self, array):
"""Insert Numpy data, which must match the layout type."""
if type(array) != numpy.ndarray:
array = numpy.array(array)
if array.ndim == 1:
# Already a structured array; just verify the type
if array.dtype != self._dtype:
raise ValueError("wrong dtype for 1D (structured) array")
elif array.ndim == 2:
# Convert to structured array
sarray = numpy.zeros(array.shape[0], dtype=self._dtype)
sarray['timestamp'] = array[:,0]
# Need the squeeze in case sarray['data'] is 1 dimensional
sarray['data'] = numpy.squeeze(array[:,1:])
array = sarray
else:
raise ValueError("wrong number of dimensions in array")
length = len(array)
maxrows = self._max_rows
if length == 0:
return
if length > maxrows:
# This is more than twice what we wanted to send, so split
# it up. This is a bit inefficient, but the user really
# shouldn't be providing this much data at once.
for cut in range(0, length, maxrows):
self.insert(array[cut:(cut + maxrows)])
return
# Add this array to our list
self._block_arrays.append(array)
self._block_rows += length
# Send if it's too long
if self._block_rows >= maxrows:
self._send_block(final = False)
def _send_block(self, final = False):
"""Send the data current stored up. One row might be left
over if we need its timestamp saved."""
# Build the full array to send
if self._block_rows == 0:
array = numpy.zeros(0, dtype = self._dtype)
else:
array = numpy.hstack(self._block_arrays)
# Get starting timestamp
start_ts = self._interval_start
if start_ts is None:
# Pull start from the first row
try:
start_ts = array['timestamp'][0]
except IndexError:
pass # no timestamp is OK, if we have no data
# Get ending timestamp
if final:
# For a final block, the timestamp is either the
# user-provided end, or the timestamp of the last line
# plus epsilon.
end_ts = self._interval_end
if end_ts is None:
try:
end_ts = array['timestamp'][-1]
end_ts += nilmdb.utils.time.epsilon
except IndexError:
pass # no timestamp is OK, if we have no data
self._block_arrays = []
self._block_rows = 0
# Next block is completely fresh
self._interval_start = None
self._interval_end = None
else:
# An intermediate block. We need to save the last row
# for the next block, and use its timestamp as the ending
# timestamp for this one.
if len(array) < 2:
# Not enough data to send an intermediate block
return
end_ts = array['timestamp'][-1]
if self._interval_end is not None and end_ts > self._interval_end:
# User gave us bad endpoints; send it anyway, and let
# the server complain so that the error is the same
# as if we hadn't done this chunking.
end_ts = self._interval_end
self._block_arrays = [ array[-1:] ]
self._block_rows = 1
array = array[:-1]
# Next block continues where this one ended
self._interval_start = end_ts
# If we have no endpoints, it's because we had no data to send.
if start_ts is None or end_ts is None:
return
# Send it
data = array.tostring()
self.last_response = self._client.stream_insert_block(
self._path, data, start_ts, end_ts, binary = True)
return

View File

@@ -10,6 +10,7 @@ import sys
import os import os
import argparse import argparse
from argparse import ArgumentDefaultsHelpFormatter as def_form from argparse import ArgumentDefaultsHelpFormatter as def_form
import signal
try: # pragma: no cover try: # pragma: no cover
import argcomplete import argcomplete
@@ -81,7 +82,7 @@ class Cmdline(object):
def __init__(self, argv = None): def __init__(self, argv = None):
self.argv = argv or sys.argv[1:] self.argv = argv or sys.argv[1:]
self.client = None self.client = None
self.def_url = os.environ.get("NILMDB_URL", "http://localhost:12380") self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
self.subcmd = {} self.subcmd = {}
self.complete = Complete() self.complete = Complete()
@@ -126,6 +127,13 @@ class Cmdline(object):
sys.exit(-1) sys.exit(-1)
def run(self): def run(self):
# Set SIGPIPE to its default handler -- we don't need Python
# to catch it for us.
try:
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
except ValueError: # pragma: no cover
pass
# Clear cached timezone, so that we can pick up timezone changes # Clear cached timezone, so that we can pick up timezone changes
# while running this from the test suite. # while running this from the test suite.
datetime_tz._localtz = None datetime_tz._localtz = None

View File

@@ -9,7 +9,8 @@ def setup(self, sub):
a stream. a stream.
""", """,
usage="%(prog)s path [-g [key ...] | " usage="%(prog)s path [-g [key ...] | "
"-s key=value [...] | -u key=value [...]]") "-s key=value [...] | -u key=value [...]] | "
"-d [key ...]")
cmd.set_defaults(handler = cmd_metadata) cmd.set_defaults(handler = cmd_metadata)
group = cmd.add_argument_group("Required arguments") group = cmd.add_argument_group("Required arguments")
@@ -30,6 +31,9 @@ def setup(self, sub):
help="Update metadata using provided " help="Update metadata using provided "
"key=value pairs", "key=value pairs",
).completer = self.complete.meta_keyval ).completer = self.complete.meta_keyval
exc.add_argument("-d", "--delete", nargs="*", metavar="key",
help="Delete metadata for specified keys (default all)",
).completer = self.complete.meta_key
return cmd return cmd
def cmd_metadata(self): def cmd_metadata(self):
@@ -56,6 +60,16 @@ def cmd_metadata(self):
handler(self.args.path, data) handler(self.args.path, data)
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("error setting/updating metadata: %s", str(e)) self.die("error setting/updating metadata: %s", str(e))
elif self.args.delete is not None:
# Delete (by setting values to empty strings)
keys = self.args.delete or None
try:
data = self.client.stream_get_metadata(self.args.path, keys)
for key in data:
data[key] = ""
self.client.stream_update_metadata(self.args.path, data)
except nilmdb.client.ClientError as e:
self.die("error deleting metadata: %s", str(e))
else: else:
# Get (or unspecified) # Get (or unspecified)
keys = self.args.get or None keys = self.args.get or None
@@ -64,7 +78,7 @@ def cmd_metadata(self):
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("error getting metadata: %s", str(e)) self.die("error getting metadata: %s", str(e))
for key, value in sorted(data.items()): for key, value in sorted(data.items()):
# Omit nonexistant keys # Print nonexistant keys as having empty value
if value is None: if value is None:
value = "" value = ""
printf("%s=%s\n", key, value) printf("%s=%s\n", key, value)

View File

@@ -79,7 +79,12 @@ class BulkData(object):
if Table.exists(ospath): if Table.exists(ospath):
raise ValueError("stream already exists at this path") raise ValueError("stream already exists at this path")
if os.path.isdir(ospath): if os.path.isdir(ospath):
raise ValueError("subdirs of this path already exist") # Look for any files in subdirectories. Fully empty subdirectories
# are OK; they might be there during a rename
for (root, dirs, files) in os.walk(ospath):
if len(files):
raise ValueError(
"non-empty subdirs of this path already exist")
def _create_parents(self, unicodepath): def _create_parents(self, unicodepath):
"""Verify the path name, and create parent directories if they """Verify the path name, and create parent directories if they
@@ -188,7 +193,6 @@ class BulkData(object):
# Basic checks # Basic checks
if oldospath == newospath: if oldospath == newospath:
raise ValueError("old and new paths are the same") raise ValueError("old and new paths are the same")
self._create_check_ospath(newospath)
# Move the table to a temporary location # Move the table to a temporary location
tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root) tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root)
@@ -196,6 +200,9 @@ class BulkData(object):
os.rename(oldospath, tmppath) os.rename(oldospath, tmppath)
try: try:
# Check destination path
self._create_check_ospath(newospath)
# Create parent dirs for new location # Create parent dirs for new location
self._create_parents(newunicodepath) self._create_parents(newunicodepath)
@@ -406,12 +413,16 @@ class Table(object):
return rocket.Rocket(self.layout, return rocket.Rocket(self.layout,
os.path.join(self.root, subdir, filename)) os.path.join(self.root, subdir, filename))
def append_string(self, data, start, end): def append_data(self, data, start, end, binary = False):
"""Parse the formatted string in 'data', according to the """Parse the formatted string in 'data', according to the
current layout, and append it to the table. If any timestamps current layout, and append it to the table. If any timestamps
are non-monotonic, or don't fall between 'start' and 'end', are non-monotonic, or don't fall between 'start' and 'end',
a ValueError is raised. a ValueError is raised.
If 'binary' is True, the data should be in raw binary format
instead: little-endian, matching the current table's layout,
including the int64 timestamp.
If this function succeeds, it returns normally. Otherwise, If this function succeeds, it returns normally. Otherwise,
the table is reverted back to its original state by truncating the table is reverted back to its original state by truncating
or deleting files as necessary.""" or deleting files as necessary."""
@@ -430,17 +441,26 @@ class Table(object):
# Ask the rocket object to parse and append up to "count" # Ask the rocket object to parse and append up to "count"
# rows of data, verifying things along the way. # rows of data, verifying things along the way.
try: try:
if binary:
appender = f.append_binary
else:
appender = f.append_string
(added_rows, data_offset, last_timestamp, linenum (added_rows, data_offset, last_timestamp, linenum
) = f.append_string(count, data, data_offset, linenum, ) = appender(count, data, data_offset, linenum,
start, end, last_timestamp) start, end, last_timestamp)
except rocket.ParseError as e: except rocket.ParseError as e:
(linenum, colnum, errtype, obj) = e.args (linenum, colnum, errtype, obj) = e.args
if binary:
where = "byte %d: " % (linenum)
else:
where = "line %d, column %d: " % (linenum, colnum) where = "line %d, column %d: " % (linenum, colnum)
# Extract out the error line, add column marker # Extract out the error line, add column marker
try: try:
if binary:
raise IndexError
bad = data.splitlines()[linenum-1] bad = data.splitlines()[linenum-1]
badptr = ' ' * (colnum - 1) + '^' bad += '\n' + ' ' * (colnum - 1) + '^'
except IndexError: # pragma: no cover except IndexError:
bad = "" bad = ""
if errtype == rocket.ERR_NON_MONOTONIC: if errtype == rocket.ERR_NON_MONOTONIC:
err = "timestamp is not monotonically increasing" err = "timestamp is not monotonically increasing"
@@ -456,7 +476,7 @@ class Table(object):
else: else:
err = str(obj) err = str(obj)
raise ValueError("error parsing input data: " + raise ValueError("error parsing input data: " +
where + err + "\n" + bad + "\n" + badptr) where + err + "\n" + bad)
tot_rows += added_rows tot_rows += added_rows
except Exception: except Exception:
# Some failure, so try to roll things back by truncating or # Some failure, so try to roll things back by truncating or
@@ -472,7 +492,7 @@ class Table(object):
# Success, so update self.nrows accordingly # Success, so update self.nrows accordingly
self.nrows = tot_rows self.nrows = tot_rows
def get_data(self, start, stop): def get_data(self, start, stop, binary = False):
"""Extract data corresponding to Python range [n:m], """Extract data corresponding to Python range [n:m],
and returns a formatted string""" and returns a formatted string"""
if (start is None or if (start is None or
@@ -490,10 +510,13 @@ class Table(object):
if count > remaining: if count > remaining:
count = remaining count = remaining
f = self.file_open(subdir, filename) f = self.file_open(subdir, filename)
if binary:
ret.append(f.extract_binary(offset, count))
else:
ret.append(f.extract_string(offset, count)) ret.append(f.extract_string(offset, count))
remaining -= count remaining -= count
row += count row += count
return "".join(ret) return b"".join(ret)
def __getitem__(self, row): def __getitem__(self, row):
"""Extract timestamps from a row, with table[n] notation.""" """Extract timestamps from a row, with table[n] notation."""

View File

@@ -475,12 +475,16 @@ class NilmDB(object):
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,)) con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
con.execute("DELETE FROM streams WHERE id=?", (stream_id,)) con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
def stream_insert(self, path, start, end, data): def stream_insert(self, path, start, end, data, binary = False):
"""Insert new data into the database. """Insert new data into the database.
path: Path at which to add the data path: Path at which to add the data
start: Starting timestamp start: Starting timestamp
end: Ending timestamp end: Ending timestamp
data: Textual data, formatted according to the layout of path data: Textual data, formatted according to the layout of path
'binary', if True, means that 'data' is raw binary:
little-endian, matching the current table's layout,
including the int64 timestamp.
""" """
# First check for basic overlap using timestamp info given. # First check for basic overlap using timestamp info given.
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
@@ -494,7 +498,7 @@ class NilmDB(object):
# there are any parse errors. # there are any parse errors.
table = self.data.getnode(path) table = self.data.getnode(path)
row_start = table.nrows row_start = table.nrows
table.append_string(data, start, end) table.append_data(data, start, end, binary)
row_end = table.nrows row_end = table.nrows
# Insert the record into the sql database. # Insert the record into the sql database.
@@ -538,7 +542,7 @@ class NilmDB(object):
dbinterval.db_endpos) dbinterval.db_endpos)
def stream_extract(self, path, start = None, end = None, def stream_extract(self, path, start = None, end = None,
count = False, markup = False): count = False, markup = False, binary = False):
""" """
Returns (data, restart) tuple. Returns (data, restart) tuple.
@@ -559,6 +563,9 @@ class NilmDB(object):
'markup', if true, indicates that returned data should be 'markup', if true, indicates that returned data should be
marked with a comment denoting when a particular interval marked with a comment denoting when a particular interval
starts, and another comment when an interval ends. starts, and another comment when an interval ends.
'binary', if true, means to return raw binary rather than
ASCII-formatted data.
""" """
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
table = self.data.getnode(path) table = self.data.getnode(path)
@@ -569,6 +576,8 @@ class NilmDB(object):
matched = 0 matched = 0
remaining = self.max_results remaining = self.max_results
restart = None restart = None
if binary and (markup or count):
raise NilmDBError("binary mode can't be used with markup or count")
for interval in intervals.intersection(requested): for interval in intervals.intersection(requested):
# Reading single rows from the table is too slow, so # Reading single rows from the table is too slow, so
# we use two bisections to find both the starting and # we use two bisections to find both the starting and
@@ -593,7 +602,7 @@ class NilmDB(object):
timestamp_to_string(interval.start) + "\n") timestamp_to_string(interval.start) + "\n")
# Gather these results up # Gather these results up
result.append(table.get_data(row_start, row_end)) result.append(table.get_data(row_start, row_end, binary))
# Count them # Count them
remaining -= row_end - row_start remaining -= row_end - row_start

View File

@@ -419,6 +419,68 @@ extra_data_on_line:
ERR_OTHER, "extra data on line"); ERR_OTHER, "extra data on line");
} }
/****
* Append from binary data
*/
/* .append_binary(count, data, offset, linenum, start, end, last_timestamp) */
static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
{
int count;
const uint8_t *data;
int data_len;
int linenum;
int offset;
timestamp_t start;
timestamp_t end;
timestamp_t last_timestamp;
if (!PyArg_ParseTuple(args, "it#iilll:append_binary",
&count, &data, &data_len, &offset,
&linenum, &start, &end, &last_timestamp))
return NULL;
/* Advance to offset */
if (offset > data_len)
return raise_str(0, 0, ERR_OTHER, "bad offset");
data += offset;
data_len -= offset;
/* Figure out max number of rows to insert */
int rows = data_len / self->binary_size;
if (rows > count)
rows = count;
/* Check timestamps */
timestamp_t ts;
int i;
for (i = 0; i < rows; i++) {
/* Read raw timestamp, byteswap if needed */
memcpy(&ts, &data[i * self->binary_size], 8);
ts = le64toh(ts);
/* Check limits */
if (ts <= last_timestamp)
return raise_int(i, 0, ERR_NON_MONOTONIC, ts);
last_timestamp = ts;
if (ts < start || ts >= end)
return raise_int(i, 0, ERR_OUT_OF_INTERVAL, ts);
}
/* Write binary data */
if (fwrite(data, data_len, 1, self->file) != 1) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
fflush(self->file);
/* Build return value and return */
PyObject *o;
o = Py_BuildValue("(iili)", rows, offset + rows * self->binary_size,
last_timestamp, linenum);
return o;
}
/**** /****
* Extract to string * Extract to string
*/ */
@@ -484,7 +546,7 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
/* read and format in a loop */ \ /* read and format in a loop */ \
for (i = 0; i < self->layout_count; i++) { \ for (i = 0; i < self->layout_count; i++) { \
if (fread(&disktype, bytes, \ if (fread(&disktype, bytes, \
1, self->file) < 0) \ 1, self->file) != 1) \
goto err; \ goto err; \
disktype = letoh(disktype); \ disktype = letoh(disktype); \
ret = sprintf(&str[len], " " fmt, \ ret = sprintf(&str[len], " " fmt, \
@@ -527,6 +589,46 @@ err:
return NULL; return NULL;
} }
/****
* Extract to binary string containing raw little-endian binary data
*/
static PyObject *Rocket_extract_binary(Rocket *self, PyObject *args)
{
long count;
long offset;
if (!PyArg_ParseTuple(args, "ll", &offset, &count))
return NULL;
if (!self->file) {
PyErr_SetString(PyExc_Exception, "no file");
return NULL;
}
/* Seek to target location */
if (fseek(self->file, offset, SEEK_SET) < 0) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
uint8_t *str;
int len = count * self->binary_size;
str = malloc(len);
if (str == NULL) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
/* Data in the file is already in the desired little-endian
binary format, so just read it directly. */
if (fread(str, self->binary_size, count, self->file) != count) {
free(str);
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
PyObject *pystr = PyBytes_FromStringAndSize((char *)str, len);
free(str);
return pystr;
}
/**** /****
* Extract timestamp * Extract timestamp
@@ -571,11 +673,13 @@ static PyMemberDef Rocket_members[] = {
}; };
static PyMethodDef Rocket_methods[] = { static PyMethodDef Rocket_methods[] = {
{ "close", (PyCFunction)Rocket_close, METH_NOARGS, { "close",
(PyCFunction)Rocket_close, METH_NOARGS,
"close(self)\n\n" "close(self)\n\n"
"Close file handle" }, "Close file handle" },
{ "append_string", (PyCFunction)Rocket_append_string, METH_VARARGS, { "append_string",
(PyCFunction)Rocket_append_string, METH_VARARGS,
"append_string(self, count, data, offset, line, start, end, ts)\n\n" "append_string(self, count, data, offset, line, start, end, ts)\n\n"
"Parse string and append data.\n" "Parse string and append data.\n"
"\n" "\n"
@@ -590,16 +694,46 @@ static PyMethodDef Rocket_methods[] = {
"Raises ParseError if timestamps are non-monotonic, outside\n" "Raises ParseError if timestamps are non-monotonic, outside\n"
"the start/end interval etc.\n" "the start/end interval etc.\n"
"\n" "\n"
"On success, return a tuple with three values:\n" "On success, return a tuple:\n"
" added_rows: how many rows were added from the file\n" " added_rows: how many rows were added from the file\n"
" data_offset: current offset into the data string\n" " data_offset: current offset into the data string\n"
" last_timestamp: last timestamp we parsed" }, " last_timestamp: last timestamp we parsed\n"
" linenum: current line number" },
{ "extract_string", (PyCFunction)Rocket_extract_string, METH_VARARGS, { "append_binary",
(PyCFunction)Rocket_append_binary, METH_VARARGS,
"append_binary(self, count, data, offset, line, start, end, ts)\n\n"
"Append binary data, which must match the data layout.\n"
"\n"
" count: maximum number of rows to add\n"
" data: binary data\n"
" offset: byte offset into data to start adding\n"
" line: current line number (unused)\n"
" start: starting timestamp for interval\n"
" end: end timestamp for interval\n"
" ts: last timestamp that was previously parsed\n"
"\n"
"Raises ParseError if timestamps are non-monotonic, outside\n"
"the start/end interval etc.\n"
"\n"
"On success, return a tuple:\n"
" added_rows: how many rows were added from the file\n"
" data_offset: current offset into the data string\n"
" last_timestamp: last timestamp we parsed\n"
" linenum: current line number (copied from argument)" },
{ "extract_string",
(PyCFunction)Rocket_extract_string, METH_VARARGS,
"extract_string(self, offset, count)\n\n" "extract_string(self, offset, count)\n\n"
"Extract count rows of data from the file at offset offset.\n" "Extract count rows of data from the file at offset offset.\n"
"Return an ascii formatted string according to the layout" }, "Return an ascii formatted string according to the layout" },
{ "extract_binary",
(PyCFunction)Rocket_extract_binary, METH_VARARGS,
"extract_binary(self, offset, count)\n\n"
"Extract count rows of data from the file at offset offset.\n"
"Return a raw binary string of data matching the data layout." },
{ "extract_timestamp", { "extract_timestamp",
(PyCFunction)Rocket_extract_timestamp, METH_VARARGS, (PyCFunction)Rocket_extract_timestamp, METH_VARARGS,
"extract_timestamp(self, offset)\n\n" "extract_timestamp(self, offset)\n\n"

View File

@@ -305,10 +305,15 @@ class Stream(NilmApp):
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, ValueError) @exception_to_httperror(NilmDBError, ValueError)
@cherrypy.tools.CORS_allow(methods = ["PUT"]) @cherrypy.tools.CORS_allow(methods = ["PUT"])
def insert(self, path, start, end): def insert(self, path, start, end, binary = False):
""" """
Insert new data into the database. Provide textual data Insert new data into the database. Provide textual data
(matching the path's layout) as a HTTP PUT. (matching the path's layout) as a HTTP PUT.
If 'binary' is True, expect raw binary data, rather than lines
of ASCII-formatted data. Raw binary data is always
little-endian and matches the database types (including an
int64 timestamp).
""" """
# Important that we always read the input before throwing any # Important that we always read the input before throwing any
# errors, to keep lengths happy for persistent connections. # errors, to keep lengths happy for persistent connections.
@@ -316,17 +321,24 @@ class Stream(NilmApp):
# requests, if we ever want to handle those (issue #1134) # requests, if we ever want to handle those (issue #1134)
body = cherrypy.request.body.read() body = cherrypy.request.body.read()
# Verify content type for binary data
content_type = cherrypy.request.headers.get('content-type')
if binary and content_type:
if content_type != "application/octet-stream":
raise cherrypy.HTTPError("400", "Content type must be "
"application/octet-stream for "
"binary data, not " + content_type)
# Check path and get layout # Check path and get layout
streams = self.db.stream_list(path = path) if len(self.db.stream_list(path = path)) != 1:
if len(streams) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path)
raise cherrypy.HTTPError("404 Not Found", "No such stream")
# Check limits # Check limits
(start, end) = self._get_times(start, end) (start, end) = self._get_times(start, end)
# Pass the data directly to nilmdb, which will parse it and # Pass the data directly to nilmdb, which will parse it and
# raise a ValueError if there are any problems. # raise a ValueError if there are any problems.
self.db.stream_insert(path, start, end, body) self.db.stream_insert(path, start, end, body, binary)
# Done # Done
return return
@@ -399,9 +411,8 @@ class Stream(NilmApp):
# /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0 # /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose @cherrypy.expose
@chunked_response @chunked_response
@response_type("text/plain")
def extract(self, path, start = None, end = None, def extract(self, path, start = None, end = None,
count = False, markup = False): count = False, markup = False, binary = False):
""" """
Extract data from backend database. Streams the resulting Extract data from backend database. Streams the resulting
entries as ASCII text lines separated by newlines. This may entries as ASCII text lines separated by newlines. This may
@@ -412,13 +423,26 @@ class Stream(NilmApp):
If 'markup' is True, adds comments to the stream denoting each If 'markup' is True, adds comments to the stream denoting each
interval's start and end timestamp. interval's start and end timestamp.
If 'binary' is True, return raw binary data, rather than lines
of ASCII-formatted data. Raw binary data is always
little-endian and matches the database types (including an
int64 timestamp).
""" """
(start, end) = self._get_times(start, end) (start, end) = self._get_times(start, end)
# Check path and get layout # Check path and get layout
streams = self.db.stream_list(path = path) if len(self.db.stream_list(path = path)) != 1:
if len(streams) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path)
raise cherrypy.HTTPError("404 Not Found", "No such stream")
if binary:
content_type = "application/octet-stream"
if markup or count:
raise cherrypy.HTTPError("400", "can't mix binary and "
"markup or count modes")
else:
content_type = "text/plain"
cherrypy.response.headers['Content-Type'] = content_type
@workaround_cp_bug_1200 @workaround_cp_bug_1200
def content(start, end): def content(start, end):
@@ -431,7 +455,8 @@ class Stream(NilmApp):
while True: while True:
(data, restart) = self.db.stream_extract( (data, restart) = self.db.stream_extract(
path, start, end, count = False, markup = markup) path, start, end, count = False,
markup = markup, binary = binary)
yield data yield data
if restart is None: if restart is None:

View File

@@ -6,7 +6,7 @@ import time
# Range # Range
min_timestamp = (-2**63) min_timestamp = (-2**63)
max_timestamp = (2**62 - 1) max_timestamp = (2**63 - 1)
# Smallest representable step # Smallest representable step
epsilon = 1 epsilon = 1
@@ -32,6 +32,10 @@ def timestamp_to_human(timestamp):
"""Convert a timestamp (integer microseconds since epoch) to a """Convert a timestamp (integer microseconds since epoch) to a
human-readable string, using the local timezone for display human-readable string, using the local timezone for display
(e.g. from the TZ env var).""" (e.g. from the TZ env var)."""
if timestamp == min_timestamp:
return "(minimum)"
if timestamp == max_timestamp:
return "(maximum)"
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_unix(timestamp)) dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_unix(timestamp))
return dt.strftime("%a, %d %b %Y %H:%M:%S.%f %z") return dt.strftime("%a, %d %b %Y %H:%M:%S.%f %z")
@@ -65,6 +69,14 @@ def parse_time(toparse):
if toparse == "max": if toparse == "max":
return max_timestamp return max_timestamp
# If it starts with @, treat it as a NILM timestamp
# (integer microseconds since epoch)
try:
if toparse[0] == '@':
return int(toparse[1:])
except (ValueError, KeyError, IndexError):
pass
# If string isn't "now" and doesn't contain at least 4 digits, # If string isn't "now" and doesn't contain at least 4 digits,
# consider it invalid. smartparse might otherwise accept # consider it invalid. smartparse might otherwise accept
# empty strings and strings with just separators. # empty strings and strings with just separators.
@@ -78,14 +90,6 @@ def parse_time(toparse):
except (ValueError, OverflowError): except (ValueError, OverflowError):
pass pass
# If it starts with @, treat it as a NILM timestamp
# (integer microseconds since epoch)
try:
if toparse[0] == '@':
return int(toparse[1:])
except (ValueError, KeyError):
pass
# If it's parseable as a float, treat it as a Unix or NILM # If it's parseable as a float, treat it as a Unix or NILM
# timestamp based on its range. # timestamp based on its range.
try: try:

View File

@@ -107,13 +107,13 @@ setup(name='nilmdb',
author_email = 'jim@jtan.com', author_email = 'jim@jtan.com',
tests_require = [ 'nose', tests_require = [ 'nose',
'coverage', 'coverage',
'numpy',
], ],
setup_requires = [ 'distribute', setup_requires = [ 'distribute',
], ],
install_requires = [ 'decorator', install_requires = [ 'decorator',
'cherrypy >= 3.2', 'cherrypy >= 3.2',
'simplejson', 'simplejson',
'pycurl',
'python-dateutil', 'python-dateutil',
'pytz', 'pytz',
'psutil >= 0.3.0', 'psutil >= 0.3.0',

View File

@@ -12,6 +12,7 @@ test_interval.py
test_bulkdata.py test_bulkdata.py
test_nilmdb.py test_nilmdb.py
test_client.py test_client.py
test_numpyclient.py
test_cmdline.py test_cmdline.py
test_*.py test_*.py

View File

@@ -69,9 +69,9 @@ class TestBulkData(object):
raw = [] raw = []
for i in range(1000): for i in range(1000):
raw.append("%d 1 2 3 4 5 6 7 8\n" % (10000 + i)) raw.append("%d 1 2 3 4 5 6 7 8\n" % (10000 + i))
node.append_string("".join(raw[0:1]), 0, 50000) node.append_data("".join(raw[0:1]), 0, 50000)
node.append_string("".join(raw[1:100]), 0, 50000) node.append_data("".join(raw[1:100]), 0, 50000)
node.append_string("".join(raw[100:]), 0, 50000) node.append_data("".join(raw[100:]), 0, 50000)
misc_slices = [ 0, 100, slice(None), slice(0), slice(10), misc_slices = [ 0, 100, slice(None), slice(0), slice(10),
slice(5,10), slice(3,None), slice(3,-3), slice(5,10), slice(3,None), slice(3,-3),
@@ -85,7 +85,7 @@ class TestBulkData(object):
# Extract misc slices while appending, to make sure the # Extract misc slices while appending, to make sure the
# data isn't being added in the middle of the file # data isn't being added in the middle of the file
for s in [2, slice(1,5), 2, slice(1,5)]: for s in [2, slice(1,5), 2, slice(1,5)]:
node.append_string("0 0 0 0 0 0 0 0 0\n", 0, 50000) node.append_data("0 0 0 0 0 0 0 0 0\n", 0, 50000)
raw.append("0 0 0 0 0 0 0 0 0\n") raw.append("0 0 0 0 0 0 0 0 0\n")
eq_(get_node_slice(s), raw[s]) eq_(get_node_slice(s), raw[s])

View File

@@ -23,6 +23,7 @@ import warnings
import resource import resource
import time import time
import re import re
import struct
from testutil.helpers import * from testutil.helpers import *
@@ -238,6 +239,22 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception)) in_("400 Bad Request", str(e.exception))
in_("start must precede end", str(e.exception)) in_("start must precede end", str(e.exception))
# Good content type
with assert_raises(ClientError) as e:
client.http.put("stream/insert", "",
{ "path": "xxxx", "start": 0, "end": 1,
"binary": 1 },
binary = True)
in_("No such stream", str(e.exception))
# Bad content type
with assert_raises(ClientError) as e:
client.http.put("stream/insert", "",
{ "path": "xxxx", "start": 0, "end": 1,
"binary": 1 },
binary = False)
in_("Content type must be application/octet-stream", str(e.exception))
# Specify start/end (starts too late) # Specify start/end (starts too late)
data = timestamper.TimestamperRate(testfile, start, 120) data = timestamper.TimestamperRate(testfile, start, 120)
with assert_raises(ClientError) as e: with assert_raises(ClientError) as e:
@@ -293,6 +310,23 @@ class TestClient(object):
# Test count # Test count
eq_(client.stream_count("/newton/prep"), 14400) eq_(client.stream_count("/newton/prep"), 14400)
# Test binary output
with assert_raises(ClientError) as e:
list(client.stream_extract("/newton/prep",
markup = True, binary = True))
with assert_raises(ClientError) as e:
list(client.stream_extract("/newton/prep",
count = True, binary = True))
data = "".join(client.stream_extract("/newton/prep", binary = True))
# Quick check using struct
unpacker = struct.Struct("<qffffffff")
out = []
for i in range(14400):
out.append(unpacker.unpack_from(data, i * unpacker.size))
eq_(out[0], (1332511200000000, 266568.0, 224029.0, 5161.39990234375,
2525.169921875, 8350.83984375, 3724.699951171875,
1355.3399658203125, 2039.0))
client.close() client.close()
def test_client_06_generators(self): def test_client_06_generators(self):
@@ -365,6 +399,17 @@ class TestClient(object):
raise AssertionError("/stream/extract is not text/plain:\n" + raise AssertionError("/stream/extract is not text/plain:\n" +
headers()) headers())
x = http.get("stream/extract",
{ "path": "/newton/prep",
"start": "123",
"end": "124",
"binary": "1" })
if "transfer-encoding: chunked" not in headers():
warnings.warn("Non-chunked HTTP response for /stream/extract")
if "content-type: application/octet-stream" not in headers():
raise AssertionError("/stream/extract is not binary:\n" +
headers())
client.close() client.close()
def test_client_08_unicode(self): def test_client_08_unicode(self):
@@ -441,72 +486,75 @@ class TestClient(object):
# override _max_data to trigger frequent server updates # override _max_data to trigger frequent server updates
ctx._max_data = 15 ctx._max_data = 15
ctx.insert("100 1\n") ctx.insert("1000 1\n")
ctx.insert("101 ") ctx.insert("1010 ")
ctx.insert("1\n102 1") ctx.insert("1\n1020 1")
ctx.insert("") ctx.insert("")
ctx.insert("\n103 1\n") ctx.insert("\n1030 1\n")
ctx.insert("104 1\n") ctx.insert("1040 1\n")
ctx.insert("# hello\n") ctx.insert("# hello\n")
ctx.insert(" # hello\n") ctx.insert(" # hello\n")
ctx.insert(" 105 1\n") ctx.insert(" 1050 1\n")
ctx.finalize() ctx.finalize()
ctx.insert("107 1\n") ctx.insert("1070 1\n")
ctx.update_end(108) ctx.update_end(1080)
ctx.finalize() ctx.finalize()
ctx.update_start(109) ctx.update_start(1090)
ctx.insert("110 1\n") ctx.insert("1100 1\n")
ctx.insert("111 1\n") ctx.insert("1110 1\n")
ctx.send() ctx.send()
ctx.insert("112 1\n") ctx.insert("1120 1\n")
ctx.insert("113 1\n") ctx.insert("1130 1\n")
ctx.insert("114 1\n") ctx.insert("1140 1\n")
ctx.update_end(116) ctx.update_end(1160)
ctx.insert("115 1\n") ctx.insert("1150 1\n")
ctx.update_end(117) ctx.update_end(1170)
ctx.insert("116 1\n") ctx.insert("1160 1\n")
ctx.update_end(118) ctx.update_end(1180)
ctx.insert("117 1" + ctx.insert("1170 1" +
" # this is super long" * 100 + " # this is super long" * 100 +
"\n") "\n")
ctx.finalize() ctx.finalize()
ctx.insert("# this is super long" * 100) ctx.insert("# this is super long" * 100)
with assert_raises(ClientError): with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 100, 200) as ctx: with client.stream_insert_context("/context/test",
ctx.insert("118 1\n") 1000, 2000) as ctx:
ctx.insert("1180 1\n")
with assert_raises(ClientError): with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 200, 300) as ctx: with client.stream_insert_context("/context/test",
ctx.insert("118 1\n") 2000, 3000) as ctx:
ctx.insert("1180 1\n")
with assert_raises(ClientError): with assert_raises(ClientError):
with client.stream_insert_context("/context/test") as ctx: with client.stream_insert_context("/context/test") as ctx:
ctx.insert("bogus data\n") ctx.insert("bogus data\n")
with client.stream_insert_context("/context/test", 200, 300) as ctx: with client.stream_insert_context("/context/test", 2000, 3000) as ctx:
# make sure our override wasn't permanent # make sure our override wasn't permanent
ne_(ctx._max_data, 15) ne_(ctx._max_data, 15)
ctx.insert("225 1\n") ctx.insert("2250 1\n")
ctx.finalize() ctx.finalize()
with assert_raises(ClientError): with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 300, 400) as ctx: with client.stream_insert_context("/context/test",
ctx.insert("301 1\n") 3000, 4000) as ctx:
ctx.insert("302 2\n") ctx.insert("3010 1\n")
ctx.insert("303 3\n") ctx.insert("3020 2\n")
ctx.insert("304 4\n") ctx.insert("3030 3\n")
ctx.insert("304 4\n") # non-monotonic after a few lines ctx.insert("3040 4\n")
ctx.insert("3040 4\n") # non-monotonic after a few lines
ctx.finalize() ctx.finalize()
eq_(list(client.stream_intervals("/context/test")), eq_(list(client.stream_intervals("/context/test")),
[ [ 100, 106 ], [ [ 1000, 1051 ],
[ 107, 108 ], [ 1070, 1080 ],
[ 109, 118 ], [ 1090, 1180 ],
[ 200, 300 ] ]) [ 2000, 3000 ] ])
# destroy stream (try without removing data first) # destroy stream (try without removing data first)
with assert_raises(ClientError): with assert_raises(ClientError):

View File

@@ -369,6 +369,8 @@ class TestCmdline(object):
self.contain("No stream at path") self.contain("No stream at path")
self.fail("metadata /newton/nosuchstream --set foo=bar") self.fail("metadata /newton/nosuchstream --set foo=bar")
self.contain("No stream at path") self.contain("No stream at path")
self.fail("metadata /newton/nosuchstream --delete")
self.contain("No stream at path")
self.ok("metadata /newton/prep") self.ok("metadata /newton/prep")
self.match("description=The Data\nv_scale=1.234\n") self.match("description=The Data\nv_scale=1.234\n")
@@ -394,6 +396,19 @@ class TestCmdline(object):
self.fail("metadata /newton/nosuchpath") self.fail("metadata /newton/nosuchpath")
self.contain("No stream at path /newton/nosuchpath") self.contain("No stream at path /newton/nosuchpath")
self.ok("metadata /newton/prep --delete")
self.ok("metadata /newton/prep --get")
self.match("")
self.ok("metadata /newton/prep --set "
"'description=The Data' "
"v_scale=1.234")
self.ok("metadata /newton/prep --delete v_scale")
self.ok("metadata /newton/prep --get")
self.match("description=The Data\n")
self.ok("metadata /newton/prep --set description=")
self.ok("metadata /newton/prep --get")
self.match("")
def test_06_insert(self): def test_06_insert(self):
self.ok("insert --help") self.ok("insert --help")
@@ -600,7 +615,7 @@ class TestCmdline(object):
test(8, "10:01:59.9", "10:02:00.1", extra="-m") test(8, "10:01:59.9", "10:02:00.1", extra="-m")
# all data put in by tests # all data put in by tests
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01") self.ok("extract -a /newton/prep --start min --end max")
lines_(self.captured, 43204) lines_(self.captured, 43204)
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01") self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("43200\n") self.match("43200\n")
@@ -1038,10 +1053,12 @@ class TestCmdline(object):
self.contain("old and new paths are the same") self.contain("old and new paths are the same")
check_path("newton", "prep") check_path("newton", "prep")
self.fail("rename /newton/prep /newton") self.fail("rename /newton/prep /newton")
self.contain("subdirs of this path already exist") self.contain("path must contain at least one folder")
self.fail("rename /newton/prep /newton/prep/") self.fail("rename /newton/prep /newton/prep/")
self.contain("invalid path") self.contain("invalid path")
self.ok("rename /newton/prep /newton/foo") self.ok("rename /newton/prep /newton/foo/1")
check_path("newton", "foo", "1")
self.ok("rename /newton/foo/1 /newton/foo")
check_path("newton", "foo") check_path("newton", "foo")
self.ok("rename /newton/foo /totally/different/thing") self.ok("rename /newton/foo /totally/different/thing")
check_path("totally", "different", "thing") check_path("totally", "different", "thing")

View File

@@ -90,13 +90,16 @@ class Test00Nilmdb(object): # named 00 so it runs first
eq_(db.stream_get_metadata("/newton/prep"), meta1) eq_(db.stream_get_metadata("/newton/prep"), meta1)
eq_(db.stream_get_metadata("/newton/raw"), meta1) eq_(db.stream_get_metadata("/newton/raw"), meta1)
# fill in some test coverage for start >= end # fill in some misc. test coverage
with assert_raises(nilmdb.server.NilmDBError): with assert_raises(nilmdb.server.NilmDBError):
db.stream_remove("/newton/prep", 0, 0) db.stream_remove("/newton/prep", 0, 0)
with assert_raises(nilmdb.server.NilmDBError): with assert_raises(nilmdb.server.NilmDBError):
db.stream_remove("/newton/prep", 1, 0) db.stream_remove("/newton/prep", 1, 0)
db.stream_remove("/newton/prep", 0, 1) db.stream_remove("/newton/prep", 0, 1)
with assert_raises(nilmdb.server.NilmDBError):
db.stream_extract("/newton/prep", count = True, binary = True)
db.close() db.close()
class TestBlockingServer(object): class TestBlockingServer(object):

333
tests/test_numpyclient.py Normal file
View File

@@ -0,0 +1,333 @@
# -*- coding: utf-8 -*-
import nilmdb.server
import nilmdb.client
import nilmdb.client.numpyclient
from nilmdb.utils.printf import *
from nilmdb.utils import timestamper
from nilmdb.client import ClientError, ServerError
from nilmdb.utils import datetime_tz
from nose.plugins.skip import SkipTest
from nose.tools import *
from nose.tools import assert_raises
import itertools
import distutils.version
from testutil.helpers import *
import numpy as np
testdb = "tests/numpyclient-testdb"
testurl = "http://localhost:32180/"
def setup_module():
global test_server, test_db
# Clear out DB
recursive_unlink(testdb)
# Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
port = 32180, stoppable = False,
fast_shutdown = True,
force_traceback = True)
test_server.start(blocking = False)
def teardown_module():
global test_server, test_db
# Close web app
test_server.stop()
test_db.close()
class TestNumpyClient(object):
def test_numpyclient_01_basic(self):
# Test basic connection
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
version = client.version()
eq_(distutils.version.LooseVersion(version),
distutils.version.LooseVersion(test_server.version))
# Verify subclassing
assert(isinstance(client, nilmdb.client.Client))
# Layouts
for layout in "int8_t", "something_8", "integer_1":
with assert_raises(ValueError):
for x in client.stream_extract_numpy("/foo", layout=layout):
pass
for layout in "int8_1", "uint8_30", "int16_20", "float64_100":
with assert_raises(ClientError) as e:
for x in client.stream_extract_numpy("/foo", layout=layout):
pass
in_("No such stream", str(e.exception))
with assert_raises(ClientError) as e:
for x in client.stream_extract_numpy("/foo"):
pass
in_("can't get layout for path", str(e.exception))
client.close()
def test_numpyclient_02_extract(self):
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
# Insert some data as text
client.stream_create("/newton/prep", "float32_8")
testfile = "tests/data/prep-20120323T1000"
start = nilmdb.utils.time.parse_time("20120323T1000")
rate = 120
data = timestamper.TimestamperRate(testfile, start, rate)
result = client.stream_insert("/newton/prep", data,
start, start + 119999777)
# Extract Numpy arrays
array = None
pieces = 0
for chunk in client.stream_extract_numpy("/newton/prep", maxrows=1000):
pieces += 1
if array is not None:
array = np.vstack((array, chunk))
else:
array = chunk
eq_(array.shape, (14400, 9))
eq_(pieces, 15)
# Try structured
s = list(client.stream_extract_numpy("/newton/prep", structured = True))
assert(np.array_equal(np.c_[s[0]['timestamp'], s[0]['data']], array))
# Compare. Will be close but not exact because the conversion
# to and from ASCII was lossy.
data = timestamper.TimestamperRate(testfile, start, rate)
actual = np.fromstring(" ".join(data), sep=' ').reshape(14400, 9)
assert(np.allclose(array, actual))
client.close()
def test_numpyclient_03_insert(self):
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
# Limit _max_data just to get better coverage
old_max_data = nilmdb.client.numpyclient.StreamInserterNumpy._max_data
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = 100000
client.stream_create("/test/1", "uint16_1")
client.stream_insert_numpy("/test/1",
np.array([[0, 1],
[1, 2],
[2, 3],
[3, 4]]))
# Wrong number of dimensions
with assert_raises(ValueError) as e:
client.stream_insert_numpy("/test/1",
np.array([[[0, 1],
[1, 2]],
[[3, 4],
[4, 5]]]))
in_("wrong number of dimensions", str(e.exception))
# Unstructured
client.stream_create("/test/2", "float32_8")
client.stream_insert_numpy(
"/test/2",
client.stream_extract_numpy(
"/newton/prep", structured = False, maxrows = 1000))
# Structured, and specifying layout
client.stream_create("/test/3", "float32_8")
client.stream_insert_numpy(
path = "/test/3", layout = "float32_8",
data = client.stream_extract_numpy(
"/newton/prep", structured = True, maxrows = 1000))
# Structured, specifying wrong layout
client.stream_create("/test/4", "float32_8")
with assert_raises(ValueError) as e:
client.stream_insert_numpy(
"/test/4", layout = "uint16_1",
data = client.stream_extract_numpy(
"/newton/prep", structured = True, maxrows = 1000))
in_("wrong dtype", str(e.exception))
# Unstructured, and specifying wrong layout
client.stream_create("/test/5", "float32_8")
with assert_raises(ClientError) as e:
client.stream_insert_numpy(
"/test/5", layout = "uint16_8",
data = client.stream_extract_numpy(
"/newton/prep", structured = False, maxrows = 1000))
# timestamps will be screwy here, because data will be parsed wrong
in_("error parsing input data", str(e.exception))
# Make sure the /newton/prep copies are identical
a = np.vstack(client.stream_extract_numpy("/newton/prep"))
b = np.vstack(client.stream_extract_numpy("/test/2"))
c = np.vstack(client.stream_extract_numpy("/test/3"))
assert(np.array_equal(a,b))
assert(np.array_equal(a,c))
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
client.close()
def test_numpyclient_04_context(self):
# Like test_client_context, but with Numpy data
client = nilmdb.client.numpyclient.NumpyClient(testurl)
client.stream_create("/context/test", "uint16_1")
with client.stream_insert_numpy_context("/context/test") as ctx:
# override _max_rows to trigger frequent server updates
ctx._max_rows = 2
ctx.insert([[1000, 1]])
ctx.insert([[1010, 1], [1020, 1], [1030, 1]])
ctx.insert([[1040, 1], [1050, 1]])
ctx.finalize()
ctx.insert([[1070, 1]])
ctx.update_end(1080)
ctx.finalize()
ctx.update_start(1090)
ctx.insert([[1100, 1]])
ctx.insert([[1110, 1]])
ctx.send()
ctx.insert([[1120, 1], [1130, 1], [1140, 1]])
ctx.update_end(1160)
ctx.insert([[1150, 1]])
ctx.update_end(1170)
ctx.insert([[1160, 1]])
ctx.update_end(1180)
ctx.insert([[1170, 123456789.0]])
ctx.finalize()
ctx.insert(np.zeros((0,2)))
with assert_raises(ClientError):
with client.stream_insert_numpy_context("/context/test",
1000, 2000) as ctx:
ctx.insert([[1180, 1]])
with assert_raises(ClientError):
with client.stream_insert_numpy_context("/context/test",
2000, 3000) as ctx:
ctx._max_rows = 2
ctx.insert([[3180, 1]])
ctx.insert([[3181, 1]])
with client.stream_insert_numpy_context("/context/test",
2000, 3000) as ctx:
# make sure our override wasn't permanent
ne_(ctx._max_rows, 2)
ctx.insert([[2250, 1]])
ctx.finalize()
with assert_raises(ClientError):
with client.stream_insert_numpy_context("/context/test",
3000, 4000) as ctx:
ctx.insert([[3010, 1]])
ctx.insert([[3020, 2]])
ctx.insert([[3030, 3]])
ctx.insert([[3040, 4]])
ctx.insert([[3040, 4]]) # non-monotonic after a few lines
ctx.finalize()
eq_(list(client.stream_intervals("/context/test")),
[ [ 1000, 1051 ],
[ 1070, 1080 ],
[ 1090, 1180 ],
[ 2000, 3000 ] ])
client.stream_remove("/context/test")
client.stream_destroy("/context/test")
client.close()
def test_numpyclient_05_emptyintervals(self):
# Like test_client_emptyintervals, with insert_numpy_context
client = nilmdb.client.numpyclient.NumpyClient(testurl)
client.stream_create("/empty/test", "uint16_1")
def info():
result = []
for interval in list(client.stream_intervals("/empty/test")):
result.append((client.stream_count("/empty/test", *interval),
interval))
return result
eq_(info(), [])
# Insert a region with just a few points
with client.stream_insert_numpy_context("/empty/test") as ctx:
ctx.update_start(100)
ctx.insert([[140, 1]])
ctx.insert([[150, 1]])
ctx.insert([[160, 1]])
ctx.update_end(200)
ctx.finalize()
eq_(info(), [(3, [100, 200])])
# Delete chunk, which will leave one data point and two intervals
client.stream_remove("/empty/test", 145, 175)
eq_(info(), [(1, [100, 145]),
(0, [175, 200])])
# Try also creating a completely empty interval from scratch,
# in a few different ways.
client.stream_insert("/empty/test", "", 300, 350)
client.stream_insert("/empty/test", [], 400, 450)
with client.stream_insert_numpy_context("/empty/test", 500, 550):
pass
# If enough timestamps aren't provided, empty streams won't be created.
client.stream_insert("/empty/test", [])
with client.stream_insert_numpy_context("/empty/test"):
pass
client.stream_insert("/empty/test", [], start = 600)
with client.stream_insert_numpy_context("/empty/test", start = 700):
pass
client.stream_insert("/empty/test", [], end = 850)
with client.stream_insert_numpy_context("/empty/test", end = 950):
pass
# Try various things that might cause problems
with client.stream_insert_numpy_context("/empty/test", 1000, 1050):
ctx.finalize() # inserts [1000, 1050]
ctx.finalize() # nothing
ctx.finalize() # nothing
ctx.insert([[1100, 1]])
ctx.finalize() # inserts [1100, 1101]
ctx.update_start(1199)
ctx.insert([[1200, 1]])
ctx.update_end(1250)
ctx.finalize() # inserts [1199, 1250]
ctx.update_start(1299)
ctx.finalize() # nothing
ctx.update_end(1350)
ctx.finalize() # nothing
ctx.update_start(1400)
ctx.insert(np.zeros((0,2)))
ctx.update_end(1450)
ctx.finalize()
ctx.update_start(1500)
ctx.insert(np.zeros((0,2)))
ctx.update_end(1550)
ctx.finalize()
ctx.insert(np.zeros((0,2)))
ctx.insert(np.zeros((0,2)))
ctx.insert(np.zeros((0,2)))
ctx.finalize()
# Check everything
eq_(info(), [(1, [100, 145]),
(0, [175, 200]),
(0, [300, 350]),
(0, [400, 450]),
(0, [500, 550]),
(0, [1000, 1050]),
(1, [1100, 1101]),
(1, [1199, 1250]),
(0, [1400, 1450]),
(0, [1500, 1550]),
])
# Clean up
client.stream_remove("/empty/test")
client.stream_destroy("/empty/test")
client.close()