You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

211 lines
6.9 KiB

  1. # -*- coding: utf-8 -*-
  2. """Class for performing HTTP client requests via libcurl"""
  3. import nilmdb
  4. import nilmdb.utils
  5. import nilmdb.client.httpclient
  6. import time
  7. import simplejson as json
  8. def float_to_string(f):
  9. """Use repr to maintain full precision in the string output."""
  10. return repr(float(f))
  11. @nilmdb.utils.must_close()
  12. class Client(object):
  13. """Main client interface to the Nilm database."""
  14. def __init__(self, url):
  15. self.http = nilmdb.client.httpclient.HTTPClient(url)
  16. # __enter__/__exit__ allow this class to be a context manager
  17. def __enter__(self):
  18. return self
  19. def __exit__(self, exc_type, exc_value, traceback):
  20. self.close()
  21. def _json_param(self, data):
  22. """Return compact json-encoded version of parameter"""
  23. return json.dumps(data, separators=(',',':'))
  24. def close(self):
  25. """Close the connection; safe to call multiple times"""
  26. self.http.close()
  27. def geturl(self):
  28. """Return the URL we're using"""
  29. return self.http.baseurl
  30. def version(self):
  31. """Return server version"""
  32. return self.http.get("version")
  33. def dbinfo(self):
  34. """Return server database info (path, size, free space)
  35. as a dictionary."""
  36. return self.http.get("dbinfo")
  37. def stream_list(self, path = None, layout = None):
  38. params = {}
  39. if path is not None:
  40. params["path"] = path
  41. if layout is not None:
  42. params["layout"] = layout
  43. return self.http.get("stream/list", params)
  44. def stream_get_metadata(self, path, keys = None):
  45. params = { "path": path }
  46. if keys is not None:
  47. params["key"] = keys
  48. return self.http.get("stream/get_metadata", params)
  49. def stream_set_metadata(self, path, data):
  50. """Set stream metadata from a dictionary, replacing all existing
  51. metadata."""
  52. params = {
  53. "path": path,
  54. "data": self._json_param(data)
  55. }
  56. return self.http.get("stream/set_metadata", params)
  57. def stream_update_metadata(self, path, data):
  58. """Update stream metadata from a dictionary"""
  59. params = {
  60. "path": path,
  61. "data": self._json_param(data)
  62. }
  63. return self.http.get("stream/update_metadata", params)
  64. def stream_create(self, path, layout):
  65. """Create a new stream"""
  66. params = { "path": path,
  67. "layout" : layout }
  68. return self.http.get("stream/create", params)
  69. def stream_destroy(self, path):
  70. """Delete stream and its contents"""
  71. params = { "path": path }
  72. return self.http.get("stream/destroy", params)
  73. def stream_remove(self, path, start = None, end = None):
  74. """Remove data from the specified time range"""
  75. params = {
  76. "path": path
  77. }
  78. if start is not None:
  79. params["start"] = float_to_string(start)
  80. if end is not None:
  81. params["end"] = float_to_string(end)
  82. return self.http.get("stream/remove", params)
  83. def stream_insert(self, path, data, start = None, end = None):
  84. """Insert data into a stream. data should be a file-like object
  85. that provides ASCII data that matches the database layout for path.
  86. start and end are the starting and ending timestamp of this
  87. stream; all timestamps t in the data must satisfy 'start <= t
  88. < end'. If left unspecified, 'start' is the timestamp of the
  89. first line of data, and 'end' is the timestamp on the last line
  90. of data, plus a small delta of 1μs.
  91. """
  92. params = { "path": path }
  93. # See design.md for a discussion of how much data to send.
  94. # These are soft limits -- actual data might be rounded up.
  95. max_data = 1048576
  96. max_time = 30
  97. end_epsilon = 1e-6
  98. def extract_timestamp(line):
  99. return float(line.split()[0])
  100. def sendit():
  101. # If we have more data after this, use the timestamp of
  102. # the next line as the end. Otherwise, use the given
  103. # overall end time, or add end_epsilon to the last data
  104. # point.
  105. if nextline:
  106. block_end = extract_timestamp(nextline)
  107. if end and block_end > end:
  108. # This is unexpected, but we'll defer to the server
  109. # to return an error in this case.
  110. block_end = end
  111. elif end:
  112. block_end = end
  113. else:
  114. block_end = extract_timestamp(line) + end_epsilon
  115. # Send it
  116. params["start"] = float_to_string(block_start)
  117. params["end"] = float_to_string(block_end)
  118. return self.http.put("stream/insert", block_data, params)
  119. clock_start = time.time()
  120. block_data = ""
  121. block_start = start
  122. result = None
  123. line = None
  124. nextline = None
  125. for (line, nextline) in nilmdb.utils.misc.pairwise(data):
  126. # If we don't have a starting time, extract it from the first line
  127. if block_start is None:
  128. block_start = extract_timestamp(line)
  129. clock_elapsed = time.time() - clock_start
  130. block_data += line
  131. # If we have enough data, or enough time has elapsed,
  132. # send this block to the server, and empty things out
  133. # for the next block.
  134. if (len(block_data) > max_data) or (clock_elapsed > max_time):
  135. result = sendit()
  136. block_start = None
  137. block_data = ""
  138. clock_start = time.time()
  139. # One last block?
  140. if len(block_data):
  141. result = sendit()
  142. # Return the most recent JSON result we got back, or None if
  143. # we didn't make any requests.
  144. return result
  145. def stream_intervals(self, path, start = None, end = None):
  146. """
  147. Return a generator that yields each stream interval.
  148. """
  149. params = {
  150. "path": path
  151. }
  152. if start is not None:
  153. params["start"] = float_to_string(start)
  154. if end is not None:
  155. params["end"] = float_to_string(end)
  156. return self.http.get_gen("stream/intervals", params, retjson = True)
  157. def stream_extract(self, path, start = None, end = None, count = False):
  158. """
  159. Extract data from a stream. Returns a generator that yields
  160. lines of ASCII-formatted data that matches the database
  161. layout for the given path.
  162. Specify count=True to just get a count of values rather than
  163. the actual data.
  164. """
  165. params = {
  166. "path": path,
  167. }
  168. if start is not None:
  169. params["start"] = float_to_string(start)
  170. if end is not None:
  171. params["end"] = float_to_string(end)
  172. if count:
  173. params["count"] = 1
  174. return self.http.get_gen("stream/extract", params, retjson = False)