You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

471 lines
18 KiB

  1. # -*- coding: utf-8 -*-
  2. """Class for performing HTTP client requests via libcurl"""
  3. import nilmdb.utils
  4. import nilmdb.client.httpclient
  5. from nilmdb.client.errors import ClientError
  6. import time
  7. import simplejson as json
  8. import contextlib
  9. from nilmdb.utils.time import timestamp_to_string, string_to_timestamp
  10. def extract_timestamp(line):
  11. """Extract just the timestamp from a line of data text"""
  12. return string_to_timestamp(line.split()[0])
  13. class Client(object):
  14. """Main client interface to the Nilm database."""
  15. def __init__(self, url, post_json = False):
  16. """Initialize client with given URL. If post_json is true,
  17. POST requests are sent with Content-Type 'application/json'
  18. instead of the default 'x-www-form-urlencoded'."""
  19. self.http = nilmdb.client.httpclient.HTTPClient(url, post_json)
  20. self.post_json = post_json
  21. # __enter__/__exit__ allow this class to be a context manager
  22. def __enter__(self):
  23. return self
  24. def __exit__(self, exc_type, exc_value, traceback):
  25. self.close()
  26. def _json_post_param(self, data):
  27. """Return compact json-encoded version of parameter"""
  28. if self.post_json:
  29. # If we're posting as JSON, we don't need to encode it further here
  30. return data
  31. return json.dumps(data, separators=(',',':'))
  32. def close(self):
  33. """Close the connection; safe to call multiple times"""
  34. self.http.close()
  35. def geturl(self):
  36. """Return the URL we're using"""
  37. return self.http.baseurl
  38. def version(self):
  39. """Return server version"""
  40. return self.http.get("version")
  41. def dbinfo(self):
  42. """Return server database info (path, size, free space)
  43. as a dictionary."""
  44. return self.http.get("dbinfo")
  45. def stream_list(self, path = None, layout = None, extended = False):
  46. """Return a sorted list of [path, layout] lists. If 'path' or
  47. 'layout' are specified, only return streams that match those
  48. exact values. If 'extended' is True, the returned lists have
  49. extended info, e.g.: [path, layout, extent_min, extent_max,
  50. total_rows, total_seconds."""
  51. params = {}
  52. if path is not None:
  53. params["path"] = path
  54. if layout is not None:
  55. params["layout"] = layout
  56. if extended:
  57. params["extended"] = 1
  58. streams = self.http.get("stream/list", params)
  59. return nilmdb.utils.sort.sort_human(streams, key = lambda s: s[0])
  60. def stream_get_metadata(self, path, keys = None):
  61. """Get stream metadata"""
  62. params = { "path": path }
  63. if keys is not None:
  64. params["key"] = keys
  65. return self.http.get("stream/get_metadata", params)
  66. def stream_set_metadata(self, path, data):
  67. """Set stream metadata from a dictionary, replacing all existing
  68. metadata."""
  69. params = {
  70. "path": path,
  71. "data": self._json_post_param(data)
  72. }
  73. return self.http.post("stream/set_metadata", params)
  74. def stream_update_metadata(self, path, data):
  75. """Update stream metadata from a dictionary"""
  76. params = {
  77. "path": path,
  78. "data": self._json_post_param(data)
  79. }
  80. return self.http.post("stream/update_metadata", params)
  81. def stream_create(self, path, layout):
  82. """Create a new stream"""
  83. params = { "path": path,
  84. "layout" : layout }
  85. return self.http.post("stream/create", params)
  86. def stream_destroy(self, path):
  87. """Delete stream. Fails if any data is still present."""
  88. params = { "path": path }
  89. return self.http.post("stream/destroy", params)
  90. def stream_rename(self, oldpath, newpath):
  91. """Rename a stream."""
  92. params = { "oldpath": oldpath,
  93. "newpath": newpath }
  94. return self.http.post("stream/rename", params)
  95. def stream_remove(self, path, start = None, end = None):
  96. """Remove data from the specified time range"""
  97. params = {
  98. "path": path
  99. }
  100. if start is not None:
  101. params["start"] = timestamp_to_string(start)
  102. if end is not None:
  103. params["end"] = timestamp_to_string(end)
  104. total = 0
  105. for count in self.http.post_gen("stream/remove", params):
  106. total += int(count)
  107. return total
  108. @contextlib.contextmanager
  109. def stream_insert_context(self, path, start = None, end = None):
  110. """Return a context manager that allows data to be efficiently
  111. inserted into a stream in a piecewise manner. Data is
  112. provided as ASCII lines, and is aggregated and sent to the
  113. server in larger or smaller chunks as necessary. Data lines
  114. must match the database layout for the given path, and end
  115. with a newline.
  116. Example:
  117. with client.stream_insert_context('/path', start, end) as ctx:
  118. ctx.insert('1234567890.0 1 2 3 4\\n')
  119. ctx.insert('1234567891.0 1 2 3 4\\n')
  120. For more details, see help for nilmdb.client.client.StreamInserter
  121. This may make multiple requests to the server, if the data is
  122. large enough or enough time has passed between insertions.
  123. """
  124. ctx = StreamInserter(self, path, start, end)
  125. yield ctx
  126. ctx.finalize()
  127. ctx.destroy()
  128. def stream_insert(self, path, data, start = None, end = None):
  129. """Insert rows of data into a stream. data should be a string
  130. or iterable that provides ASCII data that matches the database
  131. layout for path. Data is passed through stream_insert_context,
  132. so it will be broken into reasonably-sized chunks and
  133. start/end will be deduced if missing."""
  134. with self.stream_insert_context(path, start, end) as ctx:
  135. if isinstance(data, str):
  136. ctx.insert(data)
  137. else:
  138. for chunk in data:
  139. ctx.insert(chunk)
  140. return ctx.last_response
  141. def stream_insert_block(self, path, data, start, end, binary = False):
  142. """Insert a single fixed block of data into the stream. It is
  143. sent directly to the server in one block with no further
  144. processing.
  145. If 'binary' is True, provide raw binary data in little-endian
  146. format matching the path layout, including an int64 timestamp.
  147. Otherwise, provide ASCII data matching the layout."""
  148. params = {
  149. "path": path,
  150. "start": timestamp_to_string(start),
  151. "end": timestamp_to_string(end),
  152. }
  153. if binary:
  154. params["binary"] = 1
  155. return self.http.put("stream/insert", data, params, binary = binary)
  156. def stream_intervals(self, path, start = None, end = None, diffpath = None):
  157. """
  158. Return a generator that yields each stream interval.
  159. If 'diffpath' is not None, yields only interval ranges that are
  160. present in 'path' but not in 'diffpath'.
  161. """
  162. params = {
  163. "path": path
  164. }
  165. if diffpath is not None:
  166. params["diffpath"] = diffpath
  167. if start is not None:
  168. params["start"] = timestamp_to_string(start)
  169. if end is not None:
  170. params["end"] = timestamp_to_string(end)
  171. return self.http.get_gen("stream/intervals", params)
  172. def stream_extract(self, path, start = None, end = None,
  173. count = False, markup = False, binary = False):
  174. """
  175. Extract data from a stream. Returns a generator that yields
  176. lines of ASCII-formatted data that matches the database
  177. layout for the given path.
  178. If 'count' is True, return a count of matching data points
  179. rather than the actual data. The output format is unchanged.
  180. If 'markup' is True, include comments in the returned data
  181. that indicate interval starts and ends.
  182. If 'binary' is True, return chunks of raw binary data, rather
  183. than lines of ASCII-formatted data. Raw binary data is
  184. little-endian and matches the database types (including an
  185. int64 timestamp).
  186. """
  187. params = {
  188. "path": path,
  189. }
  190. if start is not None:
  191. params["start"] = timestamp_to_string(start)
  192. if end is not None:
  193. params["end"] = timestamp_to_string(end)
  194. if count:
  195. params["count"] = 1
  196. if markup:
  197. params["markup"] = 1
  198. if binary:
  199. params["binary"] = 1
  200. return self.http.get_gen("stream/extract", params, binary = binary)
  201. def stream_count(self, path, start = None, end = None):
  202. """
  203. Return the number of rows of data in the stream that satisfy
  204. the given timestamps.
  205. """
  206. counts = list(self.stream_extract(path, start, end, count = True))
  207. return int(counts[0])
  208. class StreamInserter(object):
  209. """Object returned by stream_insert_context() that manages
  210. the insertion of rows of data into a particular path.
  211. The basic data flow is that we are filling a contiguous interval
  212. on the server, with no gaps, that extends from timestamp 'start'
  213. to timestamp 'end'. Data timestamps satisfy 'start <= t < end'.
  214. Data is provided to .insert() as ASCII formatted data separated by
  215. newlines. The chunks of data passed to .insert() do not need to
  216. match up with the newlines; less or more than one line can be passed.
  217. 1. The first inserted line begins a new interval that starts at
  218. 'start'. If 'start' is not given, it is deduced from the first
  219. line's timestamp.
  220. 2. Subsequent lines go into the same contiguous interval. As lines
  221. are inserted, this routine may make multiple insertion requests to
  222. the server, but will structure the timestamps to leave no gaps.
  223. 3. The current contiguous interval can be completed by manually
  224. calling .finalize(), which the context manager will also do
  225. automatically. This will send any remaining data to the server,
  226. using the 'end' timestamp to end the interval. If no 'end'
  227. was provided, it is deduced from the last timestamp seen,
  228. plus a small delta.
  229. After a .finalize(), inserting new data goes back to step 1.
  230. .update_start() can be called before step 1 to change the start
  231. time for the interval. .update_end() can be called before step 3
  232. to change the end time for the interval.
  233. """
  234. # See design.md for a discussion of how much data to send. This
  235. # is a soft limit -- we might send up to twice as much or so
  236. _max_data = 2 * 1024 * 1024
  237. _max_data_after_send = 64 * 1024
  238. def __init__(self, client, path, start, end):
  239. """'client' is the client object. 'path' is the database
  240. path to insert to. 'start' and 'end' are used for the first
  241. contiguous interval and may be None."""
  242. self.last_response = None
  243. self._client = client
  244. self._path = path
  245. # Start and end for the overall contiguous interval we're
  246. # filling
  247. self._interval_start = start
  248. self._interval_end = end
  249. # Current data we're building up to send. Each string
  250. # goes into the array, and gets joined all at once.
  251. self._block_data = []
  252. self._block_len = 0
  253. self.destroyed = False
  254. def destroy(self):
  255. """Ensure this object can't be used again without raising
  256. an error"""
  257. def error(*args, **kwargs):
  258. raise Exception("don't reuse this context object")
  259. self._send_block = self.insert = self.finalize = self.send = error
  260. def insert(self, data):
  261. """Insert a chunk of ASCII formatted data in string form. The
  262. overall data must consist of lines terminated by '\\n'."""
  263. length = len(data)
  264. maxdata = self._max_data
  265. if length > maxdata:
  266. # This could make our buffer more than twice what we
  267. # wanted to send, so split it up. This is a bit
  268. # inefficient, but the user really shouldn't be providing
  269. # this much data at once.
  270. for cut in range(0, length, maxdata):
  271. self.insert(data[cut:(cut + maxdata)])
  272. return
  273. # Append this string to our list
  274. self._block_data.append(data)
  275. self._block_len += length
  276. # Send the block once we have enough data
  277. if self._block_len >= maxdata:
  278. self._send_block(final = False)
  279. if self._block_len >= self._max_data_after_send: # pragma: no cover
  280. raise ValueError("too much data left over after trying"
  281. " to send intermediate block; is it"
  282. " missing newlines or malformed?")
  283. def update_start(self, start):
  284. """Update the start time for the next contiguous interval.
  285. Call this before starting to insert data for a new interval,
  286. for example, after .finalize()"""
  287. self._interval_start = start
  288. def update_end(self, end):
  289. """Update the end time for the current contiguous interval.
  290. Call this before .finalize()"""
  291. self._interval_end = end
  292. def finalize(self):
  293. """Stop filling the current contiguous interval.
  294. All outstanding data will be sent, and the interval end
  295. time of the interval will be taken from the 'end' argument
  296. used when initializing this class, or the most recent
  297. value passed to update_end(), or the last timestamp plus
  298. a small epsilon value if no other endpoint was provided.
  299. If more data is inserted after a finalize(), it will become
  300. part of a new interval and there may be a gap left in-between."""
  301. self._send_block(final = True)
  302. def send(self):
  303. """Send any data that we might have buffered up. Does not affect
  304. any other treatment of timestamps or endpoints."""
  305. self._send_block(final = False)
  306. def _get_first_noncomment(self, block):
  307. """Return the (start, end) indices of the first full line in
  308. block that isn't a comment, or raise IndexError if
  309. there isn't one."""
  310. start = 0
  311. while True:
  312. end = block.find('\n', start)
  313. if end < 0:
  314. raise IndexError
  315. if block[start] != '#':
  316. return (start, (end + 1))
  317. start = end + 1
  318. def _get_last_noncomment(self, block):
  319. """Return the (start, end) indices of the last full line in
  320. block[:length] that isn't a comment, or raise IndexError if
  321. there isn't one."""
  322. end = block.rfind('\n')
  323. if end <= 0:
  324. raise IndexError
  325. while True:
  326. start = block.rfind('\n', 0, end)
  327. if block[start + 1] != '#':
  328. return ((start + 1), end)
  329. if start == -1:
  330. raise IndexError
  331. end = start
  332. def _send_block(self, final = False):
  333. """Send data currently in the block. The data sent will
  334. consist of full lines only, so some might be left over."""
  335. # Build the full string to send
  336. block = "".join(self._block_data)
  337. start_ts = self._interval_start
  338. if start_ts is None:
  339. # Pull start from the first line
  340. try:
  341. (spos, epos) = self._get_first_noncomment(block)
  342. start_ts = extract_timestamp(block[spos:epos])
  343. except (ValueError, IndexError):
  344. pass # no timestamp is OK, if we have no data
  345. if final:
  346. # For a final block, it must end in a newline, and the
  347. # ending timestamp is either the user-provided end,
  348. # or the timestamp of the last line plus epsilon.
  349. end_ts = self._interval_end
  350. try:
  351. if block[-1] != '\n':
  352. raise ValueError("final block didn't end with a newline")
  353. if end_ts is None:
  354. (spos, epos) = self._get_last_noncomment(block)
  355. end_ts = extract_timestamp(block[spos:epos])
  356. end_ts += nilmdb.utils.time.epsilon
  357. except (ValueError, IndexError):
  358. pass # no timestamp is OK, if we have no data
  359. self._block_data = []
  360. self._block_len = 0
  361. # Next block is completely fresh
  362. self._interval_start = None
  363. self._interval_end = None
  364. else:
  365. # An intermediate block, e.g. "line1\nline2\nline3\nline4"
  366. # We need to save "line3\nline4" for the next block, and
  367. # use the timestamp from "line3" as the ending timestamp
  368. # for this one.
  369. try:
  370. (spos, epos) = self._get_last_noncomment(block)
  371. end_ts = extract_timestamp(block[spos:epos])
  372. except (ValueError, IndexError):
  373. # If we found no timestamp, give up; we could send this
  374. # block later when we have more data.
  375. return
  376. if spos == 0:
  377. # Not enough data to send an intermediate block
  378. return
  379. if self._interval_end is not None and end_ts > self._interval_end:
  380. # User gave us bad endpoints; send it anyway, and let
  381. # the server complain so that the error is the same
  382. # as if we hadn't done this chunking.
  383. end_ts = self._interval_end
  384. self._block_data = [ block[spos:] ]
  385. self._block_len = (epos - spos)
  386. block = block[:spos]
  387. # Next block continues where this one ended
  388. self._interval_start = end_ts
  389. # Double check endpoints
  390. if (start_ts is None or end_ts is None) or (start_ts == end_ts):
  391. # If the block has no non-comment lines, it's OK
  392. try:
  393. self._get_first_noncomment(block)
  394. except IndexError:
  395. return
  396. raise ClientError("have data to send, but no start/end times")
  397. # Send it
  398. self.last_response = self._client.stream_insert_block(
  399. self._path, block, start_ts, end_ts, binary = False)
  400. return