Browse Source

Add nilmdb.client.numpyclient.NumpyClient with stream_extract_numpy

This is a subclass of nilmdb.client.client.Client that adds numpy
specific routines, which should be a lot faster.
tags/nilmdb-1.4.8^2
Jim Paris 9 years ago
parent
commit
c7f2df4abc
3 changed files with 186 additions and 0 deletions
  1. +77
    -0
      nilmdb/client/numpyclient.py
  2. +1
    -0
      tests/test.order
  3. +108
    -0
      tests/test_numpyclient.py

+ 77
- 0
nilmdb/client/numpyclient.py View File

@@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-

"""Provide a NumpyClient class that is based on normal Client, but has
additional methods for extracting and inserting data via Numpy arrays."""

import nilmdb.utils
import nilmdb.client.client
import nilmdb.client.httpclient
from nilmdb.client.errors import ClientError

import contextlib
from nilmdb.utils.time import timestamp_to_string, string_to_timestamp

import numpy
import cStringIO

def layout_to_dtype(layout):
ltype = layout.split('_')[0]
lcount = int(layout.split('_')[1])
if ltype.startswith('int'):
atype = '<i' + str(int(ltype[3:]) / 8)
elif ltype.startswith('uint'):
atype = '<u' + str(int(ltype[4:]) / 8)
elif ltype.startswith('float'):
atype = '<f' + str(int(ltype[5:]) / 8)
else:
raise ValueError("bad layout")
return numpy.dtype([('timestamp', '<i8'), ('data', atype, lcount)])

class NumpyClient(nilmdb.client.client.Client):
"""Subclass of nilmdb.client.Client that adds additional methods for
extracting and inserting data via Numpy arrays."""

def stream_extract_numpy(self, path, start = None, end = None,
layout = None, maxrows = 100000,
structured = False):
"""
Extract data from a stream. Returns a generator that yields
Numpy arrays of up to 'maxrows' of data each.

If 'layout' is None, it is read using stream_info.

If 'structured' is False, all data is converted to float64
and returned in a flat 2D array. Otherwise, data is returned
as a structured dtype in a 1D array.
"""
if layout is None:
streams = self.stream_list(path)
if len(streams) != 1:
raise ClientError("can't get layout for path: " + path)
layout = streams[0][1]
dtype = layout_to_dtype(layout)

def to_numpy(data):
a = numpy.fromstring(data, dtype)
if structured:
return a
return numpy.c_[a['timestamp'], a['data']]

chunks = []
total_len = 0
maxsize = dtype.itemsize * maxrows
for data in self.stream_extract(path, start, end, binary = True):
# Add this block of binary data
chunks.append(data)
total_len += len(data)

# See if we have enough to make the requested Numpy array
while total_len >= maxsize:
assembled = "".join(chunks)
total_len -= maxsize
chunks = [ assembled[maxsize:] ]
block = assembled[:maxsize]
yield to_numpy(block)

if total_len:
yield to_numpy("".join(chunks))

+ 1
- 0
tests/test.order View File

@@ -12,6 +12,7 @@ test_interval.py
test_bulkdata.py
test_nilmdb.py
test_client.py
test_numpyclient.py
test_cmdline.py

test_*.py

+ 108
- 0
tests/test_numpyclient.py View File

@@ -0,0 +1,108 @@
# -*- coding: utf-8 -*-

import nilmdb.server
import nilmdb.client
import nilmdb.client.numpyclient

from nilmdb.utils.printf import *
from nilmdb.utils import timestamper
from nilmdb.client import ClientError, ServerError
from nilmdb.utils import datetime_tz

from nose.plugins.skip import SkipTest
from nose.tools import *
from nose.tools import assert_raises
import itertools
import distutils.version

from testutil.helpers import *

import numpy as np

testdb = "tests/numpyclient-testdb"
testurl = "http://localhost:32180/"

def setup_module():
global test_server, test_db
# Clear out DB
recursive_unlink(testdb)

# Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
port = 32180, stoppable = False,
fast_shutdown = True,
force_traceback = True)
test_server.start(blocking = False)

def teardown_module():
global test_server, test_db
# Close web app
test_server.stop()
test_db.close()

class TestNumpyClient(object):

def test_numpyclient_01_basic(self):
# Test basic connection
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
version = client.version()
eq_(distutils.version.LooseVersion(version),
distutils.version.LooseVersion(test_server.version))

# Verify subclassing
assert(isinstance(client, nilmdb.client.Client))

# Layouts
for layout in "int8_t", "something_8", "integer_1":
with assert_raises(ValueError):
for x in client.stream_extract_numpy("/foo", layout=layout):
pass
for layout in "int8_1", "uint8_30", "int16_20", "float64_100":
with assert_raises(ClientError) as e:
for x in client.stream_extract_numpy("/foo", layout=layout):
pass
in_("No such stream", str(e.exception))

with assert_raises(ClientError) as e:
for x in client.stream_extract_numpy("/foo"):
pass
in_("can't get layout for path", str(e.exception))

client.close()

def test_numpyclient_02_extract(self):
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)

# Insert some data as text
client.stream_create("/newton/prep", "float32_8")
testfile = "tests/data/prep-20120323T1000"
start = nilmdb.utils.time.parse_time("20120323T1000")
rate = 120
data = timestamper.TimestamperRate(testfile, start, rate)
result = client.stream_insert("/newton/prep", data,
start, start + 119999777)

# Extract Numpy arrays
array = None
pieces = 0
for chunk in client.stream_extract_numpy("/newton/prep", maxrows=1000):
pieces += 1
if array is not None:
array = np.vstack((array, chunk))
else:
array = chunk
eq_(array.shape, (14400, 9))
eq_(pieces, 15)

# Try structured
s = list(client.stream_extract_numpy("/newton/prep", structured = True))
assert(np.array_equal(np.c_[s[0]['timestamp'], s[0]['data']], array))

# Compare. Will be close but not exact because the conversion
# to and from ASCII was lossy.
data = timestamper.TimestamperRate(testfile, start, rate)
actual = np.fromstring(" ".join(data), sep=' ').reshape(14400, 9)
assert(np.allclose(array, actual))

client.close()

Loading…
Cancel
Save