You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

384 lines
14 KiB

  1. # -*- coding: utf-8 -*-
  2. """Class for performing HTTP client requests via libcurl"""
  3. import nilmdb
  4. import nilmdb.utils
  5. import nilmdb.client.httpclient
  6. import time
  7. import simplejson as json
  8. import contextlib
  9. def float_to_string(f):
  10. """Use repr to maintain full precision in the string output."""
  11. return repr(float(f))
  12. def extract_timestamp(line):
  13. """Extract just the timestamp from a line of data text"""
  14. return float(line.split()[0])
  15. class Client(object):
  16. """Main client interface to the Nilm database."""
  17. def __init__(self, url, post_json = False):
  18. """Initialize client with given URL. If post_json is true,
  19. POST requests are sent with Content-Type 'application/json'
  20. instead of the default 'x-www-form-urlencoded'."""
  21. self.http = nilmdb.client.httpclient.HTTPClient(url, post_json)
  22. # __enter__/__exit__ allow this class to be a context manager
  23. def __enter__(self):
  24. return self
  25. def __exit__(self, exc_type, exc_value, traceback):
  26. self.close()
  27. def _json_param(self, data):
  28. """Return compact json-encoded version of parameter"""
  29. return json.dumps(data, separators=(',',':'))
  30. def close(self):
  31. """Close the connection; safe to call multiple times"""
  32. self.http.close()
  33. def geturl(self):
  34. """Return the URL we're using"""
  35. return self.http.baseurl
  36. def version(self):
  37. """Return server version"""
  38. return self.http.get("version")
  39. def dbinfo(self):
  40. """Return server database info (path, size, free space)
  41. as a dictionary."""
  42. return self.http.get("dbinfo")
  43. def stream_list(self, path = None, layout = None, extent = False):
  44. params = {}
  45. if path is not None:
  46. params["path"] = path
  47. if layout is not None:
  48. params["layout"] = layout
  49. if extent:
  50. params["extent"] = 1
  51. return self.http.get("stream/list", params)
  52. def stream_get_metadata(self, path, keys = None):
  53. params = { "path": path }
  54. if keys is not None:
  55. params["key"] = keys
  56. return self.http.get("stream/get_metadata", params)
  57. def stream_set_metadata(self, path, data):
  58. """Set stream metadata from a dictionary, replacing all existing
  59. metadata."""
  60. params = {
  61. "path": path,
  62. "data": self._json_param(data)
  63. }
  64. return self.http.post("stream/set_metadata", params)
  65. def stream_update_metadata(self, path, data):
  66. """Update stream metadata from a dictionary"""
  67. params = {
  68. "path": path,
  69. "data": self._json_param(data)
  70. }
  71. return self.http.post("stream/update_metadata", params)
  72. def stream_create(self, path, layout):
  73. """Create a new stream"""
  74. params = { "path": path,
  75. "layout" : layout }
  76. return self.http.post("stream/create", params)
  77. def stream_destroy(self, path):
  78. """Delete stream and its contents"""
  79. params = { "path": path }
  80. return self.http.post("stream/destroy", params)
  81. def stream_remove(self, path, start = None, end = None):
  82. """Remove data from the specified time range"""
  83. params = {
  84. "path": path
  85. }
  86. if start is not None:
  87. params["start"] = float_to_string(start)
  88. if end is not None:
  89. params["end"] = float_to_string(end)
  90. return self.http.post("stream/remove", params)
  91. @contextlib.contextmanager
  92. def stream_insert_context(self, path, start = None, end = None):
  93. """Return a context manager that allows data to be efficiently
  94. inserted into a stream in a piecewise manner. Data is be provided
  95. as single lines, and is aggregated and sent to the server in larger
  96. chunks as necessary. Data lines must match the database layout for
  97. the given path, and end with a newline.
  98. Example:
  99. with client.stream_insert_context('/path', start, end) as ctx:
  100. ctx.insert_line('1234567890.0 1 2 3 4\\n')
  101. ctx.insert_line('1234567891.0 1 2 3 4\\n')
  102. For more details, see help for nilmdb.client.client.StreamInserter
  103. This may make multiple requests to the server, if the data is
  104. large enough or enough time has passed between insertions.
  105. """
  106. ctx = StreamInserter(self, path, start, end)
  107. yield ctx
  108. ctx.finalize()
  109. def stream_insert(self, path, data, start = None, end = None):
  110. """Insert rows of data into a stream. data should be an
  111. iterable object that provides ASCII data that matches the
  112. database layout for path. See stream_insert_context for
  113. details on the 'start' and 'end' parameters."""
  114. with self.stream_insert_context(path, start, end) as ctx:
  115. ctx.insert_iter(data)
  116. return ctx.last_response
  117. def stream_insert_block(self, path, block, start, end):
  118. """Insert an entire block of data into a stream. Like
  119. stream_insert, except 'block' contains multiple lines of ASCII
  120. text and is sent in one single chunk."""
  121. params = { "path": path,
  122. "start": float_to_string(start),
  123. "end": float_to_string(end) }
  124. return self.http.put("stream/insert", block, params)
  125. def stream_intervals(self, path, start = None, end = None):
  126. """
  127. Return a generator that yields each stream interval.
  128. """
  129. params = {
  130. "path": path
  131. }
  132. if start is not None:
  133. params["start"] = float_to_string(start)
  134. if end is not None:
  135. params["end"] = float_to_string(end)
  136. return self.http.get_gen("stream/intervals", params)
  137. def stream_extract(self, path, start = None, end = None, count = False):
  138. """
  139. Extract data from a stream. Returns a generator that yields
  140. lines of ASCII-formatted data that matches the database
  141. layout for the given path.
  142. Specify count = True to return a count of matching data points
  143. rather than the actual data. The output format is unchanged.
  144. """
  145. params = {
  146. "path": path,
  147. }
  148. if start is not None:
  149. params["start"] = float_to_string(start)
  150. if end is not None:
  151. params["end"] = float_to_string(end)
  152. if count:
  153. params["count"] = 1
  154. return self.http.get_gen("stream/extract", params)
  155. def stream_count(self, path, start = None, end = None):
  156. """
  157. Return the number of rows of data in the stream that satisfy
  158. the given timestamps.
  159. """
  160. counts = list(self.stream_extract(path, start, end, count = True))
  161. return int(counts[0])
  162. class StreamInserter(object):
  163. """Object returned by stream_insert_context() that manages
  164. the insertion of rows of data into a particular path.
  165. The basic data flow is that we are filling a contiguous interval
  166. on the server, with no gaps, that extends from timestamp 'start'
  167. to timestamp 'end'. Data timestamps satisfy 'start <= t < end'.
  168. Data is provided by the user one line at a time with
  169. .insert_line() or .insert_iter().
  170. 1. The first inserted line begins a new interval that starts at
  171. 'start'. If 'start' is not given, it is deduced from the first
  172. line's timestamp.
  173. 2. Subsequent lines go into the same contiguous interval. As lines
  174. are inserted, this routine may make multiple insertion requests to
  175. the server, but will structure the timestamps to leave no gaps.
  176. 3. The current contiguous interval can be completed by manually
  177. calling .finalize(), which the context manager will also do
  178. automatically. This will send any remaining data to the server,
  179. using the 'end' timestamp to end the interval.
  180. After a .finalize(), inserting new data goes back to step 1.
  181. .update_start() can be called before step 1 to change the start
  182. time for the interval. .update_end() can be called before step 3
  183. to change the end time for the interval.
  184. """
  185. # See design.md for a discussion of how much data to send.
  186. # These are soft limits -- actual data might be rounded up.
  187. # We send when we have a certain amount of data queued, or
  188. # when a certain amount of time has passed since the last send.
  189. _max_data = 2 * 1024 * 1024
  190. _max_time = 30
  191. # Delta to add to the final timestamp, if "end" wasn't given
  192. _end_epsilon = 1e-6
  193. def __init__(self, client, path, start = None, end = None):
  194. """'http' is the httpclient object. 'path' is the database
  195. path to insert to. 'start' and 'end' are used for the first
  196. contiguous interval."""
  197. self.last_response = None
  198. self._client = client
  199. self._path = path
  200. # Start and end for the overall contiguous interval we're
  201. # filling
  202. self._interval_start = start
  203. self._interval_end = end
  204. # Data for the specific block we're building up to send
  205. self._block_data = []
  206. self._block_len = 0
  207. self._block_start = None
  208. # Time of last request
  209. self._last_time = time.time()
  210. # We keep a buffer of the two most recently inserted lines.
  211. # Only the older one actually gets processed; the newer one
  212. # is used to "look-ahead" to the next timestamp if we need
  213. # to internally split an insertion into two requests.
  214. self._line_old = None
  215. self._line_new = None
  216. def insert_iter(self, iter):
  217. """Insert all lines of ASCII formatted data from the given
  218. iterable. Lines must be terminated with '\\n'."""
  219. for line in iter:
  220. self.insert_line(line)
  221. def insert_line(self, line, allow_intermediate = True):
  222. """Insert a single line of ASCII formatted data. Line
  223. must be terminated with '\\n'."""
  224. if line and (len(line) < 1 or line[-1] != '\n'):
  225. raise ValueError("lines must end in with a newline character")
  226. # Store this new line, but process the previous (old) one.
  227. # This lets us "look ahead" to the next line.
  228. self._line_old = self._line_new
  229. self._line_new = line
  230. if self._line_old is None:
  231. return
  232. # If starting a new block, pull out the timestamp if needed.
  233. if self._block_start is None:
  234. if self._interval_start is not None:
  235. # User provided a start timestamp. Use it once, then
  236. # clear it for the next block.
  237. self._block_start = self._interval_start
  238. self._interval_start = None
  239. else:
  240. # Extract timestamp from the first row
  241. self._block_start = extract_timestamp(self._line_old)
  242. # Save the line
  243. self._block_data.append(self._line_old)
  244. self._block_len += len(self._line_old)
  245. if allow_intermediate:
  246. # Send an intermediate block to the server if needed.
  247. elapsed = time.time() - self._last_time
  248. if (self._block_len > self._max_data) or (elapsed > self._max_time):
  249. self._send_block_intermediate()
  250. def update_start(self, start):
  251. """Update the start time for the next contiguous interval.
  252. Call this before starting to insert data for a new interval,
  253. for example, after .finalize()"""
  254. self._interval_start = start
  255. def update_end(self, end):
  256. """Update the end time for the current contiguous interval.
  257. Call this before .finalize()"""
  258. self._interval_end = end
  259. def finalize(self):
  260. """Stop filling the current contiguous interval.
  261. All outstanding data will be sent, and the interval end
  262. time of the interval will be taken from the 'end' argument
  263. used when initializing this class, or the most recent
  264. value passed to update_end(), or the last timestamp plus
  265. a small epsilon value if no other endpoint was provided.
  266. If more data is inserted after a finalize(), it will become
  267. part of a new interval and there may be a gap left in-between."""
  268. # Special marker tells insert_line that this is the end
  269. self.insert_line(None, allow_intermediate = False)
  270. if self._block_len > 0:
  271. # We have data pending, so send the final block
  272. self._send_block_final()
  273. elif None not in (self._interval_start, self._interval_end):
  274. # We have no data, but enough information to create an
  275. # empty interval.
  276. self._block_start = self._interval_start
  277. self._interval_start = None
  278. self._send_block_final()
  279. else:
  280. # No data, and no timestamps to use to create an empty
  281. # interval.
  282. pass
  283. # Make sure both timestamps are emptied for future intervals.
  284. self._interval_start = None
  285. self._interval_end = None
  286. def _send_block_intermediate(self):
  287. """Send data, when we still have more data to send.
  288. Use the timestamp from the next line, so that the blocks
  289. are contiguous."""
  290. block_end = extract_timestamp(self._line_new)
  291. if self._interval_end is not None and block_end > self._interval_end:
  292. # Something's fishy -- the timestamp we found is after
  293. # the user's specified end. Limit it here, and the
  294. # server will return an error.
  295. block_end = self._interval_end
  296. self._send_block(block_end)
  297. def _send_block_final(self):
  298. """Send data, when this is the last block for the interval.
  299. There is no next line, so figure out the actual interval end
  300. using interval_end or end_epsilon."""
  301. if self._interval_end is not None:
  302. # Use the user's specified end timestamp
  303. block_end = self._interval_end
  304. # Clear it in case we send more intervals in the future.
  305. self._interval_end = None
  306. else:
  307. # Add an epsilon to the last timestamp we saw
  308. block_end = extract_timestamp(self._line_old) + self._end_epsilon
  309. self._send_block(block_end)
  310. def _send_block(self, block_end):
  311. """Send current block to the server"""
  312. self.last_response = self._client.stream_insert_block(
  313. self._path, "".join(self._block_data),
  314. self._block_start, block_end)
  315. # Clear out the block
  316. self._block_data = []
  317. self._block_len = 0
  318. self._block_start = None
  319. # Note when we sent it
  320. self._last_time = time.time()