import nilmdb.server from nose.tools import * from nose.tools import assert_raises import distutils.version import simplejson as json import itertools import os import sys import threading import urllib2 from urllib2 import urlopen, HTTPError import cStringIO import time import requests from nilmdb.utils import serializer_proxy testdb = "tests/testdb" #@atexit.register #def cleanup(): # os.unlink(testdb) from testutil.helpers import * class Test00Nilmdb(object): # named 00 so it runs first def test_NilmDB(self): recursive_unlink(testdb) db = nilmdb.server.NilmDB(testdb) db.close() db = nilmdb.server.NilmDB(testdb) db.close() # test timer, just to get coverage capture = cStringIO.StringIO() old = sys.stdout sys.stdout = capture with nilmdb.utils.Timer("test"): time.sleep(0.01) sys.stdout = old in_("test: ", capture.getvalue()) def test_stream(self): db = nilmdb.server.NilmDB(testdb) eq_(db.stream_list(), []) # Bad path with assert_raises(ValueError): db.stream_create("foo/bar/baz", "float32_8") with assert_raises(ValueError): db.stream_create("/foo", "float32_8") # Bad layout type with assert_raises(ValueError): db.stream_create("/newton/prep", "NoSuchLayout") db.stream_create("/newton/prep", "float32_8") db.stream_create("/newton/raw", "uint16_6") db.stream_create("/newton/zzz/rawnotch", "uint16_9") # Verify we got 3 streams eq_(db.stream_list(), [ ["/newton/prep", "float32_8"], ["/newton/raw", "uint16_6"], ["/newton/zzz/rawnotch", "uint16_9"] ]) # Match just one type or one path eq_(db.stream_list(layout="uint16_6"), [ ["/newton/raw", "uint16_6"] ]) eq_(db.stream_list(path="/newton/raw"), [ ["/newton/raw", "uint16_6"] ]) # Verify that columns were made right (pytables specific) if "h5file" in db.data.__dict__: h5file = db.data.h5file eq_(len(h5file.getNode("/newton/prep").cols), 9) eq_(len(h5file.getNode("/newton/raw").cols), 7) eq_(len(h5file.getNode("/newton/zzz/rawnotch").cols), 10) assert(not h5file.getNode("/newton/prep").colindexed["timestamp"]) assert(not h5file.getNode("/newton/prep").colindexed["c1"]) # Set / get metadata eq_(db.stream_get_metadata("/newton/prep"), {}) eq_(db.stream_get_metadata("/newton/raw"), {}) meta1 = { "description": "The Data", "v_scale": "1.234" } meta2 = { "description": "The Data" } meta3 = { "v_scale": "1.234" } db.stream_set_metadata("/newton/prep", meta1) db.stream_update_metadata("/newton/prep", {}) db.stream_update_metadata("/newton/raw", meta2) db.stream_update_metadata("/newton/raw", meta3) eq_(db.stream_get_metadata("/newton/prep"), meta1) eq_(db.stream_get_metadata("/newton/raw"), meta1) # fill in some misc. test coverage with assert_raises(nilmdb.server.NilmDBError): db.stream_remove("/newton/prep", 0, 0) with assert_raises(nilmdb.server.NilmDBError): db.stream_remove("/newton/prep", 1, 0) db.stream_remove("/newton/prep", 0, 1) with assert_raises(nilmdb.server.NilmDBError): db.stream_extract("/newton/prep", count = True, binary = True) db.close() class TestBlockingServer(object): def setUp(self): self.db = serializer_proxy(nilmdb.server.NilmDB)(testdb) def tearDown(self): self.db.close() def test_blocking_server(self): # Server should fail if the database doesn't have a "_thread_safe" # property. with assert_raises(KeyError): nilmdb.server.Server(object()) # Start web app on a custom port self.server = nilmdb.server.Server(self.db, host = "127.0.0.1", port = 32180, stoppable = True) # Run it event = threading.Event() def run_server(): self.server.start(blocking = True, event = event) thread = threading.Thread(target = run_server) thread.start() if not event.wait(timeout = 10): raise AssertionError("server didn't start in 10 seconds") # Send request to exit. req = urlopen("http://127.0.0.1:32180/exit/", timeout = 1) # Wait for it thread.join() def geturl(path): req = urlopen("http://127.0.0.1:32180" + path, timeout = 10) return req.read() def getjson(path): return json.loads(geturl(path)) class TestServer(object): def setUp(self): # Start web app on a custom port self.db = serializer_proxy(nilmdb.server.NilmDB)(testdb) self.server = nilmdb.server.Server(self.db, host = "127.0.0.1", port = 32180, stoppable = False) self.server.start(blocking = False) def tearDown(self): # Close web app self.server.stop() self.db.close() def test_server(self): # Make sure we can't force an exit, and test other 404 errors for url in [ "/exit", "/", "/favicon.ico" ]: with assert_raises(HTTPError) as e: geturl(url) eq_(e.exception.code, 404) # Check version eq_(distutils.version.LooseVersion(getjson("/version")), distutils.version.LooseVersion(nilmdb.__version__)) def test_stream_list(self): # Known streams that got populated by an earlier test (test_nilmdb) streams = getjson("/stream/list") eq_(streams, [ ['/newton/prep', 'float32_8'], ['/newton/raw', 'uint16_6'], ['/newton/zzz/rawnotch', 'uint16_9'], ]) streams = getjson("/stream/list?layout=uint16_6") eq_(streams, [['/newton/raw', 'uint16_6']]) streams = getjson("/stream/list?layout=NoSuchLayout") eq_(streams, []) def test_stream_metadata(self): with assert_raises(HTTPError) as e: getjson("/stream/get_metadata?path=foo") eq_(e.exception.code, 404) data = getjson("/stream/get_metadata?path=/newton/prep") eq_(data, {'description': 'The Data', 'v_scale': '1.234'}) data = getjson("/stream/get_metadata?path=/newton/prep" "&key=v_scale") eq_(data, {'v_scale': '1.234'}) data = getjson("/stream/get_metadata?path=/newton/prep" "&key=v_scale&key=description") eq_(data, {'description': 'The Data', 'v_scale': '1.234'}) data = getjson("/stream/get_metadata?path=/newton/prep" "&key=v_scale&key=foo") eq_(data, {'foo': None, 'v_scale': '1.234'}) data = getjson("/stream/get_metadata?path=/newton/prep" "&key=foo") eq_(data, {'foo': None}) def test_cors_headers(self): # Test that CORS headers are being set correctly # Normal GET should send simple response url = "http://127.0.0.1:32180/stream/list" r = requests.get(url, headers = { "Origin": "http://google.com/" }) eq_(r.status_code, 200) if "access-control-allow-origin" not in r.headers: raise AssertionError("No Access-Control-Allow-Origin (CORS) " "header in response:\n", r.headers) eq_(r.headers["access-control-allow-origin"], "http://google.com/") # OPTIONS without CORS preflight headers should result in 405 r = requests.options(url, headers = { "Origin": "http://google.com/", }) eq_(r.status_code, 405) # OPTIONS with preflight headers should give preflight response r = requests.options(url, headers = { "Origin": "http://google.com/", "Access-Control-Request-Method": "POST", "Access-Control-Request-Headers": "X-Custom", }) eq_(r.status_code, 200) if "access-control-allow-origin" not in r.headers: raise AssertionError("No Access-Control-Allow-Origin (CORS) " "header in response:\n", r.headers) eq_(r.headers["access-control-allow-methods"], "GET, HEAD") eq_(r.headers["access-control-allow-headers"], "X-Custom") def test_post_bodies(self): # Test JSON post bodies r = requests.post("http://127.0.0.1:32180/stream/set_metadata", headers = { "Content-Type": "application/json" }, data = '{"hello": 1}') eq_(r.status_code, 404) # wrong parameters r = requests.post("http://127.0.0.1:32180/stream/set_metadata", headers = { "Content-Type": "application/json" }, data = '["hello"]') eq_(r.status_code, 415) # not a dict r = requests.post("http://127.0.0.1:32180/stream/set_metadata", headers = { "Content-Type": "application/json" }, data = '[hello]') eq_(r.status_code, 400) # badly formatted JSON