You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

263 lines
9.8 KiB

  1. # -*- coding: utf-8 -*-
  2. """Provide a NumpyClient class that is based on normal Client, but has
  3. additional methods for extracting and inserting data via Numpy arrays."""
  4. import nilmdb.utils
  5. import nilmdb.client.client
  6. import nilmdb.client.httpclient
  7. from nilmdb.client.errors import ClientError
  8. import contextlib
  9. from nilmdb.utils.time import timestamp_to_string, string_to_timestamp
  10. import numpy
  11. import io
  12. def layout_to_dtype(layout):
  13. ltype = layout.split('_')[0]
  14. lcount = int(layout.split('_')[1])
  15. if ltype.startswith('int'):
  16. atype = '<i' + str(int(ltype[3:]) // 8)
  17. elif ltype.startswith('uint'):
  18. atype = '<u' + str(int(ltype[4:]) // 8)
  19. elif ltype.startswith('float'):
  20. atype = '<f' + str(int(ltype[5:]) // 8)
  21. else:
  22. raise ValueError("bad layout")
  23. if lcount == 1:
  24. dtype = [('timestamp', '<i8'), ('data', atype)]
  25. else:
  26. dtype = [('timestamp', '<i8'), ('data', atype, lcount)]
  27. return numpy.dtype(dtype)
  28. class NumpyClient(nilmdb.client.client.Client):
  29. """Subclass of nilmdb.client.Client that adds additional methods for
  30. extracting and inserting data via Numpy arrays."""
  31. def _get_dtype(self, path, layout):
  32. if layout is None:
  33. streams = self.stream_list(path)
  34. if len(streams) != 1:
  35. raise ClientError("can't get layout for path: " + path)
  36. layout = streams[0][1]
  37. return layout_to_dtype(layout)
  38. def stream_extract_numpy(self, path, start = None, end = None,
  39. layout = None, maxrows = 100000,
  40. structured = False):
  41. """
  42. Extract data from a stream. Returns a generator that yields
  43. Numpy arrays of up to 'maxrows' of data each.
  44. If 'layout' is None, it is read using stream_info.
  45. If 'structured' is False, all data is converted to float64
  46. and returned in a flat 2D array. Otherwise, data is returned
  47. as a structured dtype in a 1D array.
  48. """
  49. dtype = self._get_dtype(path, layout)
  50. def to_numpy(data):
  51. a = numpy.fromstring(data, dtype)
  52. if structured:
  53. return a
  54. return numpy.c_[a['timestamp'], a['data']]
  55. chunks = []
  56. total_len = 0
  57. maxsize = dtype.itemsize * maxrows
  58. for data in self.stream_extract(path, start, end, binary = True):
  59. # Add this block of binary data
  60. chunks.append(data)
  61. total_len += len(data)
  62. # See if we have enough to make the requested Numpy array
  63. while total_len >= maxsize:
  64. assembled = b"".join(chunks)
  65. total_len -= maxsize
  66. chunks = [ assembled[maxsize:] ]
  67. block = assembled[:maxsize]
  68. yield to_numpy(block)
  69. if total_len:
  70. yield to_numpy(b"".join(chunks))
  71. @contextlib.contextmanager
  72. def stream_insert_numpy_context(self, path, start = None, end = None,
  73. layout = None):
  74. """Return a context manager that allows data to be efficiently
  75. inserted into a stream in a piecewise manner. Data is
  76. provided as Numpy arrays, and is aggregated and sent to the
  77. server in larger or smaller chunks as necessary. Data format
  78. must match the database layout for the given path.
  79. For more details, see help for
  80. nilmdb.client.numpyclient.StreamInserterNumpy
  81. If 'layout' is not None, use it as the layout rather than
  82. querying the database.
  83. """
  84. dtype = self._get_dtype(path, layout)
  85. ctx = StreamInserterNumpy(self, path, start, end, dtype)
  86. yield ctx
  87. ctx.finalize()
  88. ctx.destroy()
  89. def stream_insert_numpy(self, path, data, start = None, end = None,
  90. layout = None):
  91. """Insert data into a stream. data should be a Numpy array
  92. which will be passed through stream_insert_numpy_context to
  93. break it into chunks etc. See the help for that function
  94. for details."""
  95. with self.stream_insert_numpy_context(path, start, end, layout) as ctx:
  96. if isinstance(data, numpy.ndarray):
  97. ctx.insert(data)
  98. else:
  99. for chunk in data:
  100. ctx.insert(chunk)
  101. return ctx.last_response
  102. class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
  103. """Object returned by stream_insert_numpy_context() that manages
  104. the insertion of rows of data into a particular path.
  105. See help for nilmdb.client.client.StreamInserter for details.
  106. The only difference is that, instead of ASCII formatted data,
  107. this context manager can take Numpy arrays, which are either
  108. structured (1D with complex dtype) or flat (2D with simple dtype).
  109. """
  110. # Soft limit of how many bytes to send per HTTP request.
  111. _max_data = 2 * 1024 * 1024
  112. def __init__(self, client, path, start, end, dtype):
  113. """
  114. 'client' is the client object. 'path' is the database path
  115. to insert to. 'start' and 'end' are used for the first
  116. contiguous interval and may be None. 'dtype' is the Numpy
  117. dtype for this stream.
  118. """
  119. super(StreamInserterNumpy, self).__init__(client, path, start, end)
  120. self._dtype = dtype
  121. # Max rows to send at once
  122. self._max_rows = self._max_data // self._dtype.itemsize
  123. # List of the current arrays we're building up to send
  124. self._block_arrays = []
  125. self._block_rows = 0
  126. def insert(self, array):
  127. """Insert Numpy data, which must match the layout type."""
  128. if type(array) != numpy.ndarray:
  129. array = numpy.array(array)
  130. if array.ndim == 1:
  131. # Already a structured array; just verify the type
  132. if array.dtype != self._dtype:
  133. raise ValueError("wrong dtype for 1D (structured) array")
  134. elif array.ndim == 2:
  135. # Convert to structured array
  136. sarray = numpy.zeros(array.shape[0], dtype=self._dtype)
  137. try:
  138. sarray['timestamp'] = array[:,0]
  139. # Need the squeeze in case sarray['data'] is 1 dimensional
  140. sarray['data'] = numpy.squeeze(array[:,1:])
  141. except (IndexError, ValueError):
  142. raise ValueError("wrong number of fields for this data type")
  143. array = sarray
  144. else:
  145. raise ValueError("wrong number of dimensions in array")
  146. length = len(array)
  147. maxrows = self._max_rows
  148. if length == 0:
  149. return
  150. if length > maxrows:
  151. # This is more than twice what we wanted to send, so split
  152. # it up. This is a bit inefficient, but the user really
  153. # shouldn't be providing this much data at once.
  154. for cut in range(0, length, maxrows):
  155. self.insert(array[cut:(cut + maxrows)])
  156. return
  157. # Add this array to our list
  158. self._block_arrays.append(array)
  159. self._block_rows += length
  160. # Send if it's too long
  161. if self._block_rows >= maxrows:
  162. self._send_block(final = False)
  163. def _send_block(self, final = False):
  164. """Send the data current stored up. One row might be left
  165. over if we need its timestamp saved."""
  166. # Build the full array to send
  167. if self._block_rows == 0:
  168. array = numpy.zeros(0, dtype = self._dtype)
  169. else:
  170. array = numpy.hstack(self._block_arrays)
  171. # Get starting timestamp
  172. start_ts = self._interval_start
  173. if start_ts is None:
  174. # Pull start from the first row
  175. try:
  176. start_ts = array['timestamp'][0]
  177. except IndexError:
  178. pass # no timestamp is OK, if we have no data
  179. # Get ending timestamp
  180. if final:
  181. # For a final block, the timestamp is either the
  182. # user-provided end, or the timestamp of the last line
  183. # plus epsilon.
  184. end_ts = self._interval_end
  185. if end_ts is None:
  186. try:
  187. end_ts = array['timestamp'][-1]
  188. end_ts += nilmdb.utils.time.epsilon
  189. except IndexError:
  190. pass # no timestamp is OK, if we have no data
  191. self._block_arrays = []
  192. self._block_rows = 0
  193. # Next block is completely fresh
  194. self._interval_start = None
  195. self._interval_end = None
  196. else:
  197. # An intermediate block. We need to save the last row
  198. # for the next block, and use its timestamp as the ending
  199. # timestamp for this one.
  200. if len(array) < 2:
  201. # Not enough data to send an intermediate block
  202. return
  203. end_ts = array['timestamp'][-1]
  204. if self._interval_end is not None and end_ts > self._interval_end:
  205. # User gave us bad endpoints; send it anyway, and let
  206. # the server complain so that the error is the same
  207. # as if we hadn't done this chunking.
  208. end_ts = self._interval_end
  209. self._block_arrays = [ array[-1:] ]
  210. self._block_rows = 1
  211. array = array[:-1]
  212. # Next block continues where this one ended
  213. self._interval_start = end_ts
  214. # If we have no endpoints, or equal endpoints, it's OK as long
  215. # as there's no data to send
  216. if (start_ts is None or end_ts is None) or (start_ts == end_ts):
  217. if len(array) == 0:
  218. return
  219. raise ClientError("have data to send, but invalid start/end times")
  220. # Send it
  221. data = array.tostring()
  222. self.last_response = self._client.stream_insert_block(
  223. self._path, data, start_ts, end_ts, binary = True)
  224. return