"""CherryPy-based server for accessing NILM database via HTTP""" # Need absolute_import so that "import nilmdb" won't pull in # nilmdb.py, but will pull the nilmdb module instead. from __future__ import absolute_import import nilmdb.server from nilmdb.utils.printf import * from nilmdb.server.errors import NilmDBError from nilmdb.utils.time import string_to_timestamp import cherrypy import sys import os import socket import simplejson as json import decorator import psutil import traceback class NilmApp(object): def __init__(self, db): self.db = db # Decorators def chunked_response(func): """Decorator to enable chunked responses.""" # Set this to False to get better tracebacks from some requests # (/stream/extract, /stream/intervals). func._cp_config = { 'response.stream': True } return func def response_type(content_type): """Return a decorator-generating function that sets the response type to the specified string.""" def wrapper(func, *args, **kwargs): cherrypy.response.headers['Content-Type'] = content_type return func(*args, **kwargs) return decorator.decorator(wrapper) @decorator.decorator def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover """Decorator to work around CherryPy bug #1200 in a response generator. Even if chunked responses are disabled, LookupError or UnicodeError exceptions may still be swallowed by CherryPy due to bug #1200. This throws them as generic Exceptions instead so that they make it through. """ exc_info = None try: for val in func(*args, **kwargs): yield val except (LookupError, UnicodeError): # Re-raise it, but maintain the original traceback exc_info = sys.exc_info() new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1])) raise new_exc, None, exc_info[2] finally: del exc_info def exception_to_httperror(*expected): """Return a decorator-generating function that catches expected errors and throws a HTTPError describing it instead. @exception_to_httperror(NilmDBError, ValueError) def foo(): pass """ def wrapper(func, *args, **kwargs): exc_info = None try: return func(*args, **kwargs) except expected: # Re-raise it, but maintain the original traceback exc_info = sys.exc_info() new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1])) raise new_exc, None, exc_info[2] finally: del exc_info # We need to preserve the function's argspecs for CherryPy to # handle argument errors correctly. Decorator.decorator takes # care of that. return decorator.decorator(wrapper) # Custom CherryPy tools def CORS_allow(methods): """This does several things: Handles CORS preflight requests. Adds Allow: header to all requests. Raise 405 if request.method not in method. It is similar to cherrypy.tools.allow, with the CORS stuff added. """ request = cherrypy.request.headers response = cherrypy.response.headers if not isinstance(methods, (tuple, list)): # pragma: no cover methods = [ methods ] methods = [ m.upper() for m in methods if m ] if not methods: # pragma: no cover methods = [ 'GET', 'HEAD' ] elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover methods.append('HEAD') response['Allow'] = ', '.join(methods) # Allow all origins if 'Origin' in request: response['Access-Control-Allow-Origin'] = request['Origin'] # If it's a CORS request, send response. request_method = request.get("Access-Control-Request-Method", None) request_headers = request.get("Access-Control-Request-Headers", None) if (cherrypy.request.method == "OPTIONS" and request_method and request_headers): response['Access-Control-Allow-Headers'] = request_headers response['Access-Control-Allow-Methods'] = ', '.join(methods) # Try to stop further processing and return a 200 OK cherrypy.response.status = "200 OK" cherrypy.response.body = "" cherrypy.request.handler = lambda: "" return # Reject methods that were not explicitly allowed if cherrypy.request.method not in methods: raise cherrypy.HTTPError(405) cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow) # Helper for json_in tool to process JSON data into normal request # parameters. def json_to_request_params(body): cherrypy.lib.jsontools.json_processor(body) if not isinstance(cherrypy.request.json, dict): raise cherrypy.HTTPError(415) cherrypy.request.params.update(cherrypy.request.json) # CherryPy apps class Root(NilmApp): """Root application for NILM database""" def __init__(self, db): super(Root, self).__init__(db) # / @cherrypy.expose def index(self): raise cherrypy.NotFound() # /favicon.ico @cherrypy.expose def favicon_ico(self): raise cherrypy.NotFound() # /version @cherrypy.expose @cherrypy.tools.json_out() def version(self): return nilmdb.__version__ # /dbinfo @cherrypy.expose @cherrypy.tools.json_out() def dbinfo(self): """Return a dictionary with the database path, size of the database in bytes, and free disk space in bytes""" path = self.db.get_basepath() return { "path": path, "size": nilmdb.utils.du(path), "free": psutil.disk_usage(path).free } class Stream(NilmApp): """Stream-specific operations""" # Helpers def _get_times(self, start_param, end_param): (start, end) = (None, None) if start_param is not None: start = string_to_timestamp(start_param) if end_param is not None: end = string_to_timestamp(end_param) if start is not None and end is not None: if start >= end: raise cherrypy.HTTPError( "400 Bad Request", sprintf("start must precede end (%s >= %s)", start_param, end_param)) return (start, end) # /stream/list # /stream/list?layout=float32_8 # /stream/list?path=/newton/prep&extended=1 @cherrypy.expose @cherrypy.tools.json_out() def list(self, path = None, layout = None, extended = None): """List all streams in the database. With optional path or layout parameter, just list streams that match the given path or layout. If extent is not given, returns a list of lists containing the path and layout: [ path, layout ] If extended is provided, returns a list of lists containing extended info: [ path, layout, extent_min, extent_max, total_rows, total_seconds ]. More data may be added. """ return self.db.stream_list(path, layout, bool(extended)) # /stream/create?path=/newton/prep&layout=float32_8 @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError, ValueError) @cherrypy.tools.CORS_allow(methods = ["POST"]) def create(self, path, layout): """Create a new stream in the database. Provide path and one of the nilmdb.layout.layouts keys. """ return self.db.stream_create(path, layout) # /stream/destroy?path=/newton/prep @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError) @cherrypy.tools.CORS_allow(methods = ["POST"]) def destroy(self, path): """Delete a stream. Fails if any data is still present.""" return self.db.stream_destroy(path) # /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1 @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError, ValueError) @cherrypy.tools.CORS_allow(methods = ["POST"]) def rename(self, oldpath, newpath): """Rename a stream.""" return self.db.stream_rename(oldpath, newpath) # /stream/get_metadata?path=/newton/prep # /stream/get_metadata?path=/newton/prep&key=foo&key=bar @cherrypy.expose @cherrypy.tools.json_out() def get_metadata(self, path, key=None): """Get metadata for the named stream. If optional key parameters are specified, only return metadata matching the given keys.""" try: data = self.db.stream_get_metadata(path) except nilmdb.server.nilmdb.StreamError as e: raise cherrypy.HTTPError("404 Not Found", e.message) if key is None: # If no keys specified, return them all key = data.keys() elif not isinstance(key, list): key = [ key ] result = {} for k in key: if k in data: result[k] = data[k] else: # Return "None" for keys with no matching value result[k] = None return result # Helper for set_metadata and get_metadata def _metadata_helper(self, function, path, data): if not isinstance(data, dict): try: data = dict(json.loads(data)) except TypeError as e: raise NilmDBError("can't parse 'data' parameter: " + e.message) for key in data: if not (isinstance(data[key], basestring) or isinstance(data[key], float) or isinstance(data[key], int)): raise NilmDBError("metadata values must be a string or number") function(path, data) # /stream/set_metadata?path=/newton/prep&data= @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError, LookupError) @cherrypy.tools.CORS_allow(methods = ["POST"]) def set_metadata(self, path, data): """Set metadata for the named stream, replacing any existing metadata. Data can be json-encoded or a plain dictionary.""" self._metadata_helper(self.db.stream_set_metadata, path, data) # /stream/update_metadata?path=/newton/prep&data= @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError, LookupError, ValueError) @cherrypy.tools.CORS_allow(methods = ["POST"]) def update_metadata(self, path, data): """Set metadata for the named stream, replacing any existing metadata. Data can be json-encoded or a plain dictionary.""" self._metadata_helper(self.db.stream_update_metadata, path, data) # /stream/insert?path=/newton/prep @cherrypy.expose @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError, ValueError) @cherrypy.tools.CORS_allow(methods = ["PUT"]) def insert(self, path, start, end): """ Insert new data into the database. Provide textual data (matching the path's layout) as a HTTP PUT. """ # Important that we always read the input before throwing any # errors, to keep lengths happy for persistent connections. # Note that CherryPy 3.2.2 has a bug where this fails for GET # requests, if we ever want to handle those (issue #1134) body = cherrypy.request.body.read() # Check path and get layout if len(self.db.stream_list(path = path)) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path) # Check limits (start, end) = self._get_times(start, end) # Pass the data directly to nilmdb, which will parse it and # raise a ValueError if there are any problems. self.db.stream_insert(path, start, end, body) # Done return # /stream/remove?path=/newton/prep # /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0 @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError) @cherrypy.tools.CORS_allow(methods = ["POST"]) def remove(self, path, start = None, end = None): """ Remove data from the backend database. Removes all data in the interval [start, end). Returns the number of data points removed. """ (start, end) = self._get_times(start, end) total_removed = 0 while True: (removed, restart) = self.db.stream_remove(path, start, end) total_removed += removed if restart is None: break start = restart return total_removed # /stream/intervals?path=/newton/prep # /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0 # /stream/intervals?path=/newton/prep&diffpath=/newton/prep2 @cherrypy.expose @chunked_response @response_type("application/x-json-stream") def intervals(self, path, start = None, end = None, diffpath = None): """ Get intervals from backend database. Streams the resulting intervals as JSON strings separated by CR LF pairs. This may make multiple requests to the nilmdb backend to avoid causing it to block for too long. Returns intervals between 'start' and 'end' belonging to 'path'. If 'diff' is provided, the set-difference between intervals in 'path' and intervals in 'diffpath' are returned instead. Note that the response type is the non-standard 'application/x-json-stream' for lack of a better option. """ (start, end) = self._get_times(start, end) if len(self.db.stream_list(path = path)) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path) if diffpath and len(self.db.stream_list(path = diffpath)) != 1: raise cherrypy.HTTPError("404", "No such stream: " + diffpath) @workaround_cp_bug_1200 def content(start, end): # Note: disable chunked responses to see tracebacks from here. while True: (ints, restart) = self.db.stream_intervals(path, start, end, diffpath) response = ''.join([ json.dumps(i) + "\r\n" for i in ints ]) yield response if restart is None: break start = restart return content(start, end) # /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0 @cherrypy.expose @chunked_response @response_type("text/plain") def extract(self, path, start = None, end = None, count = False, markup = False, binary = False): """ Extract data from backend database. Streams the resulting entries as ASCII text lines separated by newlines. This may make multiple requests to the nilmdb backend to avoid causing it to block for too long. If 'count' is True, returns a count rather than actual data. If 'markup' is True, adds comments to the stream denoting each interval's start and end timestamp. If 'binary' is True, return raw binary data, rather than lines of ASCII-formatted data. Raw binary data is always little-endian and matches the database types (including an int64 timestamp). """ (start, end) = self._get_times(start, end) # Check path and get layout if len(self.db.stream_list(path = path)) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path) if binary: cherrypy.response.headers['Content-Type'] = ( "application/octet-stream") if markup or count: raise cherrypy.HTTPError("400", "can't mix binary and " "markup or count modes") @workaround_cp_bug_1200 def content(start, end): # Note: disable chunked responses to see tracebacks from here. if count: matched = self.db.stream_extract(path, start, end, count = True) yield sprintf("%d\n", matched) return while True: (data, restart) = self.db.stream_extract( path, start, end, count = False, markup = markup, binary = binary) yield data if restart is None: return start = restart return content(start, end) class Exiter(object): """App that exits the server, for testing""" @cherrypy.expose def index(self): cherrypy.response.headers['Content-Type'] = 'text/plain' def content(): yield 'Exiting by request' raise SystemExit return content() index._cp_config = { 'response.stream': True } class Server(object): def __init__(self, db, host = '127.0.0.1', port = 8080, stoppable = False, # whether /exit URL exists embedded = True, # hide diagnostics and output, etc fast_shutdown = False, # don't wait for clients to disconn. force_traceback = False, # include traceback in all errors basepath = '', # base URL path for cherrypy.tree ): # Save server version, just for verification during tests self.version = nilmdb.__version__ self.embedded = embedded self.db = db if not getattr(db, "_thread_safe", None): raise KeyError("Database object " + str(db) + " doesn't claim " "to be thread safe. You should pass " "nilmdb.utils.serializer_proxy(NilmDB)(args) " "rather than NilmDB(args).") # Build up global server configuration cherrypy.config.update({ 'server.socket_host': host, 'server.socket_port': port, 'engine.autoreload_on': False, 'server.max_request_body_size': 8*1024*1024, }) if self.embedded: cherrypy.config.update({ 'environment': 'embedded' }) # Build up application specific configuration app_config = {} app_config.update({ 'error_page.default': self.json_error_page, }) # Some default headers to just help identify that things are working app_config.update({ 'response.headers.X-Jim-Is-Awesome': 'yeah' }) # Set up Cross-Origin Resource Sharing (CORS) handler so we # can correctly respond to browsers' CORS preflight requests. # This also limits verbs to GET and HEAD by default. app_config.update({ 'tools.CORS_allow.on': True, 'tools.CORS_allow.methods': ['GET', 'HEAD'] }) # Configure the 'json_in' tool to also allow other content-types # (like x-www-form-urlencoded), and to treat JSON as a dict that # fills requests.param. app_config.update({ 'tools.json_in.force': False, 'tools.json_in.processor': json_to_request_params }) # Send tracebacks in error responses. They're hidden by the # error_page function for client errors (code 400-499). app_config.update({ 'request.show_tracebacks' : True }) self.force_traceback = force_traceback # Patch CherryPy error handler to never pad out error messages. # This isn't necessary, but then again, neither is padding the # error messages. cherrypy._cperror._ie_friendly_error_sizes = {} # Build up the application and mount it root = Root(self.db) root.stream = Stream(self.db) if stoppable: root.exit = Exiter() cherrypy.tree.apps = {} cherrypy.tree.mount(root, basepath, config = { "/" : app_config }) # Shutdowns normally wait for clients to disconnect. To speed # up tests, set fast_shutdown = True if fast_shutdown: # Setting timeout to 0 triggers os._exit(70) at shutdown, grr... cherrypy.server.shutdown_timeout = 0.01 else: cherrypy.server.shutdown_timeout = 5 # Set up the WSGI application pointer for external programs self.wsgi_application = cherrypy.tree def json_error_page(self, status, message, traceback, version): """Return a custom error page in JSON so the client can parse it""" errordata = { "status" : status, "message" : message, "traceback" : traceback } # Don't send a traceback if the error was 400-499 (client's fault) try: code = int(status.split()[0]) if not self.force_traceback: if code >= 400 and code <= 499: errordata["traceback"] = "" except Exception: # pragma: no cover pass # Override the response type, which was previously set to text/html cherrypy.serving.response.headers['Content-Type'] = ( "application/json;charset=utf-8" ) # Undo the HTML escaping that cherrypy's get_error_page function applies # (cherrypy issue 1135) for k, v in errordata.iteritems(): v = v.replace("<","<") v = v.replace(">",">") v = v.replace("&","&") errordata[k] = v return json.dumps(errordata, separators=(',',':')) def start(self, blocking = False, event = None): if not self.embedded: # pragma: no cover # Handle signals nicely if hasattr(cherrypy.engine, "signal_handler"): cherrypy.engine.signal_handler.subscribe() if hasattr(cherrypy.engine, "console_control_handler"): cherrypy.engine.console_control_handler.subscribe() # Cherrypy stupidly calls os._exit(70) when it can't bind the # port. At least try to print a reasonable error and continue # in this case, rather than just dying silently (as we would # otherwise do in embedded mode) real_exit = os._exit def fake_exit(code): # pragma: no cover if code == os.EX_SOFTWARE: fprintf(sys.stderr, "error: CherryPy called os._exit!\n") else: real_exit(code) os._exit = fake_exit cherrypy.engine.start() os._exit = real_exit # Signal that the engine has started successfully if event is not None: event.set() if blocking: try: cherrypy.engine.wait(cherrypy.engine.states.EXITING, interval = 0.1, channel = 'main') except (KeyboardInterrupt, IOError): # pragma: no cover cherrypy.engine.log('Keyboard Interrupt: shutting down bus') cherrypy.engine.exit() except SystemExit: # pragma: no cover cherrypy.engine.log('SystemExit raised: shutting down bus') cherrypy.engine.exit() raise def stop(self): cherrypy.engine.exit() # Use a single global nilmdb.server.NilmDB and nilmdb.server.Server # instance since the database can only be opened once. For this to # work, the web server must use only a single process and single # Python interpreter. Multiple threads are OK. _wsgi_server = None def wsgi_application(dbpath, basepath): # pragma: no cover """Return a WSGI application object with a database at the specified path. 'dbpath' is a filesystem location, e.g. /home/nilm/db 'basepath' is the URL path of the application base, which is the same as the first argument to Apache's WSGIScriptAlias directive. """ def application(environ, start_response): global _wsgi_server if _wsgi_server is None: # Try to start the server try: db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath) _wsgi_server = nilmdb.server.Server( db, embedded = True, basepath = basepath.rstrip('/')) except Exception: # Build an error message on failure import pprint err = sprintf("Initializing database at path '%s' failed:\n\n", dbpath) err += traceback.format_exc() try: import pwd import grp err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s) " "on host %s, pid %d\n", os.getuid(), pwd.getpwuid(os.getuid())[0], os.getgid(), grp.getgrgid(os.getgid())[0], socket.gethostname(), os.getpid()) except ImportError: pass err += sprintf("\nEnvironment:\n%s\n", pprint.pformat(environ)) if _wsgi_server is None: # Serve up the error with our own mini WSGI app. headers = [ ('Content-type', 'text/plain'), ('Content-length', str(len(err))) ] start_response("500 Internal Server Error", headers) return [err] # Call the normal application return _wsgi_server.wsgi_application(environ, start_response) return application