You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

380 lines
14 KiB

  1. # -*- coding: utf-8 -*-
  2. """Class for performing HTTP client requests via libcurl"""
  3. import nilmdb
  4. import nilmdb.utils
  5. import nilmdb.client.httpclient
  6. import time
  7. import simplejson as json
  8. import contextlib
  9. def float_to_string(f):
  10. """Use repr to maintain full precision in the string output."""
  11. return repr(float(f))
  12. def extract_timestamp(line):
  13. """Extract just the timestamp from a line of data text"""
  14. return float(line.split()[0])
  15. class Client(object):
  16. """Main client interface to the Nilm database."""
  17. def __init__(self, url):
  18. self.http = nilmdb.client.httpclient.HTTPClient(url)
  19. # __enter__/__exit__ allow this class to be a context manager
  20. def __enter__(self):
  21. return self
  22. def __exit__(self, exc_type, exc_value, traceback):
  23. self.close()
  24. def _json_param(self, data):
  25. """Return compact json-encoded version of parameter"""
  26. return json.dumps(data, separators=(',',':'))
  27. def close(self):
  28. """Close the connection; safe to call multiple times"""
  29. self.http.close()
  30. def geturl(self):
  31. """Return the URL we're using"""
  32. return self.http.baseurl
  33. def version(self):
  34. """Return server version"""
  35. return self.http.get("version")
  36. def dbinfo(self):
  37. """Return server database info (path, size, free space)
  38. as a dictionary."""
  39. return self.http.get("dbinfo")
  40. def stream_list(self, path = None, layout = None):
  41. params = {}
  42. if path is not None:
  43. params["path"] = path
  44. if layout is not None:
  45. params["layout"] = layout
  46. return self.http.get("stream/list", params)
  47. def stream_get_metadata(self, path, keys = None):
  48. params = { "path": path }
  49. if keys is not None:
  50. params["key"] = keys
  51. return self.http.get("stream/get_metadata", params)
  52. def stream_set_metadata(self, path, data):
  53. """Set stream metadata from a dictionary, replacing all existing
  54. metadata."""
  55. params = {
  56. "path": path,
  57. "data": self._json_param(data)
  58. }
  59. return self.http.get("stream/set_metadata", params)
  60. def stream_update_metadata(self, path, data):
  61. """Update stream metadata from a dictionary"""
  62. params = {
  63. "path": path,
  64. "data": self._json_param(data)
  65. }
  66. return self.http.get("stream/update_metadata", params)
  67. def stream_create(self, path, layout):
  68. """Create a new stream"""
  69. params = { "path": path,
  70. "layout" : layout }
  71. return self.http.get("stream/create", params)
  72. def stream_destroy(self, path):
  73. """Delete stream and its contents"""
  74. params = { "path": path }
  75. return self.http.get("stream/destroy", params)
  76. def stream_remove(self, path, start = None, end = None):
  77. """Remove data from the specified time range"""
  78. params = {
  79. "path": path
  80. }
  81. if start is not None:
  82. params["start"] = float_to_string(start)
  83. if end is not None:
  84. params["end"] = float_to_string(end)
  85. return self.http.get("stream/remove", params)
  86. @contextlib.contextmanager
  87. def stream_insert_context(self, path, start = None, end = None):
  88. """Return a context manager that allows data to be efficiently
  89. inserted into a stream in a piecewise manner. Data is be provided
  90. as single lines, and is aggregated and sent to the server in larger
  91. chunks as necessary. Data lines must match the database layout for
  92. the given path, and end with a newline.
  93. Example:
  94. with client.stream_insert_context('/path', start, end) as ctx:
  95. ctx.insert_line('1234567890.0 1 2 3 4\\n')
  96. ctx.insert_line('1234567891.0 1 2 3 4\\n')
  97. For more details, see help for nilmdb.client.client.StreamInserter
  98. This may make multiple requests to the server, if the data is
  99. large enough or enough time has passed between insertions.
  100. """
  101. ctx = StreamInserter(self, path, start, end)
  102. yield ctx
  103. ctx.finalize()
  104. def stream_insert(self, path, data, start = None, end = None):
  105. """Insert rows of data into a stream. data should be an
  106. iterable object that provides ASCII data that matches the
  107. database layout for path. See stream_insert_context for
  108. details on the 'start' and 'end' parameters."""
  109. with self.stream_insert_context(path, start, end) as ctx:
  110. ctx.insert_iter(data)
  111. return ctx.last_response
  112. def stream_insert_block(self, path, block, start, end):
  113. """Insert an entire block of data into a stream. Like
  114. stream_insert, except 'block' contains multiple lines of ASCII
  115. text and is sent in one single chunk."""
  116. params = { "path": path,
  117. "start": float_to_string(start),
  118. "end": float_to_string(end) }
  119. return self.http.put("stream/insert", block, params)
  120. def stream_intervals(self, path, start = None, end = None):
  121. """
  122. Return a generator that yields each stream interval.
  123. """
  124. params = {
  125. "path": path
  126. }
  127. if start is not None:
  128. params["start"] = float_to_string(start)
  129. if end is not None:
  130. params["end"] = float_to_string(end)
  131. return self.http.get_gen("stream/intervals", params, retjson = True)
  132. def stream_extract(self, path, start = None, end = None, count = False):
  133. """
  134. Extract data from a stream. Returns a generator that yields
  135. lines of ASCII-formatted data that matches the database
  136. layout for the given path.
  137. Specify count = True to return a count of matching data points
  138. rather than the actual data. The output format is unchanged.
  139. """
  140. params = {
  141. "path": path,
  142. }
  143. if start is not None:
  144. params["start"] = float_to_string(start)
  145. if end is not None:
  146. params["end"] = float_to_string(end)
  147. if count:
  148. params["count"] = 1
  149. return self.http.get_gen("stream/extract", params, retjson = False)
  150. def stream_count(self, path, start = None, end = None):
  151. """
  152. Return the number of rows of data in the stream that satisfy
  153. the given timestamps.
  154. """
  155. counts = list(self.stream_extract(path, start, end, count = True))
  156. return int(counts[0])
  157. class StreamInserter(object):
  158. """Object returned by stream_insert_context() that manages
  159. the insertion of rows of data into a particular path.
  160. The basic data flow is that we are filling a contiguous interval
  161. on the server, with no gaps, that extends from timestamp 'start'
  162. to timestamp 'end'. Data timestamps satisfy 'start <= t < end'.
  163. Data is provided by the user one line at a time with
  164. .insert_line() or .insert_iter().
  165. 1. The first inserted line begins a new interval that starts at
  166. 'start'. If 'start' is not given, it is deduced from the first
  167. line's timestamp.
  168. 2. Subsequent lines go into the same contiguous interval. As lines
  169. are inserted, this routine may make multiple insertion requests to
  170. the server, but will structure the timestamps to leave no gaps.
  171. 3. The current contiguous interval can be completed by manually
  172. calling .finalize(), which the context manager will also do
  173. automatically. This will send any remaining data to the server,
  174. using the 'end' timestamp to end the interval.
  175. After a .finalize(), inserting new data goes back to step 1.
  176. .update_start() can be called before step 1 to change the start
  177. time for the interval. .update_end() can be called before step 3
  178. to change the end time for the interval.
  179. """
  180. # See design.md for a discussion of how much data to send.
  181. # These are soft limits -- actual data might be rounded up.
  182. # We send when we have a certain amount of data queued, or
  183. # when a certain amount of time has passed since the last send.
  184. _max_data = 1048576
  185. _max_time = 30
  186. # Delta to add to the final timestamp, if "end" wasn't given
  187. _end_epsilon = 1e-6
  188. def __init__(self, client, path, start = None, end = None):
  189. """'http' is the httpclient object. 'path' is the database
  190. path to insert to. 'start' and 'end' are used for the first
  191. contiguous interval."""
  192. self.last_response = None
  193. self._client = client
  194. self._path = path
  195. # Start and end for the overall contiguous interval we're
  196. # filling
  197. self._interval_start = start
  198. self._interval_end = end
  199. # Data for the specific block we're building up to send
  200. self._block_data = []
  201. self._block_len = 0
  202. self._block_start = None
  203. # Time of last request
  204. self._last_time = time.time()
  205. # We keep a buffer of the two most recently inserted lines.
  206. # Only the older one actually gets processed; the newer one
  207. # is used to "look-ahead" to the next timestamp if we need
  208. # to internally split an insertion into two requests.
  209. self._line_old = None
  210. self._line_new = None
  211. def insert_iter(self, iter):
  212. """Insert all lines of ASCII formatted data from the given
  213. iterable. Lines must be terminated with '\\n'."""
  214. for line in iter:
  215. self.insert_line(line)
  216. def insert_line(self, line, allow_intermediate = True):
  217. """Insert a single line of ASCII formatted data. Line
  218. must be terminated with '\\n'."""
  219. if line and (len(line) < 1 or line[-1] != '\n'):
  220. raise ValueError("lines must end in with a newline character")
  221. # Store this new line, but process the previous (old) one.
  222. # This lets us "look ahead" to the next line.
  223. self._line_old = self._line_new
  224. self._line_new = line
  225. if self._line_old is None:
  226. return
  227. # If starting a new block, pull out the timestamp if needed.
  228. if self._block_start is None:
  229. if self._interval_start is not None:
  230. # User provided a start timestamp. Use it once, then
  231. # clear it for the next block.
  232. self._block_start = self._interval_start
  233. self._interval_start = None
  234. else:
  235. # Extract timestamp from the first row
  236. self._block_start = extract_timestamp(self._line_old)
  237. # Save the line
  238. self._block_data.append(self._line_old)
  239. self._block_len += len(self._line_old)
  240. if allow_intermediate:
  241. # Send an intermediate block to the server if needed.
  242. elapsed = time.time() - self._last_time
  243. if (self._block_len > self._max_data) or (elapsed > self._max_time):
  244. self._send_block_intermediate()
  245. def update_start(self, start):
  246. """Update the start time for the next contiguous interval.
  247. Call this before starting to insert data for a new interval,
  248. for example, after .finalize()"""
  249. self._interval_start = start
  250. def update_end(self, end):
  251. """Update the end time for the current contiguous interval.
  252. Call this before .finalize()"""
  253. self._interval_end = end
  254. def finalize(self):
  255. """Stop filling the current contiguous interval.
  256. All outstanding data will be sent, and the interval end
  257. time of the interval will be taken from the 'end' argument
  258. used when initializing this class, or the most recent
  259. value passed to update_end(), or the last timestamp plus
  260. a small epsilon value if no other endpoint was provided.
  261. If more data is inserted after a finalize(), it will become
  262. part of a new interval and there may be a gap left in-between."""
  263. # Special marker tells insert_line that this is the end
  264. self.insert_line(None, allow_intermediate = False)
  265. if self._block_len > 0:
  266. # We have data pending, so send the final block
  267. self._send_block_final()
  268. elif None not in (self._interval_start, self._interval_end):
  269. # We have no data, but enough information to create an
  270. # empty interval.
  271. self._block_start = self._interval_start
  272. self._interval_start = None
  273. self._send_block_final()
  274. else:
  275. # No data, and no timestamps to use to create an empty
  276. # interval.
  277. pass
  278. # Make sure both timestamps are emptied for future intervals.
  279. self._interval_start = None
  280. self._interval_end = None
  281. def _send_block_intermediate(self):
  282. """Send data, when we still have more data to send.
  283. Use the timestamp from the next line, so that the blocks
  284. are contiguous."""
  285. block_end = extract_timestamp(self._line_new)
  286. if self._interval_end is not None and block_end > self._interval_end:
  287. # Something's fishy -- the timestamp we found is after
  288. # the user's specified end. Limit it here, and the
  289. # server will return an error.
  290. block_end = self._interval_end
  291. self._send_block(block_end)
  292. def _send_block_final(self):
  293. """Send data, when this is the last block for the interval.
  294. There is no next line, so figure out the actual interval end
  295. using interval_end or end_epsilon."""
  296. if self._interval_end is not None:
  297. # Use the user's specified end timestamp
  298. block_end = self._interval_end
  299. # Clear it in case we send more intervals in the future.
  300. self._interval_end = None
  301. else:
  302. # Add an epsilon to the last timestamp we saw
  303. block_end = extract_timestamp(self._line_old) + self._end_epsilon
  304. self._send_block(block_end)
  305. def _send_block(self, block_end):
  306. """Send current block to the server"""
  307. self.last_response = self._client.stream_insert_block(
  308. self._path, "".join(self._block_data),
  309. self._block_start, block_end)
  310. # Clear out the block
  311. self._block_data = []
  312. self._block_len = 0
  313. self._block_start = None
  314. # Note when we sent it
  315. self._last_time = time.time()