|
- # -*- coding: utf-8 -*-
-
- import nilmdb.server
- import nilmdb.client
- import nilmdb.client.numpyclient
-
- from nilmdb.utils.printf import *
- from nilmdb.utils import timestamper
- from nilmdb.client import ClientError, ServerError
- import datetime_tz
-
- from nose.plugins.skip import SkipTest
- from nose.tools import *
- from nose.tools import assert_raises
- import itertools
- import distutils.version
-
- from testutil.helpers import *
-
- import numpy as np
-
- testdb = "tests/numpyclient-testdb"
- testurl = "http://localhost:32180/"
-
- def setup_module():
- global test_server, test_db
- # Clear out DB
- recursive_unlink(testdb)
-
- # Start web app on a custom port
- test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
- testdb, bulkdata_args = { "file_size" : 16384,
- "files_per_dir" : 3 } )
-
- test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
- port = 32180, stoppable = False,
- fast_shutdown = True,
- force_traceback = True)
- test_server.start(blocking = False)
-
- def teardown_module():
- global test_server, test_db
- # Close web app
- test_server.stop()
- test_db.close()
-
- class TestNumpyClient(object):
-
- def test_numpyclient_01_basic(self):
- # Test basic connection
- client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
- version = client.version()
- eq_(distutils.version.LooseVersion(version),
- distutils.version.LooseVersion(test_server.version))
-
- # Verify subclassing
- assert(isinstance(client, nilmdb.client.Client))
-
- # Layouts
- for layout in "int8_t", "something_8", "integer_1":
- with assert_raises(ValueError):
- for x in client.stream_extract_numpy("/foo", layout=layout):
- pass
- for layout in "int8_1", "uint8_30", "int16_20", "float64_100":
- with assert_raises(ClientError) as e:
- for x in client.stream_extract_numpy("/foo", layout=layout):
- pass
- in_("No such stream", str(e.exception))
-
- with assert_raises(ClientError) as e:
- for x in client.stream_extract_numpy("/foo"):
- pass
- in_("can't get layout for path", str(e.exception))
-
- client.close()
-
- def test_numpyclient_02_extract(self):
- client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
-
- # Insert some data as text
- client.stream_create("/newton/prep", "float32_8")
- testfile = "tests/data/prep-20120323T1000"
- start = nilmdb.utils.time.parse_time("20120323T1000")
- rate = 120
- data = timestamper.TimestamperRate(testfile, start, rate)
- result = client.stream_insert("/newton/prep", data,
- start, start + 119999777)
-
- # Extract Numpy arrays
- array = None
- pieces = 0
- for chunk in client.stream_extract_numpy("/newton/prep", maxrows=1000):
- pieces += 1
- if array is not None:
- array = np.vstack((array, chunk))
- else:
- array = chunk
- eq_(array.shape, (14400, 9))
- eq_(pieces, 15)
-
- # Try structured
- s = list(client.stream_extract_numpy("/newton/prep", structured = True))
- assert(np.array_equal(np.c_[s[0]['timestamp'], s[0]['data']], array))
-
- # Compare. Will be close but not exact because the conversion
- # to and from ASCII was lossy.
- data = timestamper.TimestamperRate(testfile, start, rate)
- data_str = b" ".join(data).decode('utf-8', errors='backslashreplace')
- actual = np.fromstring(data_str, sep=' ').reshape(14400, 9)
- assert(np.allclose(array, actual))
-
- client.close()
-
- def test_numpyclient_03_insert(self):
- client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
-
- # Limit _max_data just to get better coverage
- old_max_data = nilmdb.client.numpyclient.StreamInserterNumpy._max_data
- nilmdb.client.numpyclient.StreamInserterNumpy._max_data = 100000
-
- client.stream_create("/test/1", "uint16_1")
- client.stream_insert_numpy("/test/1",
- np.array([[0, 1],
- [1, 2],
- [2, 3],
- [3, 4]]))
-
- # Wrong number of dimensions
- with assert_raises(ValueError) as e:
- client.stream_insert_numpy("/test/1",
- np.array([[[0, 1],
- [1, 2]],
- [[3, 4],
- [4, 5]]]))
- in_("wrong number of dimensions", str(e.exception))
-
- # Wrong number of fields
- with assert_raises(ValueError) as e:
- client.stream_insert_numpy("/test/1",
- np.array([[0, 1, 2],
- [1, 2, 3],
- [3, 4, 5],
- [4, 5, 6]]))
- in_("wrong number of fields", str(e.exception))
-
- # Unstructured
- client.stream_create("/test/2", "float32_8")
- client.stream_insert_numpy(
- "/test/2",
- client.stream_extract_numpy(
- "/newton/prep", structured = False, maxrows = 1000))
-
- # Structured, and specifying layout.
- # This also tests the final branch in stream_extract_numpy by specifing
- # a value of maxrows that exactly matches how much data we had inserted.
- client.stream_create("/test/3", "float32_8")
- client.stream_insert_numpy(
- path = "/test/3", layout = "float32_8",
- data = client.stream_extract_numpy(
- "/newton/prep", structured = True, maxrows = 14400))
-
- # Structured, specifying wrong layout
- client.stream_create("/test/4", "float32_8")
- with assert_raises(ValueError) as e:
- client.stream_insert_numpy(
- "/test/4", layout = "uint16_1",
- data = client.stream_extract_numpy(
- "/newton/prep", structured = True, maxrows = 1000))
- in_("wrong dtype", str(e.exception))
-
- # Unstructured, and specifying wrong layout
- client.stream_create("/test/5", "float32_8")
- with assert_raises(ClientError) as e:
- client.stream_insert_numpy(
- "/test/5", layout = "uint16_8",
- data = client.stream_extract_numpy(
- "/newton/prep", structured = False, maxrows = 1000))
- # timestamps will be screwy here, because data will be parsed wrong
- in_("error parsing input data", str(e.exception))
-
- # Make sure the /newton/prep copies are identical
- a = np.vstack(list(client.stream_extract_numpy("/newton/prep")))
- b = np.vstack(list(client.stream_extract_numpy("/test/2")))
- c = np.vstack(list(client.stream_extract_numpy("/test/3")))
- assert(np.array_equal(a,b))
- assert(np.array_equal(a,c))
-
- # Make sure none of the files are greater than 16384 bytes as
- # we configured with the bulkdata_args above.
- datapath = os.path.join(testdb, "data")
- for (dirpath, dirnames, filenames) in os.walk(datapath):
- for f in filenames:
- fn = os.path.join(dirpath, f)
- size = os.path.getsize(fn)
- if size > 16384:
- raise AssertionError(sprintf("%s is too big: %d > %d\n",
- fn, size, 16384))
-
- nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
- client.close()
-
- def test_numpyclient_04_context(self):
- # Like test_client_context, but with Numpy data
- client = nilmdb.client.numpyclient.NumpyClient(testurl)
-
- client.stream_create("/context/test", "uint16_1")
- with client.stream_insert_numpy_context("/context/test") as ctx:
- # override _max_rows to trigger frequent server updates
- ctx._max_rows = 2
- ctx.insert([[1000, 1]])
- ctx.insert([[1010, 1], [1020, 1], [1030, 1]])
- ctx.insert([[1040, 1], [1050, 1]])
- ctx.finalize()
- ctx.insert([[1070, 1]])
- ctx.update_end(1080)
- ctx.finalize()
- ctx.update_start(1090)
- ctx.insert([[1100, 1]])
- ctx.insert([[1110, 1]])
- ctx.send()
- ctx.insert([[1120, 1], [1130, 1], [1140, 1]])
- ctx.update_end(1160)
- ctx.insert([[1150, 1]])
- ctx.update_end(1170)
- ctx.insert([[1160, 1]])
- ctx.update_end(1180)
- ctx.insert([[1170, 123456789.0]])
- ctx.finalize()
- ctx.insert(np.zeros((0,2)))
-
- with assert_raises(ClientError):
- with client.stream_insert_numpy_context("/context/test",
- 1000, 2000) as ctx:
- ctx.insert([[1180, 1]])
-
- with assert_raises(ClientError):
- with client.stream_insert_numpy_context("/context/test",
- 2000, 3000) as ctx:
- ctx._max_rows = 2
- ctx.insert([[3180, 1]])
- ctx.insert([[3181, 1]])
-
- with client.stream_insert_numpy_context("/context/test",
- 2000, 3000) as ctx:
- # make sure our override wasn't permanent
- ne_(ctx._max_rows, 2)
- ctx.insert([[2250, 1]])
- ctx.finalize()
-
- with assert_raises(ClientError):
- with client.stream_insert_numpy_context("/context/test",
- 3000, 4000) as ctx:
- ctx.insert([[3010, 1]])
- ctx.insert([[3020, 2]])
- ctx.insert([[3030, 3]])
- ctx.insert([[3040, 4]])
- ctx.insert([[3040, 4]]) # non-monotonic after a few lines
- ctx.finalize()
-
- eq_(list(client.stream_intervals("/context/test")),
- [ [ 1000, 1051 ],
- [ 1070, 1080 ],
- [ 1090, 1180 ],
- [ 2000, 3000 ] ])
-
- client.stream_remove("/context/test")
- client.stream_destroy("/context/test")
- client.close()
-
- def test_numpyclient_05_emptyintervals(self):
- # Like test_client_emptyintervals, with insert_numpy_context
- client = nilmdb.client.numpyclient.NumpyClient(testurl)
- client.stream_create("/empty/test", "uint16_1")
- def info():
- result = []
- for interval in list(client.stream_intervals("/empty/test")):
- result.append((client.stream_count("/empty/test", *interval),
- interval))
- return result
- eq_(info(), [])
-
- # Insert a region with just a few points
- with client.stream_insert_numpy_context("/empty/test") as ctx:
- ctx.update_start(100)
- ctx.insert([[140, 1]])
- ctx.insert([[150, 1]])
- ctx.insert([[160, 1]])
- ctx.update_end(200)
- ctx.finalize()
- eq_(info(), [(3, [100, 200])])
-
- # Delete chunk, which will leave one data point and two intervals
- client.stream_remove("/empty/test", 145, 175)
- eq_(info(), [(1, [100, 145]),
- (0, [175, 200])])
-
- # Try also creating a completely empty interval from scratch,
- # in a few different ways.
- client.stream_insert("/empty/test", b"", 300, 350)
- client.stream_insert("/empty/test", [], 400, 450)
- with client.stream_insert_numpy_context("/empty/test", 500, 550):
- pass
-
- # If enough timestamps aren't provided, empty streams won't be created.
- client.stream_insert("/empty/test", [])
- with client.stream_insert_numpy_context("/empty/test"):
- pass
- client.stream_insert("/empty/test", [], start = 600)
- with client.stream_insert_numpy_context("/empty/test", start = 700):
- pass
- client.stream_insert("/empty/test", [], end = 850)
- with client.stream_insert_numpy_context("/empty/test", end = 950):
- pass
-
- # Equal start and end is OK as long as there's no data
- with assert_raises(ClientError) as e:
- with client.stream_insert_numpy_context("/empty/test",
- start=9, end=9) as ctx:
- ctx.insert([[9, 9]])
- ctx.finalize()
- in_("have data to send, but invalid start/end times", str(e.exception))
-
- with client.stream_insert_numpy_context("/empty/test",
- start=9, end=9) as ctx:
- pass
-
- # reusing a context object is bad
- with assert_raises(Exception) as e:
- ctx.insert([[9, 9]])
-
- # Try various things that might cause problems
- with client.stream_insert_numpy_context("/empty/test",
- 1000, 1050) as ctx:
- ctx.finalize() # inserts [1000, 1050]
- ctx.finalize() # nothing
- ctx.finalize() # nothing
- ctx.insert([[1100, 1]])
- ctx.finalize() # inserts [1100, 1101]
- ctx.update_start(1199)
- ctx.insert([[1200, 1]])
- ctx.update_end(1250)
- ctx.finalize() # inserts [1199, 1250]
- ctx.update_start(1299)
- ctx.finalize() # nothing
- ctx.update_end(1350)
- ctx.finalize() # nothing
- ctx.update_start(1400)
- ctx.insert(np.zeros((0,2)))
- ctx.update_end(1450)
- ctx.finalize()
- ctx.update_start(1500)
- ctx.insert(np.zeros((0,2)))
- ctx.update_end(1550)
- ctx.finalize()
- ctx.insert(np.zeros((0,2)))
- ctx.insert(np.zeros((0,2)))
- ctx.insert(np.zeros((0,2)))
- ctx.finalize()
-
- # Check everything
- eq_(info(), [(1, [100, 145]),
- (0, [175, 200]),
- (0, [300, 350]),
- (0, [400, 450]),
- (0, [500, 550]),
- (0, [1000, 1050]),
- (1, [1100, 1101]),
- (1, [1199, 1250]),
- (0, [1400, 1450]),
- (0, [1500, 1550]),
- ])
-
- # Clean up
- client.stream_remove("/empty/test")
- client.stream_destroy("/empty/test")
- client.close()
|