Browse Source

Big rework of client stream_insert_context

Now supports these operations:
  ctx.insert_line()
  ctx.insert_iter()
  ctx.finalize() (end the current contiguous interval, so a new one
                  can be started with a gap)
  ctx.update_end() (update ending timestamp before finalizing interval)
  ctx.update_start() (update starting timestamp for new interval)
tags/nilmdb-1.1^2
Jim Paris 11 years ago
parent
commit
a61fbbcf45
1 changed files with 186 additions and 90 deletions
  1. +186
    -90
      nilmdb/client/client.py

+ 186
- 90
nilmdb/client/client.py View File

@@ -14,6 +14,10 @@ def float_to_string(f):
"""Use repr to maintain full precision in the string output."""
return repr(float(f))

def extract_timestamp(line):
"""Extract just the timestamp from a line of data text"""
return float(line.split()[0])

class Client(object):
"""Main client interface to the Nilm database."""

@@ -111,100 +115,17 @@ class Client(object):

Example:
with client.stream_insert_context('/path', start, end) as ctx:
# insert a single line of ASCII formatted data:
ctx.insert_line('1234567890.0 1 2 3 4\n')
# insert all lines from the given iterable
ctx.insert_iter(['1234567890.0 1 2 3 4\n',
'1234567890.0 1 2 3 4\n'])

start and end are the overall starting and ending timestamp of
this stream; all timestamps t in the inserted data must
satisfy 'start <= t < end'. If left unspecified, 'start' is
the timestamp of the first provided line of data, and 'end' is
the timestamp on the last line of data, plus a small delta of
1μs.
ctx.insert_line('1234567890.0 1 2 3 4\\n')
ctx.insert_line('1234567891.0 1 2 3 4\\n')

For more details, see help for nilmdb.client.client.StreamInserter

This may make multiple requests to the server, if the data is
large enough or enough time has passed between insertions.
"""
# See design.md for a discussion of how much data to send.
# These are soft limits -- actual data might be rounded up.
max_data = 1048576
max_time = 30

# Delta to add to the last timestamp, if "end" wasn't given
end_epsilon = 1e-6

def extract_timestamp(line):
return float(line.split()[0])

class ContextObject(object):
def __init__(s):
s.last_result = None
s.line1 = None
s.line2 = None
s.block_data = []
s.block_len = 0
s.block_start = start
s.clock_start = time.time()

def insert_iter(s, iter, force_send = False):
"""Insert all lines of ASCII formatted data from the
given iterable. Lines must be terminated with '\n'"""
for line in iter:
s.insert_line(line, force_send)

def insert_line(s, line, force_send = False):
"""Insert a single line of ASCII formatted data. Line
must be terminated with '\n'"""
# The passed line goes into line2, but we add line1 to the
# block. This lets us "look ahead" to the next line.
s.line1 = s.line2
s.line2 = line
if s.line1 is None:
return
if s.block_start is None:
s.block_start = extract_timestamp(s.line1)
s.block_data.append(s.line1)
s.block_len += len(s.line1)

# Return if we don't need to send this to the server yet
if (s.block_len == 0 or
(force_send == False and
s.block_len < max_data and
(time.time() - s.clock_start) < max_time)):
return

# Calculate ending timestamp for this block
if s.line2:
# Use the timestamp of the next line
block_end = extract_timestamp(s.line2)
if end and block_end > end:
# Something fishy -- use the user's end
block_end = end
elif end:
# No more data; use the user's provided end
block_end = end
else:
# No more data; add an epsilon to the last timestamp
block_end = extract_timestamp(s.line1) + end_epsilon

# Send it to the server
s.last_result = self.stream_insert_block(
path, "".join(s.block_data), s.block_start, block_end)

# Start a new block
s.block_data = []
s.block_len = 0
s.block_start = None
s.clock_start = time.time()

# execute body of the "with" statement
ctx = ContextObject()
ctx = StreamInserter(self.http, path, start, end)
yield ctx

# Force the remaining data to be sent, if any
ctx.insert_line(None, force_send = True)
ctx.finalize()

def stream_insert(self, path, data, start = None, end = None):
"""Insert rows of data into a stream. data should be an
@@ -213,7 +134,7 @@ class Client(object):
details on the 'start' and 'end' parameters."""
with self.stream_insert_context(path, start, end) as ctx:
ctx.insert_iter(data)
return ctx.last_result
return ctx.last_response

def stream_insert_block(self, path, block, start, end):
"""Insert an entire block of data into a stream. Like
@@ -257,3 +178,178 @@ class Client(object):
params["count"] = 1

return self.http.get_gen("stream/extract", params, retjson = False)

class StreamInserter(object):
"""Object returned by stream_insert_context() that manages
the insertion of rows of data into a particular path.

The basic data flow is that we are filling a contiguous interval
on the server, with no gaps, that extends from timestamp 'start'
to timestamp 'end'. Data timestamps satisfy 'start <= t < end'.
Data is provided by the user one line at a time with
.insert_line() or .insert_iter().

1. The first inserted line begins a new interval that starts at
'start'. If 'start' is not given, it is deduced from the first
line's timestamp.

2. Subsequent lines go into the same contiguous interval. As lines
are inserted, this routine may make multiple insertion requests to
the server, but will structure the timestamps to leave no gaps.

3. The current contiguous interval can be completed by manually
calling .finalize(), which the context manager will also do
automatically. This will send any remaining data to the server,
using the 'end' timestamp to end the interval.

After a .finalize(), inserting new data goes back to step 1.

.update_start() can be called before step 1 to change the start
time for the interval. .update_end() can be called before step 3
to change the end time for the interval.
"""

# See design.md for a discussion of how much data to send.
# These are soft limits -- actual data might be rounded up.
# We send when we have a certain amount of data queued, or
# when a certain amount of time has passed since the last send.
_max_data = 1048576
_max_time = 30

# Delta to add to the final timestamp, if "end" wasn't given
_end_epsilon = 1e-6

def __init__(self, http, path, start = None, end = None):
"""'http' is the httpclient object. 'path' is the database
path to insert to. 'start' and 'end' are used for the first
contiguous interval."""
self.last_response = None

self._http = http
self._path = path

# Start and end for the overall contiguous interval we're
# filling
self._interval_start = start
self._interval_end = end

# Data for the specific block we're building up to send
self._block_data = []
self._block_len = 0
self._block_start = None

# Time of last request
self._last_time = time.time()

# We keep a buffer of the two most recently inserted lines.
# Only the older one actually gets processed; the newer one
# is used to "look-ahead" to the next timestamp if we need
# to internally split an insertion into two requests.
self._line_old = None
self._line_new = None

def insert_iter(self, iter):
"""Insert all lines of ASCII formatted data from the given
iterable. Lines must be terminated with '\\n'."""
for line in iter:
self.insert_line(line)

def insert_line(self, line):
"""Insert a single line of ASCII formatted data. Line
must be terminated with '\\n'."""
# Store this new line, but process the previous (old) one.
# This lets us "look ahead" to the next line.
self._line_old = self._line_new
self._line_new = line
if self._line_old is None:
return

# If starting a new block, pull out the timestamp if needed.
if self._block_start is None:
if self._interval_start:
# User provided a start timestamp. Use it once, then
# clear it for the next block.
self._block_start = self._interval_start
self._interval_start = None
else:
# Extract timestamp from the first row
self._block_start = extract_timestamp(self._line_old)

# Save the line
self._block_data.append(self._line_old)
self._block_len += len(self._line_old)

if self._line_new is None:
# No next line, so send this as the final block.
self._send_block_final()
else:
# Send an intermediate block to the server if needed.
elapsed = time.time() - self._last_time
if (self._block_len > self._max_data) or (elapsed > self._max_time):
self._send_block_intermediate()

def update_start(self, start):
"""Update the start time for the next contiguous interval.
Call this before starting to insert data for a new interval,
for example, after .finalize()"""
self._interval_start = start

def update_end(self, end):
"""Update the end time for the current contiguous interval.
Call this before .finalize()"""
self._interval_end = end

def finalize(self):
"""Stop filling the current contiguous interval.
All outstanding data will be sent, and the interval end
time of the interval will be taken from the 'end' argument
used when initializing this class, or the most recent
value passed to update_end(), or the last timestamp plus
a small epsilon value if no other endpoint was provided.

If more data is inserted after a finalize(), it will become
part of a new interval and there may be a gap left in-between."""
# Special marker tells insert_line that this is the end
self.insert_line(None)

def _send_block_intermediate(self):
"""Send data, when we still have more data to send.
Use the timestamp from the next line, so that the blocks
are contiguous."""
block_end = extract_timestamp(self._line_new)
if self._interval_end and block_end > self._interval_end:
# Something's fishy -- the timestamp we found is after
# the user's specified end. Limit it here, and the
# server will return an error.
block_end = self._interval_end
self._send_block(block_end)

def _send_block_final(self):
"""Send data, when this is the last block for the interval.
There is no next line, so figure out the actual interval end
using interval_end or end_epsilon."""
if self._interval_end:
# Use the user's specified end timestamp
block_end = self._interval_end
# Clear it in case we send more intervals in the future.
self._interval_end = None
else:
# Add an epsilon to the last timestamp we saw
block_end = extract_timestamp(self._line_old) + self._end_epsilon
self._send_block(block_end)

def _send_block(self, block_end):
"""Send current block to the server"""
params = { "path": self._path,
"start": float_to_string(self._block_start),
"end": float_to_string(block_end) }
self.last_response = self._http.put("stream/insert",
"".join(self._block_data), params)

# Clear out the block
self._block_data = []
self._block_len = 0
self._block_start = None

# Note when we sent it
self._last_time = time.time()

Loading…
Cancel
Save