257 lines
9.2 KiB
Python
257 lines
9.2 KiB
Python
import nilmdb.server
|
|
|
|
from nose.tools import *
|
|
from nose.tools import assert_raises
|
|
import distutils.version
|
|
import simplejson as json
|
|
import itertools
|
|
import os
|
|
import sys
|
|
import threading
|
|
import urllib2
|
|
from urllib2 import urlopen, HTTPError
|
|
import cStringIO
|
|
import time
|
|
import requests
|
|
|
|
from nilmdb.utils import serializer_proxy
|
|
|
|
testdb = "tests/testdb"
|
|
|
|
#@atexit.register
|
|
#def cleanup():
|
|
# os.unlink(testdb)
|
|
|
|
from testutil.helpers import *
|
|
|
|
class Test00Nilmdb(object): # named 00 so it runs first
|
|
def test_NilmDB(self):
|
|
recursive_unlink(testdb)
|
|
|
|
db = nilmdb.server.NilmDB(testdb)
|
|
db.close()
|
|
db = nilmdb.server.NilmDB(testdb)
|
|
db.close()
|
|
|
|
# test timer, just to get coverage
|
|
capture = cStringIO.StringIO()
|
|
old = sys.stdout
|
|
sys.stdout = capture
|
|
with nilmdb.utils.Timer("test"):
|
|
time.sleep(0.01)
|
|
sys.stdout = old
|
|
in_("test: ", capture.getvalue())
|
|
|
|
def test_stream(self):
|
|
db = nilmdb.server.NilmDB(testdb)
|
|
eq_(db.stream_list(), [])
|
|
|
|
# Bad path
|
|
with assert_raises(ValueError):
|
|
db.stream_create("foo/bar/baz", "float32_8")
|
|
with assert_raises(ValueError):
|
|
db.stream_create("/foo", "float32_8")
|
|
# Bad layout type
|
|
with assert_raises(ValueError):
|
|
db.stream_create("/newton/prep", "NoSuchLayout")
|
|
db.stream_create("/newton/prep", "float32_8")
|
|
db.stream_create("/newton/raw", "uint16_6")
|
|
db.stream_create("/newton/zzz/rawnotch", "uint16_9")
|
|
|
|
# Verify we got 3 streams
|
|
eq_(db.stream_list(), [ ["/newton/prep", "float32_8"],
|
|
["/newton/raw", "uint16_6"],
|
|
["/newton/zzz/rawnotch", "uint16_9"]
|
|
])
|
|
# Match just one type or one path
|
|
eq_(db.stream_list(layout="uint16_6"), [ ["/newton/raw", "uint16_6"] ])
|
|
eq_(db.stream_list(path="/newton/raw"), [ ["/newton/raw", "uint16_6"] ])
|
|
|
|
# Verify that columns were made right (pytables specific)
|
|
if "h5file" in db.data.__dict__:
|
|
h5file = db.data.h5file
|
|
eq_(len(h5file.getNode("/newton/prep").cols), 9)
|
|
eq_(len(h5file.getNode("/newton/raw").cols), 7)
|
|
eq_(len(h5file.getNode("/newton/zzz/rawnotch").cols), 10)
|
|
assert(not h5file.getNode("/newton/prep").colindexed["timestamp"])
|
|
assert(not h5file.getNode("/newton/prep").colindexed["c1"])
|
|
|
|
# Set / get metadata
|
|
eq_(db.stream_get_metadata("/newton/prep"), {})
|
|
eq_(db.stream_get_metadata("/newton/raw"), {})
|
|
meta1 = { "description": "The Data",
|
|
"v_scale": "1.234" }
|
|
meta2 = { "description": "The Data" }
|
|
meta3 = { "v_scale": "1.234" }
|
|
db.stream_set_metadata("/newton/prep", meta1)
|
|
db.stream_update_metadata("/newton/prep", {})
|
|
db.stream_update_metadata("/newton/raw", meta2)
|
|
db.stream_update_metadata("/newton/raw", meta3)
|
|
eq_(db.stream_get_metadata("/newton/prep"), meta1)
|
|
eq_(db.stream_get_metadata("/newton/raw"), meta1)
|
|
|
|
# fill in some misc. test coverage
|
|
with assert_raises(nilmdb.server.NilmDBError):
|
|
db.stream_remove("/newton/prep", 0, 0)
|
|
with assert_raises(nilmdb.server.NilmDBError):
|
|
db.stream_remove("/newton/prep", 1, 0)
|
|
db.stream_remove("/newton/prep", 0, 1)
|
|
|
|
with assert_raises(nilmdb.server.NilmDBError):
|
|
db.stream_extract("/newton/prep", count = True, binary = True)
|
|
|
|
db.close()
|
|
|
|
class TestBlockingServer(object):
|
|
def setUp(self):
|
|
self.db = serializer_proxy(nilmdb.server.NilmDB)(testdb)
|
|
|
|
def tearDown(self):
|
|
self.db.close()
|
|
|
|
def test_blocking_server(self):
|
|
# Server should fail if the database doesn't have a "_thread_safe"
|
|
# property.
|
|
with assert_raises(KeyError):
|
|
nilmdb.server.Server(object())
|
|
|
|
# Start web app on a custom port
|
|
self.server = nilmdb.server.Server(self.db, host = "127.0.0.1",
|
|
port = 32180, stoppable = True)
|
|
|
|
# Run it
|
|
event = threading.Event()
|
|
def run_server():
|
|
self.server.start(blocking = True, event = event)
|
|
thread = threading.Thread(target = run_server)
|
|
thread.start()
|
|
if not event.wait(timeout = 10):
|
|
raise AssertionError("server didn't start in 10 seconds")
|
|
|
|
# Send request to exit.
|
|
req = urlopen("http://127.0.0.1:32180/exit/", timeout = 1)
|
|
|
|
# Wait for it
|
|
thread.join()
|
|
|
|
def geturl(path):
|
|
req = urlopen("http://127.0.0.1:32180" + path, timeout = 10)
|
|
return req.read()
|
|
|
|
def getjson(path):
|
|
return json.loads(geturl(path))
|
|
|
|
class TestServer(object):
|
|
|
|
def setUp(self):
|
|
# Start web app on a custom port
|
|
self.db = serializer_proxy(nilmdb.server.NilmDB)(testdb)
|
|
self.server = nilmdb.server.Server(self.db, host = "127.0.0.1",
|
|
port = 32180, stoppable = False)
|
|
self.server.start(blocking = False)
|
|
|
|
def tearDown(self):
|
|
# Close web app
|
|
self.server.stop()
|
|
self.db.close()
|
|
|
|
def test_server(self):
|
|
# Make sure we can't force an exit, and test other 404 errors
|
|
for url in [ "/exit", "/", "/favicon.ico" ]:
|
|
with assert_raises(HTTPError) as e:
|
|
geturl(url)
|
|
eq_(e.exception.code, 404)
|
|
|
|
# Check version
|
|
eq_(distutils.version.LooseVersion(getjson("/version")),
|
|
distutils.version.LooseVersion(nilmdb.__version__))
|
|
|
|
def test_stream_list(self):
|
|
# Known streams that got populated by an earlier test (test_nilmdb)
|
|
streams = getjson("/stream/list")
|
|
|
|
eq_(streams, [
|
|
['/newton/prep', 'float32_8'],
|
|
['/newton/raw', 'uint16_6'],
|
|
['/newton/zzz/rawnotch', 'uint16_9'],
|
|
])
|
|
|
|
streams = getjson("/stream/list?layout=uint16_6")
|
|
eq_(streams, [['/newton/raw', 'uint16_6']])
|
|
|
|
streams = getjson("/stream/list?layout=NoSuchLayout")
|
|
eq_(streams, [])
|
|
|
|
|
|
def test_stream_metadata(self):
|
|
with assert_raises(HTTPError) as e:
|
|
getjson("/stream/get_metadata?path=foo")
|
|
eq_(e.exception.code, 404)
|
|
|
|
data = getjson("/stream/get_metadata?path=/newton/prep")
|
|
eq_(data, {'description': 'The Data', 'v_scale': '1.234'})
|
|
|
|
data = getjson("/stream/get_metadata?path=/newton/prep"
|
|
"&key=v_scale")
|
|
eq_(data, {'v_scale': '1.234'})
|
|
|
|
data = getjson("/stream/get_metadata?path=/newton/prep"
|
|
"&key=v_scale&key=description")
|
|
eq_(data, {'description': 'The Data', 'v_scale': '1.234'})
|
|
|
|
data = getjson("/stream/get_metadata?path=/newton/prep"
|
|
"&key=v_scale&key=foo")
|
|
eq_(data, {'foo': None, 'v_scale': '1.234'})
|
|
|
|
data = getjson("/stream/get_metadata?path=/newton/prep"
|
|
"&key=foo")
|
|
eq_(data, {'foo': None})
|
|
|
|
def test_cors_headers(self):
|
|
# Test that CORS headers are being set correctly
|
|
|
|
# Normal GET should send simple response
|
|
url = "http://127.0.0.1:32180/stream/list"
|
|
r = requests.get(url, headers = { "Origin": "http://google.com/" })
|
|
eq_(r.status_code, 200)
|
|
if "access-control-allow-origin" not in r.headers:
|
|
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
|
|
"header in response:\n", r.headers)
|
|
eq_(r.headers["access-control-allow-origin"], "http://google.com/")
|
|
|
|
# OPTIONS without CORS preflight headers should result in 405
|
|
r = requests.options(url, headers = {
|
|
"Origin": "http://google.com/",
|
|
})
|
|
eq_(r.status_code, 405)
|
|
|
|
# OPTIONS with preflight headers should give preflight response
|
|
r = requests.options(url, headers = {
|
|
"Origin": "http://google.com/",
|
|
"Access-Control-Request-Method": "POST",
|
|
"Access-Control-Request-Headers": "X-Custom",
|
|
})
|
|
eq_(r.status_code, 200)
|
|
if "access-control-allow-origin" not in r.headers:
|
|
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
|
|
"header in response:\n", r.headers)
|
|
eq_(r.headers["access-control-allow-methods"], "GET, HEAD")
|
|
eq_(r.headers["access-control-allow-headers"], "X-Custom")
|
|
|
|
def test_post_bodies(self):
|
|
# Test JSON post bodies
|
|
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
|
|
headers = { "Content-Type": "application/json" },
|
|
data = '{"hello": 1}')
|
|
eq_(r.status_code, 404) # wrong parameters
|
|
|
|
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
|
|
headers = { "Content-Type": "application/json" },
|
|
data = '["hello"]')
|
|
eq_(r.status_code, 415) # not a dict
|
|
|
|
r = requests.post("http://127.0.0.1:32180/stream/set_metadata",
|
|
headers = { "Content-Type": "application/json" },
|
|
data = '[hello]')
|
|
eq_(r.status_code, 400) # badly formatted JSON
|