|
- """CherryPy-based server for accessing NILM database via HTTP"""
-
- # Need absolute_import so that "import nilmdb" won't pull in
- # nilmdb.py, but will pull the nilmdb module instead.
- from __future__ import absolute_import
- import nilmdb.server
- from nilmdb.utils.printf import *
- from nilmdb.server.errors import NilmDBError
- from nilmdb.utils.time import string_to_timestamp
-
- import cherrypy
- import sys
- import os
- import simplejson as json
- import decorator
- import psutil
-
- class NilmApp(object):
- def __init__(self, db):
- self.db = db
-
- # Decorators
- def chunked_response(func):
- """Decorator to enable chunked responses."""
- # Set this to False to get better tracebacks from some requests
- # (/stream/extract, /stream/intervals).
- func._cp_config = { 'response.stream': True }
- return func
-
- def response_type(content_type):
- """Return a decorator-generating function that sets the
- response type to the specified string."""
- def wrapper(func, *args, **kwargs):
- cherrypy.response.headers['Content-Type'] = content_type
- return func(*args, **kwargs)
- return decorator.decorator(wrapper)
-
- @decorator.decorator
- def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
- """Decorator to work around CherryPy bug #1200 in a response
- generator.
-
- Even if chunked responses are disabled, LookupError or
- UnicodeError exceptions may still be swallowed by CherryPy due to
- bug #1200. This throws them as generic Exceptions instead so that
- they make it through.
- """
- exc_info = None
- try:
- for val in func(*args, **kwargs):
- yield val
- except (LookupError, UnicodeError):
- # Re-raise it, but maintain the original traceback
- exc_info = sys.exc_info()
- new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1]))
- raise new_exc, None, exc_info[2]
- finally:
- del exc_info
-
- def exception_to_httperror(*expected):
- """Return a decorator-generating function that catches expected
- errors and throws a HTTPError describing it instead.
-
- @exception_to_httperror(NilmDBError, ValueError)
- def foo():
- pass
- """
- def wrapper(func, *args, **kwargs):
- exc_info = None
- try:
- return func(*args, **kwargs)
- except expected:
- # Re-raise it, but maintain the original traceback
- exc_info = sys.exc_info()
- new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1]))
- raise new_exc, None, exc_info[2]
- finally:
- del exc_info
- # We need to preserve the function's argspecs for CherryPy to
- # handle argument errors correctly. Decorator.decorator takes
- # care of that.
- return decorator.decorator(wrapper)
-
- # Custom CherryPy tools
-
- def CORS_allow(methods):
- """This does several things:
-
- Handles CORS preflight requests.
- Adds Allow: header to all requests.
- Raise 405 if request.method not in method.
-
- It is similar to cherrypy.tools.allow, with the CORS stuff added.
- """
- request = cherrypy.request.headers
- response = cherrypy.response.headers
-
- if not isinstance(methods, (tuple, list)): # pragma: no cover
- methods = [ methods ]
- methods = [ m.upper() for m in methods if m ]
- if not methods: # pragma: no cover
- methods = [ 'GET', 'HEAD' ]
- elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
- methods.append('HEAD')
- response['Allow'] = ', '.join(methods)
-
- # Allow all origins
- if 'Origin' in request:
- response['Access-Control-Allow-Origin'] = request['Origin']
-
- # If it's a CORS request, send response.
- request_method = request.get("Access-Control-Request-Method", None)
- request_headers = request.get("Access-Control-Request-Headers", None)
- if (cherrypy.request.method == "OPTIONS" and
- request_method and request_headers):
- response['Access-Control-Allow-Headers'] = request_headers
- response['Access-Control-Allow-Methods'] = ', '.join(methods)
- # Try to stop further processing and return a 200 OK
- cherrypy.response.status = "200 OK"
- cherrypy.response.body = ""
- cherrypy.request.handler = lambda: ""
- return
-
- # Reject methods that were not explicitly allowed
- if cherrypy.request.method not in methods:
- raise cherrypy.HTTPError(405)
-
- cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
-
- # Helper for json_in tool to process JSON data into normal request
- # parameters.
- def json_to_request_params(body):
- cherrypy.lib.jsontools.json_processor(body)
- if not isinstance(cherrypy.request.json, dict):
- raise cherrypy.HTTPError(415)
- cherrypy.request.params.update(cherrypy.request.json)
-
- # CherryPy apps
- class Root(NilmApp):
- """Root application for NILM database"""
-
- def __init__(self, db):
- super(Root, self).__init__(db)
-
- # /
- @cherrypy.expose
- def index(self):
- raise cherrypy.NotFound()
-
- # /favicon.ico
- @cherrypy.expose
- def favicon_ico(self):
- raise cherrypy.NotFound()
-
- # /version
- @cherrypy.expose
- @cherrypy.tools.json_out()
- def version(self):
- return nilmdb.__version__
-
- # /dbinfo
- @cherrypy.expose
- @cherrypy.tools.json_out()
- def dbinfo(self):
- """Return a dictionary with the database path,
- size of the database in bytes, and free disk space in bytes"""
- path = self.db.get_basepath()
- return { "path": path,
- "size": nilmdb.utils.du(path),
- "free": psutil.disk_usage(path).free }
-
- class Stream(NilmApp):
- """Stream-specific operations"""
-
- # /stream/list
- # /stream/list?layout=float32_8
- # /stream/list?path=/newton/prep&extended=1
- @cherrypy.expose
- @cherrypy.tools.json_out()
- def list(self, path = None, layout = None, extended = None):
- """List all streams in the database. With optional path or
- layout parameter, just list streams that match the given path
- or layout.
-
- If extent is not given, returns a list of lists containing
- the path and layout: [ path, layout ]
-
- If extended is provided, returns a list of lists containing
- extended info: [ path, layout, extent_min, extent_max,
- total_rows, total_seconds ]. More data may be added.
- """
- return self.db.stream_list(path, layout, bool(extended))
-
- # /stream/create?path=/newton/prep&layout=float32_8
- @cherrypy.expose
- @cherrypy.tools.json_in()
- @cherrypy.tools.json_out()
- @exception_to_httperror(NilmDBError, ValueError)
- @cherrypy.tools.CORS_allow(methods = ["POST"])
- def create(self, path, layout):
- """Create a new stream in the database. Provide path
- and one of the nilmdb.layout.layouts keys.
- """
- return self.db.stream_create(path, layout)
-
- # /stream/destroy?path=/newton/prep
- @cherrypy.expose
- @cherrypy.tools.json_in()
- @cherrypy.tools.json_out()
- @exception_to_httperror(NilmDBError)
- @cherrypy.tools.CORS_allow(methods = ["POST"])
- def destroy(self, path):
- """Delete a stream and its associated data."""
- return self.db.stream_destroy(path)
-
- # /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1
- @cherrypy.expose
- @cherrypy.tools.json_in()
- @cherrypy.tools.json_out()
- @exception_to_httperror(NilmDBError, ValueError)
- @cherrypy.tools.CORS_allow(methods = ["POST"])
- def rename(self, oldpath, newpath):
- """Rename a stream."""
- return self.db.stream_rename(oldpath, newpath)
-
- # /stream/get_metadata?path=/newton/prep
- # /stream/get_metadata?path=/newton/prep&key=foo&key=bar
- @cherrypy.expose
- @cherrypy.tools.json_out()
- def get_metadata(self, path, key=None):
- """Get metadata for the named stream. If optional
- key parameters are specified, only return metadata
- matching the given keys."""
- try:
- data = self.db.stream_get_metadata(path)
- except nilmdb.server.nilmdb.StreamError as e:
- raise cherrypy.HTTPError("404 Not Found", e.message)
- if key is None: # If no keys specified, return them all
- key = data.keys()
- elif not isinstance(key, list):
- key = [ key ]
- result = {}
- for k in key:
- if k in data:
- result[k] = data[k]
- else: # Return "None" for keys with no matching value
- result[k] = None
- return result
-
- # Helper for set_metadata and get_metadata
- def _metadata_helper(self, function, path, data):
- if not isinstance(data, dict):
- try:
- data = dict(json.loads(data))
- except TypeError as e:
- raise NilmDBError("can't parse 'data' parameter: " + e.message)
- for key in data:
- if not (isinstance(data[key], basestring) or
- isinstance(data[key], float) or
- isinstance(data[key], int)):
- raise NilmDBError("metadata values must be a string or number")
- function(path, data)
-
- # /stream/set_metadata?path=/newton/prep&data=<json>
- @cherrypy.expose
- @cherrypy.tools.json_in()
- @cherrypy.tools.json_out()
- @exception_to_httperror(NilmDBError, LookupError)
- @cherrypy.tools.CORS_allow(methods = ["POST"])
- def set_metadata(self, path, data):
- """Set metadata for the named stream, replacing any existing
- metadata. Data can be json-encoded or a plain dictionary."""
- self._metadata_helper(self.db.stream_set_metadata, path, data)
-
- # /stream/update_metadata?path=/newton/prep&data=<json>
- @cherrypy.expose
- @cherrypy.tools.json_in()
- @cherrypy.tools.json_out()
- @exception_to_httperror(NilmDBError, LookupError, ValueError)
- @cherrypy.tools.CORS_allow(methods = ["POST"])
- def update_metadata(self, path, data):
- """Set metadata for the named stream, replacing any existing
- metadata. Data can be json-encoded or a plain dictionary."""
- self._metadata_helper(self.db.stream_update_metadata, path, data)
-
- # /stream/insert?path=/newton/prep
- @cherrypy.expose
- @cherrypy.tools.json_out()
- @exception_to_httperror(NilmDBError, ValueError)
- @cherrypy.tools.CORS_allow(methods = ["PUT"])
- def insert(self, path, start, end):
- """
- Insert new data into the database. Provide textual data
- (matching the path's layout) as a HTTP PUT.
- """
- # Important that we always read the input before throwing any
- # errors, to keep lengths happy for persistent connections.
- # Note that CherryPy 3.2.2 has a bug where this fails for GET
- # requests, if we ever want to handle those (issue #1134)
- body = cherrypy.request.body.read()
-
- # Check path and get layout
- streams = self.db.stream_list(path = path)
- if len(streams) != 1:
- raise cherrypy.HTTPError("404 Not Found", "No such stream")
-
- # Check limits
- start = string_to_timestamp(start)
- end = string_to_timestamp(end)
- if start >= end:
- raise cherrypy.HTTPError("400 Bad Request",
- "start must precede end")
-
- # Pass the data directly to nilmdb, which will parse it and
- # raise a ValueError if there are any problems.
- self.db.stream_insert(path, start, end, body)
-
- # Done
- return
-
- # /stream/remove?path=/newton/prep
- # /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
- @cherrypy.expose
- @cherrypy.tools.json_in()
- @cherrypy.tools.json_out()
- @exception_to_httperror(NilmDBError)
- @cherrypy.tools.CORS_allow(methods = ["POST"])
- def remove(self, path, start = None, end = None):
- """
- Remove data from the backend database. Removes all data in
- the interval [start, end). Returns the number of data points
- removed.
- """
- if start is not None:
- start = string_to_timestamp(start)
- if end is not None:
- end = string_to_timestamp(end)
- if start is not None and end is not None:
- if start >= end:
- raise cherrypy.HTTPError("400 Bad Request",
- "start must precede end")
- return self.db.stream_remove(path, start, end)
-
- # /stream/intervals?path=/newton/prep
- # /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
- # /stream/intervals?path=/newton/prep&diffpath=/newton/prep2
- @cherrypy.expose
- @chunked_response
- @response_type("application/x-json-stream")
- def intervals(self, path, start = None, end = None, diffpath = None):
- """
- Get intervals from backend database. Streams the resulting
- intervals as JSON strings separated by CR LF pairs. This may
- make multiple requests to the nilmdb backend to avoid causing
- it to block for too long.
-
- Returns intervals between 'start' and 'end' belonging to
- 'path'. If 'diff' is provided, the set-difference between
- intervals in 'path' and intervals in 'diffpath' are
- returned instead.
-
- Note that the response type is the non-standard
- 'application/x-json-stream' for lack of a better option.
- """
- if start is not None:
- start = string_to_timestamp(start)
- if end is not None:
- end = string_to_timestamp(end)
-
- if start is not None and end is not None:
- if start >= end:
- raise cherrypy.HTTPError("400 Bad Request",
- "start must precede end")
-
- if len(self.db.stream_list(path = path)) != 1:
- raise cherrypy.HTTPError("404", "No such stream: " + path)
-
- if diffpath and len(self.db.stream_list(path = diffpath)) != 1:
- raise cherrypy.HTTPError("404", "No such stream: " + diffpath)
-
- @workaround_cp_bug_1200
- def content(start, end):
- # Note: disable chunked responses to see tracebacks from here.
- while True:
- (ints, restart) = self.db.stream_intervals(path, start, end,
- diffpath)
- response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
- yield response
- if restart == 0:
- break
- start = restart
- return content(start, end)
-
- # /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0
- @cherrypy.expose
- @chunked_response
- @response_type("text/plain")
- def extract(self, path, start = None, end = None, count = False):
- """
- Extract data from backend database. Streams the resulting
- entries as ASCII text lines separated by newlines. This may
- make multiple requests to the nilmdb backend to avoid causing
- it to block for too long.
-
- Add count=True to return a count rather than actual data.
- """
- if start is not None:
- start = string_to_timestamp(start)
- if end is not None:
- end = string_to_timestamp(end)
-
- # Check parameters
- if start is not None and end is not None:
- if start >= end:
- raise cherrypy.HTTPError("400 Bad Request",
- "start must precede end")
-
- # Check path and get layout
- streams = self.db.stream_list(path = path)
- if len(streams) != 1:
- raise cherrypy.HTTPError("404 Not Found", "No such stream")
-
- @workaround_cp_bug_1200
- def content(start, end, count):
- # Note: disable chunked responses to see tracebacks from here.
- if count:
- matched = self.db.stream_extract(path, start, end, count)
- yield sprintf("%d\n", matched)
- return
-
- while True:
- (data, restart) = self.db.stream_extract(path, start, end)
- yield data
-
- if restart == 0:
- return
- start = restart
- return content(start, end, count)
-
- class Exiter(object):
- """App that exits the server, for testing"""
- @cherrypy.expose
- def index(self):
- cherrypy.response.headers['Content-Type'] = 'text/plain'
- def content():
- yield 'Exiting by request'
- raise SystemExit
- return content()
- index._cp_config = { 'response.stream': True }
-
- class Server(object):
- def __init__(self, db, host = '127.0.0.1', port = 8080,
- stoppable = False, # whether /exit URL exists
- embedded = True, # hide diagnostics and output, etc
- fast_shutdown = False, # don't wait for clients to disconn.
- force_traceback = False # include traceback in all errors
- ):
- # Save server version, just for verification during tests
- self.version = nilmdb.__version__
-
- self.embedded = embedded
- self.db = db
- if not getattr(db, "_thread_safe", None):
- raise KeyError("Database object " + str(db) + " doesn't claim "
- "to be thread safe. You should pass "
- "nilmdb.utils.serializer_proxy(NilmDB)(args) "
- "rather than NilmDB(args).")
-
- # Build up global server configuration
- cherrypy.config.update({
- 'server.socket_host': host,
- 'server.socket_port': port,
- 'engine.autoreload_on': False,
- 'server.max_request_body_size': 8*1024*1024,
- })
- if self.embedded:
- cherrypy.config.update({ 'environment': 'embedded' })
-
- # Build up application specific configuration
- app_config = {}
- app_config.update({
- 'error_page.default': self.json_error_page,
- })
-
- # Some default headers to just help identify that things are working
- app_config.update({ 'response.headers.X-Jim-Is-Awesome': 'yeah' })
-
- # Set up Cross-Origin Resource Sharing (CORS) handler so we
- # can correctly respond to browsers' CORS preflight requests.
- # This also limits verbs to GET and HEAD by default.
- app_config.update({ 'tools.CORS_allow.on': True,
- 'tools.CORS_allow.methods': ['GET', 'HEAD'] })
-
- # Configure the 'json_in' tool to also allow other content-types
- # (like x-www-form-urlencoded), and to treat JSON as a dict that
- # fills requests.param.
- app_config.update({ 'tools.json_in.force': False,
- 'tools.json_in.processor': json_to_request_params })
-
- # Send tracebacks in error responses. They're hidden by the
- # error_page function for client errors (code 400-499).
- app_config.update({ 'request.show_tracebacks' : True })
- self.force_traceback = force_traceback
-
- # Patch CherryPy error handler to never pad out error messages.
- # This isn't necessary, but then again, neither is padding the
- # error messages.
- cherrypy._cperror._ie_friendly_error_sizes = {}
-
- # Build up the application and mount it
- root = Root(self.db)
- root.stream = Stream(self.db)
- if stoppable:
- root.exit = Exiter()
- cherrypy.tree.apps = {}
- cherrypy.tree.mount(root, "/", config = { "/" : app_config })
-
- # Shutdowns normally wait for clients to disconnect. To speed
- # up tests, set fast_shutdown = True
- if fast_shutdown:
- # Setting timeout to 0 triggers os._exit(70) at shutdown, grr...
- cherrypy.server.shutdown_timeout = 0.01
- else:
- cherrypy.server.shutdown_timeout = 5
-
- def json_error_page(self, status, message, traceback, version):
- """Return a custom error page in JSON so the client can parse it"""
- errordata = { "status" : status,
- "message" : message,
- "traceback" : traceback }
- # Don't send a traceback if the error was 400-499 (client's fault)
- try:
- code = int(status.split()[0])
- if not self.force_traceback:
- if code >= 400 and code <= 499:
- errordata["traceback"] = ""
- except Exception: # pragma: no cover
- pass
- # Override the response type, which was previously set to text/html
- cherrypy.serving.response.headers['Content-Type'] = (
- "application/json;charset=utf-8" )
- # Undo the HTML escaping that cherrypy's get_error_page function applies
- # (cherrypy issue 1135)
- for k, v in errordata.iteritems():
- v = v.replace("<","<")
- v = v.replace(">",">")
- v = v.replace("&","&")
- errordata[k] = v
- return json.dumps(errordata, separators=(',',':'))
-
- def start(self, blocking = False, event = None):
-
- if not self.embedded: # pragma: no cover
- # Handle signals nicely
- if hasattr(cherrypy.engine, "signal_handler"):
- cherrypy.engine.signal_handler.subscribe()
- if hasattr(cherrypy.engine, "console_control_handler"):
- cherrypy.engine.console_control_handler.subscribe()
-
- # Cherrypy stupidly calls os._exit(70) when it can't bind the
- # port. At least try to print a reasonable error and continue
- # in this case, rather than just dying silently (as we would
- # otherwise do in embedded mode)
- real_exit = os._exit
- def fake_exit(code): # pragma: no cover
- if code == os.EX_SOFTWARE:
- fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
- else:
- real_exit(code)
- os._exit = fake_exit
- cherrypy.engine.start()
- os._exit = real_exit
-
- # Signal that the engine has started successfully
- if event is not None:
- event.set()
-
- if blocking:
- try:
- cherrypy.engine.wait(cherrypy.engine.states.EXITING,
- interval = 0.1, channel = 'main')
- except (KeyboardInterrupt, IOError): # pragma: no cover
- cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
- cherrypy.engine.exit()
- except SystemExit: # pragma: no cover
- cherrypy.engine.log('SystemExit raised: shutting down bus')
- cherrypy.engine.exit()
- raise
-
- def stop(self):
- cherrypy.engine.exit()
|