423 lines
15 KiB
Python
423 lines
15 KiB
Python
"""CherryPy-based server for accessing NILM database via HTTP"""
|
|
|
|
# Need absolute_import so that "import nilmdb" won't pull in nilmdb.py,
|
|
# but will pull the nilmdb module instead.
|
|
from __future__ import absolute_import
|
|
from nilmdb.utils.printf import *
|
|
import nilmdb
|
|
|
|
import cherrypy
|
|
import sys
|
|
import time
|
|
import os
|
|
import simplejson as json
|
|
|
|
try:
|
|
import cherrypy
|
|
cherrypy.tools.json_out
|
|
except: # pragma: no cover
|
|
sys.stderr.write("Cherrypy 3.2+ required\n")
|
|
sys.exit(1)
|
|
|
|
class NilmApp(object):
|
|
def __init__(self, db):
|
|
self.db = db
|
|
|
|
# Set this to False to get better tracebacks from some requests
|
|
# (/stream/extract, /stream/intervals)
|
|
use_chunked_http_response = False
|
|
|
|
version = "1.1"
|
|
|
|
class Root(NilmApp):
|
|
"""Root application for NILM database"""
|
|
|
|
def __init__(self, db, version):
|
|
super(Root, self).__init__(db)
|
|
self.server_version = version
|
|
|
|
# /
|
|
@cherrypy.expose
|
|
def index(self):
|
|
raise cherrypy.NotFound()
|
|
|
|
# /favicon.ico
|
|
@cherrypy.expose
|
|
def favicon_ico(self):
|
|
raise cherrypy.NotFound()
|
|
|
|
# /version
|
|
@cherrypy.expose
|
|
@cherrypy.tools.json_out()
|
|
def version(self):
|
|
return self.server_version
|
|
|
|
# /dbpath
|
|
@cherrypy.expose
|
|
@cherrypy.tools.json_out()
|
|
def dbpath(self):
|
|
return self.db.get_basepath()
|
|
|
|
# /dbsize
|
|
@cherrypy.expose
|
|
@cherrypy.tools.json_out()
|
|
def dbsize(self):
|
|
return nilmdb.utils.du(self.db.get_basepath())
|
|
|
|
class Stream(NilmApp):
|
|
"""Stream-specific operations"""
|
|
|
|
# /stream/list
|
|
# /stream/list?layout=PrepData
|
|
# /stream/list?path=/newton/prep
|
|
@cherrypy.expose
|
|
@cherrypy.tools.json_out()
|
|
def list(self, path = None, layout = None):
|
|
"""List all streams in the database. With optional path or
|
|
layout parameter, just list streams that match the given path
|
|
or layout"""
|
|
return self.db.stream_list(path, layout)
|
|
|
|
# /stream/create?path=/newton/prep&layout=PrepData
|
|
@cherrypy.expose
|
|
@cherrypy.tools.json_out()
|
|
def create(self, path, layout):
|
|
"""Create a new stream in the database. Provide path
|
|
and one of the nilmdb.layout.layouts keys.
|
|
"""
|
|
try:
|
|
return self.db.stream_create(path, layout)
|
|
except Exception as e:
|
|
message = sprintf("%s: %s", type(e).__name__, e.message)
|
|
raise cherrypy.HTTPError("400 Bad Request", message)
|
|
|
|
# /stream/destroy?path=/newton/prep
|
|
@cherrypy.expose
|
|
@cherrypy.tools.json_out()
|
|
def destroy(self, path):
|
|
"""Delete a stream and its associated data."""
|
|
try:
|
|
return self.db.stream_destroy(path)
|
|
except Exception as e:
|
|
message = sprintf("%s: %s", type(e).__name__, e.message)
|
|
raise cherrypy.HTTPError("400 Bad Request", message)
|
|
|
|
# /stream/get_metadata?path=/newton/prep
|
|
# /stream/get_metadata?path=/newton/prep&key=foo&key=bar
|
|
@cherrypy.expose
|
|
@cherrypy.tools.json_out()
|
|
def get_metadata(self, path, key=None):
|
|
"""Get metadata for the named stream. If optional
|
|
key parameters are specified, only return metadata
|
|
matching the given keys."""
|
|
try:
|
|
data = self.db.stream_get_metadata(path)
|
|
except nilmdb.nilmdb.StreamError as e:
|
|
raise cherrypy.HTTPError("404 Not Found", e.message)
|
|
if key is None: # If no keys specified, return them all
|
|
key = data.keys()
|
|
elif not isinstance(key, list):
|
|
key = [ key ]
|
|
result = {}
|
|
for k in key:
|
|
if k in data:
|
|
result[k] = data[k]
|
|
else: # Return "None" for keys with no matching value
|
|
result[k] = None
|
|
return result
|
|
|
|
# /stream/set_metadata?path=/newton/prep&data=<json>
|
|
@cherrypy.expose
|
|
@cherrypy.tools.json_out()
|
|
def set_metadata(self, path, data):
|
|
"""Set metadata for the named stream, replacing any
|
|
existing metadata. Data should be a json-encoded
|
|
dictionary"""
|
|
try:
|
|
data_dict = json.loads(data)
|
|
self.db.stream_set_metadata(path, data_dict)
|
|
except Exception as e:
|
|
message = sprintf("%s: %s", type(e).__name__, e.message)
|
|
raise cherrypy.HTTPError("400 Bad Request", message)
|
|
return "ok"
|
|
|
|
# /stream/update_metadata?path=/newton/prep&data=<json>
|
|
@cherrypy.expose
|
|
@cherrypy.tools.json_out()
|
|
def update_metadata(self, path, data):
|
|
"""Update metadata for the named stream. Data
|
|
should be a json-encoded dictionary"""
|
|
try:
|
|
data_dict = json.loads(data)
|
|
self.db.stream_update_metadata(path, data_dict)
|
|
except Exception as e:
|
|
message = sprintf("%s: %s", type(e).__name__, e.message)
|
|
raise cherrypy.HTTPError("400 Bad Request", message)
|
|
return "ok"
|
|
|
|
# /stream/insert?path=/newton/prep
|
|
@cherrypy.expose
|
|
@cherrypy.tools.json_out()
|
|
#@cherrypy.tools.disable_prb()
|
|
def insert(self, path, start, end):
|
|
"""
|
|
Insert new data into the database. Provide textual data
|
|
(matching the path's layout) as a HTTP PUT.
|
|
"""
|
|
# Important that we always read the input before throwing any
|
|
# errors, to keep lengths happy for persistent connections.
|
|
# However, CherryPy 3.2.2 has a bug where this fails for GET
|
|
# requests, so catch that. (issue #1134)
|
|
try:
|
|
body = cherrypy.request.body.read()
|
|
except TypeError:
|
|
raise cherrypy.HTTPError("400 Bad Request", "No request body")
|
|
|
|
# Check path and get layout
|
|
streams = self.db.stream_list(path = path)
|
|
if len(streams) != 1:
|
|
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
|
layout = streams[0][1]
|
|
|
|
# Parse the input data
|
|
try:
|
|
parser = nilmdb.layout.Parser(layout)
|
|
parser.parse(body)
|
|
except nilmdb.layout.ParserError as e:
|
|
raise cherrypy.HTTPError("400 Bad Request",
|
|
"Error parsing input data: " +
|
|
e.message)
|
|
|
|
if (not parser.min_timestamp or not parser.max_timestamp or
|
|
not len(parser.data)):
|
|
raise cherrypy.HTTPError("400 Bad Request",
|
|
"no data provided")
|
|
|
|
# Check limits
|
|
start = float(start)
|
|
end = float(end)
|
|
if parser.min_timestamp < start:
|
|
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
|
|
repr(parser.min_timestamp) +
|
|
" < start time " + repr(start))
|
|
if parser.max_timestamp >= end:
|
|
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
|
|
repr(parser.max_timestamp) +
|
|
" >= end time " + repr(end))
|
|
|
|
# Now do the nilmdb insert, passing it the parser full of data.
|
|
try:
|
|
result = self.db.stream_insert(path, start, end, parser.data)
|
|
except nilmdb.nilmdb.NilmDBError as e:
|
|
raise cherrypy.HTTPError("400 Bad Request", e.message)
|
|
|
|
# Done
|
|
return "ok"
|
|
|
|
# /stream/intervals?path=/newton/prep
|
|
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
|
@cherrypy.expose
|
|
def intervals(self, path, start = None, end = None):
|
|
"""
|
|
Get intervals from backend database. Streams the resulting
|
|
intervals as JSON strings separated by newlines. This may
|
|
make multiple requests to the nilmdb backend to avoid causing
|
|
it to block for too long.
|
|
"""
|
|
if start is not None:
|
|
start = float(start)
|
|
if end is not None:
|
|
end = float(end)
|
|
|
|
if start is not None and end is not None:
|
|
if end < start:
|
|
raise cherrypy.HTTPError("400 Bad Request",
|
|
"end before start")
|
|
|
|
streams = self.db.stream_list(path = path)
|
|
if len(streams) != 1:
|
|
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
|
|
|
def content(start, end):
|
|
# Note: disable response.stream below to get better debug info
|
|
# from tracebacks in this subfunction.
|
|
while True:
|
|
(intervals, restart) = self.db.stream_intervals(path,start,end)
|
|
response = ''.join([ json.dumps(i) + "\n" for i in intervals ])
|
|
yield response
|
|
if restart == 0:
|
|
break
|
|
start = restart
|
|
return content(start, end)
|
|
intervals._cp_config = { 'response.stream': use_chunked_http_response }
|
|
|
|
# /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
|
@cherrypy.expose
|
|
def extract(self, path, start = None, end = None, count = False):
|
|
"""
|
|
Extract data from backend database. Streams the resulting
|
|
entries as ASCII text lines separated by newlines. This may
|
|
make multiple requests to the nilmdb backend to avoid causing
|
|
it to block for too long.
|
|
|
|
Add count=True to return a count rather than actual data.
|
|
"""
|
|
if start is not None:
|
|
start = float(start)
|
|
if end is not None:
|
|
end = float(end)
|
|
|
|
# Check parameters
|
|
if start is not None and end is not None:
|
|
if end < start:
|
|
raise cherrypy.HTTPError("400 Bad Request",
|
|
"end before start")
|
|
|
|
# Check path and get layout
|
|
streams = self.db.stream_list(path = path)
|
|
if len(streams) != 1:
|
|
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
|
layout = streams[0][1]
|
|
|
|
# Get formatter
|
|
formatter = nilmdb.layout.Formatter(layout)
|
|
|
|
def content(start, end, count):
|
|
# Note: disable response.stream below to get better debug info
|
|
# from tracebacks in this subfunction.
|
|
if count:
|
|
matched = self.db.stream_extract(path, start, end, count)
|
|
yield sprintf("%d\n", matched)
|
|
return
|
|
|
|
while True:
|
|
(data, restart) = self.db.stream_extract(path, start, end)
|
|
|
|
# Format the data and yield it
|
|
yield formatter.format(data)
|
|
|
|
if restart == 0:
|
|
return
|
|
start = restart
|
|
return content(start, end, count)
|
|
extract._cp_config = { 'response.stream': use_chunked_http_response }
|
|
|
|
|
|
class Exiter(object):
|
|
"""App that exits the server, for testing"""
|
|
@cherrypy.expose
|
|
def index(self):
|
|
cherrypy.response.headers['Content-Type'] = 'text/plain'
|
|
def content():
|
|
yield 'Exiting by request'
|
|
raise SystemExit
|
|
return content()
|
|
index._cp_config = { 'response.stream': True }
|
|
|
|
class Server(object):
|
|
def __init__(self, db, host = '127.0.0.1', port = 8080,
|
|
stoppable = False, # whether /exit URL exists
|
|
embedded = True, # hide diagnostics and output, etc
|
|
fast_shutdown = False, # don't wait for clients to disconn.
|
|
force_traceback = False # include traceback in all errors
|
|
):
|
|
self.version = version
|
|
|
|
# Need to wrap DB object in a serializer because we'll call
|
|
# into it from separate threads.
|
|
self.embedded = embedded
|
|
self.db = nilmdb.utils.Serializer(db)
|
|
cherrypy.config.update({
|
|
'server.socket_host': host,
|
|
'server.socket_port': port,
|
|
'engine.autoreload_on': False,
|
|
'server.max_request_body_size': 4*1024*1024,
|
|
'error_page.default': self.json_error_page,
|
|
})
|
|
if self.embedded:
|
|
cherrypy.config.update({ 'environment': 'embedded' })
|
|
|
|
# Send tracebacks in error responses. They're hidden by the
|
|
# error_page function for client errors (code 400-499).
|
|
cherrypy.config.update({ 'request.show_tracebacks' : True })
|
|
self.force_traceback = force_traceback
|
|
|
|
cherrypy.tree.apps = {}
|
|
cherrypy.tree.mount(Root(self.db, self.version), "/")
|
|
cherrypy.tree.mount(Stream(self.db), "/stream")
|
|
if stoppable:
|
|
cherrypy.tree.mount(Exiter(), "/exit")
|
|
|
|
# Shutdowns normally wait for clients to disconnect. To speed
|
|
# up tests, set fast_shutdown = True
|
|
if fast_shutdown:
|
|
# Setting timeout to 0 triggers os._exit(70) at shutdown, grr...
|
|
cherrypy.server.shutdown_timeout = 0.01
|
|
else:
|
|
cherrypy.server.shutdown_timeout = 5
|
|
|
|
def json_error_page(self, status, message, traceback, version):
|
|
"""Return a custom error page in JSON so the client can parse it"""
|
|
errordata = { "status" : status,
|
|
"message" : message,
|
|
"traceback" : traceback }
|
|
# Don't send a traceback if the error was 400-499 (client's fault)
|
|
try:
|
|
code = int(status.split()[0])
|
|
if not self.force_traceback:
|
|
if code >= 400 and code <= 499:
|
|
errordata["traceback"] = ""
|
|
except Exception as e: # pragma: no cover
|
|
pass
|
|
# Override the response type, which was previously set to text/html
|
|
cherrypy.serving.response.headers['Content-Type'] = (
|
|
"application/json;charset=utf-8" )
|
|
# Undo the HTML escaping that cherrypy's get_error_page function applies
|
|
# (cherrypy issue 1135)
|
|
for k, v in errordata.iteritems():
|
|
v = v.replace("<","<")
|
|
v = v.replace(">",">")
|
|
v = v.replace("&","&")
|
|
errordata[k] = v
|
|
return json.dumps(errordata, separators=(',',':'))
|
|
|
|
def start(self, blocking = False, event = None):
|
|
|
|
if not self.embedded: # pragma: no cover
|
|
# Handle signals nicely
|
|
if hasattr(cherrypy.engine, "signal_handler"):
|
|
cherrypy.engine.signal_handler.subscribe()
|
|
if hasattr(cherrypy.engine, "console_control_handler"):
|
|
cherrypy.engine.console_control_handler.subscribe()
|
|
|
|
# Cherrypy stupidly calls os._exit(70) when it can't bind the
|
|
# port. At least try to print a reasonable error and continue
|
|
# in this case, rather than just dying silently (as we would
|
|
# otherwise do in embedded mode)
|
|
real_exit = os._exit
|
|
def fake_exit(code): # pragma: no cover
|
|
if code == os.EX_SOFTWARE:
|
|
fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
|
|
else:
|
|
real_exit(code)
|
|
os._exit = fake_exit
|
|
cherrypy.engine.start()
|
|
os._exit = real_exit
|
|
|
|
if event is not None:
|
|
event.set()
|
|
if blocking:
|
|
try:
|
|
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
|
|
interval = 0.1, channel = 'main')
|
|
except (KeyboardInterrupt, IOError): # pragma: no cover
|
|
cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
|
|
cherrypy.engine.exit()
|
|
except SystemExit: # pragma: no cover
|
|
cherrypy.engine.log('SystemExit raised: shutting down bus')
|
|
cherrypy.engine.exit()
|
|
raise
|
|
|
|
def stop(self):
|
|
cherrypy.engine.exit()
|