You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

276 lines
8.9 KiB

  1. """Client utilities for accessing NILM database via HTTP"""
  2. from __future__ import absolute_import
  3. from nilmdb.printf import *
  4. import time
  5. import sys
  6. import re
  7. import os
  8. import simplejson as json
  9. import urlparse
  10. import urllib
  11. import pycurl
  12. import cStringIO
  13. version = "1.0"
  14. class Error(Exception):
  15. """Base exception for both ClientError and ServerError responses"""
  16. def __init__(self,
  17. status = "Unspecified error",
  18. message = None,
  19. url = None,
  20. traceback = None):
  21. Exception.__init__(self, status)
  22. self.status = status # e.g. "400 Bad Request"
  23. self.message = message # textual message from the server
  24. self.url = url # URL we were requesting
  25. self.traceback = traceback # server traceback, if available
  26. def __str__(self):
  27. s = sprintf("[%s]", self.status)
  28. if self.message:
  29. s += sprintf(" %s", self.message)
  30. if self.url:
  31. s += sprintf(" (%s)", self.url)
  32. if self.traceback: # pragma: no cover
  33. s += sprintf("\nServer traceback:\n%s", self.traceback)
  34. return s
  35. class ClientError(Error):
  36. pass
  37. class ServerError(Error):
  38. pass
  39. class MyCurl(object):
  40. """Curl wrapper for HTTP client requests"""
  41. def __init__(self, baseurl = ""):
  42. """If baseurl is supplied, all other functions that take
  43. a URL can be given a relative URL instead."""
  44. # Verify / clean up URL
  45. reparsed = urlparse.urlparse(baseurl).geturl()
  46. if '://' not in reparsed:
  47. reparsed = urlparse.urlparse("http://" + baseurl).geturl()
  48. self.baseurl = reparsed
  49. self.curl = pycurl.Curl()
  50. self.curl.setopt(pycurl.SSL_VERIFYHOST, 2)
  51. self.curl.setopt(pycurl.FOLLOWLOCATION, 1)
  52. self.curl.setopt(pycurl.MAXREDIRS, 5)
  53. self._setup_url()
  54. def _setup_url(self, url = "", params = ""):
  55. url = urlparse.urljoin(self.baseurl, url)
  56. if params:
  57. url = urlparse.urljoin(url, "?" + urllib.urlencode(params, True))
  58. self.curl.setopt(pycurl.URL, url)
  59. self.url = url
  60. def _check_error(self, body = None):
  61. code = self.curl.getinfo(pycurl.RESPONSE_CODE)
  62. if code == 200:
  63. return
  64. # Default variables for exception
  65. args = { "url" : self.url,
  66. "status" : str(code),
  67. "message" : None,
  68. "traceback" : None }
  69. try:
  70. # Fill with server-provided data if we can
  71. jsonerror = json.loads(body)
  72. args["status"] = jsonerror["status"]
  73. args["message"] = jsonerror["message"]
  74. args["traceback"] = jsonerror["traceback"]
  75. except Exception: # pragma: no cover
  76. pass
  77. if code >= 400 and code <= 499:
  78. raise ClientError(**args)
  79. else: # pragma: no cover
  80. if code >= 500 and code <= 599:
  81. raise ServerError(**args)
  82. else:
  83. raise Error(**args)
  84. def _req(self, url, params):
  85. """GET or POST that returns raw data"""
  86. self._setup_url(url, params)
  87. body = cStringIO.StringIO()
  88. self.curl.setopt(pycurl.WRITEFUNCTION, body.write)
  89. try:
  90. self.curl.perform()
  91. except pycurl.error as e:
  92. raise ServerError(status = "502 Error",
  93. url = self.url,
  94. message = e[1])
  95. body_str = body.getvalue()
  96. self._check_error(body_str)
  97. return body_str
  98. def close(self):
  99. self.curl.close()
  100. def getjson(self, url, params = None):
  101. """Simple GET that returns JSON string"""
  102. return json.loads(self.get(url, params))
  103. def putjson(self, url, postdata, params = None):
  104. """Simple GET that returns JSON string"""
  105. return json.loads(self.put(url, postdata, params))
  106. def get(self, url, params = None):
  107. """Simple GET"""
  108. self.curl.setopt(pycurl.UPLOAD, 0)
  109. return self._req(url, params)
  110. def put(self, url, postdata, params = None):
  111. """Simple PUT"""
  112. self._setup_url(url, params)
  113. data = cStringIO.StringIO(postdata)
  114. self.curl.setopt(pycurl.UPLOAD, 1)
  115. self.curl.setopt(pycurl.READFUNCTION, data.read)
  116. return self._req(url, params)
  117. class Client(object):
  118. """Main client interface to the Nilm database."""
  119. client_version = version
  120. def __init__(self, url):
  121. self.curl = MyCurl(url)
  122. def _json_param(self, data):
  123. """Return compact json-encoded version of parameter"""
  124. return json.dumps(data, separators=(',',':'))
  125. def close(self):
  126. self.curl.close()
  127. def geturl(self):
  128. """Return the URL we're using"""
  129. return self.curl.baseurl
  130. def version(self):
  131. """Return server version"""
  132. return self.curl.getjson("version")
  133. def dbpath(self):
  134. """Return server database path"""
  135. return self.curl.getjson("dbpath")
  136. def stream_list(self, path = None, layout = None):
  137. params = {}
  138. if path is not None:
  139. params["path"] = path
  140. if layout is not None:
  141. params["layout"] = layout
  142. return self.curl.getjson("stream/list", params)
  143. def stream_get_metadata(self, path, keys = None):
  144. params = { "path": path }
  145. if keys is not None:
  146. params["key"] = keys
  147. return self.curl.getjson("stream/get_metadata", params)
  148. def stream_set_metadata(self, path, data):
  149. """Set stream metadata from a dictionary, replacing all existing
  150. metadata."""
  151. params = {
  152. "path": path,
  153. "data": self._json_param(data)
  154. }
  155. return self.curl.getjson("stream/set_metadata", params)
  156. def stream_update_metadata(self, path, data):
  157. """Update stream metadata from a dictionary"""
  158. params = {
  159. "path": path,
  160. "data": self._json_param(data)
  161. }
  162. return self.curl.getjson("stream/update_metadata", params)
  163. def stream_create(self, path, layout):
  164. """Create a new stream"""
  165. params = { "path": path,
  166. "layout" : layout }
  167. return self.curl.getjson("stream/create", params)
  168. def stream_insert(self, path, data):
  169. """Insert data into a stream. data should be a file-like object
  170. that provides ASCII data that matches the database layout for path."""
  171. params = { "path": path }
  172. # See design.md for a discussion of how much data to send.
  173. # These are soft limits -- actual data might be rounded up.
  174. max_data = 1048576
  175. max_time = 30
  176. def sendit():
  177. return self.curl.putjson("stream/insert", send_data, params)
  178. result = None
  179. start = time.time()
  180. send_data = ""
  181. for line in data:
  182. elapsed = time.time() - start
  183. send_data += line
  184. if (len(send_data) > max_data) or (elapsed > max_time):
  185. result = sendit()
  186. send_data = ""
  187. start = time.time()
  188. if len(send_data):
  189. result = sendit()
  190. # Return the most recent JSON result we got back, or None if
  191. # we didn't make any requests.
  192. return result
  193. def stream_intervals(self, path, start = None, end = None):
  194. """
  195. Return a generator that yields each stream interval.
  196. Multiple requests are made to the server if the results
  197. get truncated.
  198. """
  199. params = {
  200. "path": path
  201. }
  202. if start is not None:
  203. params["start"] = repr(start) # use repr to keep precision
  204. if end is not None:
  205. params["end"] = repr(end)
  206. while True:
  207. (intervals, restart) = self.curl.getjson("stream/intervals", params)
  208. for interval in intervals:
  209. yield interval
  210. if restart:
  211. # Restart where we left off
  212. params["start"] = repr(restart)
  213. else:
  214. break
  215. def stream_extract(self, path, start = None, end = None, bare = False):
  216. """
  217. Extract data from a stream. Returns a generator that yields
  218. chunks of ASCII-formatted data that matches the database
  219. layout for the given path. Multiple requests are made to the
  220. server if shorter requests get truncated.
  221. """
  222. params = {
  223. "path": path
  224. }
  225. if start is not None:
  226. params["start"] = repr(start) # use repr to keep precision
  227. if end is not None:
  228. params["end"] = repr(end)
  229. params["bare"] = bare
  230. more = True
  231. while more:
  232. self.curl.get
  233. (intervals, more) = self.curl.getjson("stream/intervals", params)
  234. for interval in intervals:
  235. yield interval
  236. if more:
  237. # Restart where we left off
  238. params["start"] = repr(intervals[-1][1])