Jim Paris
3d82888580
This is a pretty big change that will render existing clients unable to modify the database, but it's important that we use POST or PUT instead of GET for anything that may change state, in case this is ever put behind a cache.
211 lines
7.0 KiB
Python
211 lines
7.0 KiB
Python
import nilmdb
|
|
|
|
from nose.tools import *
|
|
from nose.tools import assert_raises
|
|
import distutils.version
|
|
import simplejson as json
|
|
import itertools
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import cherrypy
|
|
import threading
|
|
import urllib2
|
|
from urllib2 import urlopen, HTTPError
|
|
import Queue
|
|
import cStringIO
|
|
import time
|
|
|
|
from nilmdb.utils import serializer_proxy
|
|
|
|
testdb = "tests/testdb"
|
|
|
|
#@atexit.register
|
|
#def cleanup():
|
|
# os.unlink(testdb)
|
|
|
|
from testutil.helpers import *
|
|
|
|
class Test00Nilmdb(object): # named 00 so it runs first
|
|
def test_NilmDB(self):
|
|
recursive_unlink(testdb)
|
|
|
|
with assert_raises(IOError):
|
|
nilmdb.NilmDB("/nonexistant-db/foo")
|
|
|
|
db = nilmdb.NilmDB(testdb)
|
|
db.close()
|
|
db = nilmdb.NilmDB(testdb, sync=False)
|
|
db.close()
|
|
|
|
# test timer, just to get coverage
|
|
capture = cStringIO.StringIO()
|
|
old = sys.stdout
|
|
sys.stdout = capture
|
|
with nilmdb.utils.Timer("test"):
|
|
time.sleep(0.01)
|
|
sys.stdout = old
|
|
in_("test: ", capture.getvalue())
|
|
|
|
def test_stream(self):
|
|
db = nilmdb.NilmDB(testdb, sync=False)
|
|
eq_(db.stream_list(), [])
|
|
|
|
# Bad path
|
|
with assert_raises(ValueError):
|
|
db.stream_create("foo/bar/baz", "PrepData")
|
|
with assert_raises(ValueError):
|
|
db.stream_create("/foo", "PrepData")
|
|
# Bad layout type
|
|
with assert_raises(ValueError):
|
|
db.stream_create("/newton/prep", "NoSuchLayout")
|
|
db.stream_create("/newton/prep", "PrepData")
|
|
db.stream_create("/newton/raw", "RawData")
|
|
db.stream_create("/newton/zzz/rawnotch", "RawNotchedData")
|
|
|
|
# Verify we got 3 streams
|
|
eq_(db.stream_list(), [ ["/newton/prep", "PrepData"],
|
|
["/newton/raw", "RawData"],
|
|
["/newton/zzz/rawnotch", "RawNotchedData"]
|
|
])
|
|
# Match just one type or one path
|
|
eq_(db.stream_list(layout="RawData"), [ ["/newton/raw", "RawData"] ])
|
|
eq_(db.stream_list(path="/newton/raw"), [ ["/newton/raw", "RawData"] ])
|
|
|
|
# Verify that columns were made right (pytables specific)
|
|
if "h5file" in db.data.__dict__:
|
|
h5file = db.data.h5file
|
|
eq_(len(h5file.getNode("/newton/prep").cols), 9)
|
|
eq_(len(h5file.getNode("/newton/raw").cols), 7)
|
|
eq_(len(h5file.getNode("/newton/zzz/rawnotch").cols), 10)
|
|
assert(not h5file.getNode("/newton/prep").colindexed["timestamp"])
|
|
assert(not h5file.getNode("/newton/prep").colindexed["c1"])
|
|
|
|
# Set / get metadata
|
|
eq_(db.stream_get_metadata("/newton/prep"), {})
|
|
eq_(db.stream_get_metadata("/newton/raw"), {})
|
|
meta1 = { "description": "The Data",
|
|
"v_scale": "1.234" }
|
|
meta2 = { "description": "The Data" }
|
|
meta3 = { "v_scale": "1.234" }
|
|
db.stream_set_metadata("/newton/prep", meta1)
|
|
db.stream_update_metadata("/newton/prep", {})
|
|
db.stream_update_metadata("/newton/raw", meta2)
|
|
db.stream_update_metadata("/newton/raw", meta3)
|
|
eq_(db.stream_get_metadata("/newton/prep"), meta1)
|
|
eq_(db.stream_get_metadata("/newton/raw"), meta1)
|
|
|
|
# fill in some test coverage for start >= end
|
|
with assert_raises(nilmdb.server.NilmDBError):
|
|
db.stream_remove("/newton/prep", 0, 0)
|
|
with assert_raises(nilmdb.server.NilmDBError):
|
|
db.stream_remove("/newton/prep", 1, 0)
|
|
db.stream_remove("/newton/prep", 0, 1)
|
|
|
|
db.close()
|
|
|
|
class TestBlockingServer(object):
|
|
def setUp(self):
|
|
self.db = serializer_proxy(nilmdb.NilmDB)(testdb, sync=False)
|
|
|
|
def tearDown(self):
|
|
self.db.close()
|
|
|
|
def test_blocking_server(self):
|
|
# Server should fail if the database doesn't have a "_thread_safe"
|
|
# property.
|
|
with assert_raises(KeyError):
|
|
nilmdb.Server(object())
|
|
|
|
# Start web app on a custom port
|
|
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
|
|
port = 12380, stoppable = True)
|
|
|
|
# Run it
|
|
event = threading.Event()
|
|
def run_server():
|
|
self.server.start(blocking = True, event = event)
|
|
thread = threading.Thread(target = run_server)
|
|
thread.start()
|
|
if not event.wait(timeout = 10):
|
|
raise AssertionError("server didn't start in 10 seconds")
|
|
|
|
# Send request to exit.
|
|
req = urlopen("http://127.0.0.1:12380/exit/", timeout = 1)
|
|
|
|
# Wait for it
|
|
thread.join()
|
|
|
|
def geturl(path):
|
|
req = urlopen("http://127.0.0.1:12380" + path, timeout = 10)
|
|
return req.read()
|
|
|
|
def getjson(path):
|
|
return json.loads(geturl(path))
|
|
|
|
class TestServer(object):
|
|
|
|
def setUp(self):
|
|
# Start web app on a custom port
|
|
self.db = serializer_proxy(nilmdb.NilmDB)(testdb, sync=False)
|
|
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
|
|
port = 12380, stoppable = False)
|
|
self.server.start(blocking = False)
|
|
|
|
def tearDown(self):
|
|
# Close web app
|
|
self.server.stop()
|
|
self.db.close()
|
|
|
|
def test_server(self):
|
|
# Make sure we can't force an exit, and test other 404 errors
|
|
for url in [ "/exit", "/", "/favicon.ico" ]:
|
|
with assert_raises(HTTPError) as e:
|
|
geturl(url)
|
|
eq_(e.exception.code, 404)
|
|
|
|
# Check version
|
|
eq_(distutils.version.LooseVersion(getjson("/version")),
|
|
distutils.version.LooseVersion(nilmdb.__version__))
|
|
|
|
def test_stream_list(self):
|
|
# Known streams that got populated by an earlier test (test_nilmdb)
|
|
streams = getjson("/stream/list")
|
|
|
|
eq_(streams, [
|
|
['/newton/prep', 'PrepData'],
|
|
['/newton/raw', 'RawData'],
|
|
['/newton/zzz/rawnotch', 'RawNotchedData'],
|
|
])
|
|
|
|
streams = getjson("/stream/list?layout=RawData")
|
|
eq_(streams, [['/newton/raw', 'RawData']])
|
|
|
|
streams = getjson("/stream/list?layout=NoSuchLayout")
|
|
eq_(streams, [])
|
|
|
|
|
|
def test_stream_metadata(self):
|
|
with assert_raises(HTTPError) as e:
|
|
getjson("/stream/get_metadata?path=foo")
|
|
eq_(e.exception.code, 404)
|
|
|
|
data = getjson("/stream/get_metadata?path=/newton/prep")
|
|
eq_(data, {'description': 'The Data', 'v_scale': '1.234'})
|
|
|
|
data = getjson("/stream/get_metadata?path=/newton/prep"
|
|
"&key=v_scale")
|
|
eq_(data, {'v_scale': '1.234'})
|
|
|
|
data = getjson("/stream/get_metadata?path=/newton/prep"
|
|
"&key=v_scale&key=description")
|
|
eq_(data, {'description': 'The Data', 'v_scale': '1.234'})
|
|
|
|
data = getjson("/stream/get_metadata?path=/newton/prep"
|
|
"&key=v_scale&key=foo")
|
|
eq_(data, {'foo': None, 'v_scale': '1.234'})
|
|
|
|
data = getjson("/stream/get_metadata?path=/newton/prep"
|
|
"&key=foo")
|
|
eq_(data, {'foo': None})
|