Browse Source

Huge update to support inserting in client.numpyclient, with tests

This includes both client.stream_insert_numpy and
client.stream_insert_numpy_context().  The test code is based on
similar test code for client.stream_insert_context, so it should be
fairly complete.
tags/nilmdb-1.5.0
Jim Paris 11 years ago
parent
commit
e95142eabf
2 changed files with 413 additions and 6 deletions
  1. +188
    -6
      nilmdb/client/numpyclient.py
  2. +225
    -0
      tests/test_numpyclient.py

+ 188
- 6
nilmdb/client/numpyclient.py View File

@@ -31,6 +31,14 @@ class NumpyClient(nilmdb.client.client.Client):
"""Subclass of nilmdb.client.Client that adds additional methods for
extracting and inserting data via Numpy arrays."""

def _get_dtype(self, path, layout):
if layout is None:
streams = self.stream_list(path)
if len(streams) != 1:
raise ClientError("can't get layout for path: " + path)
layout = streams[0][1]
return layout_to_dtype(layout)

def stream_extract_numpy(self, path, start = None, end = None,
layout = None, maxrows = 100000,
structured = False):
@@ -44,12 +52,7 @@ class NumpyClient(nilmdb.client.client.Client):
and returned in a flat 2D array. Otherwise, data is returned
as a structured dtype in a 1D array.
"""
if layout is None:
streams = self.stream_list(path)
if len(streams) != 1:
raise ClientError("can't get layout for path: " + path)
layout = streams[0][1]
dtype = layout_to_dtype(layout)
dtype = self._get_dtype(path, layout)

def to_numpy(data):
a = numpy.fromstring(data, dtype)
@@ -75,3 +78,182 @@ class NumpyClient(nilmdb.client.client.Client):

if total_len:
yield to_numpy("".join(chunks))

@contextlib.contextmanager
def stream_insert_numpy_context(self, path, start = None, end = None,
layout = None):
"""Return a context manager that allows data to be efficiently
inserted into a stream in a piecewise manner. Data is
provided as Numpy arrays, and is aggregated and sent to the
server in larger or smaller chunks as necessary. Data format
must match the database layout for the given path.

For more details, see help for
nilmdb.client.numpyclient.StreamInserterNumpy

If 'layout' is not None, use it as the layout rather than
querying the database.
"""
dtype = self._get_dtype(path, layout)
ctx = StreamInserterNumpy(self, path, start, end, dtype)
yield ctx
ctx.finalize()

def stream_insert_numpy(self, path, data, start = None, end = None,
layout = None):
"""Insert data into a stream. data should be a Numpy array
which will be passed through stream_insert_numpy_context to
break it into chunks etc. See the help for that function
for details."""
with self.stream_insert_numpy_context(path, start, end, layout) as ctx:
if isinstance(data, numpy.ndarray):
ctx.insert(data)
else:
for chunk in data:
ctx.insert(chunk)
return ctx.last_response

class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
"""Object returned by stream_insert_numpy_context() that manages
the insertion of rows of data into a particular path.

See help for nilmdb.client.client.StreamInserter for details.
The only difference is that, instead of ASCII formatted data,
this context manager can take Numpy arrays, which are either
structured (1D with complex dtype) or flat (2D with simple dtype).
"""

# Soft limit of how many bytes to send per HTTP request.
_max_data = 2 * 1024 * 1024

def __init__(self, client, path, start, end, dtype):
"""
'client' is the client object. 'path' is the database path
to insert to. 'start' and 'end' are used for the first
contiguous interval and may be None. 'dtype' is the Numpy
dtype for this stream.
"""
self.last_response = None

self._dtype = dtype
self._client = client
self._path = path

# Start and end for the overall contiguous interval we're
# filling
self._interval_start = start
self._interval_end = end

# Max rows to send at once
self._max_rows = self._max_data // self._dtype.itemsize

# List of the current arrays we're building up to send
self._block_arrays = []
self._block_rows = 0

def insert(self, array):
"""Insert Numpy data, which must match the layout type."""
if type(array) != numpy.ndarray:
array = numpy.array(array)
if array.ndim == 1:
# Already a structured array; just verify the type
if array.dtype != self._dtype:
raise ValueError("wrong dtype for 1D (structured) array")
elif array.ndim == 2:
# Convert to structured array
sarray = numpy.zeros(array.shape[0], dtype=self._dtype)
sarray['timestamp'] = array[:,0]
# Need the squeeze in case sarray['data'] is 1 dimensional
sarray['data'] = numpy.squeeze(array[:,1:])
array = sarray
else:
raise ValueError("wrong number of dimensions in array")

length = len(array)
maxrows = self._max_rows

if length == 0:
return
if length > maxrows:
# This is more than twice what we wanted to send, so split
# it up. This is a bit inefficient, but the user really
# shouldn't be providing this much data at once.
for cut in range(0, length, maxrows):
self.insert(array[cut:(cut + maxrows)])
return

# Add this array to our list
self._block_arrays.append(array)
self._block_rows += length

# Send if it's too long
if self._block_rows >= maxrows:
self._send_block(final = False)

def _send_block(self, final = False):
"""Send the data current stored up. One row might be left
over if we need its timestamp saved."""

# Build the full array to send
if self._block_rows == 0:
array = numpy.zeros(0, dtype = self._dtype)
else:
array = numpy.hstack(self._block_arrays)

# Get starting timestamp
start_ts = self._interval_start
if start_ts is None:
# Pull start from the first row
try:
start_ts = array['timestamp'][0]
except IndexError:
pass # no timestamp is OK, if we have no data

# Get ending timestamp
if final:
# For a final block, the timestamp is either the
# user-provided end, or the timestamp of the last line
# plus epsilon.
end_ts = self._interval_end
if end_ts is None:
try:
end_ts = array['timestamp'][-1]
end_ts += nilmdb.utils.time.epsilon
except IndexError:
pass # no timestamp is OK, if we have no data
self._block_arrays = []
self._block_rows = 0

# Next block is completely fresh
self._interval_start = None
self._interval_end = None
else:
# An intermediate block. We need to save the last row
# for the next block, and use its timestamp as the ending
# timestamp for this one.
if len(array) < 2:
# Not enough data to send an intermediate block
return
end_ts = array['timestamp'][-1]
if self._interval_end is not None and end_ts > self._interval_end:
# User gave us bad endpoints; send it anyway, and let
# the server complain so that the error is the same
# as if we hadn't done this chunking.
end_ts = self._interval_end
self._block_arrays = [ array[-1:] ]
self._block_rows = 1
array = array[:-1]

# Next block continues where this one ended
self._interval_start = end_ts

# If we have no endpoints, it's because we had no data to send.
if start_ts is None or end_ts is None:
return

# Send it
data = array.tostring()
self.last_response = self._client.stream_insert_block(
self._path, data, start_ts, end_ts, binary = True)

return

+ 225
- 0
tests/test_numpyclient.py View File

@@ -106,3 +106,228 @@ class TestNumpyClient(object):
assert(np.allclose(array, actual))

client.close()

def test_numpyclient_03_insert(self):
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)

# Limit _max_data just to get better coverage
old_max_data = nilmdb.client.numpyclient.StreamInserterNumpy._max_data
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = 100000

client.stream_create("/test/1", "uint16_1")
client.stream_insert_numpy("/test/1",
np.array([[0, 1],
[1, 2],
[2, 3],
[3, 4]]))

# Wrong number of dimensions
with assert_raises(ValueError) as e:
client.stream_insert_numpy("/test/1",
np.array([[[0, 1],
[1, 2]],
[[3, 4],
[4, 5]]]))
in_("wrong number of dimensions", str(e.exception))

# Unstructured
client.stream_create("/test/2", "float32_8")
client.stream_insert_numpy(
"/test/2",
client.stream_extract_numpy(
"/newton/prep", structured = False, maxrows = 1000))

# Structured, and specifying layout
client.stream_create("/test/3", "float32_8")
client.stream_insert_numpy(
path = "/test/3", layout = "float32_8",
data = client.stream_extract_numpy(
"/newton/prep", structured = True, maxrows = 1000))

# Structured, specifying wrong layout
client.stream_create("/test/4", "float32_8")
with assert_raises(ValueError) as e:
client.stream_insert_numpy(
"/test/4", layout = "uint16_1",
data = client.stream_extract_numpy(
"/newton/prep", structured = True, maxrows = 1000))
in_("wrong dtype", str(e.exception))

# Unstructured, and specifying wrong layout
client.stream_create("/test/5", "float32_8")
with assert_raises(ClientError) as e:
client.stream_insert_numpy(
"/test/5", layout = "uint16_8",
data = client.stream_extract_numpy(
"/newton/prep", structured = False, maxrows = 1000))
# timestamps will be screwy here, because data will be parsed wrong
in_("error parsing input data", str(e.exception))

# Make sure the /newton/prep copies are identical
a = np.vstack(client.stream_extract_numpy("/newton/prep"))
b = np.vstack(client.stream_extract_numpy("/test/2"))
c = np.vstack(client.stream_extract_numpy("/test/3"))
assert(np.array_equal(a,b))
assert(np.array_equal(a,c))

nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
client.close()

def test_numpyclient_04_context(self):
# Like test_client_context, but with Numpy data
client = nilmdb.client.numpyclient.NumpyClient(testurl)

client.stream_create("/context/test", "uint16_1")
with client.stream_insert_numpy_context("/context/test") as ctx:
# override _max_rows to trigger frequent server updates
ctx._max_rows = 2
ctx.insert([[1000, 1]])
ctx.insert([[1010, 1], [1020, 1], [1030, 1]])
ctx.insert([[1040, 1], [1050, 1]])
ctx.finalize()
ctx.insert([[1070, 1]])
ctx.update_end(1080)
ctx.finalize()
ctx.update_start(1090)
ctx.insert([[1100, 1]])
ctx.insert([[1110, 1]])
ctx.send()
ctx.insert([[1120, 1], [1130, 1], [1140, 1]])
ctx.update_end(1160)
ctx.insert([[1150, 1]])
ctx.update_end(1170)
ctx.insert([[1160, 1]])
ctx.update_end(1180)
ctx.insert([[1170, 123456789.0]])
ctx.finalize()
ctx.insert(np.zeros((0,2)))

with assert_raises(ClientError):
with client.stream_insert_numpy_context("/context/test",
1000, 2000) as ctx:
ctx.insert([[1180, 1]])

with assert_raises(ClientError):
with client.stream_insert_numpy_context("/context/test",
2000, 3000) as ctx:
ctx._max_rows = 2
ctx.insert([[3180, 1]])
ctx.insert([[3181, 1]])

with client.stream_insert_numpy_context("/context/test",
2000, 3000) as ctx:
# make sure our override wasn't permanent
ne_(ctx._max_rows, 2)
ctx.insert([[2250, 1]])
ctx.finalize()

with assert_raises(ClientError):
with client.stream_insert_numpy_context("/context/test",
3000, 4000) as ctx:
ctx.insert([[3010, 1]])
ctx.insert([[3020, 2]])
ctx.insert([[3030, 3]])
ctx.insert([[3040, 4]])
ctx.insert([[3040, 4]]) # non-monotonic after a few lines
ctx.finalize()

eq_(list(client.stream_intervals("/context/test")),
[ [ 1000, 1051 ],
[ 1070, 1080 ],
[ 1090, 1180 ],
[ 2000, 3000 ] ])

client.stream_remove("/context/test")
client.stream_destroy("/context/test")
client.close()

def test_numpyclient_05_emptyintervals(self):
# Like test_client_emptyintervals, with insert_numpy_context
client = nilmdb.client.numpyclient.NumpyClient(testurl)
client.stream_create("/empty/test", "uint16_1")
def info():
result = []
for interval in list(client.stream_intervals("/empty/test")):
result.append((client.stream_count("/empty/test", *interval),
interval))
return result
eq_(info(), [])

# Insert a region with just a few points
with client.stream_insert_numpy_context("/empty/test") as ctx:
ctx.update_start(100)
ctx.insert([[140, 1]])
ctx.insert([[150, 1]])
ctx.insert([[160, 1]])
ctx.update_end(200)
ctx.finalize()
eq_(info(), [(3, [100, 200])])

# Delete chunk, which will leave one data point and two intervals
client.stream_remove("/empty/test", 145, 175)
eq_(info(), [(1, [100, 145]),
(0, [175, 200])])

# Try also creating a completely empty interval from scratch,
# in a few different ways.
client.stream_insert("/empty/test", "", 300, 350)
client.stream_insert("/empty/test", [], 400, 450)
with client.stream_insert_numpy_context("/empty/test", 500, 550):
pass

# If enough timestamps aren't provided, empty streams won't be created.
client.stream_insert("/empty/test", [])
with client.stream_insert_numpy_context("/empty/test"):
pass
client.stream_insert("/empty/test", [], start = 600)
with client.stream_insert_numpy_context("/empty/test", start = 700):
pass
client.stream_insert("/empty/test", [], end = 850)
with client.stream_insert_numpy_context("/empty/test", end = 950):
pass

# Try various things that might cause problems
with client.stream_insert_numpy_context("/empty/test", 1000, 1050):
ctx.finalize() # inserts [1000, 1050]
ctx.finalize() # nothing
ctx.finalize() # nothing
ctx.insert([[1100, 1]])
ctx.finalize() # inserts [1100, 1101]
ctx.update_start(1199)
ctx.insert([[1200, 1]])
ctx.update_end(1250)
ctx.finalize() # inserts [1199, 1250]
ctx.update_start(1299)
ctx.finalize() # nothing
ctx.update_end(1350)
ctx.finalize() # nothing
ctx.update_start(1400)
ctx.insert(np.zeros((0,2)))
ctx.update_end(1450)
ctx.finalize()
ctx.update_start(1500)
ctx.insert(np.zeros((0,2)))
ctx.update_end(1550)
ctx.finalize()
ctx.insert(np.zeros((0,2)))
ctx.insert(np.zeros((0,2)))
ctx.insert(np.zeros((0,2)))
ctx.finalize()

# Check everything
eq_(info(), [(1, [100, 145]),
(0, [175, 200]),
(0, [300, 350]),
(0, [400, 450]),
(0, [500, 550]),
(0, [1000, 1050]),
(1, [1100, 1101]),
(1, [1199, 1250]),
(0, [1400, 1450]),
(0, [1500, 1550]),
])

# Clean up
client.stream_remove("/empty/test")
client.stream_destroy("/empty/test")
client.close()

Loading…
Cancel
Save