You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

551 lines
21 KiB

  1. """CherryPy-based server for accessing NILM database via HTTP"""
  2. # Need absolute_import so that "import nilmdb" won't pull in
  3. # nilmdb.py, but will pull the nilmdb module instead.
  4. import nilmdb.server
  5. from nilmdb.utils.printf import *
  6. from nilmdb.server.errors import NilmDBError
  7. from nilmdb.utils.time import string_to_timestamp
  8. import cherrypy
  9. import sys
  10. import os
  11. import socket
  12. import simplejson as json
  13. import decorator
  14. import psutil
  15. import traceback
  16. from nilmdb.server.serverutil import (
  17. chunked_response,
  18. response_type,
  19. exception_to_httperror,
  20. CORS_allow,
  21. json_to_request_params,
  22. json_error_page,
  23. cherrypy_start,
  24. cherrypy_stop,
  25. bool_param,
  26. )
  27. # Add CORS_allow tool
  28. cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
  29. class NilmApp(object):
  30. def __init__(self, db):
  31. self.db = db
  32. # CherryPy apps
  33. class Root(NilmApp):
  34. """Root application for NILM database"""
  35. def __init__(self, db):
  36. super(Root, self).__init__(db)
  37. # /
  38. @cherrypy.expose
  39. def index(self):
  40. cherrypy.response.headers['Content-Type'] = 'text/plain'
  41. msg = sprintf("This is NilmDB version %s, running on host %s.\n",
  42. nilmdb.__version__, socket.getfqdn())
  43. return msg
  44. # /favicon.ico
  45. @cherrypy.expose
  46. def favicon_ico(self):
  47. raise cherrypy.NotFound()
  48. # /version
  49. @cherrypy.expose
  50. @cherrypy.tools.json_out()
  51. def version(self):
  52. return nilmdb.__version__
  53. # /dbinfo
  54. @cherrypy.expose
  55. @cherrypy.tools.json_out()
  56. def dbinfo(self):
  57. """Return a dictionary with the database path,
  58. size of the database in bytes, and free disk space in bytes"""
  59. path = self.db.get_basepath()
  60. usage = psutil.disk_usage(path)
  61. dbsize = nilmdb.utils.du(path)
  62. return { "path": path,
  63. "size": dbsize,
  64. "other": max(usage.used - dbsize, 0),
  65. "reserved": max(usage.total - usage.used - usage.free, 0),
  66. "free": usage.free }
  67. class Stream(NilmApp):
  68. """Stream-specific operations"""
  69. # Helpers
  70. def _get_times(self, start_param, end_param):
  71. (start, end) = (None, None)
  72. try:
  73. if start_param is not None:
  74. start = string_to_timestamp(start_param)
  75. except Exception:
  76. raise cherrypy.HTTPError("400 Bad Request", sprintf(
  77. "invalid start (%s): must be a numeric timestamp", start_param))
  78. try:
  79. if end_param is not None:
  80. end = string_to_timestamp(end_param)
  81. except Exception:
  82. raise cherrypy.HTTPError("400 Bad Request", sprintf(
  83. "invalid end (%s): must be a numeric timestamp", end_param))
  84. if start is not None and end is not None:
  85. if start >= end:
  86. raise cherrypy.HTTPError(
  87. "400 Bad Request",
  88. sprintf("start must precede end (%s >= %s)",
  89. start_param, end_param))
  90. return (start, end)
  91. # /stream/list
  92. # /stream/list?layout=float32_8
  93. # /stream/list?path=/newton/prep&extended=1
  94. @cherrypy.expose
  95. @cherrypy.tools.json_out()
  96. def list(self, path = None, layout = None, extended = None):
  97. """List all streams in the database. With optional path or
  98. layout parameter, just list streams that match the given path
  99. or layout.
  100. If extended is missing or zero, returns a list of lists
  101. containing the path and layout: [ path, layout ]
  102. If extended is true, returns a list of lists containing
  103. extended info: [ path, layout, extent_min, extent_max,
  104. total_rows, total_seconds ]. More data may be added.
  105. """
  106. return self.db.stream_list(path, layout, bool(extended))
  107. # /stream/create?path=/newton/prep&layout=float32_8
  108. @cherrypy.expose
  109. @cherrypy.tools.json_in()
  110. @cherrypy.tools.json_out()
  111. @exception_to_httperror(NilmDBError, ValueError)
  112. @cherrypy.tools.CORS_allow(methods = ["POST"])
  113. def create(self, path, layout):
  114. """Create a new stream in the database. Provide path
  115. and one of the nilmdb.layout.layouts keys.
  116. """
  117. return self.db.stream_create(path, layout)
  118. # /stream/destroy?path=/newton/prep
  119. @cherrypy.expose
  120. @cherrypy.tools.json_in()
  121. @cherrypy.tools.json_out()
  122. @exception_to_httperror(NilmDBError)
  123. @cherrypy.tools.CORS_allow(methods = ["POST"])
  124. def destroy(self, path):
  125. """Delete a stream. Fails if any data is still present."""
  126. return self.db.stream_destroy(path)
  127. # /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1
  128. @cherrypy.expose
  129. @cherrypy.tools.json_in()
  130. @cherrypy.tools.json_out()
  131. @exception_to_httperror(NilmDBError, ValueError)
  132. @cherrypy.tools.CORS_allow(methods = ["POST"])
  133. def rename(self, oldpath, newpath):
  134. """Rename a stream."""
  135. return self.db.stream_rename(oldpath, newpath)
  136. # /stream/get_metadata?path=/newton/prep
  137. # /stream/get_metadata?path=/newton/prep&key=foo&key=bar
  138. @cherrypy.expose
  139. @cherrypy.tools.json_out()
  140. def get_metadata(self, path, key=None):
  141. """Get metadata for the named stream. If optional
  142. key parameters are specified, only return metadata
  143. matching the given keys."""
  144. try:
  145. data = self.db.stream_get_metadata(path)
  146. except nilmdb.server.nilmdb.StreamError as e:
  147. raise cherrypy.HTTPError("404 Not Found", str(e))
  148. if key is None: # If no keys specified, return them all
  149. key = list(data.keys())
  150. elif not isinstance(key, list):
  151. key = [ key ]
  152. result = {}
  153. for k in key:
  154. if k in data:
  155. result[k] = data[k]
  156. else: # Return "None" for keys with no matching value
  157. result[k] = None
  158. return result
  159. # Helper for set_metadata and get_metadata
  160. def _metadata_helper(self, function, path, data):
  161. if not isinstance(data, dict):
  162. try:
  163. data = dict(json.loads(data))
  164. except TypeError as e:
  165. raise NilmDBError("can't parse 'data' parameter: " + str(e))
  166. for key in data:
  167. if not (isinstance(data[key], str) or
  168. isinstance(data[key], float) or
  169. isinstance(data[key], int)):
  170. raise NilmDBError("metadata values must be a string or number")
  171. function(path, data)
  172. # /stream/set_metadata?path=/newton/prep&data=<json>
  173. @cherrypy.expose
  174. @cherrypy.tools.json_in()
  175. @cherrypy.tools.json_out()
  176. @exception_to_httperror(NilmDBError, LookupError)
  177. @cherrypy.tools.CORS_allow(methods = ["POST"])
  178. def set_metadata(self, path, data):
  179. """Set metadata for the named stream, replacing any existing
  180. metadata. Data can be json-encoded or a plain dictionary."""
  181. self._metadata_helper(self.db.stream_set_metadata, path, data)
  182. # /stream/update_metadata?path=/newton/prep&data=<json>
  183. @cherrypy.expose
  184. @cherrypy.tools.json_in()
  185. @cherrypy.tools.json_out()
  186. @exception_to_httperror(NilmDBError, LookupError, ValueError)
  187. @cherrypy.tools.CORS_allow(methods = ["POST"])
  188. def update_metadata(self, path, data):
  189. """Set metadata for the named stream, replacing any existing
  190. metadata. Data can be json-encoded or a plain dictionary."""
  191. self._metadata_helper(self.db.stream_update_metadata, path, data)
  192. # /stream/insert?path=/newton/prep
  193. @cherrypy.expose
  194. @cherrypy.tools.json_out()
  195. @exception_to_httperror(NilmDBError, ValueError)
  196. @cherrypy.tools.CORS_allow(methods = ["PUT"])
  197. def insert(self, path, start, end, binary = False):
  198. """
  199. Insert new data into the database. Provide textual data
  200. (matching the path's layout) as a HTTP PUT.
  201. If 'binary' is True, expect raw binary data, rather than lines
  202. of ASCII-formatted data. Raw binary data is always
  203. little-endian and matches the database types (including an
  204. int64 timestamp).
  205. """
  206. binary = bool_param(binary)
  207. # Important that we always read the input before throwing any
  208. # errors, to keep lengths happy for persistent connections.
  209. # Note that CherryPy 3.2.2 has a bug where this fails for GET
  210. # requests, if we ever want to handle those (issue #1134)
  211. body = cherrypy.request.body.read()
  212. # Verify content type for binary data
  213. content_type = cherrypy.request.headers.get('content-type')
  214. if binary and content_type:
  215. if content_type != "application/octet-stream":
  216. raise cherrypy.HTTPError("400", "Content type must be "
  217. "application/octet-stream for "
  218. "binary data, not " + content_type)
  219. # Check path and get layout
  220. if len(self.db.stream_list(path = path)) != 1:
  221. raise cherrypy.HTTPError("404", "No such stream: " + path)
  222. # Check limits
  223. (start, end) = self._get_times(start, end)
  224. # Pass the data directly to nilmdb, which will parse it and
  225. # raise a ValueError if there are any problems.
  226. self.db.stream_insert(path, start, end, body, binary)
  227. # Done
  228. return
  229. # /stream/remove?path=/newton/prep
  230. # /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
  231. @cherrypy.expose
  232. @cherrypy.tools.json_in()
  233. @cherrypy.tools.CORS_allow(methods = ["POST"])
  234. @chunked_response
  235. @response_type("application/x-json-stream")
  236. def remove(self, path, start = None, end = None):
  237. """
  238. Remove data from the backend database. Removes all data in
  239. the interval [start, end).
  240. Returns the number of data points removed. Since this is a potentially
  241. long-running operation, multiple numbers may be returned as the
  242. data gets removed from the backend database. The total number of
  243. points removed is the sum of all of these numbers.
  244. """
  245. (start, end) = self._get_times(start, end)
  246. if len(self.db.stream_list(path = path)) != 1:
  247. raise cherrypy.HTTPError("404", "No such stream: " + path)
  248. def content(start, end):
  249. # Note: disable chunked responses to see tracebacks from here.
  250. while True:
  251. (removed, restart) = self.db.stream_remove(path, start, end)
  252. yield json.dumps(removed) + "\r\n"
  253. if restart is None:
  254. break
  255. start = restart
  256. return content(start, end)
  257. # /stream/intervals?path=/newton/prep
  258. # /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
  259. # /stream/intervals?path=/newton/prep&diffpath=/newton/prep2
  260. @cherrypy.expose
  261. @chunked_response
  262. @response_type("application/x-json-stream")
  263. def intervals(self, path, start = None, end = None, diffpath = None):
  264. """
  265. Get intervals from backend database. Streams the resulting
  266. intervals as JSON strings separated by CR LF pairs. This may
  267. make multiple requests to the nilmdb backend to avoid causing
  268. it to block for too long.
  269. Returns intervals between 'start' and 'end' belonging to
  270. 'path'. If 'diff' is provided, the set-difference between
  271. intervals in 'path' and intervals in 'diffpath' are
  272. returned instead.
  273. Note that the response type is the non-standard
  274. 'application/x-json-stream' for lack of a better option.
  275. """
  276. (start, end) = self._get_times(start, end)
  277. if len(self.db.stream_list(path = path)) != 1:
  278. raise cherrypy.HTTPError("404", "No such stream: " + path)
  279. if diffpath and len(self.db.stream_list(path = diffpath)) != 1:
  280. raise cherrypy.HTTPError("404", "No such stream: " + diffpath)
  281. def content(start, end):
  282. # Note: disable chunked responses to see tracebacks from here.
  283. while True:
  284. (ints, restart) = self.db.stream_intervals(path, start, end,
  285. diffpath)
  286. response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
  287. yield response
  288. if restart is None:
  289. break
  290. start = restart
  291. return content(start, end)
  292. # /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0
  293. @cherrypy.expose
  294. @chunked_response
  295. def extract(self, path, start = None, end = None,
  296. count = False, markup = False, binary = False):
  297. """
  298. Extract data from backend database. Streams the resulting
  299. entries as ASCII text lines separated by newlines. This may
  300. make multiple requests to the nilmdb backend to avoid causing
  301. it to block for too long.
  302. If 'count' is True, returns a count rather than actual data.
  303. If 'markup' is True, adds comments to the stream denoting each
  304. interval's start and end timestamp.
  305. If 'binary' is True, return raw binary data, rather than lines
  306. of ASCII-formatted data. Raw binary data is always
  307. little-endian and matches the database types (including an
  308. int64 timestamp).
  309. """
  310. binary = bool_param(binary)
  311. markup = bool_param(markup)
  312. count = bool_param(count)
  313. (start, end) = self._get_times(start, end)
  314. # Check path and get layout
  315. if len(self.db.stream_list(path = path)) != 1:
  316. raise cherrypy.HTTPError("404", "No such stream: " + path)
  317. if binary:
  318. content_type = "application/octet-stream"
  319. if markup or count:
  320. raise cherrypy.HTTPError("400", "can't mix binary and "
  321. "markup or count modes")
  322. else:
  323. content_type = "text/plain"
  324. cherrypy.response.headers['Content-Type'] = content_type
  325. def content(start, end):
  326. # Note: disable chunked responses to see tracebacks from here.
  327. if count:
  328. matched = self.db.stream_extract(path, start, end,
  329. count = True)
  330. yield sprintf("%d\n", matched)
  331. return
  332. while True:
  333. (data, restart) = self.db.stream_extract(
  334. path, start, end, count = False,
  335. markup = markup, binary = binary)
  336. yield data
  337. if restart is None:
  338. return
  339. start = restart
  340. return content(start, end)
  341. class Exiter(object):
  342. """App that exits the server, for testing"""
  343. @cherrypy.expose
  344. def index(self):
  345. cherrypy.response.headers['Content-Type'] = 'text/plain'
  346. def content():
  347. yield 'Exiting by request'
  348. raise SystemExit
  349. return content()
  350. index._cp_config = { 'response.stream': True }
  351. class Server(object):
  352. def __init__(self, db, host = '127.0.0.1', port = 8080,
  353. stoppable = False, # whether /exit URL exists
  354. embedded = True, # hide diagnostics and output, etc
  355. fast_shutdown = False, # don't wait for clients to disconn.
  356. force_traceback = False, # include traceback in all errors
  357. basepath = '', # base URL path for cherrypy.tree
  358. ):
  359. # Save server version, just for verification during tests
  360. self.version = nilmdb.__version__
  361. self.embedded = embedded
  362. self.db = db
  363. if not getattr(db, "_thread_safe", None):
  364. raise KeyError("Database object " + str(db) + " doesn't claim "
  365. "to be thread safe. You should pass "
  366. "nilmdb.utils.serializer_proxy(NilmDB)(args) "
  367. "rather than NilmDB(args).")
  368. # Build up global server configuration
  369. cherrypy.config.update({
  370. 'server.socket_host': host,
  371. 'server.socket_port': port,
  372. 'engine.autoreload.on': False,
  373. 'server.max_request_body_size': 8*1024*1024,
  374. })
  375. if self.embedded:
  376. cherrypy.config.update({ 'environment': 'embedded' })
  377. # Build up application specific configuration
  378. app_config = {}
  379. app_config.update({
  380. 'error_page.default': self.json_error_page,
  381. })
  382. # Some default headers to just help identify that things are working
  383. app_config.update({ 'response.headers.X-Jim-Is-Awesome': 'yeah' })
  384. # Set up Cross-Origin Resource Sharing (CORS) handler so we
  385. # can correctly respond to browsers' CORS preflight requests.
  386. # This also limits verbs to GET and HEAD by default.
  387. app_config.update({ 'tools.CORS_allow.on': True,
  388. 'tools.CORS_allow.methods': ['GET', 'HEAD'] })
  389. # Configure the 'json_in' tool to also allow other content-types
  390. # (like x-www-form-urlencoded), and to treat JSON as a dict that
  391. # fills requests.param.
  392. app_config.update({ 'tools.json_in.force': False,
  393. 'tools.json_in.processor': json_to_request_params })
  394. # Convert Unicode strings to raw bytes on output
  395. app_config.update({ 'tools.encode.text_only': True,
  396. 'tools.encode.on': True,
  397. 'tools.encode.encoding': 'utf-8' })
  398. # Send tracebacks in error responses. They're hidden by the
  399. # error_page function for client errors (code 400-499).
  400. app_config.update({ 'request.show_tracebacks' : True })
  401. self.force_traceback = force_traceback
  402. # Patch CherryPy error handler to never pad out error messages.
  403. # This isn't necessary, but then again, neither is padding the
  404. # error messages.
  405. cherrypy._cperror._ie_friendly_error_sizes = {}
  406. # Build up the application and mount it
  407. root = Root(self.db)
  408. root.stream = Stream(self.db)
  409. if stoppable:
  410. root.exit = Exiter()
  411. cherrypy.tree.apps = {}
  412. cherrypy.tree.mount(root, basepath, config = { "/" : app_config })
  413. # Shutdowns normally wait for clients to disconnect. To speed
  414. # up tests, set fast_shutdown = True
  415. if fast_shutdown:
  416. # Setting timeout to 0 triggers os._exit(70) at shutdown, grr...
  417. cherrypy.server.shutdown_timeout = 0.01
  418. else:
  419. cherrypy.server.shutdown_timeout = 5
  420. # Set up the WSGI application pointer for external programs
  421. self.wsgi_application = cherrypy.tree
  422. def json_error_page(self, status, message, traceback, version):
  423. """Return a custom error page in JSON so the client can parse it"""
  424. return json_error_page(status, message, traceback, version,
  425. self.force_traceback)
  426. def start(self, blocking = False, event = None):
  427. cherrypy_start(blocking, event, self.embedded)
  428. def stop(self):
  429. cherrypy_stop()
  430. # Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
  431. # instance since the database can only be opened once. For this to
  432. # work, the web server must use only a single process and single
  433. # Python interpreter. Multiple threads are OK.
  434. _wsgi_server = None
  435. def wsgi_application(dbpath, basepath): # pragma: no cover
  436. """Return a WSGI application object with a database at the
  437. specified path.
  438. 'dbpath' is a filesystem location, e.g. /home/nilm/db
  439. 'basepath' is the URL path of the application base, which
  440. is the same as the first argument to Apache's WSGIScriptAlias
  441. directive.
  442. """
  443. def application(environ, start_response):
  444. global _wsgi_server
  445. if _wsgi_server is None:
  446. # Try to start the server
  447. try:
  448. db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath)
  449. _wsgi_server = nilmdb.server.Server(
  450. db, embedded = True,
  451. basepath = basepath.rstrip('/'))
  452. except Exception:
  453. # Build an error message on failure
  454. import pprint
  455. err = sprintf("Initializing database at path '%s' failed:\n\n",
  456. dbpath)
  457. err += traceback.format_exc()
  458. try:
  459. import pwd
  460. import grp
  461. err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s) "
  462. "on host %s, pid %d\n",
  463. os.getuid(), pwd.getpwuid(os.getuid())[0],
  464. os.getgid(), grp.getgrgid(os.getgid())[0],
  465. socket.gethostname(), os.getpid())
  466. except ImportError:
  467. pass
  468. err += sprintf("\nEnvironment:\n%s\n", pprint.pformat(environ))
  469. if _wsgi_server is None:
  470. # Serve up the error with our own mini WSGI app.
  471. headers = [ ('Content-type', 'text/plain'),
  472. ('Content-length', str(len(err))) ]
  473. start_response("500 Internal Server Error", headers)
  474. return [err]
  475. # Call the normal application
  476. return _wsgi_server.wsgi_application(environ, start_response)
  477. return application