layouts, rather than direct dict objects. This allows us to go backwards and match the description of tables in the database with the layouts defined within nilmdb.layout. Fix some big issues with the nilmdb.server and startup errors like an invalid port number. We have to catch os._exit and report the error ourselves. Add some timeouts and otherwise clean up tests. Make NilmDB track and complain about files that weren't closed before the object is destroyed. Update stream_list and /stream/list to return the layout corresponding to each table. git-svn-id: https://bucket.mit.edu/svn/nilm/nilmdb-new@10338 ddd99763-3ecb-0310-9145-efcb8ce7c51ftags/bxinterval-last
@@ -1,34 +1,58 @@ | |||
import tables | |||
# Table description for typical prep output | |||
PrepData = { | |||
'timestamp': tables.Int64Col(pos=1), | |||
'p1': tables.Float32Col(pos=2), | |||
'q1': tables.Float32Col(pos=3), | |||
'p3': tables.Float32Col(pos=4), | |||
'q3': tables.Float32Col(pos=5), | |||
'p5': tables.Float32Col(pos=6), | |||
'q5': tables.Float32Col(pos=7), | |||
'p7': tables.Float32Col(pos=8), | |||
'q7': tables.Float32Col(pos=9), | |||
} | |||
_layout_desc = { | |||
# Typical prep output | |||
"PrepData": { | |||
'timestamp': tables.Int64Col(pos=1), | |||
'p1': tables.Float32Col(pos=2), | |||
'q1': tables.Float32Col(pos=3), | |||
'p3': tables.Float32Col(pos=4), | |||
'q3': tables.Float32Col(pos=5), | |||
'p5': tables.Float32Col(pos=6), | |||
'q5': tables.Float32Col(pos=7), | |||
'p7': tables.Float32Col(pos=8), | |||
'q7': tables.Float32Col(pos=9), | |||
}, | |||
# Raw data | |||
"RawData": { | |||
'timestamp': tables.Int64Col(pos=1), | |||
'va': tables.UInt16Col(pos=2), | |||
'vb': tables.UInt16Col(pos=3), | |||
'vc': tables.UInt16Col(pos=4), | |||
'ia': tables.UInt16Col(pos=5), | |||
'ib': tables.UInt16Col(pos=6), | |||
'ic': tables.UInt16Col(pos=7), | |||
}, | |||
# Table description for raw data | |||
RawData = { | |||
'timestamp': tables.Int64Col(pos=1), | |||
'va': tables.UInt16Col(pos=2), | |||
'vb': tables.UInt16Col(pos=3), | |||
'vc': tables.UInt16Col(pos=4), | |||
'ia': tables.UInt16Col(pos=5), | |||
'ib': tables.UInt16Col(pos=6), | |||
'ic': tables.UInt16Col(pos=7), | |||
} | |||
# Raw data plus 60 Hz notched current | |||
"RawNotchedData": { | |||
'timestamp': tables.Int64Col(pos=1), | |||
'va': tables.UInt16Col(pos=2), | |||
'vb': tables.UInt16Col(pos=3), | |||
'vc': tables.UInt16Col(pos=4), | |||
'ia': tables.UInt16Col(pos=5), | |||
'ib': tables.UInt16Col(pos=6), | |||
'ic': tables.UInt16Col(pos=7), | |||
'notch_ia': tables.UInt16Col(pos=8), | |||
'notch_ib': tables.UInt16Col(pos=9), | |||
'notch_ic': tables.UInt16Col(pos=10), | |||
}, | |||
} | |||
# Table description for raw data plus 60 Hz notched current | |||
RawNotchedData = dict(RawData) | |||
RawNotchedData.update({ | |||
'notch_ia': tables.UInt16Col(pos=8), | |||
'notch_ib': tables.UInt16Col(pos=9), | |||
'notch_ic': tables.UInt16Col(pos=10), | |||
}) | |||
def layout_to_desc(layout): | |||
"""Return a tables.Description corresponding to the given layout string""" | |||
return tables.Description(_layout_desc[layout]) | |||
def desc_to_layout(match_desc): | |||
""" | |||
Match a tables.Description to our fixed list of layouts, and | |||
return a string reprensenting the one that matched, or None | |||
if nothing matches. | |||
This isn't very efficient or even very good. | |||
""" | |||
for layout in _layout_desc.keys(): | |||
if repr(layout_to_desc(layout)) == repr(match_desc): | |||
return layout | |||
return None |
@@ -2,26 +2,48 @@ | |||
Object that represents a NILM database file""" | |||
# Need absolute_import so that "import nilmdb" won't pull in nilmdb.py, | |||
# but will pull the nilmdb module instead. | |||
from __future__ import absolute_import | |||
import nilmdb | |||
from nilmdb.printf import * | |||
import tables | |||
import time | |||
import sys | |||
class NilmDB(object): | |||
def __init__(self, filename): | |||
# Open or create the file | |||
self.filename = filename | |||
self.h5file = tables.openFile(filename, "a", "NILM Database") | |||
self.opened = True | |||
def __del__(self): | |||
if "opened" in self.__dict__: # pragma: no cover | |||
fprintf(sys.stderr, | |||
"error: NilmDB.close() wasn't called, file %s", | |||
self.filename) | |||
def close(self): | |||
self.h5file.close() | |||
del self.opened | |||
def stream_list(self): | |||
"""Return list of paths to all Tables in the database""" | |||
iterator = self.h5file.walkNodes('/', 'Table') | |||
paths = [ x._v_pathname for x in iterator ] | |||
def stream_list(self, layout = None): | |||
"""Return list of (path, layout) to all Tables in the | |||
database that match the given layout, or all Tables if | |||
layout is None""" | |||
paths = [] | |||
for node in self.h5file.walkNodes('/', 'Table'): | |||
path = node._v_pathname | |||
this_layout = nilmdb.layout.desc_to_layout(node.description) | |||
if layout is None or layout == this_layout: | |||
paths.append((path, this_layout)) | |||
return sorted(paths) | |||
def stream_create(self, path, cls, index = None): | |||
def stream_create(self, path, layout_name, index = None): | |||
"""Create a table at the given path, with the contents | |||
matching the given class description (e.g. nilmdb.PrepData). | |||
matching the given layout_name (e.g. 'PrepData'). | |||
Columns listed in 'index' are marked as indices. If index = | |||
None, the 'timestamp' column is indexed if it exists. Pass | |||
an empty list to prevent indexing""" | |||
@@ -38,7 +60,10 @@ class NilmDB(object): | |||
self.h5file.createGroup(parent, child) | |||
except tables.NodeError: | |||
pass | |||
table = self.h5file.createTable(group, node, cls) | |||
# Create the table | |||
desc = nilmdb.layout.layout_to_desc(layout_name) | |||
table = self.h5file.createTable(group, node, desc) | |||
# Create indices | |||
try: | |||
@@ -5,8 +5,12 @@ | |||
from __future__ import absolute_import | |||
import nilmdb | |||
from nilmdb.printf import * | |||
import cherrypy | |||
import sys | |||
import os | |||
import traceback | |||
try: | |||
import cherrypy | |||
@@ -26,14 +30,17 @@ class Root(NilmApp): | |||
super(Root, self).__init__(db) | |||
self.server_version = version | |||
# / | |||
@cherrypy.expose | |||
def index(self): | |||
raise cherrypy.NotFound() | |||
# /favicon.ico | |||
@cherrypy.expose | |||
def favicon_ico(self): | |||
raise cherrypy.NotFound() | |||
# /version | |||
@cherrypy.expose | |||
@cherrypy.tools.json_out() | |||
def version(self): | |||
@@ -42,10 +49,12 @@ class Root(NilmApp): | |||
class Stream(NilmApp): | |||
"""Stream-specific operations""" | |||
# /stream/list | |||
# /stream/list?layout=PrepData | |||
@cherrypy.expose | |||
@cherrypy.tools.json_out() | |||
def list(self): | |||
return self.db.stream_list() | |||
def list(self, layout = None): | |||
return self.db.stream_list(layout) | |||
class Exiter(object): | |||
"""App that exits the server, for testing""" | |||
@@ -61,9 +70,9 @@ class Exiter(object): | |||
class Server(object): | |||
version = "1.0" | |||
def __init__(self, filename, host = '127.0.0.1', port = 8080, stoppable = False): | |||
def __init__(self, db, host = '127.0.0.1', port = 8080, stoppable = False): | |||
# Need to wrap DB object in a serializer because we'll call into it from separate threads. | |||
self.db = nilmdb.serializer.WrapObject(nilmdb.NilmDB(filename)) | |||
self.db = nilmdb.serializer.WrapObject(db) | |||
cherrypy.config.update({ | |||
'server.socket_host': host, | |||
'server.socket_port': port, | |||
@@ -77,7 +86,21 @@ class Server(object): | |||
cherrypy.tree.mount(Exiter(), "/exit") | |||
def start(self, blocking = False, event = None): | |||
# Cherrypy stupidly calls os._exit(70) when it can't bind the | |||
# port. At least try to print a reasonable error and continue | |||
# in this case, rather than just dying silently (as we would | |||
# otherwise do in embedded mode) | |||
real_exit = os._exit | |||
def fake_exit(code): # pragma: no cover | |||
if code == os.EX_SOFTWARE: | |||
fprintf(sys.stderr, "error: CherryPy called os._exit!\n") | |||
else: | |||
real_exit(code) | |||
os._exit = fake_exit | |||
cherrypy.engine.start() | |||
os._exit = real_exit | |||
if event is not None: | |||
event.set() | |||
if blocking: | |||
@@ -86,4 +109,3 @@ class Server(object): | |||
def stop(self): | |||
cherrypy.engine.exit() | |||
self.db.close() |
@@ -1,53 +0,0 @@ | |||
import sys | |||
import tables | |||
import nilmdb | |||
try: | |||
import cherrypy | |||
cherrypy.tools.json_out | |||
except: | |||
sys.stderr.write("Cherrypy 3.2+ required\n") | |||
sys.exit(1) | |||
class NilmApp: | |||
def __init__(self, db): | |||
self.db = db | |||
class Root(NilmApp): | |||
"""NILM Database""" | |||
server_version = "1.0" | |||
@cherrypy.expose | |||
def index(self): | |||
raise cherrypy.NotFound() | |||
@cherrypy.expose | |||
def favicon_ico(self): | |||
raise cherrypy.NotFound() | |||
@cherrypy.expose | |||
@cherrypy.tools.json_out() | |||
def version(self): | |||
return self.server_version | |||
class Stream(NilmApp): | |||
"""Stream operations""" | |||
@cherrypy.expose | |||
@cherrypy.tools.json_out() | |||
def list(self): | |||
return | |||
cherrypy.config.update({ | |||
'server.socket_host': '127.0.0.1', | |||
'server.socket_port': 8080 | |||
}) | |||
db = nilmdb.nilmdb() | |||
cherrypy.tree.mount(Root(db), "/") | |||
cherrypy.tree.mount(Stream(db), "/stream") | |||
if __name__ == "__main__": | |||
cherrypy.engine.start() | |||
cherrypy.engine.block() |
@@ -10,6 +10,7 @@ import sys | |||
import cherrypy | |||
import threading | |||
import urllib2 | |||
import Queue | |||
testdb = "tests/test.db" | |||
@@ -17,7 +18,7 @@ testdb = "tests/test.db" | |||
#def cleanup(): | |||
# os.unlink(testdb) | |||
class TestNilmdb(object): | |||
class Test00Nilmdb(object): # named 00 so it runs first | |||
def test_NilmDB(self): | |||
try: | |||
os.unlink(testdb) | |||
@@ -34,52 +35,74 @@ class TestNilmdb(object): | |||
def test_stream(self): | |||
db = nilmdb.NilmDB(testdb) | |||
assert(db.stream_list() == []) | |||
eq_(db.stream_list(), []) | |||
# Bad path | |||
with assert_raises(ValueError): | |||
db.stream_create("/foo", nilmdb.layout.PrepData) | |||
db.stream_create("/foo", "PrepData") | |||
# Bad layout type | |||
with assert_raises(KeyError): | |||
db.stream_create("/newton/prep", "NoSuchLayout") | |||
# Bad index columns | |||
with assert_raises(KeyError): | |||
db.stream_create("/newton/prep", nilmdb.layout.PrepData, ["nonexistant"]) | |||
db.stream_create("/newton/prep", nilmdb.layout.PrepData) | |||
db.stream_create("/newton/raw", nilmdb.layout.RawData) | |||
db.stream_create("/newton/zzz/rawnotch", nilmdb.layout.RawNotchedData) | |||
db.stream_create("/newton/prep", "PrepData", ["nonexistant"]) | |||
db.stream_create("/newton/prep", "PrepData") | |||
db.stream_create("/newton/raw", "RawData") | |||
db.stream_create("/newton/zzz/rawnotch", "RawNotchedData") | |||
# Verify we got 3 streams | |||
assert(db.stream_list() == [ "/newton/prep", | |||
"/newton/raw", | |||
"/newton/zzz/rawnotch" ]) | |||
eq_(db.stream_list(), [ ("/newton/prep", "PrepData"), | |||
("/newton/raw", "RawData"), | |||
("/newton/zzz/rawnotch", "RawNotchedData") | |||
]) | |||
# Match just one type | |||
eq_(db.stream_list("RawData"), [ ("/newton/raw", "RawData") ]) | |||
# Verify returned types if a layout is missing | |||
save = nilmdb.layout._layout_desc.copy() | |||
del nilmdb.layout._layout_desc["RawData"] | |||
eq_(db.stream_list(), [ ("/newton/prep", "PrepData"), | |||
("/newton/raw", None), | |||
("/newton/zzz/rawnotch", "RawNotchedData") | |||
]) | |||
nilmdb.layout._layout_desc = save | |||
# Verify that columns were made right | |||
assert(len(db.h5file.getNode("/newton/prep").cols) == 9) | |||
assert(len(db.h5file.getNode("/newton/raw").cols) == 7) | |||
assert(len(db.h5file.getNode("/newton/zzz/rawnotch").cols) == 10) | |||
eq_(len(db.h5file.getNode("/newton/prep").cols), 9) | |||
eq_(len(db.h5file.getNode("/newton/raw").cols), 7) | |||
eq_(len(db.h5file.getNode("/newton/zzz/rawnotch").cols), 10) | |||
assert(db.h5file.getNode("/newton/prep").colindexed["timestamp"]) | |||
assert(not db.h5file.getNode("/newton/prep").colindexed["p1"]) | |||
db.close() | |||
class TestBlockingServer(object): | |||
def setUp(self): | |||
self.db = nilmdb.NilmDB(testdb) | |||
def tearDown(self): | |||
self.db.close() | |||
def test_blocking_server(self): | |||
# Start web app on a custom port | |||
self.server = nilmdb.Server(testdb, host = "127.0.0.1", | |||
self.server = nilmdb.Server(self.db, host = "127.0.0.1", | |||
port = 12380, stoppable = True) | |||
# Run it | |||
event = threading.Event() | |||
def run_server(): | |||
self.server.start(blocking = True, event = event) | |||
thread = threading.Thread(target = run_server) | |||
thread.start() | |||
event.wait() | |||
event.wait(timeout = 2) | |||
# Send request to exit. | |||
req = urllib2.urlopen("http://127.0.0.1:12380/exit/") | |||
req = urllib2.urlopen("http://127.0.0.1:12380/exit/", timeout = 1) | |||
# Wait for it | |||
thread.join() | |||
def geturl(path): | |||
req = urllib2.urlopen("http://127.0.0.1:12380" + path) | |||
req = urllib2.urlopen("http://127.0.0.1:12380" + path, timeout = 10) | |||
return req.read() | |||
def getjson(path): | |||
@@ -89,13 +112,15 @@ class TestServer(object): | |||
def setUp(self): | |||
# Start web app on a custom port | |||
self.server = nilmdb.Server(testdb, host = "127.0.0.1", | |||
self.db = nilmdb.NilmDB(testdb) | |||
self.server = nilmdb.Server(self.db, host = "127.0.0.1", | |||
port = 12380, stoppable = False) | |||
self.server.start(blocking = False) | |||
def tearDown(self): | |||
# Close web app | |||
self.server.stop() | |||
self.db.close() | |||
def test_server(self): | |||
# Make sure we can't force an exit, and test other 404 errors | |||
@@ -107,7 +132,19 @@ class TestServer(object): | |||
# Check version | |||
eq_(V(getjson("/version")), V(self.server.version)) | |||
def test_stream(self): | |||
# List | |||
def test_stream_list(self): | |||
# Known streams that got populated by an earlier test (test_nilmdb) | |||
streams = getjson("/stream/list") | |||
print streams | |||
eq_(streams, [ | |||
['/newton/prep', 'PrepData'], | |||
['/newton/raw', 'RawData'], | |||
['/newton/zzz/rawnotch', 'RawNotchedData'], | |||
]) | |||
streams = getjson("/stream/list?layout=RawData") | |||
eq_(streams, [['/newton/raw', 'RawData']]) | |||
streams = getjson("/stream/list?layout=NoSuchLayout") | |||
eq_(streams, []) | |||
@@ -1,11 +1,14 @@ | |||
import nilmdb | |||
from nilmdb.printf import * | |||
import nose | |||
from nose.tools import * | |||
from nose.tools import assert_raises | |||
import threading | |||
import time | |||
#raise nose.exc.SkipTest("Skip these") | |||
class Foo(object): | |||
val = 0 | |||