nilmdb/tests/test_nilmdb.py

211 lines
7.0 KiB
Python
Raw Permalink Normal View History

import nilmdb
from nose.tools import *
from nose.tools import assert_raises
import distutils.version
import simplejson as json
import itertools
import os
import shutil
import sys
import cherrypy
import threading
import urllib2
from urllib2 import urlopen, HTTPError
import Queue
import cStringIO
import time
from nilmdb.utils import serializer_proxy
testdb = "tests/testdb"
#@atexit.register
#def cleanup():
# os.unlink(testdb)
2013-01-05 15:00:34 -05:00
from testutil.helpers import *
class Test00Nilmdb(object): # named 00 so it runs first
def test_NilmDB(self):
recursive_unlink(testdb)
with assert_raises(IOError):
nilmdb.NilmDB("/nonexistant-db/foo")
db = nilmdb.NilmDB(testdb)
db.close()
db = nilmdb.NilmDB(testdb, sync=False)
db.close()
# test timer, just to get coverage
capture = cStringIO.StringIO()
old = sys.stdout
sys.stdout = capture
with nilmdb.utils.Timer("test"):
time.sleep(0.01)
sys.stdout = old
in_("test: ", capture.getvalue())
def test_stream(self):
db = nilmdb.NilmDB(testdb, sync=False)
eq_(db.stream_list(), [])
# Bad path
with assert_raises(ValueError):
db.stream_create("foo/bar/baz", "PrepData")
with assert_raises(ValueError):
db.stream_create("/foo", "PrepData")
# Bad layout type
with assert_raises(ValueError):
db.stream_create("/newton/prep", "NoSuchLayout")
db.stream_create("/newton/prep", "PrepData")
db.stream_create("/newton/raw", "RawData")
db.stream_create("/newton/zzz/rawnotch", "RawNotchedData")
# Verify we got 3 streams
eq_(db.stream_list(), [ ["/newton/prep", "PrepData"],
["/newton/raw", "RawData"],
["/newton/zzz/rawnotch", "RawNotchedData"]
])
# Match just one type or one path
eq_(db.stream_list(layout="RawData"), [ ["/newton/raw", "RawData"] ])
eq_(db.stream_list(path="/newton/raw"), [ ["/newton/raw", "RawData"] ])
# Verify that columns were made right (pytables specific)
if "h5file" in db.data.__dict__:
h5file = db.data.h5file
eq_(len(h5file.getNode("/newton/prep").cols), 9)
eq_(len(h5file.getNode("/newton/raw").cols), 7)
eq_(len(h5file.getNode("/newton/zzz/rawnotch").cols), 10)
assert(not h5file.getNode("/newton/prep").colindexed["timestamp"])
assert(not h5file.getNode("/newton/prep").colindexed["c1"])
# Set / get metadata
eq_(db.stream_get_metadata("/newton/prep"), {})
eq_(db.stream_get_metadata("/newton/raw"), {})
meta1 = { "description": "The Data",
"v_scale": "1.234" }
meta2 = { "description": "The Data" }
meta3 = { "v_scale": "1.234" }
db.stream_set_metadata("/newton/prep", meta1)
db.stream_update_metadata("/newton/prep", {})
db.stream_update_metadata("/newton/raw", meta2)
db.stream_update_metadata("/newton/raw", meta3)
eq_(db.stream_get_metadata("/newton/prep"), meta1)
eq_(db.stream_get_metadata("/newton/raw"), meta1)
# fill in some test coverage for start >= end
with assert_raises(nilmdb.server.NilmDBError):
db.stream_remove("/newton/prep", 0, 0)
with assert_raises(nilmdb.server.NilmDBError):
db.stream_remove("/newton/prep", 1, 0)
db.stream_remove("/newton/prep", 0, 1)
db.close()
class TestBlockingServer(object):
def setUp(self):
self.db = serializer_proxy(nilmdb.NilmDB)(testdb, sync=False)
def tearDown(self):
self.db.close()
def test_blocking_server(self):
# Server should fail if the database doesn't have a "_thread_safe"
# property.
with assert_raises(KeyError):
nilmdb.Server(object())
# Start web app on a custom port
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
port = 12380, stoppable = True)
# Run it
event = threading.Event()
def run_server():
self.server.start(blocking = True, event = event)
thread = threading.Thread(target = run_server)
thread.start()
if not event.wait(timeout = 10):
raise AssertionError("server didn't start in 10 seconds")
# Send request to exit.
req = urlopen("http://127.0.0.1:12380/exit/", timeout = 1)
# Wait for it
thread.join()
def geturl(path):
req = urlopen("http://127.0.0.1:12380" + path, timeout = 10)
return req.read()
def getjson(path):
return json.loads(geturl(path))
class TestServer(object):
def setUp(self):
# Start web app on a custom port
self.db = serializer_proxy(nilmdb.NilmDB)(testdb, sync=False)
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
port = 12380, stoppable = False)
self.server.start(blocking = False)
def tearDown(self):
# Close web app
self.server.stop()
self.db.close()
def test_server(self):
# Make sure we can't force an exit, and test other 404 errors
for url in [ "/exit", "/", "/favicon.ico" ]:
with assert_raises(HTTPError) as e:
geturl(url)
eq_(e.exception.code, 404)
# Check version
eq_(distutils.version.LooseVersion(getjson("/version")),
distutils.version.LooseVersion(nilmdb.__version__))
def test_stream_list(self):
# Known streams that got populated by an earlier test (test_nilmdb)
streams = getjson("/stream/list")
eq_(streams, [
['/newton/prep', 'PrepData'],
['/newton/raw', 'RawData'],
['/newton/zzz/rawnotch', 'RawNotchedData'],
])
streams = getjson("/stream/list?layout=RawData")
eq_(streams, [['/newton/raw', 'RawData']])
streams = getjson("/stream/list?layout=NoSuchLayout")
eq_(streams, [])
def test_stream_metadata(self):
with assert_raises(HTTPError) as e:
getjson("/stream/get_metadata?path=foo")
eq_(e.exception.code, 404)
data = getjson("/stream/get_metadata?path=/newton/prep")
eq_(data, {'description': 'The Data', 'v_scale': '1.234'})
data = getjson("/stream/get_metadata?path=/newton/prep"
"&key=v_scale")
eq_(data, {'v_scale': '1.234'})
data = getjson("/stream/get_metadata?path=/newton/prep"
"&key=v_scale&key=description")
eq_(data, {'description': 'The Data', 'v_scale': '1.234'})
data = getjson("/stream/get_metadata?path=/newton/prep"
"&key=v_scale&key=foo")
eq_(data, {'foo': None, 'v_scale': '1.234'})
data = getjson("/stream/get_metadata?path=/newton/prep"
"&key=foo")
eq_(data, {'foo': None})