"""CherryPy-based server for accessing NILM database via HTTP""" # Need absolute_import so that "import nilmdb" won't pull in # nilmdb.py, but will pull the nilmdb module instead. import nilmdb.server from nilmdb.utils.printf import * from nilmdb.server.errors import NilmDBError from nilmdb.utils.time import string_to_timestamp import cherrypy import sys import os import socket import json import decorator import psutil import traceback from nilmdb.server.serverutil import ( chunked_response, response_type, exception_to_httperror, CORS_allow, json_to_request_params, json_error_page, cherrypy_start, cherrypy_stop, bool_param, ) # Add CORS_allow tool cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow) class NilmApp(object): def __init__(self, db): self.db = db # CherryPy apps class Root(NilmApp): """Root application for NILM database""" def __init__(self, db): super(Root, self).__init__(db) # / @cherrypy.expose def index(self): cherrypy.response.headers['Content-Type'] = 'text/plain' msg = sprintf("This is NilmDB version %s, running on host %s.\n", nilmdb.__version__, socket.getfqdn()) return msg # /favicon.ico @cherrypy.expose def favicon_ico(self): raise cherrypy.NotFound() # /version @cherrypy.expose @cherrypy.tools.json_out() def version(self): return nilmdb.__version__ # /dbinfo @cherrypy.expose @cherrypy.tools.json_out() def dbinfo(self): """Return a dictionary with the database path, size of the database in bytes, and free disk space in bytes""" path = self.db.get_basepath() usage = psutil.disk_usage(path) dbsize = nilmdb.utils.du(path) return { "path": path, "size": dbsize, "other": max(usage.used - dbsize, 0), "reserved": max(usage.total - usage.used - usage.free, 0), "free": usage.free } class Stream(NilmApp): """Stream-specific operations""" # Helpers def _get_times(self, start_param, end_param): (start, end) = (None, None) try: if start_param is not None: start = string_to_timestamp(start_param) except Exception: raise cherrypy.HTTPError("400 Bad Request", sprintf( "invalid start (%s): must be a numeric timestamp", start_param)) try: if end_param is not None: end = string_to_timestamp(end_param) except Exception: raise cherrypy.HTTPError("400 Bad Request", sprintf( "invalid end (%s): must be a numeric timestamp", end_param)) if start is not None and end is not None: if start >= end: raise cherrypy.HTTPError( "400 Bad Request", sprintf("start must precede end (%s >= %s)", start_param, end_param)) return (start, end) # /stream/list # /stream/list?layout=float32_8 # /stream/list?path=/newton/prep&extended=1 @cherrypy.expose @cherrypy.tools.json_out() def list(self, path = None, layout = None, extended = None): """List all streams in the database. With optional path or layout parameter, just list streams that match the given path or layout. If extended is missing or zero, returns a list of lists containing the path and layout: [ path, layout ] If extended is true, returns a list of lists containing extended info: [ path, layout, extent_min, extent_max, total_rows, total_seconds ]. More data may be added. """ return self.db.stream_list(path, layout, bool(extended)) # /stream/create?path=/newton/prep&layout=float32_8 @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError, ValueError) @cherrypy.tools.CORS_allow(methods = ["POST"]) def create(self, path, layout): """Create a new stream in the database. Provide path and one of the nilmdb.layout.layouts keys. """ return self.db.stream_create(path, layout) # /stream/destroy?path=/newton/prep @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError) @cherrypy.tools.CORS_allow(methods = ["POST"]) def destroy(self, path): """Delete a stream. Fails if any data is still present.""" return self.db.stream_destroy(path) # /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1 @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError, ValueError) @cherrypy.tools.CORS_allow(methods = ["POST"]) def rename(self, oldpath, newpath): """Rename a stream.""" return self.db.stream_rename(oldpath, newpath) # /stream/get_metadata?path=/newton/prep # /stream/get_metadata?path=/newton/prep&key=foo&key=bar @cherrypy.expose @cherrypy.tools.json_out() def get_metadata(self, path, key=None): """Get metadata for the named stream. If optional key parameters are specified, only return metadata matching the given keys.""" try: data = self.db.stream_get_metadata(path) except nilmdb.server.nilmdb.StreamError as e: raise cherrypy.HTTPError("404 Not Found", str(e)) if key is None: # If no keys specified, return them all key = list(data.keys()) elif not isinstance(key, list): key = [ key ] result = {} for k in key: if k in data: result[k] = data[k] else: # Return "None" for keys with no matching value result[k] = None return result # Helper for set_metadata and get_metadata def _metadata_helper(self, function, path, data): if not isinstance(data, dict): try: data = dict(json.loads(data)) except TypeError as e: raise NilmDBError("can't parse 'data' parameter: " + str(e)) for key in data: if not (isinstance(data[key], str) or isinstance(data[key], float) or isinstance(data[key], int)): raise NilmDBError("metadata values must be a string or number") function(path, data) # /stream/set_metadata?path=/newton/prep&data= @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError, LookupError) @cherrypy.tools.CORS_allow(methods = ["POST"]) def set_metadata(self, path, data): """Set metadata for the named stream, replacing any existing metadata. Data can be json-encoded or a plain dictionary.""" self._metadata_helper(self.db.stream_set_metadata, path, data) # /stream/update_metadata?path=/newton/prep&data= @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError, LookupError, ValueError) @cherrypy.tools.CORS_allow(methods = ["POST"]) def update_metadata(self, path, data): """Set metadata for the named stream, replacing any existing metadata. Data can be json-encoded or a plain dictionary.""" self._metadata_helper(self.db.stream_update_metadata, path, data) # /stream/insert?path=/newton/prep @cherrypy.expose @cherrypy.tools.json_out() @exception_to_httperror(NilmDBError, ValueError) @cherrypy.tools.CORS_allow(methods = ["PUT"]) def insert(self, path, start, end, binary = False): """ Insert new data into the database. Provide textual data (matching the path's layout) as a HTTP PUT. If 'binary' is True, expect raw binary data, rather than lines of ASCII-formatted data. Raw binary data is always little-endian and matches the database types (including an int64 timestamp). """ binary = bool_param(binary) # Important that we always read the input before throwing any # errors, to keep lengths happy for persistent connections. # Note that CherryPy 3.2.2 has a bug where this fails for GET # requests, if we ever want to handle those (issue #1134) body = cherrypy.request.body.read() # Verify content type for binary data content_type = cherrypy.request.headers.get('content-type') if binary and content_type: if content_type != "application/octet-stream": raise cherrypy.HTTPError("400", "Content type must be " "application/octet-stream for " "binary data, not " + content_type) # Note that non-binary data is *not* decoded from bytes to string, # but rather passed directly to stream_insert. # Check path and get layout if len(self.db.stream_list(path = path)) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path) # Check limits (start, end) = self._get_times(start, end) # Pass the data directly to nilmdb, which will parse it and # raise a ValueError if there are any problems. self.db.stream_insert(path, start, end, body, binary) # Done return # /stream/remove?path=/newton/prep # /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0 @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.CORS_allow(methods = ["POST"]) @chunked_response @response_type("application/x-json-stream") def remove(self, path, start = None, end = None): """ Remove data from the backend database. Removes all data in the interval [start, end). Returns the number of data points removed. Since this is a potentially long-running operation, multiple numbers may be returned as the data gets removed from the backend database. The total number of points removed is the sum of all of these numbers. """ (start, end) = self._get_times(start, end) if len(self.db.stream_list(path = path)) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path) def content(start, end): # Note: disable chunked responses to see tracebacks from here. while True: (removed, restart) = self.db.stream_remove(path, start, end) response = json.dumps(removed) + "\r\n" yield response.encode('utf-8') if restart is None: break start = restart return content(start, end) # /stream/intervals?path=/newton/prep # /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0 # /stream/intervals?path=/newton/prep&diffpath=/newton/prep2 @cherrypy.expose @chunked_response @response_type("application/x-json-stream") def intervals(self, path, start = None, end = None, diffpath = None): """ Get intervals from backend database. Streams the resulting intervals as JSON strings separated by CR LF pairs. This may make multiple requests to the nilmdb backend to avoid causing it to block for too long. Returns intervals between 'start' and 'end' belonging to 'path'. If 'diff' is provided, the set-difference between intervals in 'path' and intervals in 'diffpath' are returned instead. Note that the response type is the non-standard 'application/x-json-stream' for lack of a better option. """ (start, end) = self._get_times(start, end) if len(self.db.stream_list(path = path)) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path) if diffpath and len(self.db.stream_list(path = diffpath)) != 1: raise cherrypy.HTTPError("404", "No such stream: " + diffpath) def content(start, end): # Note: disable chunked responses to see tracebacks from here. while True: (ints, restart) = self.db.stream_intervals(path, start, end, diffpath) response = ''.join([ json.dumps(i) + "\r\n" for i in ints ]) yield response.encode('utf-8') if restart is None: break start = restart return content(start, end) # /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0 @cherrypy.expose @chunked_response def extract(self, path, start = None, end = None, count = False, markup = False, binary = False): """ Extract data from backend database. Streams the resulting entries as ASCII text lines separated by newlines. This may make multiple requests to the nilmdb backend to avoid causing it to block for too long. If 'count' is True, returns a count rather than actual data. If 'markup' is True, adds comments to the stream denoting each interval's start and end timestamp. If 'binary' is True, return raw binary data, rather than lines of ASCII-formatted data. Raw binary data is always little-endian and matches the database types (including an int64 timestamp). """ binary = bool_param(binary) markup = bool_param(markup) count = bool_param(count) (start, end) = self._get_times(start, end) # Check path and get layout if len(self.db.stream_list(path = path)) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path) if binary: content_type = "application/octet-stream" if markup or count: raise cherrypy.HTTPError("400", "can't mix binary and " "markup or count modes") else: content_type = "text/plain" cherrypy.response.headers['Content-Type'] = content_type def content(start, end): # Note: disable chunked responses to see tracebacks from here. if count: matched = self.db.stream_extract(path, start, end, count = True) yield sprintf(b"%d\n", matched) return while True: (data, restart) = self.db.stream_extract( path, start, end, count = False, markup = markup, binary = binary) yield data if restart is None: return start = restart return content(start, end) class Exiter(object): """App that exits the server, for testing""" @cherrypy.expose def index(self): cherrypy.response.headers['Content-Type'] = 'text/plain' def content(): yield b'Exiting by request' raise SystemExit return content() index._cp_config = { 'response.stream': True } class Server(object): def __init__(self, db, host = '127.0.0.1', port = 8080, stoppable = False, # whether /exit URL exists embedded = True, # hide diagnostics and output, etc fast_shutdown = False, # don't wait for clients to disconn. force_traceback = False, # include traceback in all errors basepath = '', # base URL path for cherrypy.tree ): # Save server version, just for verification during tests self.version = nilmdb.__version__ self.embedded = embedded self.db = db if not getattr(db, "_thread_safe", None): raise KeyError("Database object " + str(db) + " doesn't claim " "to be thread safe. You should pass " "nilmdb.utils.serializer_proxy(NilmDB)(args) " "rather than NilmDB(args).") # Build up global server configuration cherrypy.config.update({ 'server.socket_host': host, 'server.socket_port': port, 'engine.autoreload.on': False, 'server.max_request_body_size': 8*1024*1024, }) if self.embedded: # pragma: no branch (always taken in test suite) cherrypy.config.update({ 'environment': 'embedded' }) # Build up application specific configuration app_config = {} app_config.update({ 'error_page.default': self.json_error_page, }) # Some default headers to just help identify that things are working app_config.update({ 'response.headers.X-Jim-Is-Awesome': 'yeah' }) # Set up Cross-Origin Resource Sharing (CORS) handler so we # can correctly respond to browsers' CORS preflight requests. # This also limits verbs to GET and HEAD by default. app_config.update({ 'tools.CORS_allow.on': True, 'tools.CORS_allow.methods': ['GET', 'HEAD'] }) # Configure the 'json_in' tool to also allow other content-types # (like x-www-form-urlencoded), and to treat JSON as a dict that # fills requests.param. app_config.update({ 'tools.json_in.force': False, 'tools.json_in.processor': json_to_request_params }) # Send tracebacks in error responses. They're hidden by the # error_page function for client errors (code 400-499). app_config.update({ 'request.show_tracebacks' : True }) self.force_traceback = force_traceback # Patch CherryPy error handler to never pad out error messages. # This isn't necessary, but then again, neither is padding the # error messages. cherrypy._cperror._ie_friendly_error_sizes = {} # Build up the application and mount it root = Root(self.db) root.stream = Stream(self.db) if stoppable: root.exit = Exiter() cherrypy.tree.apps = {} cherrypy.tree.mount(root, basepath, config = { "/" : app_config }) # Shutdowns normally wait for clients to disconnect. To speed # up tests, set fast_shutdown = True if fast_shutdown: # Setting timeout to 0 triggers os._exit(70) at shutdown, grr... cherrypy.server.shutdown_timeout = 0.01 else: cherrypy.server.shutdown_timeout = 5 # Set up the WSGI application pointer for external programs self.wsgi_application = cherrypy.tree def json_error_page(self, status, message, traceback, version): """Return a custom error page in JSON so the client can parse it""" return json_error_page(status, message, traceback, version, self.force_traceback) def start(self, blocking = False, event = None): cherrypy_start(blocking, event, self.embedded) def stop(self): cherrypy_stop() # Use a single global nilmdb.server.NilmDB and nilmdb.server.Server # instance since the database can only be opened once. For this to # work, the web server must use only a single process and single # Python interpreter. Multiple threads are OK. _wsgi_server = None def wsgi_application(dbpath, basepath): # pragma: no cover """Return a WSGI application object with a database at the specified path. 'dbpath' is a filesystem location, e.g. /home/nilm/db 'basepath' is the URL path of the application base, which is the same as the first argument to Apache's WSGIScriptAlias directive. """ def application(environ, start_response): global _wsgi_server if _wsgi_server is None: # Try to start the server try: db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath) _wsgi_server = nilmdb.server.Server( db, embedded = True, basepath = basepath.rstrip('/')) except Exception: # Build an error message on failure import pprint err = sprintf("Initializing database at path '%s' failed:\n\n", dbpath) err += traceback.format_exc() try: import pwd import grp err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s) " "on host %s, pid %d\n", os.getuid(), pwd.getpwuid(os.getuid())[0], os.getgid(), grp.getgrgid(os.getgid())[0], socket.gethostname(), os.getpid()) except ImportError: pass err += sprintf("\nEnvironment:\n%s\n", pprint.pformat(environ)) if _wsgi_server is None: # Serve up the error with our own mini WSGI app. headers = [ ('Content-type', 'text/plain'), ('Content-length', str(len(err))) ] start_response("500 Internal Server Error", headers) return [err] # Call the normal application return _wsgi_server.wsgi_application(environ, start_response) return application