Browse Source

Add basic binary support to client, and restructure a bit

tags/nilmdb-1.5.0
Jim Paris 11 years ago
parent
commit
7576883f49
1 changed files with 38 additions and 21 deletions
  1. +38
    -21
      nilmdb/client/client.py

+ 38
- 21
nilmdb/client/client.py View File

@@ -127,10 +127,11 @@ class Client(object):
@contextlib.contextmanager
def stream_insert_context(self, path, start = None, end = None):
"""Return a context manager that allows data to be efficiently
inserted into a stream in a piecewise manner. Data is be provided
as single lines, and is aggregated and sent to the server in larger
chunks as necessary. Data lines must match the database layout for
the given path, and end with a newline.
inserted into a stream in a piecewise manner. Data is be
provided as ASCII lines, and is aggregated and sent to the
server in larger chunks as necessary. Data lines must match
the database layout for the given path, and end with a
newline.

Example:
with client.stream_insert_context('/path', start, end) as ctx:
@@ -142,15 +143,16 @@ class Client(object):
This may make multiple requests to the server, if the data is
large enough or enough time has passed between insertions.
"""
ctx = StreamInserter(self.http, path, start, end)
ctx = StreamInserter(self, path, start, end)
yield ctx
ctx.finalize()

def stream_insert(self, path, data, start = None, end = None):
"""Insert rows of data into a stream. data should be a string
or iterable that provides ASCII data that matches the database
layout for path. See stream_insert_context for details on the
'start' and 'end' parameters."""
layout for path. Data is passed through stream_insert_context,
so it will be broken into reasonably-sized chunks and
timestamps will be deduced if missing."""
with self.stream_insert_context(path, start, end) as ctx:
if isinstance(data, basestring):
ctx.insert(data)
@@ -159,11 +161,28 @@ class Client(object):
ctx.insert(chunk)
return ctx.last_response

def stream_insert_block(self, path, data, start, end, binary = False):
"""Insert a single fixed block of data into the stream. It is
sent directly to the server in one block with no further
processing.

If 'binary' is True, provide raw binary data in little-endian
format matching the path layout, including an int64 timestamp.
Otherwise, provide ASCII data matching the layout."""
params = {
"path": path,
"start": timestamp_to_string(start),
"end": timestamp_to_string(end),
}
if binary:
params["binary"] = 1
return self.http.put("stream/insert", data, params, binary = binary)

def stream_intervals(self, path, start = None, end = None, diffpath = None):
"""
Return a generator that yields each stream interval.

If diffpath is not None, yields only interval ranges that are
If 'diffpath' is not None, yields only interval ranges that are
present in 'path' but not in 'diffpath'.
"""
params = {
@@ -184,16 +203,16 @@ class Client(object):
lines of ASCII-formatted data that matches the database
layout for the given path.

Specify count = True to return a count of matching data points
If 'count' is True, return a count of matching data points
rather than the actual data. The output format is unchanged.

Specify markup = True to include comments in the returned data
If 'markup' is True, include comments in the returned data
that indicate interval starts and ends.

Specify binary = True to return chunks of raw binary data,
rather than lines of ASCII-formatted data. Raw binary data
is always little-endian and matches the database types
(including an int64 timestamp).
If 'binary' is True, return chunks of raw binary data, rather
than lines of ASCII-formatted data. Raw binary data is
little-endian and matches the database types (including an
int64 timestamp).
"""
params = {
"path": path,
@@ -257,13 +276,13 @@ class StreamInserter(object):
_max_data = 2 * 1024 * 1024
_max_data_after_send = 64 * 1024

def __init__(self, http, path, start = None, end = None):
"""'http' is the httpclient object. 'path' is the database
def __init__(self, client, path, start = None, end = None):
"""'client' is the client object. 'path' is the database
path to insert to. 'start' and 'end' are used for the first
contiguous interval."""
self.last_response = None

self._http = http
self._client = client
self._path = path

# Start and end for the overall contiguous interval we're
@@ -431,9 +450,7 @@ class StreamInserter(object):
raise ClientError("have data to send, but no start/end times")

# Send it
params = { "path": self._path,
"start": timestamp_to_string(start_ts),
"end": timestamp_to_string(end_ts) }
self.last_response = self._http.put("stream/insert", block, params)
self.last_response = self._client.stream_insert_block(
self._path, block, start_ts, end_ts, binary = False)

return

Loading…
Cancel
Save