# -*- coding: utf-8 -*- import nilmdb.server import nilmdb.client import nilmdb.client.numpyclient from nilmdb.utils.printf import * from nilmdb.utils import timestamper from nilmdb.client import ClientError, ServerError from nilmdb.utils import datetime_tz from nose.plugins.skip import SkipTest from nose.tools import * from nose.tools import assert_raises import itertools import distutils.version from testutil.helpers import * import numpy as np testdb = "tests/numpyclient-testdb" testurl = "http://localhost:32180/" def setup_module(): global test_server, test_db # Clear out DB recursive_unlink(testdb) # Start web app on a custom port test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb) test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", port = 32180, stoppable = False, fast_shutdown = True, force_traceback = True) test_server.start(blocking = False) def teardown_module(): global test_server, test_db # Close web app test_server.stop() test_db.close() class TestNumpyClient(object): def test_numpyclient_01_basic(self): # Test basic connection client = nilmdb.client.numpyclient.NumpyClient(url = testurl) version = client.version() eq_(distutils.version.LooseVersion(version), distutils.version.LooseVersion(test_server.version)) # Verify subclassing assert(isinstance(client, nilmdb.client.Client)) # Layouts for layout in "int8_t", "something_8", "integer_1": with assert_raises(ValueError): for x in client.stream_extract_numpy("/foo", layout=layout): pass for layout in "int8_1", "uint8_30", "int16_20", "float64_100": with assert_raises(ClientError) as e: for x in client.stream_extract_numpy("/foo", layout=layout): pass in_("No such stream", str(e.exception)) with assert_raises(ClientError) as e: for x in client.stream_extract_numpy("/foo"): pass in_("can't get layout for path", str(e.exception)) client.close() def test_numpyclient_02_extract(self): client = nilmdb.client.numpyclient.NumpyClient(url = testurl) # Insert some data as text client.stream_create("/newton/prep", "float32_8") testfile = "tests/data/prep-20120323T1000" start = nilmdb.utils.time.parse_time("20120323T1000") rate = 120 data = timestamper.TimestamperRate(testfile, start, rate) result = client.stream_insert("/newton/prep", data, start, start + 119999777) # Extract Numpy arrays array = None pieces = 0 for chunk in client.stream_extract_numpy("/newton/prep", maxrows=1000): pieces += 1 if array is not None: array = np.vstack((array, chunk)) else: array = chunk eq_(array.shape, (14400, 9)) eq_(pieces, 15) # Try structured s = list(client.stream_extract_numpy("/newton/prep", structured = True)) assert(np.array_equal(np.c_[s[0]['timestamp'], s[0]['data']], array)) # Compare. Will be close but not exact because the conversion # to and from ASCII was lossy. data = timestamper.TimestamperRate(testfile, start, rate) actual = np.fromstring(" ".join(data), sep=' ').reshape(14400, 9) assert(np.allclose(array, actual)) client.close() def test_numpyclient_03_insert(self): client = nilmdb.client.numpyclient.NumpyClient(url = testurl) # Limit _max_data just to get better coverage old_max_data = nilmdb.client.numpyclient.StreamInserterNumpy._max_data nilmdb.client.numpyclient.StreamInserterNumpy._max_data = 100000 client.stream_create("/test/1", "uint16_1") client.stream_insert_numpy("/test/1", np.array([[0, 1], [1, 2], [2, 3], [3, 4]])) # Wrong number of dimensions with assert_raises(ValueError) as e: client.stream_insert_numpy("/test/1", np.array([[[0, 1], [1, 2]], [[3, 4], [4, 5]]])) in_("wrong number of dimensions", str(e.exception)) # Wrong number of fields with assert_raises(ValueError) as e: client.stream_insert_numpy("/test/1", np.array([[0, 1, 2], [1, 2, 3], [3, 4, 5], [4, 5, 6]])) in_("wrong number of fields", str(e.exception)) # Unstructured client.stream_create("/test/2", "float32_8") client.stream_insert_numpy( "/test/2", client.stream_extract_numpy( "/newton/prep", structured = False, maxrows = 1000)) # Structured, and specifying layout client.stream_create("/test/3", "float32_8") client.stream_insert_numpy( path = "/test/3", layout = "float32_8", data = client.stream_extract_numpy( "/newton/prep", structured = True, maxrows = 1000)) # Structured, specifying wrong layout client.stream_create("/test/4", "float32_8") with assert_raises(ValueError) as e: client.stream_insert_numpy( "/test/4", layout = "uint16_1", data = client.stream_extract_numpy( "/newton/prep", structured = True, maxrows = 1000)) in_("wrong dtype", str(e.exception)) # Unstructured, and specifying wrong layout client.stream_create("/test/5", "float32_8") with assert_raises(ClientError) as e: client.stream_insert_numpy( "/test/5", layout = "uint16_8", data = client.stream_extract_numpy( "/newton/prep", structured = False, maxrows = 1000)) # timestamps will be screwy here, because data will be parsed wrong in_("error parsing input data", str(e.exception)) # Make sure the /newton/prep copies are identical a = np.vstack(client.stream_extract_numpy("/newton/prep")) b = np.vstack(client.stream_extract_numpy("/test/2")) c = np.vstack(client.stream_extract_numpy("/test/3")) assert(np.array_equal(a,b)) assert(np.array_equal(a,c)) nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data client.close() def test_numpyclient_04_context(self): # Like test_client_context, but with Numpy data client = nilmdb.client.numpyclient.NumpyClient(testurl) client.stream_create("/context/test", "uint16_1") with client.stream_insert_numpy_context("/context/test") as ctx: # override _max_rows to trigger frequent server updates ctx._max_rows = 2 ctx.insert([[1000, 1]]) ctx.insert([[1010, 1], [1020, 1], [1030, 1]]) ctx.insert([[1040, 1], [1050, 1]]) ctx.finalize() ctx.insert([[1070, 1]]) ctx.update_end(1080) ctx.finalize() ctx.update_start(1090) ctx.insert([[1100, 1]]) ctx.insert([[1110, 1]]) ctx.send() ctx.insert([[1120, 1], [1130, 1], [1140, 1]]) ctx.update_end(1160) ctx.insert([[1150, 1]]) ctx.update_end(1170) ctx.insert([[1160, 1]]) ctx.update_end(1180) ctx.insert([[1170, 123456789.0]]) ctx.finalize() ctx.insert(np.zeros((0,2))) with assert_raises(ClientError): with client.stream_insert_numpy_context("/context/test", 1000, 2000) as ctx: ctx.insert([[1180, 1]]) with assert_raises(ClientError): with client.stream_insert_numpy_context("/context/test", 2000, 3000) as ctx: ctx._max_rows = 2 ctx.insert([[3180, 1]]) ctx.insert([[3181, 1]]) with client.stream_insert_numpy_context("/context/test", 2000, 3000) as ctx: # make sure our override wasn't permanent ne_(ctx._max_rows, 2) ctx.insert([[2250, 1]]) ctx.finalize() with assert_raises(ClientError): with client.stream_insert_numpy_context("/context/test", 3000, 4000) as ctx: ctx.insert([[3010, 1]]) ctx.insert([[3020, 2]]) ctx.insert([[3030, 3]]) ctx.insert([[3040, 4]]) ctx.insert([[3040, 4]]) # non-monotonic after a few lines ctx.finalize() eq_(list(client.stream_intervals("/context/test")), [ [ 1000, 1051 ], [ 1070, 1080 ], [ 1090, 1180 ], [ 2000, 3000 ] ]) client.stream_remove("/context/test") client.stream_destroy("/context/test") client.close() def test_numpyclient_05_emptyintervals(self): # Like test_client_emptyintervals, with insert_numpy_context client = nilmdb.client.numpyclient.NumpyClient(testurl) client.stream_create("/empty/test", "uint16_1") def info(): result = [] for interval in list(client.stream_intervals("/empty/test")): result.append((client.stream_count("/empty/test", *interval), interval)) return result eq_(info(), []) # Insert a region with just a few points with client.stream_insert_numpy_context("/empty/test") as ctx: ctx.update_start(100) ctx.insert([[140, 1]]) ctx.insert([[150, 1]]) ctx.insert([[160, 1]]) ctx.update_end(200) ctx.finalize() eq_(info(), [(3, [100, 200])]) # Delete chunk, which will leave one data point and two intervals client.stream_remove("/empty/test", 145, 175) eq_(info(), [(1, [100, 145]), (0, [175, 200])]) # Try also creating a completely empty interval from scratch, # in a few different ways. client.stream_insert("/empty/test", "", 300, 350) client.stream_insert("/empty/test", [], 400, 450) with client.stream_insert_numpy_context("/empty/test", 500, 550): pass # If enough timestamps aren't provided, empty streams won't be created. client.stream_insert("/empty/test", []) with client.stream_insert_numpy_context("/empty/test"): pass client.stream_insert("/empty/test", [], start = 600) with client.stream_insert_numpy_context("/empty/test", start = 700): pass client.stream_insert("/empty/test", [], end = 850) with client.stream_insert_numpy_context("/empty/test", end = 950): pass # Try various things that might cause problems with client.stream_insert_numpy_context("/empty/test", 1000, 1050): ctx.finalize() # inserts [1000, 1050] ctx.finalize() # nothing ctx.finalize() # nothing ctx.insert([[1100, 1]]) ctx.finalize() # inserts [1100, 1101] ctx.update_start(1199) ctx.insert([[1200, 1]]) ctx.update_end(1250) ctx.finalize() # inserts [1199, 1250] ctx.update_start(1299) ctx.finalize() # nothing ctx.update_end(1350) ctx.finalize() # nothing ctx.update_start(1400) ctx.insert(np.zeros((0,2))) ctx.update_end(1450) ctx.finalize() ctx.update_start(1500) ctx.insert(np.zeros((0,2))) ctx.update_end(1550) ctx.finalize() ctx.insert(np.zeros((0,2))) ctx.insert(np.zeros((0,2))) ctx.insert(np.zeros((0,2))) ctx.finalize() # Check everything eq_(info(), [(1, [100, 145]), (0, [175, 200]), (0, [300, 350]), (0, [400, 450]), (0, [500, 550]), (0, [1000, 1050]), (1, [1100, 1101]), (1, [1199, 1250]), (0, [1400, 1450]), (0, [1500, 1550]), ]) # Clean up client.stream_remove("/empty/test") client.stream_destroy("/empty/test") client.close()