Compare commits
	
		
			15 Commits
		
	
	
		
			nilmdb-1.4
			...
			nilmdb-1.5
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| d36ece3767 | |||
| 231963538e | |||
| b4d6aad6de | |||
| e95142eabf | |||
| d21c3470bc | |||
| 7576883f49 | |||
| cc211542f8 | |||
| 8292dcf70b | |||
| b362fd37f6 | |||
| 41ec13ee17 | |||
| efa9aa9097 | |||
| d9afb48f45 | |||
| d1140e0f16 | |||
| 6091e44561 | |||
| e233ba790f | 
| @@ -10,6 +10,9 @@ Prerequisites: | |||||||
|   sudo apt-get install python-cherrypy3 python-decorator python-simplejson |   sudo apt-get install python-cherrypy3 python-decorator python-simplejson | ||||||
|   sudo apt-get install python-requests python-dateutil python-tz python-psutil |   sudo apt-get install python-requests python-dateutil python-tz python-psutil | ||||||
|  |  | ||||||
|  |   # Other dependencies (required by some modules) | ||||||
|  |   sudo apt-get install python-numpy | ||||||
|  |  | ||||||
|   # Tools for running tests |   # Tools for running tests | ||||||
|   sudo apt-get install python-nose python-coverage |   sudo apt-get install python-nose python-coverage | ||||||
|  |  | ||||||
|   | |||||||
| @@ -389,3 +389,35 @@ Possible solutions: | |||||||
|     are always printed as int64 values, and a new format |     are always printed as int64 values, and a new format | ||||||
|     "@1234567890123456" is added to the parser for specifying them |     "@1234567890123456" is added to the parser for specifying them | ||||||
|     exactly. |     exactly. | ||||||
|  |  | ||||||
|  | Binary interface | ||||||
|  | ---------------- | ||||||
|  |  | ||||||
|  | The ASCII interface is too slow for high-bandwidth processing, like | ||||||
|  | sinefits, prep, etc.  A binary interface was added so that you can | ||||||
|  | extract the raw binary out of the bulkdata storage.  This binary is | ||||||
|  | a little-endian format, e.g. in C a uint16_6 stream would be: | ||||||
|  |  | ||||||
|  |     #include <endian.h> | ||||||
|  |     #include <stdint.h> | ||||||
|  |     struct { | ||||||
|  |         int64_t timestamp_le; | ||||||
|  |         uint16_t data_le[6]; | ||||||
|  |     } __attribute__((packed)); | ||||||
|  |  | ||||||
|  | Remember to byteswap (with e.g. `letoh` in C)! | ||||||
|  |  | ||||||
|  | This interface is used by the new `nilmdb.client.numpyclient.NumpyClient` | ||||||
|  | class, which is a subclass of the normal `nilmcb.client.client.Client` | ||||||
|  | and has all of the same functions.  It adds three new functions: | ||||||
|  |  | ||||||
|  | - `stream_extract_numpy` to extract data as a Numpy array | ||||||
|  |  | ||||||
|  | - `stream_insert_numpy` to insert data as a Numpy array | ||||||
|  |  | ||||||
|  | - `stream_insert_numpy_context` is the context manager for | ||||||
|  |   incrementally inserting data | ||||||
|  |  | ||||||
|  | It is significantly faster!  It is about 20 times faster to decimate a | ||||||
|  | stream with `nilm-decimate` when the filter code is using the new | ||||||
|  | binary/numpy interface. | ||||||
|   | |||||||
| @@ -127,10 +127,11 @@ class Client(object): | |||||||
|     @contextlib.contextmanager |     @contextlib.contextmanager | ||||||
|     def stream_insert_context(self, path, start = None, end = None): |     def stream_insert_context(self, path, start = None, end = None): | ||||||
|         """Return a context manager that allows data to be efficiently |         """Return a context manager that allows data to be efficiently | ||||||
|         inserted into a stream in a piecewise manner.  Data is be provided |         inserted into a stream in a piecewise manner.  Data is | ||||||
|         as single lines, and is aggregated and sent to the server in larger |         provided as ASCII lines, and is aggregated and sent to the | ||||||
|         chunks as necessary.  Data lines must match the database layout for |         server in larger or smaller chunks as necessary.  Data lines | ||||||
|         the given path, and end with a newline. |         must match the database layout for the given path, and end | ||||||
|  |         with a newline. | ||||||
|  |  | ||||||
|         Example: |         Example: | ||||||
|           with client.stream_insert_context('/path', start, end) as ctx: |           with client.stream_insert_context('/path', start, end) as ctx: | ||||||
| @@ -142,15 +143,16 @@ class Client(object): | |||||||
|         This may make multiple requests to the server, if the data is |         This may make multiple requests to the server, if the data is | ||||||
|         large enough or enough time has passed between insertions. |         large enough or enough time has passed between insertions. | ||||||
|         """ |         """ | ||||||
|         ctx = StreamInserter(self.http, path, start, end) |         ctx = StreamInserter(self, path, start, end) | ||||||
|         yield ctx |         yield ctx | ||||||
|         ctx.finalize() |         ctx.finalize() | ||||||
|  |  | ||||||
|     def stream_insert(self, path, data, start = None, end = None): |     def stream_insert(self, path, data, start = None, end = None): | ||||||
|         """Insert rows of data into a stream.  data should be a string |         """Insert rows of data into a stream.  data should be a string | ||||||
|         or iterable that provides ASCII data that matches the database |         or iterable that provides ASCII data that matches the database | ||||||
|         layout for path.  See stream_insert_context for details on the |         layout for path.  Data is passed through stream_insert_context, | ||||||
|         'start' and 'end' parameters.""" |         so it will be broken into reasonably-sized chunks and | ||||||
|  |         start/end will be deduced if missing.""" | ||||||
|         with self.stream_insert_context(path, start, end) as ctx: |         with self.stream_insert_context(path, start, end) as ctx: | ||||||
|             if isinstance(data, basestring): |             if isinstance(data, basestring): | ||||||
|                 ctx.insert(data) |                 ctx.insert(data) | ||||||
| @@ -159,11 +161,28 @@ class Client(object): | |||||||
|                     ctx.insert(chunk) |                     ctx.insert(chunk) | ||||||
|         return ctx.last_response |         return ctx.last_response | ||||||
|  |  | ||||||
|  |     def stream_insert_block(self, path, data, start, end, binary = False): | ||||||
|  |         """Insert a single fixed block of data into the stream.  It is | ||||||
|  |         sent directly to the server in one block with no further | ||||||
|  |         processing. | ||||||
|  |  | ||||||
|  |         If 'binary' is True, provide raw binary data in little-endian | ||||||
|  |         format matching the path layout, including an int64 timestamp. | ||||||
|  |         Otherwise, provide ASCII data matching the layout.""" | ||||||
|  |         params = { | ||||||
|  |             "path": path, | ||||||
|  |             "start": timestamp_to_string(start), | ||||||
|  |             "end": timestamp_to_string(end), | ||||||
|  |         } | ||||||
|  |         if binary: | ||||||
|  |             params["binary"] = 1 | ||||||
|  |         return self.http.put("stream/insert", data, params, binary = binary) | ||||||
|  |  | ||||||
|     def stream_intervals(self, path, start = None, end = None, diffpath = None): |     def stream_intervals(self, path, start = None, end = None, diffpath = None): | ||||||
|         """ |         """ | ||||||
|         Return a generator that yields each stream interval. |         Return a generator that yields each stream interval. | ||||||
|  |  | ||||||
|         If diffpath is not None, yields only interval ranges that are |         If 'diffpath' is not None, yields only interval ranges that are | ||||||
|         present in 'path' but not in 'diffpath'. |         present in 'path' but not in 'diffpath'. | ||||||
|         """ |         """ | ||||||
|         params = { |         params = { | ||||||
| @@ -184,16 +203,16 @@ class Client(object): | |||||||
|         lines of ASCII-formatted data that matches the database |         lines of ASCII-formatted data that matches the database | ||||||
|         layout for the given path. |         layout for the given path. | ||||||
|  |  | ||||||
|         Specify count = True to return a count of matching data points |         If 'count' is True, return a count of matching data points | ||||||
|         rather than the actual data.  The output format is unchanged. |         rather than the actual data.  The output format is unchanged. | ||||||
|  |  | ||||||
|         Specify markup = True to include comments in the returned data |         If 'markup' is True, include comments in the returned data | ||||||
|         that indicate interval starts and ends. |         that indicate interval starts and ends. | ||||||
|  |  | ||||||
|         Specify binary = True to return chunks of raw binary data, |         If 'binary' is True, return chunks of raw binary data, rather | ||||||
|         rather than lines of ASCII-formatted data.  Raw binary data |         than lines of ASCII-formatted data.  Raw binary data is | ||||||
|         is always little-endian and matches the database types |         little-endian and matches the database types (including an | ||||||
|         (including a uint64 timestamp). |         int64 timestamp). | ||||||
|         """ |         """ | ||||||
|         params = { |         params = { | ||||||
|             "path": path, |             "path": path, | ||||||
| @@ -257,13 +276,13 @@ class StreamInserter(object): | |||||||
|     _max_data = 2 * 1024 * 1024 |     _max_data = 2 * 1024 * 1024 | ||||||
|     _max_data_after_send = 64 * 1024 |     _max_data_after_send = 64 * 1024 | ||||||
|  |  | ||||||
|     def __init__(self, http, path, start = None, end = None): |     def __init__(self, client, path, start, end): | ||||||
|         """'http' is the httpclient object.  'path' is the database |         """'client' is the client object.  'path' is the database | ||||||
|         path to insert to.  'start' and 'end' are used for the first |         path to insert to.  'start' and 'end' are used for the first | ||||||
|         contiguous interval.""" |         contiguous interval and may be None.""" | ||||||
|         self.last_response = None |         self.last_response = None | ||||||
|  |  | ||||||
|         self._http = http |         self._client = client | ||||||
|         self._path = path |         self._path = path | ||||||
|  |  | ||||||
|         # Start and end for the overall contiguous interval we're |         # Start and end for the overall contiguous interval we're | ||||||
| @@ -431,9 +450,7 @@ class StreamInserter(object): | |||||||
|             raise ClientError("have data to send, but no start/end times") |             raise ClientError("have data to send, but no start/end times") | ||||||
|  |  | ||||||
|         # Send it |         # Send it | ||||||
|         params = { "path": self._path, |         self.last_response = self._client.stream_insert_block( | ||||||
|                    "start": timestamp_to_string(start_ts), |             self._path, block, start_ts, end_ts, binary = False) | ||||||
|                    "end": timestamp_to_string(end_ts) } |  | ||||||
|         self.last_response = self._http.put("stream/insert", block, params) |  | ||||||
|  |  | ||||||
|         return |         return | ||||||
|   | |||||||
| @@ -105,9 +105,13 @@ class HTTPClient(object): | |||||||
|         else: |         else: | ||||||
|             return self._req("POST", url, None, params) |             return self._req("POST", url, None, params) | ||||||
|  |  | ||||||
|     def put(self, url, data, params = None): |     def put(self, url, data, params = None, binary = False): | ||||||
|         """Simple PUT (parameters in URL, data in body)""" |         """Simple PUT (parameters in URL, data in body)""" | ||||||
|         return self._req("PUT", url, params, data) |         if binary: | ||||||
|  |             h = { 'Content-type': 'application/octet-stream' } | ||||||
|  |         else: | ||||||
|  |             h = { 'Content-type': 'text/plain; charset=utf-8' } | ||||||
|  |         return self._req("PUT", url, query = params, body = data, headers = h) | ||||||
|  |  | ||||||
|     # Generator versions that return data one line at a time. |     # Generator versions that return data one line at a time. | ||||||
|     def _req_gen(self, method, url, query = None, body = None, |     def _req_gen(self, method, url, query = None, body = None, | ||||||
|   | |||||||
| @@ -31,6 +31,14 @@ class NumpyClient(nilmdb.client.client.Client): | |||||||
|     """Subclass of nilmdb.client.Client that adds additional methods for |     """Subclass of nilmdb.client.Client that adds additional methods for | ||||||
|     extracting and inserting data via Numpy arrays.""" |     extracting and inserting data via Numpy arrays.""" | ||||||
|  |  | ||||||
|  |     def _get_dtype(self, path, layout): | ||||||
|  |         if layout is None: | ||||||
|  |             streams = self.stream_list(path) | ||||||
|  |             if len(streams) != 1: | ||||||
|  |                 raise ClientError("can't get layout for path: " + path) | ||||||
|  |             layout = streams[0][1] | ||||||
|  |         return layout_to_dtype(layout) | ||||||
|  |  | ||||||
|     def stream_extract_numpy(self, path, start = None, end = None, |     def stream_extract_numpy(self, path, start = None, end = None, | ||||||
|                              layout = None, maxrows = 100000, |                              layout = None, maxrows = 100000, | ||||||
|                              structured = False): |                              structured = False): | ||||||
| @@ -44,12 +52,7 @@ class NumpyClient(nilmdb.client.client.Client): | |||||||
|         and returned in a flat 2D array.  Otherwise, data is returned |         and returned in a flat 2D array.  Otherwise, data is returned | ||||||
|         as a structured dtype in a 1D array. |         as a structured dtype in a 1D array. | ||||||
|         """ |         """ | ||||||
|         if layout is None: |         dtype = self._get_dtype(path, layout) | ||||||
|             streams = self.stream_list(path) |  | ||||||
|             if len(streams) != 1: |  | ||||||
|                 raise ClientError("can't get layout for path: " + path) |  | ||||||
|             layout = streams[0][1] |  | ||||||
|         dtype = layout_to_dtype(layout) |  | ||||||
|  |  | ||||||
|         def to_numpy(data): |         def to_numpy(data): | ||||||
|             a = numpy.fromstring(data, dtype) |             a = numpy.fromstring(data, dtype) | ||||||
| @@ -75,3 +78,182 @@ class NumpyClient(nilmdb.client.client.Client): | |||||||
|  |  | ||||||
|         if total_len: |         if total_len: | ||||||
|             yield to_numpy("".join(chunks)) |             yield to_numpy("".join(chunks)) | ||||||
|  |  | ||||||
|  |     @contextlib.contextmanager | ||||||
|  |     def stream_insert_numpy_context(self, path, start = None, end = None, | ||||||
|  |                                     layout = None): | ||||||
|  |         """Return a context manager that allows data to be efficiently | ||||||
|  |         inserted into a stream in a piecewise manner.  Data is | ||||||
|  |         provided as Numpy arrays, and is aggregated and sent to the | ||||||
|  |         server in larger or smaller chunks as necessary.  Data format | ||||||
|  |         must match the database layout for the given path. | ||||||
|  |  | ||||||
|  |         For more details, see help for | ||||||
|  |         nilmdb.client.numpyclient.StreamInserterNumpy | ||||||
|  |  | ||||||
|  |         If 'layout' is not None, use it as the layout rather than | ||||||
|  |         querying the database. | ||||||
|  |         """ | ||||||
|  |         dtype = self._get_dtype(path, layout) | ||||||
|  |         ctx = StreamInserterNumpy(self, path, start, end, dtype) | ||||||
|  |         yield ctx | ||||||
|  |         ctx.finalize() | ||||||
|  |  | ||||||
|  |     def stream_insert_numpy(self, path, data, start = None, end = None, | ||||||
|  |                             layout = None): | ||||||
|  |         """Insert data into a stream.  data should be a Numpy array | ||||||
|  |         which will be passed through stream_insert_numpy_context to | ||||||
|  |         break it into chunks etc.  See the help for that function | ||||||
|  |         for details.""" | ||||||
|  |         with self.stream_insert_numpy_context(path, start, end, layout) as ctx: | ||||||
|  |             if isinstance(data, numpy.ndarray): | ||||||
|  |                 ctx.insert(data) | ||||||
|  |             else: | ||||||
|  |                 for chunk in data: | ||||||
|  |                     ctx.insert(chunk) | ||||||
|  |         return ctx.last_response | ||||||
|  |  | ||||||
|  | class StreamInserterNumpy(nilmdb.client.client.StreamInserter): | ||||||
|  |     """Object returned by stream_insert_numpy_context() that manages | ||||||
|  |     the insertion of rows of data into a particular path. | ||||||
|  |  | ||||||
|  |     See help for nilmdb.client.client.StreamInserter for details. | ||||||
|  |     The only difference is that, instead of ASCII formatted data, | ||||||
|  |     this context manager can take Numpy arrays, which are either | ||||||
|  |     structured (1D with complex dtype) or flat (2D with simple dtype). | ||||||
|  |     """ | ||||||
|  |  | ||||||
|  |     # Soft limit of how many bytes to send per HTTP request. | ||||||
|  |     _max_data = 2 * 1024 * 1024 | ||||||
|  |  | ||||||
|  |     def __init__(self, client, path, start, end, dtype): | ||||||
|  |         """ | ||||||
|  |         'client' is the client object.  'path' is the database path | ||||||
|  |         to insert to.  'start' and 'end' are used for the first | ||||||
|  |         contiguous interval and may be None.  'dtype' is the Numpy | ||||||
|  |         dtype for this stream. | ||||||
|  |         """ | ||||||
|  |         self.last_response = None | ||||||
|  |  | ||||||
|  |         self._dtype = dtype | ||||||
|  |         self._client = client | ||||||
|  |         self._path = path | ||||||
|  |  | ||||||
|  |         # Start and end for the overall contiguous interval we're | ||||||
|  |         # filling | ||||||
|  |         self._interval_start = start | ||||||
|  |         self._interval_end = end | ||||||
|  |  | ||||||
|  |         # Max rows to send at once | ||||||
|  |         self._max_rows = self._max_data // self._dtype.itemsize | ||||||
|  |  | ||||||
|  |         # List of the current arrays we're building up to send | ||||||
|  |         self._block_arrays = [] | ||||||
|  |         self._block_rows = 0 | ||||||
|  |  | ||||||
|  |     def insert(self, array): | ||||||
|  |         """Insert Numpy data, which must match the layout type.""" | ||||||
|  |         if type(array) != numpy.ndarray: | ||||||
|  |             array = numpy.array(array) | ||||||
|  |         if array.ndim == 1: | ||||||
|  |             # Already a structured array; just verify the type | ||||||
|  |             if array.dtype != self._dtype: | ||||||
|  |                 raise ValueError("wrong dtype for 1D (structured) array") | ||||||
|  |         elif array.ndim == 2: | ||||||
|  |             # Convert to structured array | ||||||
|  |             sarray = numpy.zeros(array.shape[0], dtype=self._dtype) | ||||||
|  |             sarray['timestamp'] = array[:,0] | ||||||
|  |             # Need the squeeze in case sarray['data'] is 1 dimensional | ||||||
|  |             sarray['data'] = numpy.squeeze(array[:,1:]) | ||||||
|  |             array = sarray | ||||||
|  |         else: | ||||||
|  |             raise ValueError("wrong number of dimensions in array") | ||||||
|  |  | ||||||
|  |         length = len(array) | ||||||
|  |         maxrows = self._max_rows | ||||||
|  |  | ||||||
|  |         if length == 0: | ||||||
|  |             return | ||||||
|  |         if length > maxrows: | ||||||
|  |             # This is more than twice what we wanted to send, so split | ||||||
|  |             # it up.  This is a bit inefficient, but the user really | ||||||
|  |             # shouldn't be providing this much data at once. | ||||||
|  |             for cut in range(0, length, maxrows): | ||||||
|  |                 self.insert(array[cut:(cut + maxrows)]) | ||||||
|  |             return | ||||||
|  |  | ||||||
|  |         # Add this array to our list | ||||||
|  |         self._block_arrays.append(array) | ||||||
|  |         self._block_rows += length | ||||||
|  |  | ||||||
|  |         # Send if it's too long | ||||||
|  |         if self._block_rows >= maxrows: | ||||||
|  |             self._send_block(final = False) | ||||||
|  |  | ||||||
|  |     def _send_block(self, final = False): | ||||||
|  |         """Send the data current stored up.  One row might be left | ||||||
|  |         over if we need its timestamp saved.""" | ||||||
|  |  | ||||||
|  |         # Build the full array to send | ||||||
|  |         if self._block_rows == 0: | ||||||
|  |             array = numpy.zeros(0, dtype = self._dtype) | ||||||
|  |         else: | ||||||
|  |             array = numpy.hstack(self._block_arrays) | ||||||
|  |  | ||||||
|  |         # Get starting timestamp | ||||||
|  |         start_ts = self._interval_start | ||||||
|  |         if start_ts is None: | ||||||
|  |             # Pull start from the first row | ||||||
|  |             try: | ||||||
|  |                 start_ts = array['timestamp'][0] | ||||||
|  |             except IndexError: | ||||||
|  |                 pass # no timestamp is OK, if we have no data | ||||||
|  |  | ||||||
|  |         # Get ending timestamp | ||||||
|  |         if final: | ||||||
|  |             # For a final block, the timestamp is either the | ||||||
|  |             # user-provided end, or the timestamp of the last line | ||||||
|  |             # plus epsilon. | ||||||
|  |             end_ts = self._interval_end | ||||||
|  |             if end_ts is None: | ||||||
|  |                 try: | ||||||
|  |                     end_ts = array['timestamp'][-1] | ||||||
|  |                     end_ts += nilmdb.utils.time.epsilon | ||||||
|  |                 except IndexError: | ||||||
|  |                     pass # no timestamp is OK, if we have no data | ||||||
|  |             self._block_arrays = [] | ||||||
|  |             self._block_rows = 0 | ||||||
|  |  | ||||||
|  |             # Next block is completely fresh | ||||||
|  |             self._interval_start = None | ||||||
|  |             self._interval_end = None | ||||||
|  |         else: | ||||||
|  |             # An intermediate block.  We need to save the last row | ||||||
|  |             # for the next block, and use its timestamp as the ending | ||||||
|  |             # timestamp for this one. | ||||||
|  |             if len(array) < 2: | ||||||
|  |                 # Not enough data to send an intermediate block | ||||||
|  |                 return | ||||||
|  |             end_ts = array['timestamp'][-1] | ||||||
|  |             if self._interval_end is not None and end_ts > self._interval_end: | ||||||
|  |                 # User gave us bad endpoints; send it anyway, and let | ||||||
|  |                 # the server complain so that the error is the same | ||||||
|  |                 # as if we hadn't done this chunking. | ||||||
|  |                 end_ts = self._interval_end | ||||||
|  |             self._block_arrays = [ array[-1:] ] | ||||||
|  |             self._block_rows = 1 | ||||||
|  |             array = array[:-1] | ||||||
|  |  | ||||||
|  |             # Next block continues where this one ended | ||||||
|  |             self._interval_start = end_ts | ||||||
|  |  | ||||||
|  |         # If we have no endpoints, it's because we had no data to send. | ||||||
|  |         if start_ts is None or end_ts is None: | ||||||
|  |             return | ||||||
|  |  | ||||||
|  |         # Send it | ||||||
|  |         data = array.tostring() | ||||||
|  |         self.last_response = self._client.stream_insert_block( | ||||||
|  |             self._path, data, start_ts, end_ts, binary = True) | ||||||
|  |  | ||||||
|  |         return | ||||||
|   | |||||||
| @@ -413,12 +413,16 @@ class Table(object): | |||||||
|         return rocket.Rocket(self.layout, |         return rocket.Rocket(self.layout, | ||||||
|                              os.path.join(self.root, subdir, filename)) |                              os.path.join(self.root, subdir, filename)) | ||||||
|  |  | ||||||
|     def append_string(self, data, start, end): |     def append_data(self, data, start, end, binary = False): | ||||||
|         """Parse the formatted string in 'data', according to the |         """Parse the formatted string in 'data', according to the | ||||||
|         current layout, and append it to the table.  If any timestamps |         current layout, and append it to the table.  If any timestamps | ||||||
|         are non-monotonic, or don't fall between 'start' and 'end', |         are non-monotonic, or don't fall between 'start' and 'end', | ||||||
|         a ValueError is raised. |         a ValueError is raised. | ||||||
|  |  | ||||||
|  |         If 'binary' is True, the data should be in raw binary format | ||||||
|  |         instead: little-endian, matching the current table's layout, | ||||||
|  |         including the int64 timestamp. | ||||||
|  |  | ||||||
|         If this function succeeds, it returns normally.  Otherwise, |         If this function succeeds, it returns normally.  Otherwise, | ||||||
|         the table is reverted back to its original state by truncating |         the table is reverted back to its original state by truncating | ||||||
|         or deleting files as necessary.""" |         or deleting files as necessary.""" | ||||||
| @@ -437,17 +441,26 @@ class Table(object): | |||||||
|                 # Ask the rocket object to parse and append up to "count" |                 # Ask the rocket object to parse and append up to "count" | ||||||
|                 # rows of data, verifying things along the way. |                 # rows of data, verifying things along the way. | ||||||
|                 try: |                 try: | ||||||
|  |                     if binary: | ||||||
|  |                         appender = f.append_binary | ||||||
|  |                     else: | ||||||
|  |                         appender = f.append_string | ||||||
|                     (added_rows, data_offset, last_timestamp, linenum |                     (added_rows, data_offset, last_timestamp, linenum | ||||||
|                      ) = f.append_string(count, data, data_offset, linenum, |                      ) = appender(count, data, data_offset, linenum, | ||||||
|                                          start, end, last_timestamp) |                                   start, end, last_timestamp) | ||||||
|                 except rocket.ParseError as e: |                 except rocket.ParseError as e: | ||||||
|                     (linenum, colnum, errtype, obj) = e.args |                     (linenum, colnum, errtype, obj) = e.args | ||||||
|                     where = "line %d, column %d: " % (linenum, colnum) |                     if binary: | ||||||
|  |                         where = "byte %d: " % (linenum) | ||||||
|  |                     else: | ||||||
|  |                         where = "line %d, column %d: " % (linenum, colnum) | ||||||
|                     # Extract out the error line, add column marker |                     # Extract out the error line, add column marker | ||||||
|                     try: |                     try: | ||||||
|  |                         if binary: | ||||||
|  |                             raise IndexError | ||||||
|                         bad = data.splitlines()[linenum-1] |                         bad = data.splitlines()[linenum-1] | ||||||
|                         badptr = ' ' * (colnum - 1) + '^' |                         bad += '\n' + ' ' * (colnum - 1) + '^' | ||||||
|                     except IndexError: # pragma: no cover |                     except IndexError: | ||||||
|                         bad = "" |                         bad = "" | ||||||
|                     if errtype == rocket.ERR_NON_MONOTONIC: |                     if errtype == rocket.ERR_NON_MONOTONIC: | ||||||
|                         err = "timestamp is not monotonically increasing" |                         err = "timestamp is not monotonically increasing" | ||||||
| @@ -463,7 +476,7 @@ class Table(object): | |||||||
|                     else: |                     else: | ||||||
|                         err = str(obj) |                         err = str(obj) | ||||||
|                     raise ValueError("error parsing input data: " + |                     raise ValueError("error parsing input data: " + | ||||||
|                                      where + err + "\n" + bad + "\n" + badptr) |                                      where + err + "\n" + bad) | ||||||
|                 tot_rows += added_rows |                 tot_rows += added_rows | ||||||
|         except Exception: |         except Exception: | ||||||
|             # Some failure, so try to roll things back by truncating or |             # Some failure, so try to roll things back by truncating or | ||||||
|   | |||||||
| @@ -475,12 +475,16 @@ class NilmDB(object): | |||||||
|             con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,)) |             con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,)) | ||||||
|             con.execute("DELETE FROM streams WHERE id=?", (stream_id,)) |             con.execute("DELETE FROM streams WHERE id=?", (stream_id,)) | ||||||
|  |  | ||||||
|     def stream_insert(self, path, start, end, data): |     def stream_insert(self, path, start, end, data, binary = False): | ||||||
|         """Insert new data into the database. |         """Insert new data into the database. | ||||||
|            path: Path at which to add the data |            path: Path at which to add the data | ||||||
|            start: Starting timestamp |            start: Starting timestamp | ||||||
|            end: Ending timestamp |            end: Ending timestamp | ||||||
|            data: Textual data, formatted according to the layout of path |            data: Textual data, formatted according to the layout of path | ||||||
|  |  | ||||||
|  |            'binary', if True, means that 'data' is raw binary: | ||||||
|  |            little-endian, matching the current table's layout, | ||||||
|  |            including the int64 timestamp. | ||||||
|            """ |            """ | ||||||
|         # First check for basic overlap using timestamp info given. |         # First check for basic overlap using timestamp info given. | ||||||
|         stream_id = self._stream_id(path) |         stream_id = self._stream_id(path) | ||||||
| @@ -494,7 +498,7 @@ class NilmDB(object): | |||||||
|         # there are any parse errors. |         # there are any parse errors. | ||||||
|         table = self.data.getnode(path) |         table = self.data.getnode(path) | ||||||
|         row_start = table.nrows |         row_start = table.nrows | ||||||
|         table.append_string(data, start, end) |         table.append_data(data, start, end, binary) | ||||||
|         row_end = table.nrows |         row_end = table.nrows | ||||||
|  |  | ||||||
|         # Insert the record into the sql database. |         # Insert the record into the sql database. | ||||||
|   | |||||||
| @@ -419,6 +419,68 @@ extra_data_on_line: | |||||||
| 			 ERR_OTHER, "extra data on line"); | 			 ERR_OTHER, "extra data on line"); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /**** | ||||||
|  |  * Append from binary data | ||||||
|  |  */ | ||||||
|  |  | ||||||
|  | /* .append_binary(count, data, offset, linenum, start, end, last_timestamp) */ | ||||||
|  | static PyObject *Rocket_append_binary(Rocket *self, PyObject *args) | ||||||
|  | { | ||||||
|  |         int count; | ||||||
|  | 	const uint8_t *data; | ||||||
|  |         int data_len; | ||||||
|  |         int linenum; | ||||||
|  | 	int offset; | ||||||
|  | 	timestamp_t start; | ||||||
|  | 	timestamp_t end; | ||||||
|  | 	timestamp_t last_timestamp; | ||||||
|  |  | ||||||
|  | 	if (!PyArg_ParseTuple(args, "it#iilll:append_binary", | ||||||
|  |                               &count, &data, &data_len, &offset, | ||||||
|  |                               &linenum, &start, &end, &last_timestamp)) | ||||||
|  | 		return NULL; | ||||||
|  |  | ||||||
|  |         /* Advance to offset */ | ||||||
|  |         if (offset > data_len) | ||||||
|  |                 return raise_str(0, 0, ERR_OTHER, "bad offset"); | ||||||
|  |         data += offset; | ||||||
|  |         data_len -= offset; | ||||||
|  |  | ||||||
|  |         /* Figure out max number of rows to insert */ | ||||||
|  |         int rows = data_len / self->binary_size; | ||||||
|  |         if (rows > count) | ||||||
|  |                 rows = count; | ||||||
|  |  | ||||||
|  |         /* Check timestamps */ | ||||||
|  |         timestamp_t ts; | ||||||
|  | 	int i; | ||||||
|  |         for (i = 0; i < rows; i++) { | ||||||
|  |                 /* Read raw timestamp, byteswap if needed */ | ||||||
|  |                 memcpy(&ts, &data[i * self->binary_size], 8); | ||||||
|  |                 ts = le64toh(ts); | ||||||
|  |  | ||||||
|  |                 /* Check limits */ | ||||||
|  |                 if (ts <= last_timestamp) | ||||||
|  |                         return raise_int(i, 0, ERR_NON_MONOTONIC, ts); | ||||||
|  |                 last_timestamp = ts; | ||||||
|  |                 if (ts < start || ts >= end) | ||||||
|  |                         return raise_int(i, 0, ERR_OUT_OF_INTERVAL, ts); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         /* Write binary data */ | ||||||
|  |         if (fwrite(data, data_len, 1, self->file) != 1) { | ||||||
|  |                 PyErr_SetFromErrno(PyExc_OSError); | ||||||
|  |                 return NULL; | ||||||
|  |         } | ||||||
|  | 	fflush(self->file); | ||||||
|  |  | ||||||
|  | 	/* Build return value and return */ | ||||||
|  | 	PyObject *o; | ||||||
|  | 	o = Py_BuildValue("(iili)", rows, offset + rows * self->binary_size, | ||||||
|  |                           last_timestamp, linenum); | ||||||
|  | 	return o; | ||||||
|  | } | ||||||
|  |  | ||||||
| /**** | /**** | ||||||
|  * Extract to string |  * Extract to string | ||||||
|  */ |  */ | ||||||
| @@ -484,7 +546,7 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args) | |||||||
| 			/* read and format in a loop */			\ | 			/* read and format in a loop */			\ | ||||||
| 			for (i = 0; i < self->layout_count; i++) {	\ | 			for (i = 0; i < self->layout_count; i++) {	\ | ||||||
| 				if (fread(&disktype, bytes,		\ | 				if (fread(&disktype, bytes,		\ | ||||||
| 					  1, self->file) < 0)		\ | 					  1, self->file) != 1)		\ | ||||||
| 					goto err;			\ | 					goto err;			\ | ||||||
| 				disktype = letoh(disktype);		\ | 				disktype = letoh(disktype);		\ | ||||||
| 				ret = sprintf(&str[len], " " fmt,	\ | 				ret = sprintf(&str[len], " " fmt,	\ | ||||||
| @@ -611,11 +673,13 @@ static PyMemberDef Rocket_members[] = { | |||||||
| }; | }; | ||||||
|  |  | ||||||
| static PyMethodDef Rocket_methods[] = { | static PyMethodDef Rocket_methods[] = { | ||||||
| 	{ "close", (PyCFunction)Rocket_close, METH_NOARGS, | 	{ "close", | ||||||
|  |           (PyCFunction)Rocket_close, METH_NOARGS, | ||||||
| 	  "close(self)\n\n" | 	  "close(self)\n\n" | ||||||
| 	  "Close file handle" }, | 	  "Close file handle" }, | ||||||
|  |  | ||||||
| 	{ "append_string", (PyCFunction)Rocket_append_string, METH_VARARGS, | 	{ "append_string", | ||||||
|  |           (PyCFunction)Rocket_append_string, METH_VARARGS, | ||||||
| 	  "append_string(self, count, data, offset, line, start, end, ts)\n\n" | 	  "append_string(self, count, data, offset, line, start, end, ts)\n\n" | ||||||
|           "Parse string and append data.\n" |           "Parse string and append data.\n" | ||||||
| 	  "\n" | 	  "\n" | ||||||
| @@ -630,12 +694,36 @@ static PyMethodDef Rocket_methods[] = { | |||||||
| 	  "Raises ParseError if timestamps are non-monotonic, outside\n" | 	  "Raises ParseError if timestamps are non-monotonic, outside\n" | ||||||
| 	  "the start/end interval etc.\n" | 	  "the start/end interval etc.\n" | ||||||
| 	  "\n" | 	  "\n" | ||||||
|           "On success, return a tuple with three values:\n" |           "On success, return a tuple:\n" | ||||||
|           "  added_rows: how many rows were added from the file\n" |           "  added_rows: how many rows were added from the file\n" | ||||||
|           "  data_offset: current offset into the data string\n" |           "  data_offset: current offset into the data string\n" | ||||||
|           "  last_timestamp: last timestamp we parsed" }, |           "  last_timestamp: last timestamp we parsed\n" | ||||||
|  |           "  linenum: current line number" }, | ||||||
|  |  | ||||||
| 	{ "extract_string", (PyCFunction)Rocket_extract_string, METH_VARARGS, | 	{ "append_binary", | ||||||
|  | 	  (PyCFunction)Rocket_append_binary, METH_VARARGS, | ||||||
|  | 	  "append_binary(self, count, data, offset, line, start, end, ts)\n\n" | ||||||
|  |           "Append binary data, which must match the data layout.\n" | ||||||
|  | 	  "\n" | ||||||
|  | 	  "  count: maximum number of rows to add\n" | ||||||
|  |           "  data: binary data\n" | ||||||
|  |           "  offset: byte offset into data to start adding\n" | ||||||
|  |           "  line: current line number (unused)\n" | ||||||
|  |           "  start: starting timestamp for interval\n" | ||||||
|  |           "  end: end timestamp for interval\n" | ||||||
|  |           "  ts: last timestamp that was previously parsed\n" | ||||||
|  | 	  "\n" | ||||||
|  | 	  "Raises ParseError if timestamps are non-monotonic, outside\n" | ||||||
|  | 	  "the start/end interval etc.\n" | ||||||
|  | 	  "\n" | ||||||
|  |           "On success, return a tuple:\n" | ||||||
|  |           "  added_rows: how many rows were added from the file\n" | ||||||
|  |           "  data_offset: current offset into the data string\n" | ||||||
|  |           "  last_timestamp: last timestamp we parsed\n" | ||||||
|  |           "  linenum: current line number (copied from argument)" }, | ||||||
|  |  | ||||||
|  | 	{ "extract_string", | ||||||
|  |           (PyCFunction)Rocket_extract_string, METH_VARARGS, | ||||||
| 	  "extract_string(self, offset, count)\n\n" | 	  "extract_string(self, offset, count)\n\n" | ||||||
| 	  "Extract count rows of data from the file at offset offset.\n" | 	  "Extract count rows of data from the file at offset offset.\n" | ||||||
| 	  "Return an ascii formatted string according to the layout" }, | 	  "Return an ascii formatted string according to the layout" }, | ||||||
|   | |||||||
| @@ -305,10 +305,15 @@ class Stream(NilmApp): | |||||||
|     @cherrypy.tools.json_out() |     @cherrypy.tools.json_out() | ||||||
|     @exception_to_httperror(NilmDBError, ValueError) |     @exception_to_httperror(NilmDBError, ValueError) | ||||||
|     @cherrypy.tools.CORS_allow(methods = ["PUT"]) |     @cherrypy.tools.CORS_allow(methods = ["PUT"]) | ||||||
|     def insert(self, path, start, end): |     def insert(self, path, start, end, binary = False): | ||||||
|         """ |         """ | ||||||
|         Insert new data into the database.  Provide textual data |         Insert new data into the database.  Provide textual data | ||||||
|         (matching the path's layout) as a HTTP PUT. |         (matching the path's layout) as a HTTP PUT. | ||||||
|  |  | ||||||
|  |         If 'binary' is True, expect raw binary data, rather than lines | ||||||
|  |         of ASCII-formatted data.  Raw binary data is always | ||||||
|  |         little-endian and matches the database types (including an | ||||||
|  |         int64 timestamp). | ||||||
|         """ |         """ | ||||||
|         # Important that we always read the input before throwing any |         # Important that we always read the input before throwing any | ||||||
|         # errors, to keep lengths happy for persistent connections. |         # errors, to keep lengths happy for persistent connections. | ||||||
| @@ -316,6 +321,14 @@ class Stream(NilmApp): | |||||||
|         # requests, if we ever want to handle those (issue #1134) |         # requests, if we ever want to handle those (issue #1134) | ||||||
|         body = cherrypy.request.body.read() |         body = cherrypy.request.body.read() | ||||||
|  |  | ||||||
|  |         # Verify content type for binary data | ||||||
|  |         content_type = cherrypy.request.headers.get('content-type') | ||||||
|  |         if binary and content_type: | ||||||
|  |             if content_type != "application/octet-stream": | ||||||
|  |                 raise cherrypy.HTTPError("400", "Content type must be " | ||||||
|  |                                          "application/octet-stream for " | ||||||
|  |                                          "binary data, not " + content_type) | ||||||
|  |  | ||||||
|         # Check path and get layout |         # Check path and get layout | ||||||
|         if len(self.db.stream_list(path = path)) != 1: |         if len(self.db.stream_list(path = path)) != 1: | ||||||
|             raise cherrypy.HTTPError("404", "No such stream: " + path) |             raise cherrypy.HTTPError("404", "No such stream: " + path) | ||||||
| @@ -325,7 +338,7 @@ class Stream(NilmApp): | |||||||
|  |  | ||||||
|         # Pass the data directly to nilmdb, which will parse it and |         # Pass the data directly to nilmdb, which will parse it and | ||||||
|         # raise a ValueError if there are any problems. |         # raise a ValueError if there are any problems. | ||||||
|         self.db.stream_insert(path, start, end, body) |         self.db.stream_insert(path, start, end, body, binary) | ||||||
|  |  | ||||||
|         # Done |         # Done | ||||||
|         return |         return | ||||||
| @@ -398,7 +411,6 @@ class Stream(NilmApp): | |||||||
|     # /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0 |     # /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0 | ||||||
|     @cherrypy.expose |     @cherrypy.expose | ||||||
|     @chunked_response |     @chunked_response | ||||||
|     @response_type("text/plain") |  | ||||||
|     def extract(self, path, start = None, end = None, |     def extract(self, path, start = None, end = None, | ||||||
|                 count = False, markup = False, binary = False): |                 count = False, markup = False, binary = False): | ||||||
|         """ |         """ | ||||||
| @@ -414,8 +426,8 @@ class Stream(NilmApp): | |||||||
|  |  | ||||||
|         If 'binary' is True, return raw binary data, rather than lines |         If 'binary' is True, return raw binary data, rather than lines | ||||||
|         of ASCII-formatted data.  Raw binary data is always |         of ASCII-formatted data.  Raw binary data is always | ||||||
|         little-endian and matches the database types (including a |         little-endian and matches the database types (including an | ||||||
|         uint64 timestamp). |         int64 timestamp). | ||||||
|         """ |         """ | ||||||
|         (start, end) = self._get_times(start, end) |         (start, end) = self._get_times(start, end) | ||||||
|  |  | ||||||
| @@ -424,11 +436,13 @@ class Stream(NilmApp): | |||||||
|             raise cherrypy.HTTPError("404", "No such stream: " + path) |             raise cherrypy.HTTPError("404", "No such stream: " + path) | ||||||
|  |  | ||||||
|         if binary: |         if binary: | ||||||
|             cherrypy.response.headers['Content-Type'] = ( |             content_type = "application/octet-stream" | ||||||
|                 "application/octet-stream") |  | ||||||
|             if markup or count: |             if markup or count: | ||||||
|                 raise cherrypy.HTTPError("400", "can't mix binary and " |                 raise cherrypy.HTTPError("400", "can't mix binary and " | ||||||
|                                          "markup or count modes") |                                          "markup or count modes") | ||||||
|  |         else: | ||||||
|  |             content_type = "text/plain" | ||||||
|  |         cherrypy.response.headers['Content-Type'] = content_type | ||||||
|  |  | ||||||
|         @workaround_cp_bug_1200 |         @workaround_cp_bug_1200 | ||||||
|         def content(start, end): |         def content(start, end): | ||||||
|   | |||||||
							
								
								
									
										1
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								setup.py
									
									
									
									
									
								
							| @@ -114,7 +114,6 @@ setup(name='nilmdb', | |||||||
|       install_requires = [ 'decorator', |       install_requires = [ 'decorator', | ||||||
|                            'cherrypy >= 3.2', |                            'cherrypy >= 3.2', | ||||||
|                            'simplejson', |                            'simplejson', | ||||||
|                            'pycurl', |  | ||||||
|                            'python-dateutil', |                            'python-dateutil', | ||||||
|                            'pytz', |                            'pytz', | ||||||
|                            'psutil >= 0.3.0', |                            'psutil >= 0.3.0', | ||||||
|   | |||||||
| @@ -69,9 +69,9 @@ class TestBulkData(object): | |||||||
|         raw = [] |         raw = [] | ||||||
|         for i in range(1000): |         for i in range(1000): | ||||||
|             raw.append("%d 1 2 3 4 5 6 7 8\n" % (10000 + i)) |             raw.append("%d 1 2 3 4 5 6 7 8\n" % (10000 + i)) | ||||||
|         node.append_string("".join(raw[0:1]), 0, 50000) |         node.append_data("".join(raw[0:1]), 0, 50000) | ||||||
|         node.append_string("".join(raw[1:100]), 0, 50000) |         node.append_data("".join(raw[1:100]), 0, 50000) | ||||||
|         node.append_string("".join(raw[100:]), 0, 50000) |         node.append_data("".join(raw[100:]), 0, 50000) | ||||||
|  |  | ||||||
|         misc_slices = [ 0, 100, slice(None), slice(0), slice(10), |         misc_slices = [ 0, 100, slice(None), slice(0), slice(10), | ||||||
|                         slice(5,10), slice(3,None), slice(3,-3), |                         slice(5,10), slice(3,None), slice(3,-3), | ||||||
| @@ -85,7 +85,7 @@ class TestBulkData(object): | |||||||
|         # Extract misc slices while appending, to make sure the |         # Extract misc slices while appending, to make sure the | ||||||
|         # data isn't being added in the middle of the file |         # data isn't being added in the middle of the file | ||||||
|         for s in [2, slice(1,5), 2, slice(1,5)]: |         for s in [2, slice(1,5), 2, slice(1,5)]: | ||||||
|             node.append_string("0 0 0 0 0 0 0 0 0\n", 0, 50000) |             node.append_data("0 0 0 0 0 0 0 0 0\n", 0, 50000) | ||||||
|             raw.append("0 0 0 0 0 0 0 0 0\n") |             raw.append("0 0 0 0 0 0 0 0 0\n") | ||||||
|             eq_(get_node_slice(s), raw[s]) |             eq_(get_node_slice(s), raw[s]) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -239,6 +239,22 @@ class TestClient(object): | |||||||
|         in_("400 Bad Request", str(e.exception)) |         in_("400 Bad Request", str(e.exception)) | ||||||
|         in_("start must precede end", str(e.exception)) |         in_("start must precede end", str(e.exception)) | ||||||
|  |  | ||||||
|  |         # Good content type | ||||||
|  |         with assert_raises(ClientError) as e: | ||||||
|  |             client.http.put("stream/insert", "", | ||||||
|  |                             { "path": "xxxx", "start": 0, "end": 1, | ||||||
|  |                               "binary": 1 }, | ||||||
|  |                             binary = True) | ||||||
|  |         in_("No such stream", str(e.exception)) | ||||||
|  |  | ||||||
|  |         # Bad content type | ||||||
|  |         with assert_raises(ClientError) as e: | ||||||
|  |             client.http.put("stream/insert", "", | ||||||
|  |                             { "path": "xxxx", "start": 0, "end": 1, | ||||||
|  |                               "binary": 1 }, | ||||||
|  |                             binary = False) | ||||||
|  |         in_("Content type must be application/octet-stream", str(e.exception)) | ||||||
|  |  | ||||||
|         # Specify start/end (starts too late) |         # Specify start/end (starts too late) | ||||||
|         data = timestamper.TimestamperRate(testfile, start, 120) |         data = timestamper.TimestamperRate(testfile, start, 120) | ||||||
|         with assert_raises(ClientError) as e: |         with assert_raises(ClientError) as e: | ||||||
| @@ -383,6 +399,17 @@ class TestClient(object): | |||||||
|             raise AssertionError("/stream/extract is not text/plain:\n" + |             raise AssertionError("/stream/extract is not text/plain:\n" + | ||||||
|                                  headers()) |                                  headers()) | ||||||
|  |  | ||||||
|  |         x = http.get("stream/extract", | ||||||
|  |                             { "path": "/newton/prep", | ||||||
|  |                               "start": "123", | ||||||
|  |                               "end": "124", | ||||||
|  |                               "binary": "1" }) | ||||||
|  |         if "transfer-encoding: chunked" not in headers(): | ||||||
|  |             warnings.warn("Non-chunked HTTP response for /stream/extract") | ||||||
|  |         if "content-type: application/octet-stream" not in headers(): | ||||||
|  |             raise AssertionError("/stream/extract is not binary:\n" + | ||||||
|  |                                  headers()) | ||||||
|  |  | ||||||
|         client.close() |         client.close() | ||||||
|  |  | ||||||
|     def test_client_08_unicode(self): |     def test_client_08_unicode(self): | ||||||
| @@ -459,72 +486,75 @@ class TestClient(object): | |||||||
|             # override _max_data to trigger frequent server updates |             # override _max_data to trigger frequent server updates | ||||||
|             ctx._max_data = 15 |             ctx._max_data = 15 | ||||||
|  |  | ||||||
|             ctx.insert("100 1\n") |             ctx.insert("1000 1\n") | ||||||
|  |  | ||||||
|             ctx.insert("101 ") |             ctx.insert("1010 ") | ||||||
|             ctx.insert("1\n102 1") |             ctx.insert("1\n1020 1") | ||||||
|             ctx.insert("") |             ctx.insert("") | ||||||
|             ctx.insert("\n103 1\n") |             ctx.insert("\n1030 1\n") | ||||||
|  |  | ||||||
|             ctx.insert("104 1\n") |             ctx.insert("1040 1\n") | ||||||
|             ctx.insert("# hello\n") |             ctx.insert("# hello\n") | ||||||
|             ctx.insert("   # hello\n") |             ctx.insert("   # hello\n") | ||||||
|             ctx.insert("  105 1\n") |             ctx.insert("  1050 1\n") | ||||||
|             ctx.finalize() |             ctx.finalize() | ||||||
|  |  | ||||||
|             ctx.insert("107 1\n") |             ctx.insert("1070 1\n") | ||||||
|             ctx.update_end(108) |             ctx.update_end(1080) | ||||||
|             ctx.finalize() |             ctx.finalize() | ||||||
|             ctx.update_start(109) |             ctx.update_start(1090) | ||||||
|             ctx.insert("110 1\n") |             ctx.insert("1100 1\n") | ||||||
|             ctx.insert("111 1\n") |             ctx.insert("1110 1\n") | ||||||
|             ctx.send() |             ctx.send() | ||||||
|             ctx.insert("112 1\n") |             ctx.insert("1120 1\n") | ||||||
|             ctx.insert("113 1\n") |             ctx.insert("1130 1\n") | ||||||
|             ctx.insert("114 1\n") |             ctx.insert("1140 1\n") | ||||||
|             ctx.update_end(116) |             ctx.update_end(1160) | ||||||
|             ctx.insert("115 1\n") |             ctx.insert("1150 1\n") | ||||||
|             ctx.update_end(117) |             ctx.update_end(1170) | ||||||
|             ctx.insert("116 1\n") |             ctx.insert("1160 1\n") | ||||||
|             ctx.update_end(118) |             ctx.update_end(1180) | ||||||
|             ctx.insert("117 1" + |             ctx.insert("1170 1" + | ||||||
|                        " # this is super long" * 100 + |                        " # this is super long" * 100 + | ||||||
|                        "\n") |                        "\n") | ||||||
|             ctx.finalize() |             ctx.finalize() | ||||||
|             ctx.insert("# this is super long" * 100) |             ctx.insert("# this is super long" * 100) | ||||||
|  |  | ||||||
|         with assert_raises(ClientError): |         with assert_raises(ClientError): | ||||||
|             with client.stream_insert_context("/context/test", 100, 200) as ctx: |             with client.stream_insert_context("/context/test", | ||||||
|                 ctx.insert("118 1\n") |                                               1000, 2000) as ctx: | ||||||
|  |                 ctx.insert("1180 1\n") | ||||||
|  |  | ||||||
|         with assert_raises(ClientError): |         with assert_raises(ClientError): | ||||||
|             with client.stream_insert_context("/context/test", 200, 300) as ctx: |             with client.stream_insert_context("/context/test", | ||||||
|                 ctx.insert("118 1\n") |                                               2000, 3000) as ctx: | ||||||
|  |                 ctx.insert("1180 1\n") | ||||||
|  |  | ||||||
|         with assert_raises(ClientError): |         with assert_raises(ClientError): | ||||||
|             with client.stream_insert_context("/context/test") as ctx: |             with client.stream_insert_context("/context/test") as ctx: | ||||||
|                 ctx.insert("bogus data\n") |                 ctx.insert("bogus data\n") | ||||||
|  |  | ||||||
|         with client.stream_insert_context("/context/test", 200, 300) as ctx: |         with client.stream_insert_context("/context/test", 2000, 3000) as ctx: | ||||||
|             # make sure our override wasn't permanent |             # make sure our override wasn't permanent | ||||||
|             ne_(ctx._max_data, 15) |             ne_(ctx._max_data, 15) | ||||||
|             ctx.insert("225 1\n") |             ctx.insert("2250 1\n") | ||||||
|             ctx.finalize() |             ctx.finalize() | ||||||
|  |  | ||||||
|         with assert_raises(ClientError): |         with assert_raises(ClientError): | ||||||
|             with client.stream_insert_context("/context/test", 300, 400) as ctx: |             with client.stream_insert_context("/context/test", | ||||||
|                 ctx.insert("301 1\n") |                                               3000, 4000) as ctx: | ||||||
|                 ctx.insert("302 2\n") |                 ctx.insert("3010 1\n") | ||||||
|                 ctx.insert("303 3\n") |                 ctx.insert("3020 2\n") | ||||||
|                 ctx.insert("304 4\n") |                 ctx.insert("3030 3\n") | ||||||
|                 ctx.insert("304 4\n") # non-monotonic after a few lines |                 ctx.insert("3040 4\n") | ||||||
|  |                 ctx.insert("3040 4\n") # non-monotonic after a few lines | ||||||
|                 ctx.finalize() |                 ctx.finalize() | ||||||
|  |  | ||||||
|         eq_(list(client.stream_intervals("/context/test")), |         eq_(list(client.stream_intervals("/context/test")), | ||||||
|             [ [ 100, 106 ], |             [ [ 1000, 1051 ], | ||||||
|               [ 107, 108 ], |               [ 1070, 1080 ], | ||||||
|               [ 109, 118 ], |               [ 1090, 1180 ], | ||||||
|               [ 200, 300 ] ]) |               [ 2000, 3000 ] ]) | ||||||
|  |  | ||||||
|         # destroy stream (try without removing data first) |         # destroy stream (try without removing data first) | ||||||
|         with assert_raises(ClientError): |         with assert_raises(ClientError): | ||||||
|   | |||||||
| @@ -106,3 +106,228 @@ class TestNumpyClient(object): | |||||||
|         assert(np.allclose(array, actual)) |         assert(np.allclose(array, actual)) | ||||||
|  |  | ||||||
|         client.close() |         client.close() | ||||||
|  |  | ||||||
|  |     def test_numpyclient_03_insert(self): | ||||||
|  |         client = nilmdb.client.numpyclient.NumpyClient(url = testurl) | ||||||
|  |  | ||||||
|  |         # Limit _max_data just to get better coverage | ||||||
|  |         old_max_data = nilmdb.client.numpyclient.StreamInserterNumpy._max_data | ||||||
|  |         nilmdb.client.numpyclient.StreamInserterNumpy._max_data = 100000 | ||||||
|  |  | ||||||
|  |         client.stream_create("/test/1", "uint16_1") | ||||||
|  |         client.stream_insert_numpy("/test/1", | ||||||
|  |                                    np.array([[0, 1], | ||||||
|  |                                              [1, 2], | ||||||
|  |                                              [2, 3], | ||||||
|  |                                              [3, 4]])) | ||||||
|  |  | ||||||
|  |         # Wrong number of dimensions | ||||||
|  |         with assert_raises(ValueError) as e: | ||||||
|  |             client.stream_insert_numpy("/test/1", | ||||||
|  |                                        np.array([[[0, 1], | ||||||
|  |                                                   [1, 2]], | ||||||
|  |                                                  [[3, 4], | ||||||
|  |                                                   [4, 5]]])) | ||||||
|  |         in_("wrong number of dimensions", str(e.exception)) | ||||||
|  |  | ||||||
|  |         # Unstructured | ||||||
|  |         client.stream_create("/test/2", "float32_8") | ||||||
|  |         client.stream_insert_numpy( | ||||||
|  |             "/test/2", | ||||||
|  |             client.stream_extract_numpy( | ||||||
|  |                 "/newton/prep", structured = False, maxrows = 1000)) | ||||||
|  |  | ||||||
|  |         # Structured, and specifying layout | ||||||
|  |         client.stream_create("/test/3", "float32_8") | ||||||
|  |         client.stream_insert_numpy( | ||||||
|  |             path = "/test/3", layout = "float32_8", | ||||||
|  |             data = client.stream_extract_numpy( | ||||||
|  |                 "/newton/prep", structured = True, maxrows = 1000)) | ||||||
|  |  | ||||||
|  |         # Structured, specifying wrong layout | ||||||
|  |         client.stream_create("/test/4", "float32_8") | ||||||
|  |         with assert_raises(ValueError) as e: | ||||||
|  |             client.stream_insert_numpy( | ||||||
|  |                 "/test/4", layout = "uint16_1", | ||||||
|  |                 data = client.stream_extract_numpy( | ||||||
|  |                     "/newton/prep", structured = True, maxrows = 1000)) | ||||||
|  |         in_("wrong dtype", str(e.exception)) | ||||||
|  |  | ||||||
|  |         # Unstructured, and specifying wrong layout | ||||||
|  |         client.stream_create("/test/5", "float32_8") | ||||||
|  |         with assert_raises(ClientError) as e: | ||||||
|  |             client.stream_insert_numpy( | ||||||
|  |                 "/test/5", layout = "uint16_8", | ||||||
|  |                 data = client.stream_extract_numpy( | ||||||
|  |                     "/newton/prep", structured = False, maxrows = 1000)) | ||||||
|  |         # timestamps will be screwy here, because data will be parsed wrong | ||||||
|  |         in_("error parsing input data", str(e.exception)) | ||||||
|  |  | ||||||
|  |         # Make sure the /newton/prep copies are identical | ||||||
|  |         a = np.vstack(client.stream_extract_numpy("/newton/prep")) | ||||||
|  |         b = np.vstack(client.stream_extract_numpy("/test/2")) | ||||||
|  |         c = np.vstack(client.stream_extract_numpy("/test/3")) | ||||||
|  |         assert(np.array_equal(a,b)) | ||||||
|  |         assert(np.array_equal(a,c)) | ||||||
|  |  | ||||||
|  |         nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data | ||||||
|  |         client.close() | ||||||
|  |  | ||||||
|  |     def test_numpyclient_04_context(self): | ||||||
|  |         # Like test_client_context, but with Numpy data | ||||||
|  |         client = nilmdb.client.numpyclient.NumpyClient(testurl) | ||||||
|  |  | ||||||
|  |         client.stream_create("/context/test", "uint16_1") | ||||||
|  |         with client.stream_insert_numpy_context("/context/test") as ctx: | ||||||
|  |             # override _max_rows to trigger frequent server updates | ||||||
|  |             ctx._max_rows = 2 | ||||||
|  |             ctx.insert([[1000, 1]]) | ||||||
|  |             ctx.insert([[1010, 1], [1020, 1], [1030, 1]]) | ||||||
|  |             ctx.insert([[1040, 1], [1050, 1]]) | ||||||
|  |             ctx.finalize() | ||||||
|  |             ctx.insert([[1070, 1]]) | ||||||
|  |             ctx.update_end(1080) | ||||||
|  |             ctx.finalize() | ||||||
|  |             ctx.update_start(1090) | ||||||
|  |             ctx.insert([[1100, 1]]) | ||||||
|  |             ctx.insert([[1110, 1]]) | ||||||
|  |             ctx.send() | ||||||
|  |             ctx.insert([[1120, 1], [1130, 1], [1140, 1]]) | ||||||
|  |             ctx.update_end(1160) | ||||||
|  |             ctx.insert([[1150, 1]]) | ||||||
|  |             ctx.update_end(1170) | ||||||
|  |             ctx.insert([[1160, 1]]) | ||||||
|  |             ctx.update_end(1180) | ||||||
|  |             ctx.insert([[1170, 123456789.0]]) | ||||||
|  |             ctx.finalize() | ||||||
|  |             ctx.insert(np.zeros((0,2))) | ||||||
|  |  | ||||||
|  |         with assert_raises(ClientError): | ||||||
|  |             with client.stream_insert_numpy_context("/context/test", | ||||||
|  |                                                     1000, 2000) as ctx: | ||||||
|  |                 ctx.insert([[1180, 1]]) | ||||||
|  |  | ||||||
|  |         with assert_raises(ClientError): | ||||||
|  |             with client.stream_insert_numpy_context("/context/test", | ||||||
|  |                                                     2000, 3000) as ctx: | ||||||
|  |                 ctx._max_rows = 2 | ||||||
|  |                 ctx.insert([[3180, 1]]) | ||||||
|  |                 ctx.insert([[3181, 1]]) | ||||||
|  |  | ||||||
|  |         with client.stream_insert_numpy_context("/context/test", | ||||||
|  |                                                 2000, 3000) as ctx: | ||||||
|  |             # make sure our override wasn't permanent | ||||||
|  |             ne_(ctx._max_rows, 2) | ||||||
|  |             ctx.insert([[2250, 1]]) | ||||||
|  |             ctx.finalize() | ||||||
|  |  | ||||||
|  |         with assert_raises(ClientError): | ||||||
|  |             with client.stream_insert_numpy_context("/context/test", | ||||||
|  |                                                     3000, 4000) as ctx: | ||||||
|  |                 ctx.insert([[3010, 1]]) | ||||||
|  |                 ctx.insert([[3020, 2]]) | ||||||
|  |                 ctx.insert([[3030, 3]]) | ||||||
|  |                 ctx.insert([[3040, 4]]) | ||||||
|  |                 ctx.insert([[3040, 4]]) # non-monotonic after a few lines | ||||||
|  |                 ctx.finalize() | ||||||
|  |  | ||||||
|  |         eq_(list(client.stream_intervals("/context/test")), | ||||||
|  |             [ [ 1000, 1051 ], | ||||||
|  |               [ 1070, 1080 ], | ||||||
|  |               [ 1090, 1180 ], | ||||||
|  |               [ 2000, 3000 ] ]) | ||||||
|  |  | ||||||
|  |         client.stream_remove("/context/test") | ||||||
|  |         client.stream_destroy("/context/test") | ||||||
|  |         client.close() | ||||||
|  |  | ||||||
|  |     def test_numpyclient_05_emptyintervals(self): | ||||||
|  |         # Like test_client_emptyintervals, with insert_numpy_context | ||||||
|  |         client = nilmdb.client.numpyclient.NumpyClient(testurl) | ||||||
|  |         client.stream_create("/empty/test", "uint16_1") | ||||||
|  |         def info(): | ||||||
|  |             result = [] | ||||||
|  |             for interval in list(client.stream_intervals("/empty/test")): | ||||||
|  |                 result.append((client.stream_count("/empty/test", *interval), | ||||||
|  |                                interval)) | ||||||
|  |             return result | ||||||
|  |         eq_(info(), []) | ||||||
|  |  | ||||||
|  |         # Insert a region with just a few points | ||||||
|  |         with client.stream_insert_numpy_context("/empty/test") as ctx: | ||||||
|  |             ctx.update_start(100) | ||||||
|  |             ctx.insert([[140, 1]]) | ||||||
|  |             ctx.insert([[150, 1]]) | ||||||
|  |             ctx.insert([[160, 1]]) | ||||||
|  |             ctx.update_end(200) | ||||||
|  |             ctx.finalize() | ||||||
|  |         eq_(info(), [(3, [100, 200])]) | ||||||
|  |  | ||||||
|  |         # Delete chunk, which will leave one data point and two intervals | ||||||
|  |         client.stream_remove("/empty/test", 145, 175) | ||||||
|  |         eq_(info(), [(1, [100, 145]), | ||||||
|  |                      (0, [175, 200])]) | ||||||
|  |  | ||||||
|  |         # Try also creating a completely empty interval from scratch, | ||||||
|  |         # in a few different ways. | ||||||
|  |         client.stream_insert("/empty/test", "", 300, 350) | ||||||
|  |         client.stream_insert("/empty/test", [], 400, 450) | ||||||
|  |         with client.stream_insert_numpy_context("/empty/test", 500, 550): | ||||||
|  |             pass | ||||||
|  |  | ||||||
|  |         # If enough timestamps aren't provided, empty streams won't be created. | ||||||
|  |         client.stream_insert("/empty/test", []) | ||||||
|  |         with client.stream_insert_numpy_context("/empty/test"): | ||||||
|  |             pass | ||||||
|  |         client.stream_insert("/empty/test", [], start = 600) | ||||||
|  |         with client.stream_insert_numpy_context("/empty/test", start = 700): | ||||||
|  |             pass | ||||||
|  |         client.stream_insert("/empty/test", [], end = 850) | ||||||
|  |         with client.stream_insert_numpy_context("/empty/test", end = 950): | ||||||
|  |             pass | ||||||
|  |  | ||||||
|  |         # Try various things that might cause problems | ||||||
|  |         with client.stream_insert_numpy_context("/empty/test", 1000, 1050): | ||||||
|  |             ctx.finalize() # inserts [1000, 1050] | ||||||
|  |             ctx.finalize() # nothing | ||||||
|  |             ctx.finalize() # nothing | ||||||
|  |             ctx.insert([[1100, 1]]) | ||||||
|  |             ctx.finalize() # inserts [1100, 1101] | ||||||
|  |             ctx.update_start(1199) | ||||||
|  |             ctx.insert([[1200, 1]]) | ||||||
|  |             ctx.update_end(1250) | ||||||
|  |             ctx.finalize() # inserts [1199, 1250] | ||||||
|  |             ctx.update_start(1299) | ||||||
|  |             ctx.finalize() # nothing | ||||||
|  |             ctx.update_end(1350) | ||||||
|  |             ctx.finalize() # nothing | ||||||
|  |             ctx.update_start(1400) | ||||||
|  |             ctx.insert(np.zeros((0,2))) | ||||||
|  |             ctx.update_end(1450) | ||||||
|  |             ctx.finalize() | ||||||
|  |             ctx.update_start(1500) | ||||||
|  |             ctx.insert(np.zeros((0,2))) | ||||||
|  |             ctx.update_end(1550) | ||||||
|  |             ctx.finalize() | ||||||
|  |             ctx.insert(np.zeros((0,2))) | ||||||
|  |             ctx.insert(np.zeros((0,2))) | ||||||
|  |             ctx.insert(np.zeros((0,2))) | ||||||
|  |             ctx.finalize() | ||||||
|  |  | ||||||
|  |         # Check everything | ||||||
|  |         eq_(info(), [(1, [100, 145]), | ||||||
|  |                      (0, [175, 200]), | ||||||
|  |                      (0, [300, 350]), | ||||||
|  |                      (0, [400, 450]), | ||||||
|  |                      (0, [500, 550]), | ||||||
|  |                      (0, [1000, 1050]), | ||||||
|  |                      (1, [1100, 1101]), | ||||||
|  |                      (1, [1199, 1250]), | ||||||
|  |                      (0, [1400, 1450]), | ||||||
|  |                      (0, [1500, 1550]), | ||||||
|  |                      ]) | ||||||
|  |  | ||||||
|  |         # Clean up | ||||||
|  |         client.stream_remove("/empty/test") | ||||||
|  |         client.stream_destroy("/empty/test") | ||||||
|  |         client.close() | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user