You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

364 lines
13 KiB

  1. # -*- coding: utf-8 -*-
  2. """Class for performing HTTP client requests via libcurl"""
  3. import nilmdb
  4. import nilmdb.utils
  5. import nilmdb.client.httpclient
  6. import time
  7. import simplejson as json
  8. import contextlib
  9. def float_to_string(f):
  10. """Use repr to maintain full precision in the string output."""
  11. return repr(float(f))
  12. def extract_timestamp(line):
  13. """Extract just the timestamp from a line of data text"""
  14. return float(line.split()[0])
  15. class Client(object):
  16. """Main client interface to the Nilm database."""
  17. def __init__(self, url):
  18. self.http = nilmdb.client.httpclient.HTTPClient(url)
  19. # __enter__/__exit__ allow this class to be a context manager
  20. def __enter__(self):
  21. return self
  22. def __exit__(self, exc_type, exc_value, traceback):
  23. self.close()
  24. def _json_param(self, data):
  25. """Return compact json-encoded version of parameter"""
  26. return json.dumps(data, separators=(',',':'))
  27. def close(self):
  28. """Close the connection; safe to call multiple times"""
  29. self.http.close()
  30. def geturl(self):
  31. """Return the URL we're using"""
  32. return self.http.baseurl
  33. def version(self):
  34. """Return server version"""
  35. return self.http.get("version")
  36. def dbinfo(self):
  37. """Return server database info (path, size, free space)
  38. as a dictionary."""
  39. return self.http.get("dbinfo")
  40. def stream_list(self, path = None, layout = None):
  41. params = {}
  42. if path is not None:
  43. params["path"] = path
  44. if layout is not None:
  45. params["layout"] = layout
  46. return self.http.get("stream/list", params)
  47. def stream_get_metadata(self, path, keys = None):
  48. params = { "path": path }
  49. if keys is not None:
  50. params["key"] = keys
  51. return self.http.get("stream/get_metadata", params)
  52. def stream_set_metadata(self, path, data):
  53. """Set stream metadata from a dictionary, replacing all existing
  54. metadata."""
  55. params = {
  56. "path": path,
  57. "data": self._json_param(data)
  58. }
  59. return self.http.get("stream/set_metadata", params)
  60. def stream_update_metadata(self, path, data):
  61. """Update stream metadata from a dictionary"""
  62. params = {
  63. "path": path,
  64. "data": self._json_param(data)
  65. }
  66. return self.http.get("stream/update_metadata", params)
  67. def stream_create(self, path, layout):
  68. """Create a new stream"""
  69. params = { "path": path,
  70. "layout" : layout }
  71. return self.http.get("stream/create", params)
  72. def stream_destroy(self, path):
  73. """Delete stream and its contents"""
  74. params = { "path": path }
  75. return self.http.get("stream/destroy", params)
  76. def stream_remove(self, path, start = None, end = None):
  77. """Remove data from the specified time range"""
  78. params = {
  79. "path": path
  80. }
  81. if start is not None:
  82. params["start"] = float_to_string(start)
  83. if end is not None:
  84. params["end"] = float_to_string(end)
  85. return self.http.get("stream/remove", params)
  86. @contextlib.contextmanager
  87. def stream_insert_context(self, path, start = None, end = None):
  88. """Return a context manager that allows data to be efficiently
  89. inserted into a stream in a piecewise manner. Data is be provided
  90. as single lines, and is aggregated and sent to the server in larger
  91. chunks as necessary. Data lines must match the database layout for
  92. the given path, and end with a newline.
  93. Example:
  94. with client.stream_insert_context('/path', start, end) as ctx:
  95. ctx.insert_line('1234567890.0 1 2 3 4\\n')
  96. ctx.insert_line('1234567891.0 1 2 3 4\\n')
  97. For more details, see help for nilmdb.client.client.StreamInserter
  98. This may make multiple requests to the server, if the data is
  99. large enough or enough time has passed between insertions.
  100. """
  101. ctx = StreamInserter(self, path, start, end)
  102. yield ctx
  103. ctx.finalize()
  104. def stream_insert(self, path, data, start = None, end = None):
  105. """Insert rows of data into a stream. data should be an
  106. iterable object that provides ASCII data that matches the
  107. database layout for path. See stream_insert_context for
  108. details on the 'start' and 'end' parameters."""
  109. with self.stream_insert_context(path, start, end) as ctx:
  110. ctx.insert_iter(data)
  111. return ctx.last_response
  112. def stream_insert_block(self, path, block, start, end):
  113. """Insert an entire block of data into a stream. Like
  114. stream_insert, except 'block' contains multiple lines of ASCII
  115. text and is sent in one single chunk."""
  116. params = { "path": path,
  117. "start": float_to_string(start),
  118. "end": float_to_string(end) }
  119. return self.http.put("stream/insert", block, params)
  120. def stream_intervals(self, path, start = None, end = None):
  121. """
  122. Return a generator that yields each stream interval.
  123. """
  124. params = {
  125. "path": path
  126. }
  127. if start is not None:
  128. params["start"] = float_to_string(start)
  129. if end is not None:
  130. params["end"] = float_to_string(end)
  131. return self.http.get_gen("stream/intervals", params, retjson = True)
  132. def stream_extract(self, path, start = None, end = None, count = False):
  133. """
  134. Extract data from a stream. Returns a generator that yields
  135. lines of ASCII-formatted data that matches the database
  136. layout for the given path.
  137. Specify count = True to return a count of matching data points
  138. rather than the actual data. The output format is unchanged.
  139. """
  140. params = {
  141. "path": path,
  142. }
  143. if start is not None:
  144. params["start"] = float_to_string(start)
  145. if end is not None:
  146. params["end"] = float_to_string(end)
  147. if count:
  148. params["count"] = 1
  149. return self.http.get_gen("stream/extract", params, retjson = False)
  150. def stream_count(self, path, start = None, end = None):
  151. """
  152. Return the number of rows of data in the stream that satisfy
  153. the given timestamps.
  154. """
  155. return int(self.stream_extract(path, start, end, count = True)[0])
  156. class StreamInserter(object):
  157. """Object returned by stream_insert_context() that manages
  158. the insertion of rows of data into a particular path.
  159. The basic data flow is that we are filling a contiguous interval
  160. on the server, with no gaps, that extends from timestamp 'start'
  161. to timestamp 'end'. Data timestamps satisfy 'start <= t < end'.
  162. Data is provided by the user one line at a time with
  163. .insert_line() or .insert_iter().
  164. 1. The first inserted line begins a new interval that starts at
  165. 'start'. If 'start' is not given, it is deduced from the first
  166. line's timestamp.
  167. 2. Subsequent lines go into the same contiguous interval. As lines
  168. are inserted, this routine may make multiple insertion requests to
  169. the server, but will structure the timestamps to leave no gaps.
  170. 3. The current contiguous interval can be completed by manually
  171. calling .finalize(), which the context manager will also do
  172. automatically. This will send any remaining data to the server,
  173. using the 'end' timestamp to end the interval.
  174. After a .finalize(), inserting new data goes back to step 1.
  175. .update_start() can be called before step 1 to change the start
  176. time for the interval. .update_end() can be called before step 3
  177. to change the end time for the interval.
  178. """
  179. # See design.md for a discussion of how much data to send.
  180. # These are soft limits -- actual data might be rounded up.
  181. # We send when we have a certain amount of data queued, or
  182. # when a certain amount of time has passed since the last send.
  183. _max_data = 1048576
  184. _max_time = 30
  185. # Delta to add to the final timestamp, if "end" wasn't given
  186. _end_epsilon = 1e-6
  187. def __init__(self, client, path, start = None, end = None):
  188. """'http' is the httpclient object. 'path' is the database
  189. path to insert to. 'start' and 'end' are used for the first
  190. contiguous interval."""
  191. self.last_response = None
  192. self._client = client
  193. self._path = path
  194. # Start and end for the overall contiguous interval we're
  195. # filling
  196. self._interval_start = start
  197. self._interval_end = end
  198. # Data for the specific block we're building up to send
  199. self._block_data = []
  200. self._block_len = 0
  201. self._block_start = None
  202. # Time of last request
  203. self._last_time = time.time()
  204. # We keep a buffer of the two most recently inserted lines.
  205. # Only the older one actually gets processed; the newer one
  206. # is used to "look-ahead" to the next timestamp if we need
  207. # to internally split an insertion into two requests.
  208. self._line_old = None
  209. self._line_new = None
  210. def insert_iter(self, iter):
  211. """Insert all lines of ASCII formatted data from the given
  212. iterable. Lines must be terminated with '\\n'."""
  213. for line in iter:
  214. self.insert_line(line)
  215. def insert_line(self, line):
  216. """Insert a single line of ASCII formatted data. Line
  217. must be terminated with '\\n'."""
  218. if line and (len(line) < 1 or line[-1] != '\n'):
  219. raise ValueError("lines must end in with a newline character")
  220. # Store this new line, but process the previous (old) one.
  221. # This lets us "look ahead" to the next line.
  222. self._line_old = self._line_new
  223. self._line_new = line
  224. if self._line_old is None:
  225. return
  226. # If starting a new block, pull out the timestamp if needed.
  227. if self._block_start is None:
  228. if self._interval_start:
  229. # User provided a start timestamp. Use it once, then
  230. # clear it for the next block.
  231. self._block_start = self._interval_start
  232. self._interval_start = None
  233. else:
  234. # Extract timestamp from the first row
  235. self._block_start = extract_timestamp(self._line_old)
  236. # Save the line
  237. self._block_data.append(self._line_old)
  238. self._block_len += len(self._line_old)
  239. if self._line_new is None:
  240. # No next line, so send this as the final block.
  241. self._send_block_final()
  242. else:
  243. # Send an intermediate block to the server if needed.
  244. elapsed = time.time() - self._last_time
  245. if (self._block_len > self._max_data) or (elapsed > self._max_time):
  246. self._send_block_intermediate()
  247. def update_start(self, start):
  248. """Update the start time for the next contiguous interval.
  249. Call this before starting to insert data for a new interval,
  250. for example, after .finalize()"""
  251. self._interval_start = start
  252. def update_end(self, end):
  253. """Update the end time for the current contiguous interval.
  254. Call this before .finalize()"""
  255. self._interval_end = end
  256. def finalize(self):
  257. """Stop filling the current contiguous interval.
  258. All outstanding data will be sent, and the interval end
  259. time of the interval will be taken from the 'end' argument
  260. used when initializing this class, or the most recent
  261. value passed to update_end(), or the last timestamp plus
  262. a small epsilon value if no other endpoint was provided.
  263. If more data is inserted after a finalize(), it will become
  264. part of a new interval and there may be a gap left in-between."""
  265. # Special marker tells insert_line that this is the end
  266. self.insert_line(None)
  267. def _send_block_intermediate(self):
  268. """Send data, when we still have more data to send.
  269. Use the timestamp from the next line, so that the blocks
  270. are contiguous."""
  271. block_end = extract_timestamp(self._line_new)
  272. if self._interval_end and block_end > self._interval_end:
  273. # Something's fishy -- the timestamp we found is after
  274. # the user's specified end. Limit it here, and the
  275. # server will return an error.
  276. block_end = self._interval_end
  277. self._send_block(block_end)
  278. def _send_block_final(self):
  279. """Send data, when this is the last block for the interval.
  280. There is no next line, so figure out the actual interval end
  281. using interval_end or end_epsilon."""
  282. if self._interval_end:
  283. # Use the user's specified end timestamp
  284. block_end = self._interval_end
  285. # Clear it in case we send more intervals in the future.
  286. self._interval_end = None
  287. else:
  288. # Add an epsilon to the last timestamp we saw
  289. block_end = extract_timestamp(self._line_old) + self._end_epsilon
  290. self._send_block(block_end)
  291. def _send_block(self, block_end):
  292. """Send current block to the server"""
  293. self.last_response = self._client.stream_insert_block(
  294. self._path, "".join(self._block_data),
  295. self._block_start, block_end)
  296. # Clear out the block
  297. self._block_data = []
  298. self._block_len = 0
  299. self._block_start = None
  300. # Note when we sent it
  301. self._last_time = time.time()