You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

620 lines
23 KiB

  1. """CherryPy-based server for accessing NILM database via HTTP"""
  2. # Need absolute_import so that "import nilmdb" won't pull in
  3. # nilmdb.py, but will pull the nilmdb module instead.
  4. from __future__ import absolute_import
  5. import nilmdb.server
  6. from nilmdb.utils.printf import *
  7. from nilmdb.server.errors import NilmDBError
  8. from nilmdb.utils.time import string_to_timestamp
  9. import cherrypy
  10. import sys
  11. import os
  12. import simplejson as json
  13. import decorator
  14. import psutil
  15. class NilmApp(object):
  16. def __init__(self, db):
  17. self.db = db
  18. # Decorators
  19. def chunked_response(func):
  20. """Decorator to enable chunked responses."""
  21. # Set this to False to get better tracebacks from some requests
  22. # (/stream/extract, /stream/intervals).
  23. func._cp_config = { 'response.stream': True }
  24. return func
  25. def response_type(content_type):
  26. """Return a decorator-generating function that sets the
  27. response type to the specified string."""
  28. def wrapper(func, *args, **kwargs):
  29. cherrypy.response.headers['Content-Type'] = content_type
  30. return func(*args, **kwargs)
  31. return decorator.decorator(wrapper)
  32. @decorator.decorator
  33. def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
  34. """Decorator to work around CherryPy bug #1200 in a response
  35. generator.
  36. Even if chunked responses are disabled, LookupError or
  37. UnicodeError exceptions may still be swallowed by CherryPy due to
  38. bug #1200. This throws them as generic Exceptions instead so that
  39. they make it through.
  40. """
  41. exc_info = None
  42. try:
  43. for val in func(*args, **kwargs):
  44. yield val
  45. except (LookupError, UnicodeError):
  46. # Re-raise it, but maintain the original traceback
  47. exc_info = sys.exc_info()
  48. new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1]))
  49. raise new_exc, None, exc_info[2]
  50. finally:
  51. del exc_info
  52. def exception_to_httperror(*expected):
  53. """Return a decorator-generating function that catches expected
  54. errors and throws a HTTPError describing it instead.
  55. @exception_to_httperror(NilmDBError, ValueError)
  56. def foo():
  57. pass
  58. """
  59. def wrapper(func, *args, **kwargs):
  60. exc_info = None
  61. try:
  62. return func(*args, **kwargs)
  63. except expected:
  64. # Re-raise it, but maintain the original traceback
  65. exc_info = sys.exc_info()
  66. new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1]))
  67. raise new_exc, None, exc_info[2]
  68. finally:
  69. del exc_info
  70. # We need to preserve the function's argspecs for CherryPy to
  71. # handle argument errors correctly. Decorator.decorator takes
  72. # care of that.
  73. return decorator.decorator(wrapper)
  74. # Custom CherryPy tools
  75. def CORS_allow(methods):
  76. """This does several things:
  77. Handles CORS preflight requests.
  78. Adds Allow: header to all requests.
  79. Raise 405 if request.method not in method.
  80. It is similar to cherrypy.tools.allow, with the CORS stuff added.
  81. """
  82. request = cherrypy.request.headers
  83. response = cherrypy.response.headers
  84. if not isinstance(methods, (tuple, list)): # pragma: no cover
  85. methods = [ methods ]
  86. methods = [ m.upper() for m in methods if m ]
  87. if not methods: # pragma: no cover
  88. methods = [ 'GET', 'HEAD' ]
  89. elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
  90. methods.append('HEAD')
  91. response['Allow'] = ', '.join(methods)
  92. # Allow all origins
  93. if 'Origin' in request:
  94. response['Access-Control-Allow-Origin'] = request['Origin']
  95. # If it's a CORS request, send response.
  96. request_method = request.get("Access-Control-Request-Method", None)
  97. request_headers = request.get("Access-Control-Request-Headers", None)
  98. if (cherrypy.request.method == "OPTIONS" and
  99. request_method and request_headers):
  100. response['Access-Control-Allow-Headers'] = request_headers
  101. response['Access-Control-Allow-Methods'] = ', '.join(methods)
  102. # Try to stop further processing and return a 200 OK
  103. cherrypy.response.status = "200 OK"
  104. cherrypy.response.body = ""
  105. cherrypy.request.handler = lambda: ""
  106. return
  107. # Reject methods that were not explicitly allowed
  108. if cherrypy.request.method not in methods:
  109. raise cherrypy.HTTPError(405)
  110. cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
  111. # Helper for json_in tool to process JSON data into normal request
  112. # parameters.
  113. def json_to_request_params(body):
  114. cherrypy.lib.jsontools.json_processor(body)
  115. if not isinstance(cherrypy.request.json, dict):
  116. raise cherrypy.HTTPError(415)
  117. cherrypy.request.params.update(cherrypy.request.json)
  118. # CherryPy apps
  119. class Root(NilmApp):
  120. """Root application for NILM database"""
  121. def __init__(self, db):
  122. super(Root, self).__init__(db)
  123. # /
  124. @cherrypy.expose
  125. def index(self):
  126. raise cherrypy.NotFound()
  127. # /favicon.ico
  128. @cherrypy.expose
  129. def favicon_ico(self):
  130. raise cherrypy.NotFound()
  131. # /version
  132. @cherrypy.expose
  133. @cherrypy.tools.json_out()
  134. def version(self):
  135. return nilmdb.__version__
  136. # /dbinfo
  137. @cherrypy.expose
  138. @cherrypy.tools.json_out()
  139. def dbinfo(self):
  140. """Return a dictionary with the database path,
  141. size of the database in bytes, and free disk space in bytes"""
  142. path = self.db.get_basepath()
  143. return { "path": path,
  144. "size": nilmdb.utils.du(path),
  145. "free": psutil.disk_usage(path).free }
  146. class Stream(NilmApp):
  147. """Stream-specific operations"""
  148. # /stream/list
  149. # /stream/list?layout=float32_8
  150. # /stream/list?path=/newton/prep&extended=1
  151. @cherrypy.expose
  152. @cherrypy.tools.json_out()
  153. def list(self, path = None, layout = None, extended = None):
  154. """List all streams in the database. With optional path or
  155. layout parameter, just list streams that match the given path
  156. or layout.
  157. If extent is not given, returns a list of lists containing
  158. the path and layout: [ path, layout ]
  159. If extended is provided, returns a list of lists containing
  160. extended info: [ path, layout, extent_min, extent_max,
  161. total_rows, total_seconds ]. More data may be added.
  162. """
  163. return self.db.stream_list(path, layout, bool(extended))
  164. # /stream/create?path=/newton/prep&layout=float32_8
  165. @cherrypy.expose
  166. @cherrypy.tools.json_in()
  167. @cherrypy.tools.json_out()
  168. @exception_to_httperror(NilmDBError, ValueError)
  169. @cherrypy.tools.CORS_allow(methods = ["POST"])
  170. def create(self, path, layout):
  171. """Create a new stream in the database. Provide path
  172. and one of the nilmdb.layout.layouts keys.
  173. """
  174. return self.db.stream_create(path, layout)
  175. # /stream/destroy?path=/newton/prep
  176. @cherrypy.expose
  177. @cherrypy.tools.json_in()
  178. @cherrypy.tools.json_out()
  179. @exception_to_httperror(NilmDBError)
  180. @cherrypy.tools.CORS_allow(methods = ["POST"])
  181. def destroy(self, path):
  182. """Delete a stream. Fails if any data is still present."""
  183. return self.db.stream_destroy(path)
  184. # /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1
  185. @cherrypy.expose
  186. @cherrypy.tools.json_in()
  187. @cherrypy.tools.json_out()
  188. @exception_to_httperror(NilmDBError, ValueError)
  189. @cherrypy.tools.CORS_allow(methods = ["POST"])
  190. def rename(self, oldpath, newpath):
  191. """Rename a stream."""
  192. return self.db.stream_rename(oldpath, newpath)
  193. # /stream/get_metadata?path=/newton/prep
  194. # /stream/get_metadata?path=/newton/prep&key=foo&key=bar
  195. @cherrypy.expose
  196. @cherrypy.tools.json_out()
  197. def get_metadata(self, path, key=None):
  198. """Get metadata for the named stream. If optional
  199. key parameters are specified, only return metadata
  200. matching the given keys."""
  201. try:
  202. data = self.db.stream_get_metadata(path)
  203. except nilmdb.server.nilmdb.StreamError as e:
  204. raise cherrypy.HTTPError("404 Not Found", e.message)
  205. if key is None: # If no keys specified, return them all
  206. key = data.keys()
  207. elif not isinstance(key, list):
  208. key = [ key ]
  209. result = {}
  210. for k in key:
  211. if k in data:
  212. result[k] = data[k]
  213. else: # Return "None" for keys with no matching value
  214. result[k] = None
  215. return result
  216. # Helper for set_metadata and get_metadata
  217. def _metadata_helper(self, function, path, data):
  218. if not isinstance(data, dict):
  219. try:
  220. data = dict(json.loads(data))
  221. except TypeError as e:
  222. raise NilmDBError("can't parse 'data' parameter: " + e.message)
  223. for key in data:
  224. if not (isinstance(data[key], basestring) or
  225. isinstance(data[key], float) or
  226. isinstance(data[key], int)):
  227. raise NilmDBError("metadata values must be a string or number")
  228. function(path, data)
  229. # /stream/set_metadata?path=/newton/prep&data=<json>
  230. @cherrypy.expose
  231. @cherrypy.tools.json_in()
  232. @cherrypy.tools.json_out()
  233. @exception_to_httperror(NilmDBError, LookupError)
  234. @cherrypy.tools.CORS_allow(methods = ["POST"])
  235. def set_metadata(self, path, data):
  236. """Set metadata for the named stream, replacing any existing
  237. metadata. Data can be json-encoded or a plain dictionary."""
  238. self._metadata_helper(self.db.stream_set_metadata, path, data)
  239. # /stream/update_metadata?path=/newton/prep&data=<json>
  240. @cherrypy.expose
  241. @cherrypy.tools.json_in()
  242. @cherrypy.tools.json_out()
  243. @exception_to_httperror(NilmDBError, LookupError, ValueError)
  244. @cherrypy.tools.CORS_allow(methods = ["POST"])
  245. def update_metadata(self, path, data):
  246. """Set metadata for the named stream, replacing any existing
  247. metadata. Data can be json-encoded or a plain dictionary."""
  248. self._metadata_helper(self.db.stream_update_metadata, path, data)
  249. # /stream/insert?path=/newton/prep
  250. @cherrypy.expose
  251. @cherrypy.tools.json_out()
  252. @exception_to_httperror(NilmDBError, ValueError)
  253. @cherrypy.tools.CORS_allow(methods = ["PUT"])
  254. def insert(self, path, start, end):
  255. """
  256. Insert new data into the database. Provide textual data
  257. (matching the path's layout) as a HTTP PUT.
  258. """
  259. # Important that we always read the input before throwing any
  260. # errors, to keep lengths happy for persistent connections.
  261. # Note that CherryPy 3.2.2 has a bug where this fails for GET
  262. # requests, if we ever want to handle those (issue #1134)
  263. body = cherrypy.request.body.read()
  264. # Check path and get layout
  265. streams = self.db.stream_list(path = path)
  266. if len(streams) != 1:
  267. raise cherrypy.HTTPError("404 Not Found", "No such stream")
  268. # Check limits
  269. start = string_to_timestamp(start)
  270. end = string_to_timestamp(end)
  271. if start >= end:
  272. raise cherrypy.HTTPError("400 Bad Request",
  273. "start must precede end")
  274. # Pass the data directly to nilmdb, which will parse it and
  275. # raise a ValueError if there are any problems.
  276. self.db.stream_insert(path, start, end, body)
  277. # Done
  278. return
  279. # /stream/remove?path=/newton/prep
  280. # /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
  281. @cherrypy.expose
  282. @cherrypy.tools.json_in()
  283. @cherrypy.tools.json_out()
  284. @exception_to_httperror(NilmDBError)
  285. @cherrypy.tools.CORS_allow(methods = ["POST"])
  286. def remove(self, path, start = None, end = None):
  287. """
  288. Remove data from the backend database. Removes all data in
  289. the interval [start, end). Returns the number of data points
  290. removed.
  291. """
  292. if start is not None:
  293. start = string_to_timestamp(start)
  294. if end is not None:
  295. end = string_to_timestamp(end)
  296. if start is not None and end is not None:
  297. if start >= end:
  298. raise cherrypy.HTTPError("400 Bad Request",
  299. "start must precede end")
  300. total_removed = 0
  301. while True:
  302. (removed, restart) = self.db.stream_remove(path, start, end)
  303. total_removed += removed
  304. if restart is None:
  305. break
  306. start = restart
  307. return total_removed
  308. # /stream/intervals?path=/newton/prep
  309. # /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
  310. # /stream/intervals?path=/newton/prep&diffpath=/newton/prep2
  311. @cherrypy.expose
  312. @chunked_response
  313. @response_type("application/x-json-stream")
  314. def intervals(self, path, start = None, end = None, diffpath = None):
  315. """
  316. Get intervals from backend database. Streams the resulting
  317. intervals as JSON strings separated by CR LF pairs. This may
  318. make multiple requests to the nilmdb backend to avoid causing
  319. it to block for too long.
  320. Returns intervals between 'start' and 'end' belonging to
  321. 'path'. If 'diff' is provided, the set-difference between
  322. intervals in 'path' and intervals in 'diffpath' are
  323. returned instead.
  324. Note that the response type is the non-standard
  325. 'application/x-json-stream' for lack of a better option.
  326. """
  327. if start is not None:
  328. start = string_to_timestamp(start)
  329. if end is not None:
  330. end = string_to_timestamp(end)
  331. if start is not None and end is not None:
  332. if start >= end:
  333. raise cherrypy.HTTPError("400 Bad Request",
  334. "start must precede end")
  335. if len(self.db.stream_list(path = path)) != 1:
  336. raise cherrypy.HTTPError("404", "No such stream: " + path)
  337. if diffpath and len(self.db.stream_list(path = diffpath)) != 1:
  338. raise cherrypy.HTTPError("404", "No such stream: " + diffpath)
  339. @workaround_cp_bug_1200
  340. def content(start, end):
  341. # Note: disable chunked responses to see tracebacks from here.
  342. while True:
  343. (ints, restart) = self.db.stream_intervals(path, start, end,
  344. diffpath)
  345. response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
  346. yield response
  347. if restart is None:
  348. break
  349. start = restart
  350. return content(start, end)
  351. # /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0
  352. @cherrypy.expose
  353. @chunked_response
  354. @response_type("text/plain")
  355. def extract(self, path, start = None, end = None,
  356. count = False, markup = False):
  357. """
  358. Extract data from backend database. Streams the resulting
  359. entries as ASCII text lines separated by newlines. This may
  360. make multiple requests to the nilmdb backend to avoid causing
  361. it to block for too long.
  362. If 'count' is True, returns a count rather than actual data.
  363. If 'markup' is True, adds comments to the stream denoting each
  364. interval's start and end timestamp.
  365. """
  366. if start is not None:
  367. start = string_to_timestamp(start)
  368. if end is not None:
  369. end = string_to_timestamp(end)
  370. # Check parameters
  371. if start is not None and end is not None:
  372. if start >= end:
  373. raise cherrypy.HTTPError("400 Bad Request",
  374. "start must precede end")
  375. # Check path and get layout
  376. streams = self.db.stream_list(path = path)
  377. if len(streams) != 1:
  378. raise cherrypy.HTTPError("404 Not Found", "No such stream")
  379. @workaround_cp_bug_1200
  380. def content(start, end):
  381. # Note: disable chunked responses to see tracebacks from here.
  382. if count:
  383. matched = self.db.stream_extract(path, start, end,
  384. count = True)
  385. yield sprintf("%d\n", matched)
  386. return
  387. while True:
  388. (data, restart) = self.db.stream_extract(
  389. path, start, end, count = False, markup = markup)
  390. yield data
  391. if restart is None:
  392. return
  393. start = restart
  394. return content(start, end)
  395. class Exiter(object):
  396. """App that exits the server, for testing"""
  397. @cherrypy.expose
  398. def index(self):
  399. cherrypy.response.headers['Content-Type'] = 'text/plain'
  400. def content():
  401. yield 'Exiting by request'
  402. raise SystemExit
  403. return content()
  404. index._cp_config = { 'response.stream': True }
  405. class Server(object):
  406. def __init__(self, db, host = '127.0.0.1', port = 8080,
  407. stoppable = False, # whether /exit URL exists
  408. embedded = True, # hide diagnostics and output, etc
  409. fast_shutdown = False, # don't wait for clients to disconn.
  410. force_traceback = False # include traceback in all errors
  411. ):
  412. # Save server version, just for verification during tests
  413. self.version = nilmdb.__version__
  414. self.embedded = embedded
  415. self.db = db
  416. if not getattr(db, "_thread_safe", None):
  417. raise KeyError("Database object " + str(db) + " doesn't claim "
  418. "to be thread safe. You should pass "
  419. "nilmdb.utils.serializer_proxy(NilmDB)(args) "
  420. "rather than NilmDB(args).")
  421. # Build up global server configuration
  422. cherrypy.config.update({
  423. 'server.socket_host': host,
  424. 'server.socket_port': port,
  425. 'engine.autoreload_on': False,
  426. 'server.max_request_body_size': 8*1024*1024,
  427. })
  428. if self.embedded:
  429. cherrypy.config.update({ 'environment': 'embedded' })
  430. # Build up application specific configuration
  431. app_config = {}
  432. app_config.update({
  433. 'error_page.default': self.json_error_page,
  434. })
  435. # Some default headers to just help identify that things are working
  436. app_config.update({ 'response.headers.X-Jim-Is-Awesome': 'yeah' })
  437. # Set up Cross-Origin Resource Sharing (CORS) handler so we
  438. # can correctly respond to browsers' CORS preflight requests.
  439. # This also limits verbs to GET and HEAD by default.
  440. app_config.update({ 'tools.CORS_allow.on': True,
  441. 'tools.CORS_allow.methods': ['GET', 'HEAD'] })
  442. # Configure the 'json_in' tool to also allow other content-types
  443. # (like x-www-form-urlencoded), and to treat JSON as a dict that
  444. # fills requests.param.
  445. app_config.update({ 'tools.json_in.force': False,
  446. 'tools.json_in.processor': json_to_request_params })
  447. # Send tracebacks in error responses. They're hidden by the
  448. # error_page function for client errors (code 400-499).
  449. app_config.update({ 'request.show_tracebacks' : True })
  450. self.force_traceback = force_traceback
  451. # Patch CherryPy error handler to never pad out error messages.
  452. # This isn't necessary, but then again, neither is padding the
  453. # error messages.
  454. cherrypy._cperror._ie_friendly_error_sizes = {}
  455. # Build up the application and mount it
  456. root = Root(self.db)
  457. root.stream = Stream(self.db)
  458. if stoppable:
  459. root.exit = Exiter()
  460. cherrypy.tree.apps = {}
  461. cherrypy.tree.mount(root, "/", config = { "/" : app_config })
  462. # Shutdowns normally wait for clients to disconnect. To speed
  463. # up tests, set fast_shutdown = True
  464. if fast_shutdown:
  465. # Setting timeout to 0 triggers os._exit(70) at shutdown, grr...
  466. cherrypy.server.shutdown_timeout = 0.01
  467. else:
  468. cherrypy.server.shutdown_timeout = 5
  469. def get_application(self):
  470. """Return a WSGI application object"""
  471. def app(environ, start_response):
  472. if environ['wsgi.multiprocess']:
  473. raise Exception("can't function in a multi-process environment")
  474. return cherrypy.tree(environ, start_response)
  475. return app
  476. def json_error_page(self, status, message, traceback, version):
  477. """Return a custom error page in JSON so the client can parse it"""
  478. errordata = { "status" : status,
  479. "message" : message,
  480. "traceback" : traceback }
  481. # Don't send a traceback if the error was 400-499 (client's fault)
  482. try:
  483. code = int(status.split()[0])
  484. if not self.force_traceback:
  485. if code >= 400 and code <= 499:
  486. errordata["traceback"] = ""
  487. except Exception: # pragma: no cover
  488. pass
  489. # Override the response type, which was previously set to text/html
  490. cherrypy.serving.response.headers['Content-Type'] = (
  491. "application/json;charset=utf-8" )
  492. # Undo the HTML escaping that cherrypy's get_error_page function applies
  493. # (cherrypy issue 1135)
  494. for k, v in errordata.iteritems():
  495. v = v.replace("&lt;","<")
  496. v = v.replace("&gt;",">")
  497. v = v.replace("&amp;","&")
  498. errordata[k] = v
  499. return json.dumps(errordata, separators=(',',':'))
  500. def start(self, blocking = False, event = None):
  501. if not self.embedded: # pragma: no cover
  502. # Handle signals nicely
  503. if hasattr(cherrypy.engine, "signal_handler"):
  504. cherrypy.engine.signal_handler.subscribe()
  505. if hasattr(cherrypy.engine, "console_control_handler"):
  506. cherrypy.engine.console_control_handler.subscribe()
  507. # Cherrypy stupidly calls os._exit(70) when it can't bind the
  508. # port. At least try to print a reasonable error and continue
  509. # in this case, rather than just dying silently (as we would
  510. # otherwise do in embedded mode)
  511. real_exit = os._exit
  512. def fake_exit(code): # pragma: no cover
  513. if code == os.EX_SOFTWARE:
  514. fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
  515. else:
  516. real_exit(code)
  517. os._exit = fake_exit
  518. cherrypy.engine.start()
  519. os._exit = real_exit
  520. # Signal that the engine has started successfully
  521. if event is not None:
  522. event.set()
  523. if blocking:
  524. try:
  525. cherrypy.engine.wait(cherrypy.engine.states.EXITING,
  526. interval = 0.1, channel = 'main')
  527. except (KeyboardInterrupt, IOError): # pragma: no cover
  528. cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
  529. cherrypy.engine.exit()
  530. except SystemExit: # pragma: no cover
  531. cherrypy.engine.log('SystemExit raised: shutting down bus')
  532. cherrypy.engine.exit()
  533. raise
  534. def stop(self):
  535. cherrypy.engine.exit()
  536. def wsgi_application(dbpath):
  537. """Return a WSGI application object with a database at the
  538. specified path."""
  539. db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath)
  540. server = nilmdb.server.Server(db, embedded = True)
  541. return server.get_application()