You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

555 lines
21 KiB

  1. """CherryPy-based server for accessing NILM database via HTTP"""
  2. # Need absolute_import so that "import nilmdb" won't pull in
  3. # nilmdb.py, but will pull the nilmdb module instead.
  4. import nilmdb.server
  5. from nilmdb.utils.printf import *
  6. from nilmdb.server.errors import NilmDBError
  7. from nilmdb.utils.time import string_to_timestamp
  8. import cherrypy
  9. import sys
  10. import os
  11. import socket
  12. import simplejson as json
  13. import decorator
  14. import psutil
  15. import traceback
  16. from nilmdb.server.serverutil import (
  17. chunked_response,
  18. response_type,
  19. workaround_cp_bug_1200,
  20. exception_to_httperror,
  21. CORS_allow,
  22. json_to_request_params,
  23. json_error_page,
  24. cherrypy_start,
  25. cherrypy_stop,
  26. bool_param,
  27. )
  28. # Add CORS_allow tool
  29. cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
  30. class NilmApp(object):
  31. def __init__(self, db):
  32. self.db = db
  33. # CherryPy apps
  34. class Root(NilmApp):
  35. """Root application for NILM database"""
  36. def __init__(self, db):
  37. super(Root, self).__init__(db)
  38. # /
  39. @cherrypy.expose
  40. def index(self):
  41. cherrypy.response.headers['Content-Type'] = 'text/plain'
  42. msg = sprintf("This is NilmDB version %s, running on host %s.\n",
  43. nilmdb.__version__, socket.getfqdn())
  44. return msg
  45. # /favicon.ico
  46. @cherrypy.expose
  47. def favicon_ico(self):
  48. raise cherrypy.NotFound()
  49. # /version
  50. @cherrypy.expose
  51. @cherrypy.tools.json_out()
  52. def version(self):
  53. return nilmdb.__version__
  54. # /dbinfo
  55. @cherrypy.expose
  56. @cherrypy.tools.json_out()
  57. def dbinfo(self):
  58. """Return a dictionary with the database path,
  59. size of the database in bytes, and free disk space in bytes"""
  60. path = self.db.get_basepath()
  61. usage = psutil.disk_usage(path)
  62. dbsize = nilmdb.utils.du(path)
  63. return { "path": path,
  64. "size": dbsize,
  65. "other": max(usage.used - dbsize, 0),
  66. "reserved": max(usage.total - usage.used - usage.free, 0),
  67. "free": usage.free }
  68. class Stream(NilmApp):
  69. """Stream-specific operations"""
  70. # Helpers
  71. def _get_times(self, start_param, end_param):
  72. (start, end) = (None, None)
  73. try:
  74. if start_param is not None:
  75. start = string_to_timestamp(start_param)
  76. except Exception:
  77. raise cherrypy.HTTPError("400 Bad Request", sprintf(
  78. "invalid start (%s): must be a numeric timestamp", start_param))
  79. try:
  80. if end_param is not None:
  81. end = string_to_timestamp(end_param)
  82. except Exception:
  83. raise cherrypy.HTTPError("400 Bad Request", sprintf(
  84. "invalid end (%s): must be a numeric timestamp", end_param))
  85. if start is not None and end is not None:
  86. if start >= end:
  87. raise cherrypy.HTTPError(
  88. "400 Bad Request",
  89. sprintf("start must precede end (%s >= %s)",
  90. start_param, end_param))
  91. return (start, end)
  92. # /stream/list
  93. # /stream/list?layout=float32_8
  94. # /stream/list?path=/newton/prep&extended=1
  95. @cherrypy.expose
  96. @cherrypy.tools.json_out()
  97. def list(self, path = None, layout = None, extended = None):
  98. """List all streams in the database. With optional path or
  99. layout parameter, just list streams that match the given path
  100. or layout.
  101. If extended is missing or zero, returns a list of lists
  102. containing the path and layout: [ path, layout ]
  103. If extended is true, returns a list of lists containing
  104. extended info: [ path, layout, extent_min, extent_max,
  105. total_rows, total_seconds ]. More data may be added.
  106. """
  107. return self.db.stream_list(path, layout, bool(extended))
  108. # /stream/create?path=/newton/prep&layout=float32_8
  109. @cherrypy.expose
  110. @cherrypy.tools.json_in()
  111. @cherrypy.tools.json_out()
  112. @exception_to_httperror(NilmDBError, ValueError)
  113. @cherrypy.tools.CORS_allow(methods = ["POST"])
  114. def create(self, path, layout):
  115. """Create a new stream in the database. Provide path
  116. and one of the nilmdb.layout.layouts keys.
  117. """
  118. return self.db.stream_create(path, layout)
  119. # /stream/destroy?path=/newton/prep
  120. @cherrypy.expose
  121. @cherrypy.tools.json_in()
  122. @cherrypy.tools.json_out()
  123. @exception_to_httperror(NilmDBError)
  124. @cherrypy.tools.CORS_allow(methods = ["POST"])
  125. def destroy(self, path):
  126. """Delete a stream. Fails if any data is still present."""
  127. return self.db.stream_destroy(path)
  128. # /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1
  129. @cherrypy.expose
  130. @cherrypy.tools.json_in()
  131. @cherrypy.tools.json_out()
  132. @exception_to_httperror(NilmDBError, ValueError)
  133. @cherrypy.tools.CORS_allow(methods = ["POST"])
  134. def rename(self, oldpath, newpath):
  135. """Rename a stream."""
  136. return self.db.stream_rename(oldpath, newpath)
  137. # /stream/get_metadata?path=/newton/prep
  138. # /stream/get_metadata?path=/newton/prep&key=foo&key=bar
  139. @cherrypy.expose
  140. @cherrypy.tools.json_out()
  141. def get_metadata(self, path, key=None):
  142. """Get metadata for the named stream. If optional
  143. key parameters are specified, only return metadata
  144. matching the given keys."""
  145. try:
  146. data = self.db.stream_get_metadata(path)
  147. except nilmdb.server.nilmdb.StreamError as e:
  148. raise cherrypy.HTTPError("404 Not Found", e.message)
  149. if key is None: # If no keys specified, return them all
  150. key = list(data.keys())
  151. elif not isinstance(key, list):
  152. key = [ key ]
  153. result = {}
  154. for k in key:
  155. if k in data:
  156. result[k] = data[k]
  157. else: # Return "None" for keys with no matching value
  158. result[k] = None
  159. return result
  160. # Helper for set_metadata and get_metadata
  161. def _metadata_helper(self, function, path, data):
  162. if not isinstance(data, dict):
  163. try:
  164. data = dict(json.loads(data))
  165. except TypeError as e:
  166. raise NilmDBError("can't parse 'data' parameter: " + e.message)
  167. for key in data:
  168. if not (isinstance(data[key], str) or
  169. isinstance(data[key], float) or
  170. isinstance(data[key], int)):
  171. raise NilmDBError("metadata values must be a string or number")
  172. function(path, data)
  173. # /stream/set_metadata?path=/newton/prep&data=<json>
  174. @cherrypy.expose
  175. @cherrypy.tools.json_in()
  176. @cherrypy.tools.json_out()
  177. @exception_to_httperror(NilmDBError, LookupError)
  178. @cherrypy.tools.CORS_allow(methods = ["POST"])
  179. def set_metadata(self, path, data):
  180. """Set metadata for the named stream, replacing any existing
  181. metadata. Data can be json-encoded or a plain dictionary."""
  182. self._metadata_helper(self.db.stream_set_metadata, path, data)
  183. # /stream/update_metadata?path=/newton/prep&data=<json>
  184. @cherrypy.expose
  185. @cherrypy.tools.json_in()
  186. @cherrypy.tools.json_out()
  187. @exception_to_httperror(NilmDBError, LookupError, ValueError)
  188. @cherrypy.tools.CORS_allow(methods = ["POST"])
  189. def update_metadata(self, path, data):
  190. """Set metadata for the named stream, replacing any existing
  191. metadata. Data can be json-encoded or a plain dictionary."""
  192. self._metadata_helper(self.db.stream_update_metadata, path, data)
  193. # /stream/insert?path=/newton/prep
  194. @cherrypy.expose
  195. @cherrypy.tools.json_out()
  196. @exception_to_httperror(NilmDBError, ValueError)
  197. @cherrypy.tools.CORS_allow(methods = ["PUT"])
  198. def insert(self, path, start, end, binary = False):
  199. """
  200. Insert new data into the database. Provide textual data
  201. (matching the path's layout) as a HTTP PUT.
  202. If 'binary' is True, expect raw binary data, rather than lines
  203. of ASCII-formatted data. Raw binary data is always
  204. little-endian and matches the database types (including an
  205. int64 timestamp).
  206. """
  207. binary = bool_param(binary)
  208. # Important that we always read the input before throwing any
  209. # errors, to keep lengths happy for persistent connections.
  210. # Note that CherryPy 3.2.2 has a bug where this fails for GET
  211. # requests, if we ever want to handle those (issue #1134)
  212. body = cherrypy.request.body.read()
  213. # Verify content type for binary data
  214. content_type = cherrypy.request.headers.get('content-type')
  215. if binary and content_type:
  216. if content_type != "application/octet-stream":
  217. raise cherrypy.HTTPError("400", "Content type must be "
  218. "application/octet-stream for "
  219. "binary data, not " + content_type)
  220. # Check path and get layout
  221. if len(self.db.stream_list(path = path)) != 1:
  222. raise cherrypy.HTTPError("404", "No such stream: " + path)
  223. # Check limits
  224. (start, end) = self._get_times(start, end)
  225. # Pass the data directly to nilmdb, which will parse it and
  226. # raise a ValueError if there are any problems.
  227. self.db.stream_insert(path, start, end, body, binary)
  228. # Done
  229. return
  230. # /stream/remove?path=/newton/prep
  231. # /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
  232. @cherrypy.expose
  233. @cherrypy.tools.json_in()
  234. @cherrypy.tools.CORS_allow(methods = ["POST"])
  235. @chunked_response
  236. @response_type("application/x-json-stream")
  237. def remove(self, path, start = None, end = None):
  238. """
  239. Remove data from the backend database. Removes all data in
  240. the interval [start, end).
  241. Returns the number of data points removed. Since this is a potentially
  242. long-running operation, multiple numbers may be returned as the
  243. data gets removed from the backend database. The total number of
  244. points removed is the sum of all of these numbers.
  245. """
  246. (start, end) = self._get_times(start, end)
  247. if len(self.db.stream_list(path = path)) != 1:
  248. raise cherrypy.HTTPError("404", "No such stream: " + path)
  249. @workaround_cp_bug_1200
  250. def content(start, end):
  251. # Note: disable chunked responses to see tracebacks from here.
  252. while True:
  253. (removed, restart) = self.db.stream_remove(path, start, end)
  254. yield json.dumps(removed) + "\r\n"
  255. if restart is None:
  256. break
  257. start = restart
  258. return content(start, end)
  259. # /stream/intervals?path=/newton/prep
  260. # /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
  261. # /stream/intervals?path=/newton/prep&diffpath=/newton/prep2
  262. @cherrypy.expose
  263. @chunked_response
  264. @response_type("application/x-json-stream")
  265. def intervals(self, path, start = None, end = None, diffpath = None):
  266. """
  267. Get intervals from backend database. Streams the resulting
  268. intervals as JSON strings separated by CR LF pairs. This may
  269. make multiple requests to the nilmdb backend to avoid causing
  270. it to block for too long.
  271. Returns intervals between 'start' and 'end' belonging to
  272. 'path'. If 'diff' is provided, the set-difference between
  273. intervals in 'path' and intervals in 'diffpath' are
  274. returned instead.
  275. Note that the response type is the non-standard
  276. 'application/x-json-stream' for lack of a better option.
  277. """
  278. (start, end) = self._get_times(start, end)
  279. if len(self.db.stream_list(path = path)) != 1:
  280. raise cherrypy.HTTPError("404", "No such stream: " + path)
  281. if diffpath and len(self.db.stream_list(path = diffpath)) != 1:
  282. raise cherrypy.HTTPError("404", "No such stream: " + diffpath)
  283. @workaround_cp_bug_1200
  284. def content(start, end):
  285. # Note: disable chunked responses to see tracebacks from here.
  286. while True:
  287. (ints, restart) = self.db.stream_intervals(path, start, end,
  288. diffpath)
  289. response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
  290. yield response
  291. if restart is None:
  292. break
  293. start = restart
  294. return content(start, end)
  295. # /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0
  296. @cherrypy.expose
  297. @chunked_response
  298. def extract(self, path, start = None, end = None,
  299. count = False, markup = False, binary = False):
  300. """
  301. Extract data from backend database. Streams the resulting
  302. entries as ASCII text lines separated by newlines. This may
  303. make multiple requests to the nilmdb backend to avoid causing
  304. it to block for too long.
  305. If 'count' is True, returns a count rather than actual data.
  306. If 'markup' is True, adds comments to the stream denoting each
  307. interval's start and end timestamp.
  308. If 'binary' is True, return raw binary data, rather than lines
  309. of ASCII-formatted data. Raw binary data is always
  310. little-endian and matches the database types (including an
  311. int64 timestamp).
  312. """
  313. binary = bool_param(binary)
  314. markup = bool_param(markup)
  315. count = bool_param(count)
  316. (start, end) = self._get_times(start, end)
  317. # Check path and get layout
  318. if len(self.db.stream_list(path = path)) != 1:
  319. raise cherrypy.HTTPError("404", "No such stream: " + path)
  320. if binary:
  321. content_type = "application/octet-stream"
  322. if markup or count:
  323. raise cherrypy.HTTPError("400", "can't mix binary and "
  324. "markup or count modes")
  325. else:
  326. content_type = "text/plain"
  327. cherrypy.response.headers['Content-Type'] = content_type
  328. @workaround_cp_bug_1200
  329. def content(start, end):
  330. # Note: disable chunked responses to see tracebacks from here.
  331. if count:
  332. matched = self.db.stream_extract(path, start, end,
  333. count = True)
  334. yield sprintf("%d\n", matched)
  335. return
  336. while True:
  337. (data, restart) = self.db.stream_extract(
  338. path, start, end, count = False,
  339. markup = markup, binary = binary)
  340. yield data
  341. if restart is None:
  342. return
  343. start = restart
  344. return content(start, end)
  345. class Exiter(object):
  346. """App that exits the server, for testing"""
  347. @cherrypy.expose
  348. def index(self):
  349. cherrypy.response.headers['Content-Type'] = 'text/plain'
  350. def content():
  351. yield 'Exiting by request'
  352. raise SystemExit
  353. return content()
  354. index._cp_config = { 'response.stream': True }
  355. class Server(object):
  356. def __init__(self, db, host = '127.0.0.1', port = 8080,
  357. stoppable = False, # whether /exit URL exists
  358. embedded = True, # hide diagnostics and output, etc
  359. fast_shutdown = False, # don't wait for clients to disconn.
  360. force_traceback = False, # include traceback in all errors
  361. basepath = '', # base URL path for cherrypy.tree
  362. ):
  363. # Save server version, just for verification during tests
  364. self.version = nilmdb.__version__
  365. self.embedded = embedded
  366. self.db = db
  367. if not getattr(db, "_thread_safe", None):
  368. raise KeyError("Database object " + str(db) + " doesn't claim "
  369. "to be thread safe. You should pass "
  370. "nilmdb.utils.serializer_proxy(NilmDB)(args) "
  371. "rather than NilmDB(args).")
  372. # Build up global server configuration
  373. cherrypy.config.update({
  374. 'server.socket_host': host,
  375. 'server.socket_port': port,
  376. 'engine.autoreload.on': False,
  377. 'server.max_request_body_size': 8*1024*1024,
  378. })
  379. if self.embedded:
  380. cherrypy.config.update({ 'environment': 'embedded' })
  381. # Build up application specific configuration
  382. app_config = {}
  383. app_config.update({
  384. 'error_page.default': self.json_error_page,
  385. })
  386. # Some default headers to just help identify that things are working
  387. app_config.update({ 'response.headers.X-Jim-Is-Awesome': 'yeah' })
  388. # Set up Cross-Origin Resource Sharing (CORS) handler so we
  389. # can correctly respond to browsers' CORS preflight requests.
  390. # This also limits verbs to GET and HEAD by default.
  391. app_config.update({ 'tools.CORS_allow.on': True,
  392. 'tools.CORS_allow.methods': ['GET', 'HEAD'] })
  393. # Configure the 'json_in' tool to also allow other content-types
  394. # (like x-www-form-urlencoded), and to treat JSON as a dict that
  395. # fills requests.param.
  396. app_config.update({ 'tools.json_in.force': False,
  397. 'tools.json_in.processor': json_to_request_params })
  398. # Convert Unicode strings to raw bytes on output
  399. app_config.update({ 'tools.encode.text_only': True,
  400. 'tools.encode.on': True,
  401. 'tools.encode.encoding': 'utf-8' })
  402. # Send tracebacks in error responses. They're hidden by the
  403. # error_page function for client errors (code 400-499).
  404. app_config.update({ 'request.show_tracebacks' : True })
  405. self.force_traceback = force_traceback
  406. # Patch CherryPy error handler to never pad out error messages.
  407. # This isn't necessary, but then again, neither is padding the
  408. # error messages.
  409. cherrypy._cperror._ie_friendly_error_sizes = {}
  410. # Build up the application and mount it
  411. root = Root(self.db)
  412. root.stream = Stream(self.db)
  413. if stoppable:
  414. root.exit = Exiter()
  415. cherrypy.tree.apps = {}
  416. cherrypy.tree.mount(root, basepath, config = { "/" : app_config })
  417. # Shutdowns normally wait for clients to disconnect. To speed
  418. # up tests, set fast_shutdown = True
  419. if fast_shutdown:
  420. # Setting timeout to 0 triggers os._exit(70) at shutdown, grr...
  421. cherrypy.server.shutdown_timeout = 0.01
  422. else:
  423. cherrypy.server.shutdown_timeout = 5
  424. # Set up the WSGI application pointer for external programs
  425. self.wsgi_application = cherrypy.tree
  426. def json_error_page(self, status, message, traceback, version):
  427. """Return a custom error page in JSON so the client can parse it"""
  428. return json_error_page(status, message, traceback, version,
  429. self.force_traceback)
  430. def start(self, blocking = False, event = None):
  431. cherrypy_start(blocking, event, self.embedded)
  432. def stop(self):
  433. cherrypy_stop()
  434. # Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
  435. # instance since the database can only be opened once. For this to
  436. # work, the web server must use only a single process and single
  437. # Python interpreter. Multiple threads are OK.
  438. _wsgi_server = None
  439. def wsgi_application(dbpath, basepath): # pragma: no cover
  440. """Return a WSGI application object with a database at the
  441. specified path.
  442. 'dbpath' is a filesystem location, e.g. /home/nilm/db
  443. 'basepath' is the URL path of the application base, which
  444. is the same as the first argument to Apache's WSGIScriptAlias
  445. directive.
  446. """
  447. def application(environ, start_response):
  448. global _wsgi_server
  449. if _wsgi_server is None:
  450. # Try to start the server
  451. try:
  452. db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath)
  453. _wsgi_server = nilmdb.server.Server(
  454. db, embedded = True,
  455. basepath = basepath.rstrip('/'))
  456. except Exception:
  457. # Build an error message on failure
  458. import pprint
  459. err = sprintf("Initializing database at path '%s' failed:\n\n",
  460. dbpath)
  461. err += traceback.format_exc()
  462. try:
  463. import pwd
  464. import grp
  465. err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s) "
  466. "on host %s, pid %d\n",
  467. os.getuid(), pwd.getpwuid(os.getuid())[0],
  468. os.getgid(), grp.getgrgid(os.getgid())[0],
  469. socket.gethostname(), os.getpid())
  470. except ImportError:
  471. pass
  472. err += sprintf("\nEnvironment:\n%s\n", pprint.pformat(environ))
  473. if _wsgi_server is None:
  474. # Serve up the error with our own mini WSGI app.
  475. headers = [ ('Content-type', 'text/plain'),
  476. ('Content-length', str(len(err))) ]
  477. start_response("500 Internal Server Error", headers)
  478. return [err]
  479. # Call the normal application
  480. return _wsgi_server.wsgi_application(environ, start_response)
  481. return application