diff --git a/nilmdb/client/client.py b/nilmdb/client/client.py index cb5c7d9..4e0d41a 100644 --- a/nilmdb/client/client.py +++ b/nilmdb/client/client.py @@ -127,10 +127,11 @@ class Client(object): @contextlib.contextmanager def stream_insert_context(self, path, start = None, end = None): """Return a context manager that allows data to be efficiently - inserted into a stream in a piecewise manner. Data is be provided - as single lines, and is aggregated and sent to the server in larger - chunks as necessary. Data lines must match the database layout for - the given path, and end with a newline. + inserted into a stream in a piecewise manner. Data is be + provided as ASCII lines, and is aggregated and sent to the + server in larger chunks as necessary. Data lines must match + the database layout for the given path, and end with a + newline. Example: with client.stream_insert_context('/path', start, end) as ctx: @@ -142,15 +143,16 @@ class Client(object): This may make multiple requests to the server, if the data is large enough or enough time has passed between insertions. """ - ctx = StreamInserter(self.http, path, start, end) + ctx = StreamInserter(self, path, start, end) yield ctx ctx.finalize() def stream_insert(self, path, data, start = None, end = None): """Insert rows of data into a stream. data should be a string or iterable that provides ASCII data that matches the database - layout for path. See stream_insert_context for details on the - 'start' and 'end' parameters.""" + layout for path. Data is passed through stream_insert_context, + so it will be broken into reasonably-sized chunks and + timestamps will be deduced if missing.""" with self.stream_insert_context(path, start, end) as ctx: if isinstance(data, basestring): ctx.insert(data) @@ -159,11 +161,28 @@ class Client(object): ctx.insert(chunk) return ctx.last_response + def stream_insert_block(self, path, data, start, end, binary = False): + """Insert a single fixed block of data into the stream. It is + sent directly to the server in one block with no further + processing. + + If 'binary' is True, provide raw binary data in little-endian + format matching the path layout, including an int64 timestamp. + Otherwise, provide ASCII data matching the layout.""" + params = { + "path": path, + "start": timestamp_to_string(start), + "end": timestamp_to_string(end), + } + if binary: + params["binary"] = 1 + return self.http.put("stream/insert", data, params, binary = binary) + def stream_intervals(self, path, start = None, end = None, diffpath = None): """ Return a generator that yields each stream interval. - If diffpath is not None, yields only interval ranges that are + If 'diffpath' is not None, yields only interval ranges that are present in 'path' but not in 'diffpath'. """ params = { @@ -184,16 +203,16 @@ class Client(object): lines of ASCII-formatted data that matches the database layout for the given path. - Specify count = True to return a count of matching data points + If 'count' is True, return a count of matching data points rather than the actual data. The output format is unchanged. - Specify markup = True to include comments in the returned data + If 'markup' is True, include comments in the returned data that indicate interval starts and ends. - Specify binary = True to return chunks of raw binary data, - rather than lines of ASCII-formatted data. Raw binary data - is always little-endian and matches the database types - (including an int64 timestamp). + If 'binary' is True, return chunks of raw binary data, rather + than lines of ASCII-formatted data. Raw binary data is + little-endian and matches the database types (including an + int64 timestamp). """ params = { "path": path, @@ -257,13 +276,13 @@ class StreamInserter(object): _max_data = 2 * 1024 * 1024 _max_data_after_send = 64 * 1024 - def __init__(self, http, path, start = None, end = None): - """'http' is the httpclient object. 'path' is the database + def __init__(self, client, path, start = None, end = None): + """'client' is the client object. 'path' is the database path to insert to. 'start' and 'end' are used for the first contiguous interval.""" self.last_response = None - self._http = http + self._client = client self._path = path # Start and end for the overall contiguous interval we're @@ -431,9 +450,7 @@ class StreamInserter(object): raise ClientError("have data to send, but no start/end times") # Send it - params = { "path": self._path, - "start": timestamp_to_string(start_ts), - "end": timestamp_to_string(end_ts) } - self.last_response = self._http.put("stream/insert", block, params) + self.last_response = self._client.stream_insert_block( + self._path, block, start_ts, end_ts, binary = False) return