You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

422 lines
16 KiB

  1. # -*- coding: utf-8 -*-
  2. """Class for performing HTTP client requests via libcurl"""
  3. import nilmdb.utils
  4. import nilmdb.client.httpclient
  5. from nilmdb.client.errors import ClientError
  6. import time
  7. import simplejson as json
  8. import contextlib
  9. from nilmdb.utils.time import timestamp_to_string, string_to_timestamp
  10. def extract_timestamp(line):
  11. """Extract just the timestamp from a line of data text"""
  12. return string_to_timestamp(line.split()[0])
  13. class Client(object):
  14. """Main client interface to the Nilm database."""
  15. def __init__(self, url, post_json = False):
  16. """Initialize client with given URL. If post_json is true,
  17. POST requests are sent with Content-Type 'application/json'
  18. instead of the default 'x-www-form-urlencoded'."""
  19. self.http = nilmdb.client.httpclient.HTTPClient(url, post_json)
  20. self.post_json = post_json
  21. # __enter__/__exit__ allow this class to be a context manager
  22. def __enter__(self):
  23. return self
  24. def __exit__(self, exc_type, exc_value, traceback):
  25. self.close()
  26. def _json_post_param(self, data):
  27. """Return compact json-encoded version of parameter"""
  28. if self.post_json:
  29. # If we're posting as JSON, we don't need to encode it further here
  30. return data
  31. return json.dumps(data, separators=(',',':'))
  32. def close(self):
  33. """Close the connection; safe to call multiple times"""
  34. self.http.close()
  35. def geturl(self):
  36. """Return the URL we're using"""
  37. return self.http.baseurl
  38. def version(self):
  39. """Return server version"""
  40. return self.http.get("version")
  41. def dbinfo(self):
  42. """Return server database info (path, size, free space)
  43. as a dictionary."""
  44. return self.http.get("dbinfo")
  45. def stream_list(self, path = None, layout = None, extended = False):
  46. params = {}
  47. if path is not None:
  48. params["path"] = path
  49. if layout is not None:
  50. params["layout"] = layout
  51. if extended:
  52. params["extended"] = 1
  53. return self.http.get("stream/list", params)
  54. def stream_get_metadata(self, path, keys = None):
  55. params = { "path": path }
  56. if keys is not None:
  57. params["key"] = keys
  58. return self.http.get("stream/get_metadata", params)
  59. def stream_set_metadata(self, path, data):
  60. """Set stream metadata from a dictionary, replacing all existing
  61. metadata."""
  62. params = {
  63. "path": path,
  64. "data": self._json_post_param(data)
  65. }
  66. return self.http.post("stream/set_metadata", params)
  67. def stream_update_metadata(self, path, data):
  68. """Update stream metadata from a dictionary"""
  69. params = {
  70. "path": path,
  71. "data": self._json_post_param(data)
  72. }
  73. return self.http.post("stream/update_metadata", params)
  74. def stream_create(self, path, layout):
  75. """Create a new stream"""
  76. params = { "path": path,
  77. "layout" : layout }
  78. return self.http.post("stream/create", params)
  79. def stream_destroy(self, path):
  80. """Delete stream. Fails if any data is still present."""
  81. params = { "path": path }
  82. return self.http.post("stream/destroy", params)
  83. def stream_rename(self, oldpath, newpath):
  84. """Rename a stream."""
  85. params = { "oldpath": oldpath,
  86. "newpath": newpath }
  87. return self.http.post("stream/rename", params)
  88. def stream_remove(self, path, start = None, end = None):
  89. """Remove data from the specified time range"""
  90. params = {
  91. "path": path
  92. }
  93. if start is not None:
  94. params["start"] = timestamp_to_string(start)
  95. if end is not None:
  96. params["end"] = timestamp_to_string(end)
  97. return self.http.post("stream/remove", params)
  98. @contextlib.contextmanager
  99. def stream_insert_context(self, path, start = None, end = None):
  100. """Return a context manager that allows data to be efficiently
  101. inserted into a stream in a piecewise manner. Data is be provided
  102. as single lines, and is aggregated and sent to the server in larger
  103. chunks as necessary. Data lines must match the database layout for
  104. the given path, and end with a newline.
  105. Example:
  106. with client.stream_insert_context('/path', start, end) as ctx:
  107. ctx.insert('1234567890.0 1 2 3 4\\n')
  108. ctx.insert('1234567891.0 1 2 3 4\\n')
  109. For more details, see help for nilmdb.client.client.StreamInserter
  110. This may make multiple requests to the server, if the data is
  111. large enough or enough time has passed between insertions.
  112. """
  113. ctx = StreamInserter(self.http, path, start, end)
  114. yield ctx
  115. ctx.finalize()
  116. def stream_insert(self, path, data, start = None, end = None):
  117. """Insert rows of data into a stream. data should be a string
  118. or iterable that provides ASCII data that matches the database
  119. layout for path. See stream_insert_context for details on the
  120. 'start' and 'end' parameters."""
  121. with self.stream_insert_context(path, start, end) as ctx:
  122. if isinstance(data, basestring):
  123. ctx.insert(data)
  124. else:
  125. for chunk in data:
  126. ctx.insert(chunk)
  127. return ctx.last_response
  128. def stream_intervals(self, path, start = None, end = None, diffpath = None):
  129. """
  130. Return a generator that yields each stream interval.
  131. If diffpath is not None, yields only interval ranges that are
  132. present in 'path' but not in 'diffpath'.
  133. """
  134. params = {
  135. "path": path
  136. }
  137. if diffpath is not None:
  138. params["diffpath"] = diffpath
  139. if start is not None:
  140. params["start"] = timestamp_to_string(start)
  141. if end is not None:
  142. params["end"] = timestamp_to_string(end)
  143. return self.http.get_gen("stream/intervals", params)
  144. def stream_extract(self, path, start = None, end = None,
  145. count = False, markup = False):
  146. """
  147. Extract data from a stream. Returns a generator that yields
  148. lines of ASCII-formatted data that matches the database
  149. layout for the given path.
  150. Specify count = True to return a count of matching data points
  151. rather than the actual data. The output format is unchanged.
  152. Specify markup = True to include comments in the returned data
  153. that indicate interval starts and ends.
  154. """
  155. params = {
  156. "path": path,
  157. }
  158. if start is not None:
  159. params["start"] = timestamp_to_string(start)
  160. if end is not None:
  161. params["end"] = timestamp_to_string(end)
  162. if count:
  163. params["count"] = 1
  164. if markup:
  165. params["markup"] = 1
  166. return self.http.get_gen("stream/extract", params)
  167. def stream_count(self, path, start = None, end = None):
  168. """
  169. Return the number of rows of data in the stream that satisfy
  170. the given timestamps.
  171. """
  172. counts = list(self.stream_extract(path, start, end, count = True))
  173. return int(counts[0])
  174. class StreamInserter(object):
  175. """Object returned by stream_insert_context() that manages
  176. the insertion of rows of data into a particular path.
  177. The basic data flow is that we are filling a contiguous interval
  178. on the server, with no gaps, that extends from timestamp 'start'
  179. to timestamp 'end'. Data timestamps satisfy 'start <= t < end'.
  180. Data is provided to .insert() as ASCII formatted data separated by
  181. newlines. The chunks of data passed to .insert() do not need to
  182. match up with the newlines; less or more than one line can be passed.
  183. 1. The first inserted line begins a new interval that starts at
  184. 'start'. If 'start' is not given, it is deduced from the first
  185. line's timestamp.
  186. 2. Subsequent lines go into the same contiguous interval. As lines
  187. are inserted, this routine may make multiple insertion requests to
  188. the server, but will structure the timestamps to leave no gaps.
  189. 3. The current contiguous interval can be completed by manually
  190. calling .finalize(), which the context manager will also do
  191. automatically. This will send any remaining data to the server,
  192. using the 'end' timestamp to end the interval. If no 'end'
  193. was provided, it is deduced from the last timestamp seen,
  194. plus a small delta.
  195. After a .finalize(), inserting new data goes back to step 1.
  196. .update_start() can be called before step 1 to change the start
  197. time for the interval. .update_end() can be called before step 3
  198. to change the end time for the interval.
  199. """
  200. # See design.md for a discussion of how much data to send. This
  201. # is a soft limit -- we might send up to twice as much or so
  202. _max_data = 2 * 1024 * 1024
  203. _max_data_after_send = 64 * 1024
  204. def __init__(self, http, path, start = None, end = None):
  205. """'http' is the httpclient object. 'path' is the database
  206. path to insert to. 'start' and 'end' are used for the first
  207. contiguous interval."""
  208. self.last_response = None
  209. self._http = http
  210. self._path = path
  211. # Start and end for the overall contiguous interval we're
  212. # filling
  213. self._interval_start = start
  214. self._interval_end = end
  215. # Current data we're building up to send. Each string
  216. # goes into the array, and gets joined all at once.
  217. self._block_data = []
  218. self._block_len = 0
  219. def insert(self, data):
  220. """Insert a chunk of ASCII formatted data in string form. The
  221. overall data must consist of lines terminated by '\\n'."""
  222. length = len(data)
  223. maxdata = self._max_data
  224. if length > maxdata:
  225. # This could make our buffer more than twice what we
  226. # wanted to send, so split it up. This is a bit
  227. # inefficient, but the user really shouldn't be providing
  228. # this much data at once.
  229. for cut in range(0, length, maxdata):
  230. self.insert(data[cut:(cut + maxdata)])
  231. return
  232. # Append this string to our list
  233. self._block_data.append(data)
  234. self._block_len += length
  235. # Send the block once we have enough data
  236. if self._block_len >= maxdata:
  237. self._send_block(final = False)
  238. if self._block_len >= self._max_data_after_send: # pragma: no cover
  239. raise ValueError("too much data left over after trying"
  240. " to send intermediate block; is it"
  241. " missing newlines or malformed?")
  242. def update_start(self, start):
  243. """Update the start time for the next contiguous interval.
  244. Call this before starting to insert data for a new interval,
  245. for example, after .finalize()"""
  246. self._interval_start = start
  247. def update_end(self, end):
  248. """Update the end time for the current contiguous interval.
  249. Call this before .finalize()"""
  250. self._interval_end = end
  251. def finalize(self):
  252. """Stop filling the current contiguous interval.
  253. All outstanding data will be sent, and the interval end
  254. time of the interval will be taken from the 'end' argument
  255. used when initializing this class, or the most recent
  256. value passed to update_end(), or the last timestamp plus
  257. a small epsilon value if no other endpoint was provided.
  258. If more data is inserted after a finalize(), it will become
  259. part of a new interval and there may be a gap left in-between."""
  260. self._send_block(final = True)
  261. def _get_first_noncomment(self, block):
  262. """Return the (start, end) indices of the first full line in
  263. block that isn't a comment, or raise IndexError if
  264. there isn't one."""
  265. start = 0
  266. while True:
  267. end = block.find('\n', start)
  268. if end < 0:
  269. raise IndexError
  270. if block[start] != '#':
  271. return (start, (end + 1))
  272. start = end + 1
  273. def _get_last_noncomment(self, block):
  274. """Return the (start, end) indices of the last full line in
  275. block[:length] that isn't a comment, or raise IndexError if
  276. there isn't one."""
  277. end = block.rfind('\n')
  278. if end <= 0:
  279. raise IndexError
  280. while True:
  281. start = block.rfind('\n', 0, end)
  282. if block[start + 1] != '#':
  283. return ((start + 1), end)
  284. if start == -1:
  285. raise IndexError
  286. end = start
  287. def _send_block(self, final = False):
  288. """Send data currently in the block. The data sent will
  289. consist of full lines only, so some might be left over."""
  290. # Build the full string to send
  291. block = "".join(self._block_data)
  292. start_ts = self._interval_start
  293. if start_ts is None:
  294. # Pull start from the first line
  295. try:
  296. (spos, epos) = self._get_first_noncomment(block)
  297. start_ts = extract_timestamp(block[spos:epos])
  298. except (ValueError, IndexError):
  299. pass # no timestamp is OK, if we have no data
  300. if final:
  301. # For a final block, it must end in a newline, and the
  302. # ending timestamp is either the user-provided end,
  303. # or the timestamp of the last line plus epsilon.
  304. end_ts = self._interval_end
  305. try:
  306. if block[-1] != '\n':
  307. raise ValueError("final block didn't end with a newline")
  308. if end_ts is None:
  309. (spos, epos) = self._get_last_noncomment(block)
  310. end_ts = extract_timestamp(block[spos:epos])
  311. end_ts += nilmdb.utils.time.epsilon
  312. except (ValueError, IndexError):
  313. pass # no timestamp is OK, if we have no data
  314. self._block_data = []
  315. self._block_len = 0
  316. # Next block is completely fresh
  317. self._interval_start = None
  318. self._interval_end = None
  319. else:
  320. # An intermediate block, e.g. "line1\nline2\nline3\nline4"
  321. # We need to save "line3\nline4" for the next block, and
  322. # use the timestamp from "line3" as the ending timestamp
  323. # for this one.
  324. try:
  325. (spos, epos) = self._get_last_noncomment(block)
  326. end_ts = extract_timestamp(block[spos:epos])
  327. except (ValueError, IndexError):
  328. # If we found no timestamp, give up; we could send this
  329. # block later when we have more data.
  330. return
  331. if spos == 0:
  332. # Not enough data to send an intermediate block
  333. return
  334. if self._interval_end is not None and end_ts > self._interval_end:
  335. # User gave us bad endpoints; send it anyway, and let
  336. # the server complain so that the error is the same
  337. # as if we hadn't done this chunking.
  338. end_ts = self._interval_end
  339. self._block_data = [ block[spos:] ]
  340. self._block_len = (epos - spos)
  341. block = block[:spos]
  342. # Next block continues where this one ended
  343. self._interval_start = end_ts
  344. # Double check endpoints
  345. if start_ts is None or end_ts is None:
  346. # If the block has no non-comment lines, it's OK
  347. try:
  348. self._get_first_noncomment(block)
  349. except IndexError:
  350. return
  351. raise ClientError("have data to send, but no start/end times")
  352. # Send it
  353. params = { "path": self._path,
  354. "start": timestamp_to_string(start_ts),
  355. "end": timestamp_to_string(end_ts) }
  356. self.last_response = self._http.put("stream/insert", block, params)
  357. return