374 lines
14 KiB
Python
374 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import nilmdb.server
|
|
import nilmdb.client
|
|
import nilmdb.client.numpyclient
|
|
|
|
from nilmdb.utils.printf import *
|
|
from nilmdb.utils import timestamper
|
|
from nilmdb.client import ClientError, ServerError
|
|
from nilmdb.utils import datetime_tz
|
|
|
|
from nose.plugins.skip import SkipTest
|
|
from nose.tools import *
|
|
from nose.tools import assert_raises
|
|
import itertools
|
|
import distutils.version
|
|
|
|
from testutil.helpers import *
|
|
|
|
import numpy as np
|
|
|
|
testdb = "tests/numpyclient-testdb"
|
|
testurl = "http://localhost:32180/"
|
|
|
|
def setup_module():
|
|
global test_server, test_db
|
|
# Clear out DB
|
|
recursive_unlink(testdb)
|
|
|
|
# Start web app on a custom port
|
|
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
|
|
testdb, bulkdata_args = { "file_size" : 16384,
|
|
"files_per_dir" : 3 } )
|
|
|
|
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
|
port = 32180, stoppable = False,
|
|
fast_shutdown = True,
|
|
force_traceback = True)
|
|
test_server.start(blocking = False)
|
|
|
|
def teardown_module():
|
|
global test_server, test_db
|
|
# Close web app
|
|
test_server.stop()
|
|
test_db.close()
|
|
|
|
class TestNumpyClient(object):
|
|
|
|
def test_numpyclient_01_basic(self):
|
|
# Test basic connection
|
|
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
|
|
version = client.version()
|
|
eq_(distutils.version.LooseVersion(version),
|
|
distutils.version.LooseVersion(test_server.version))
|
|
|
|
# Verify subclassing
|
|
assert(isinstance(client, nilmdb.client.Client))
|
|
|
|
# Layouts
|
|
for layout in "int8_t", "something_8", "integer_1":
|
|
with assert_raises(ValueError):
|
|
for x in client.stream_extract_numpy("/foo", layout=layout):
|
|
pass
|
|
for layout in "int8_1", "uint8_30", "int16_20", "float64_100":
|
|
with assert_raises(ClientError) as e:
|
|
for x in client.stream_extract_numpy("/foo", layout=layout):
|
|
pass
|
|
in_("No such stream", str(e.exception))
|
|
|
|
with assert_raises(ClientError) as e:
|
|
for x in client.stream_extract_numpy("/foo"):
|
|
pass
|
|
in_("can't get layout for path", str(e.exception))
|
|
|
|
client.close()
|
|
|
|
def test_numpyclient_02_extract(self):
|
|
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
|
|
|
|
# Insert some data as text
|
|
client.stream_create("/newton/prep", "float32_8")
|
|
testfile = "tests/data/prep-20120323T1000"
|
|
start = nilmdb.utils.time.parse_time("20120323T1000")
|
|
rate = 120
|
|
data = timestamper.TimestamperRate(testfile, start, rate)
|
|
result = client.stream_insert("/newton/prep", data,
|
|
start, start + 119999777)
|
|
|
|
# Extract Numpy arrays
|
|
array = None
|
|
pieces = 0
|
|
for chunk in client.stream_extract_numpy("/newton/prep", maxrows=1000):
|
|
pieces += 1
|
|
if array is not None:
|
|
array = np.vstack((array, chunk))
|
|
else:
|
|
array = chunk
|
|
eq_(array.shape, (14400, 9))
|
|
eq_(pieces, 15)
|
|
|
|
# Try structured
|
|
s = list(client.stream_extract_numpy("/newton/prep", structured = True))
|
|
assert(np.array_equal(np.c_[s[0]['timestamp'], s[0]['data']], array))
|
|
|
|
# Compare. Will be close but not exact because the conversion
|
|
# to and from ASCII was lossy.
|
|
data = timestamper.TimestamperRate(testfile, start, rate)
|
|
actual = np.fromstring(" ".join(data), sep=' ').reshape(14400, 9)
|
|
assert(np.allclose(array, actual))
|
|
|
|
client.close()
|
|
|
|
def test_numpyclient_03_insert(self):
|
|
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
|
|
|
|
# Limit _max_data just to get better coverage
|
|
old_max_data = nilmdb.client.numpyclient.StreamInserterNumpy._max_data
|
|
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = 100000
|
|
|
|
client.stream_create("/test/1", "uint16_1")
|
|
client.stream_insert_numpy("/test/1",
|
|
np.array([[0, 1],
|
|
[1, 2],
|
|
[2, 3],
|
|
[3, 4]]))
|
|
|
|
# Wrong number of dimensions
|
|
with assert_raises(ValueError) as e:
|
|
client.stream_insert_numpy("/test/1",
|
|
np.array([[[0, 1],
|
|
[1, 2]],
|
|
[[3, 4],
|
|
[4, 5]]]))
|
|
in_("wrong number of dimensions", str(e.exception))
|
|
|
|
# Wrong number of fields
|
|
with assert_raises(ValueError) as e:
|
|
client.stream_insert_numpy("/test/1",
|
|
np.array([[0, 1, 2],
|
|
[1, 2, 3],
|
|
[3, 4, 5],
|
|
[4, 5, 6]]))
|
|
in_("wrong number of fields", str(e.exception))
|
|
|
|
# Unstructured
|
|
client.stream_create("/test/2", "float32_8")
|
|
client.stream_insert_numpy(
|
|
"/test/2",
|
|
client.stream_extract_numpy(
|
|
"/newton/prep", structured = False, maxrows = 1000))
|
|
|
|
# Structured, and specifying layout
|
|
client.stream_create("/test/3", "float32_8")
|
|
client.stream_insert_numpy(
|
|
path = "/test/3", layout = "float32_8",
|
|
data = client.stream_extract_numpy(
|
|
"/newton/prep", structured = True, maxrows = 1000))
|
|
|
|
# Structured, specifying wrong layout
|
|
client.stream_create("/test/4", "float32_8")
|
|
with assert_raises(ValueError) as e:
|
|
client.stream_insert_numpy(
|
|
"/test/4", layout = "uint16_1",
|
|
data = client.stream_extract_numpy(
|
|
"/newton/prep", structured = True, maxrows = 1000))
|
|
in_("wrong dtype", str(e.exception))
|
|
|
|
# Unstructured, and specifying wrong layout
|
|
client.stream_create("/test/5", "float32_8")
|
|
with assert_raises(ClientError) as e:
|
|
client.stream_insert_numpy(
|
|
"/test/5", layout = "uint16_8",
|
|
data = client.stream_extract_numpy(
|
|
"/newton/prep", structured = False, maxrows = 1000))
|
|
# timestamps will be screwy here, because data will be parsed wrong
|
|
in_("error parsing input data", str(e.exception))
|
|
|
|
# Make sure the /newton/prep copies are identical
|
|
a = np.vstack(client.stream_extract_numpy("/newton/prep"))
|
|
b = np.vstack(client.stream_extract_numpy("/test/2"))
|
|
c = np.vstack(client.stream_extract_numpy("/test/3"))
|
|
assert(np.array_equal(a,b))
|
|
assert(np.array_equal(a,c))
|
|
|
|
# Make sure none of the files are greater than 16384 bytes as
|
|
# we configured with the bulkdata_args above.
|
|
datapath = os.path.join(testdb, "data")
|
|
for (dirpath, dirnames, filenames) in os.walk(datapath):
|
|
for f in filenames:
|
|
fn = os.path.join(dirpath, f)
|
|
size = os.path.getsize(fn)
|
|
if size > 16384:
|
|
raise AssertionError(sprintf("%s is too big: %d > %d\n",
|
|
fn, size, 16384))
|
|
|
|
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
|
|
client.close()
|
|
|
|
def test_numpyclient_04_context(self):
|
|
# Like test_client_context, but with Numpy data
|
|
client = nilmdb.client.numpyclient.NumpyClient(testurl)
|
|
|
|
client.stream_create("/context/test", "uint16_1")
|
|
with client.stream_insert_numpy_context("/context/test") as ctx:
|
|
# override _max_rows to trigger frequent server updates
|
|
ctx._max_rows = 2
|
|
ctx.insert([[1000, 1]])
|
|
ctx.insert([[1010, 1], [1020, 1], [1030, 1]])
|
|
ctx.insert([[1040, 1], [1050, 1]])
|
|
ctx.finalize()
|
|
ctx.insert([[1070, 1]])
|
|
ctx.update_end(1080)
|
|
ctx.finalize()
|
|
ctx.update_start(1090)
|
|
ctx.insert([[1100, 1]])
|
|
ctx.insert([[1110, 1]])
|
|
ctx.send()
|
|
ctx.insert([[1120, 1], [1130, 1], [1140, 1]])
|
|
ctx.update_end(1160)
|
|
ctx.insert([[1150, 1]])
|
|
ctx.update_end(1170)
|
|
ctx.insert([[1160, 1]])
|
|
ctx.update_end(1180)
|
|
ctx.insert([[1170, 123456789.0]])
|
|
ctx.finalize()
|
|
ctx.insert(np.zeros((0,2)))
|
|
|
|
with assert_raises(ClientError):
|
|
with client.stream_insert_numpy_context("/context/test",
|
|
1000, 2000) as ctx:
|
|
ctx.insert([[1180, 1]])
|
|
|
|
with assert_raises(ClientError):
|
|
with client.stream_insert_numpy_context("/context/test",
|
|
2000, 3000) as ctx:
|
|
ctx._max_rows = 2
|
|
ctx.insert([[3180, 1]])
|
|
ctx.insert([[3181, 1]])
|
|
|
|
with client.stream_insert_numpy_context("/context/test",
|
|
2000, 3000) as ctx:
|
|
# make sure our override wasn't permanent
|
|
ne_(ctx._max_rows, 2)
|
|
ctx.insert([[2250, 1]])
|
|
ctx.finalize()
|
|
|
|
with assert_raises(ClientError):
|
|
with client.stream_insert_numpy_context("/context/test",
|
|
3000, 4000) as ctx:
|
|
ctx.insert([[3010, 1]])
|
|
ctx.insert([[3020, 2]])
|
|
ctx.insert([[3030, 3]])
|
|
ctx.insert([[3040, 4]])
|
|
ctx.insert([[3040, 4]]) # non-monotonic after a few lines
|
|
ctx.finalize()
|
|
|
|
eq_(list(client.stream_intervals("/context/test")),
|
|
[ [ 1000, 1051 ],
|
|
[ 1070, 1080 ],
|
|
[ 1090, 1180 ],
|
|
[ 2000, 3000 ] ])
|
|
|
|
client.stream_remove("/context/test")
|
|
client.stream_destroy("/context/test")
|
|
client.close()
|
|
|
|
def test_numpyclient_05_emptyintervals(self):
|
|
# Like test_client_emptyintervals, with insert_numpy_context
|
|
client = nilmdb.client.numpyclient.NumpyClient(testurl)
|
|
client.stream_create("/empty/test", "uint16_1")
|
|
def info():
|
|
result = []
|
|
for interval in list(client.stream_intervals("/empty/test")):
|
|
result.append((client.stream_count("/empty/test", *interval),
|
|
interval))
|
|
return result
|
|
eq_(info(), [])
|
|
|
|
# Insert a region with just a few points
|
|
with client.stream_insert_numpy_context("/empty/test") as ctx:
|
|
ctx.update_start(100)
|
|
ctx.insert([[140, 1]])
|
|
ctx.insert([[150, 1]])
|
|
ctx.insert([[160, 1]])
|
|
ctx.update_end(200)
|
|
ctx.finalize()
|
|
eq_(info(), [(3, [100, 200])])
|
|
|
|
# Delete chunk, which will leave one data point and two intervals
|
|
client.stream_remove("/empty/test", 145, 175)
|
|
eq_(info(), [(1, [100, 145]),
|
|
(0, [175, 200])])
|
|
|
|
# Try also creating a completely empty interval from scratch,
|
|
# in a few different ways.
|
|
client.stream_insert("/empty/test", "", 300, 350)
|
|
client.stream_insert("/empty/test", [], 400, 450)
|
|
with client.stream_insert_numpy_context("/empty/test", 500, 550):
|
|
pass
|
|
|
|
# If enough timestamps aren't provided, empty streams won't be created.
|
|
client.stream_insert("/empty/test", [])
|
|
with client.stream_insert_numpy_context("/empty/test"):
|
|
pass
|
|
client.stream_insert("/empty/test", [], start = 600)
|
|
with client.stream_insert_numpy_context("/empty/test", start = 700):
|
|
pass
|
|
client.stream_insert("/empty/test", [], end = 850)
|
|
with client.stream_insert_numpy_context("/empty/test", end = 950):
|
|
pass
|
|
|
|
# Equal start and end is OK as long as there's no data
|
|
with assert_raises(ClientError) as e:
|
|
with client.stream_insert_numpy_context("/empty/test",
|
|
start=9, end=9) as ctx:
|
|
ctx.insert([[9, 9]])
|
|
ctx.finalize()
|
|
in_("have data to send, but invalid start/end times", str(e.exception))
|
|
|
|
with client.stream_insert_numpy_context("/empty/test",
|
|
start=9, end=9) as ctx:
|
|
pass
|
|
|
|
# reusing a context object is bad
|
|
with assert_raises(Exception) as e:
|
|
ctx.insert([[9, 9]])
|
|
|
|
# Try various things that might cause problems
|
|
with client.stream_insert_numpy_context("/empty/test",
|
|
1000, 1050) as ctx:
|
|
ctx.finalize() # inserts [1000, 1050]
|
|
ctx.finalize() # nothing
|
|
ctx.finalize() # nothing
|
|
ctx.insert([[1100, 1]])
|
|
ctx.finalize() # inserts [1100, 1101]
|
|
ctx.update_start(1199)
|
|
ctx.insert([[1200, 1]])
|
|
ctx.update_end(1250)
|
|
ctx.finalize() # inserts [1199, 1250]
|
|
ctx.update_start(1299)
|
|
ctx.finalize() # nothing
|
|
ctx.update_end(1350)
|
|
ctx.finalize() # nothing
|
|
ctx.update_start(1400)
|
|
ctx.insert(np.zeros((0,2)))
|
|
ctx.update_end(1450)
|
|
ctx.finalize()
|
|
ctx.update_start(1500)
|
|
ctx.insert(np.zeros((0,2)))
|
|
ctx.update_end(1550)
|
|
ctx.finalize()
|
|
ctx.insert(np.zeros((0,2)))
|
|
ctx.insert(np.zeros((0,2)))
|
|
ctx.insert(np.zeros((0,2)))
|
|
ctx.finalize()
|
|
|
|
# Check everything
|
|
eq_(info(), [(1, [100, 145]),
|
|
(0, [175, 200]),
|
|
(0, [300, 350]),
|
|
(0, [400, 450]),
|
|
(0, [500, 550]),
|
|
(0, [1000, 1050]),
|
|
(1, [1100, 1101]),
|
|
(1, [1199, 1250]),
|
|
(0, [1400, 1450]),
|
|
(0, [1500, 1550]),
|
|
])
|
|
|
|
# Clean up
|
|
client.stream_remove("/empty/test")
|
|
client.stream_destroy("/empty/test")
|
|
client.close()
|