Create StreamException class in NilmDB, which can get caught and
handled by the HTTP server. Add /stream/get_metadata HTTP request. Fill out test_stream_metadata Add runserver.py script to just run the server from the command line, outside of the testing environment. Add necessary hooks within nilmdb/server.py to allow this to happen. Metadata operations get stream_id first and report error on missing stream, instead of returning an empty dict. git-svn-id: https://bucket.mit.edu/svn/nilm/nilmdb@10443 ddd99763-3ecb-0310-9145-efcb8ce7c51f
This commit is contained in:
parent
9b64a18daf
commit
1e1c7fa9c4
13
TODO
13
TODO
|
@ -1,13 +1,8 @@
|
|||
-- TOP: Move nilmdbsql into nilmdb -- silly to keep it out
|
||||
- Return ranges in stream list?
|
||||
|
||||
- Track intervals in a SQL database (!)
|
||||
|
||||
- Use directory tree instead of single database, store tables in
|
||||
individual hdf5 files (still using pytables for now)?
|
||||
|
||||
- Return ranges in stream list
|
||||
|
||||
- Add user ranges to stream list
|
||||
- Add user ranges to stream list?
|
||||
|
||||
- More stream operations: insert & extract
|
||||
- Track intervals in a SQL database (!)
|
||||
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
# empty
|
||||
from nilmdb import NilmDB
|
||||
from nilmdb import NilmDB, StreamException
|
||||
from server import Server
|
||||
from layout import *
|
||||
from serializer import WrapObject
|
||||
|
|
|
@ -55,6 +55,9 @@ sql_schema_updates = {
|
|||
""",
|
||||
}
|
||||
|
||||
class StreamException(Exception):
|
||||
pass
|
||||
|
||||
class NilmDB(object):
|
||||
verbose = 0
|
||||
|
||||
|
@ -180,6 +183,13 @@ class NilmDB(object):
|
|||
with self.con as con:
|
||||
con.execute("INSERT INTO streams (path, layout) VALUES (?,?)",
|
||||
(path, layout_name))
|
||||
|
||||
def stream_id(self, path):
|
||||
result = self.con.execute("SELECT id FROM streams WHERE path=?",
|
||||
(path,)).fetchone()
|
||||
if result is None:
|
||||
raise StreamException("No stream at path " + path)
|
||||
return result[0]
|
||||
|
||||
def stream_set_metadata(self, path, data):
|
||||
"""Set stream metadata from a dictionary, e.g.
|
||||
|
@ -187,21 +197,21 @@ class NilmDB(object):
|
|||
v_scaling = 123.45 }
|
||||
This replaces all existing metadata.
|
||||
"""
|
||||
stream_id = self.stream_id(path)
|
||||
with self.con as con:
|
||||
stream_id = self.con.execute("SELECT id FROM streams WHERE path=?",
|
||||
(path,)).fetchone()[0]
|
||||
for key in data:
|
||||
con.execute("DELETE FROM metadata "
|
||||
"WHERE stream_id=? AND key=?", (stream_id, key))
|
||||
con.execute("INSERT INTO metadata VALUES (?, ?, ?)",
|
||||
(stream_id, key, data[key]))
|
||||
if data[key] != '':
|
||||
con.execute("INSERT INTO metadata VALUES (?, ?, ?)",
|
||||
(stream_id, key, data[key]))
|
||||
|
||||
def stream_get_metadata(self, path):
|
||||
"""Return stream metadata as a dictionary."""
|
||||
stream_id = self.stream_id(path)
|
||||
result = self.con.execute("SELECT metadata.key, metadata.value "
|
||||
"FROM metadata, streams "
|
||||
"WHERE metadata.stream_id=streams.id "
|
||||
"AND streams.path=?", (path,))
|
||||
"FROM metadata "
|
||||
"WHERE metadata.stream_id=?", (stream_id,))
|
||||
data = {}
|
||||
for (key, value) in result:
|
||||
data[key] = value
|
||||
|
|
|
@ -56,6 +56,27 @@ class Stream(NilmApp):
|
|||
def list(self, layout = None):
|
||||
return self.db.stream_list(layout)
|
||||
|
||||
# /stream/get_metadata?path=/newton/prep
|
||||
# /stream/get_metadata?path=/newton/prep?key=foo&key=bar
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
def get_metadata(self, path, key=None):
|
||||
try:
|
||||
data = self.db.stream_get_metadata(path)
|
||||
except nilmdb.StreamException, e:
|
||||
raise cherrypy.HTTPError("404 Not Found", e.message)
|
||||
if key is None: # If no keys specified, return them all
|
||||
key = data.keys()
|
||||
elif not isinstance(key, list):
|
||||
key = [ key ]
|
||||
result = {}
|
||||
for k in key:
|
||||
if k in data:
|
||||
result[k] = data[k]
|
||||
else: # Return "None" for keys with no matching value
|
||||
result[k] = None
|
||||
return result
|
||||
|
||||
class Exiter(object):
|
||||
"""App that exits the server, for testing"""
|
||||
@cherrypy.expose
|
||||
|
@ -70,15 +91,18 @@ class Exiter(object):
|
|||
class Server(object):
|
||||
version = "1.0"
|
||||
|
||||
def __init__(self, db, host = '127.0.0.1', port = 8080, stoppable = False):
|
||||
def __init__(self, db, host = '127.0.0.1', port = 8080, stoppable = False, embedded = True):
|
||||
# Need to wrap DB object in a serializer because we'll call into it from separate threads.
|
||||
self.embedded = embedded
|
||||
self.db = nilmdb.serializer.WrapObject(db)
|
||||
cherrypy.config.update({
|
||||
'server.socket_host': host,
|
||||
'server.socket_port': port,
|
||||
'engine.autoreload_on': False,
|
||||
'environment': 'embedded',
|
||||
})
|
||||
if self.embedded:
|
||||
cherrypy.config.update({ 'environment': 'embedded' })
|
||||
|
||||
cherrypy.tree.apps = {}
|
||||
cherrypy.tree.mount(Root(self.db, self.version), "/")
|
||||
cherrypy.tree.mount(Stream(self.db), "/stream")
|
||||
|
@ -87,6 +111,13 @@ class Server(object):
|
|||
|
||||
def start(self, blocking = False, event = None):
|
||||
|
||||
if not self.embedded: # pragma: no cover
|
||||
# Handle signals nicely
|
||||
if hasattr(cherrypy.engine, "signal_handler"):
|
||||
cherrypy.engine.signal_handler.subscribe()
|
||||
if hasattr(cherrypy.engine, "console_control_handler"):
|
||||
cherrypy.engine.console_control_handler.subscribe()
|
||||
|
||||
# Cherrypy stupidly calls os._exit(70) when it can't bind the
|
||||
# port. At least try to print a reasonable error and continue
|
||||
# in this case, rather than just dying silently (as we would
|
||||
|
@ -100,12 +131,20 @@ class Server(object):
|
|||
os._exit = fake_exit
|
||||
cherrypy.engine.start()
|
||||
os._exit = real_exit
|
||||
|
||||
|
||||
if event is not None:
|
||||
event.set()
|
||||
if blocking:
|
||||
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
|
||||
interval = 0.1, channel = 'main')
|
||||
try:
|
||||
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
|
||||
interval = 0.1, channel = 'main')
|
||||
except (KeyboardInterrupt, IOError): # pragma: no cover
|
||||
cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
|
||||
cherrypy.engine.exit()
|
||||
except SystemExit: # pragma: no cover
|
||||
cherrypy.engine.log('SystemExit raised: shutting down bus')
|
||||
cherrypy.engine.exit()
|
||||
raise
|
||||
|
||||
def stop(self):
|
||||
cherrypy.engine.exit()
|
||||
|
|
13
runserver.py
Executable file
13
runserver.py
Executable file
|
@ -0,0 +1,13 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
import nilmdb
|
||||
import json
|
||||
|
||||
testdb = "tests/testdb"
|
||||
|
||||
# Start web app on a custom port
|
||||
db = nilmdb.NilmDB(testdb)
|
||||
server = nilmdb.Server(db, host = "127.0.0.1",
|
||||
port = 12381, stoppable = False, embedded = False)
|
||||
server.start(blocking = True)
|
||||
db.close()
|
|
@ -11,6 +11,7 @@ import sys
|
|||
import cherrypy
|
||||
import threading
|
||||
import urllib2
|
||||
from urllib2 import urlopen, HTTPError
|
||||
import Queue
|
||||
|
||||
testdb = "tests/testdb"
|
||||
|
@ -109,13 +110,13 @@ class TestBlockingServer(object):
|
|||
event.wait(timeout = 2)
|
||||
|
||||
# Send request to exit.
|
||||
req = urllib2.urlopen("http://127.0.0.1:12380/exit/", timeout = 1)
|
||||
req = urlopen("http://127.0.0.1:12380/exit/", timeout = 1)
|
||||
|
||||
# Wait for it
|
||||
thread.join()
|
||||
|
||||
def geturl(path):
|
||||
req = urllib2.urlopen("http://127.0.0.1:12380" + path, timeout = 10)
|
||||
req = urlopen("http://127.0.0.1:12380" + path, timeout = 10)
|
||||
return req.read()
|
||||
|
||||
def getjson(path):
|
||||
|
@ -138,7 +139,7 @@ class TestServer(object):
|
|||
def test_server(self):
|
||||
# Make sure we can't force an exit, and test other 404 errors
|
||||
for url in [ "/exit", "/", "/favicon.ico" ]:
|
||||
with assert_raises(urllib2.HTTPError) as e:
|
||||
with assert_raises(HTTPError) as e:
|
||||
geturl(url)
|
||||
eq_(e.exception.code, 404)
|
||||
|
||||
|
@ -161,3 +162,27 @@ class TestServer(object):
|
|||
streams = getjson("/stream/list?layout=NoSuchLayout")
|
||||
eq_(streams, [])
|
||||
|
||||
|
||||
def test_stream_metadata(self):
|
||||
with assert_raises(HTTPError) as e:
|
||||
getjson("/stream/get_metadata?path=foo")
|
||||
eq_(e.exception.code, 404)
|
||||
|
||||
data = getjson("/stream/get_metadata?path=/newton/prep")
|
||||
eq_(data, {'description': 'The Data', 'v_scale': '1.234'})
|
||||
|
||||
data = getjson("/stream/get_metadata?path=/newton/prep"
|
||||
"&key=v_scale")
|
||||
eq_(data, {'v_scale': '1.234'})
|
||||
|
||||
data = getjson("/stream/get_metadata?path=/newton/prep"
|
||||
"&key=v_scale&key=description")
|
||||
eq_(data, {'description': 'The Data', 'v_scale': '1.234'})
|
||||
|
||||
data = getjson("/stream/get_metadata?path=/newton/prep"
|
||||
"&key=v_scale&key=foo")
|
||||
eq_(data, {'foo': None, 'v_scale': '1.234'})
|
||||
|
||||
data = getjson("/stream/get_metadata?path=/newton/prep"
|
||||
"&key=foo")
|
||||
eq_(data, {'foo': None})
|
||||
|
|
Loading…
Reference in New Issue
Block a user