Browse Source

Add client.stream_insert_context, convert everything to use it. Slow.

Not sure why this is so painfully slow.  Need more testing;
might have to scratch the idea.
tags/nilmdb-1.1^2
Jim Paris 11 years ago
parent
commit
1edb96a0bd
1 changed files with 110 additions and 62 deletions
  1. +110
    -62
      nilmdb/client/client.py

+ 110
- 62
nilmdb/client/client.py View File

@@ -8,6 +8,7 @@ import nilmdb.client.httpclient

import time
import simplejson as json
import contextlib

def float_to_string(f):
"""Use repr to maintain full precision in the string output."""
@@ -100,79 +101,126 @@ class Client(object):
params["end"] = float_to_string(end)
return self.http.get("stream/remove", params)

def stream_insert(self, path, data, start = None, end = None):
"""Insert data into a stream. data should be a file-like object
that provides ASCII data that matches the database layout for path.

start and end are the starting and ending timestamp of this
stream; all timestamps t in the data must satisfy 'start <= t
< end'. If left unspecified, 'start' is the timestamp of the
first line of data, and 'end' is the timestamp on the last line
of data, plus a small delta of 1μs.
@contextlib.contextmanager
def stream_insert_context(self, path, start = None, end = None):
"""Return a context manager that allows data to be efficiently
inserted into a stream in a piecewise manner. Data is be provided
as single lines, and is aggregated and sent to the server in larger
chunks as necessary. Data lines must match the database layout for
the given path, and end with a newline.

Example:
with client.stream_insert_context('/path', start, end) as ctx:
# insert a single line of ASCII formatted data:
ctx.insert_line('1234567890.0 1 2 3 4\n')
# insert all lines from the given iterable
ctx.insert_iter(['1234567890.0 1 2 3 4\n',
'1234567890.0 1 2 3 4\n'])

start and end are the overall starting and ending timestamp of
this stream; all timestamps t in the inserted data must
satisfy 'start <= t < end'. If left unspecified, 'start' is
the timestamp of the first provided line of data, and 'end' is
the timestamp on the last line of data, plus a small delta of
1μs.

This may make multiple requests to the server, if the data is
large enough or enough time has passed between insertions.
"""
params = { "path": path }

# See design.md for a discussion of how much data to send.
# These are soft limits -- actual data might be rounded up.
max_data = 1048576
max_time = 30
end_epsilon = 1e-6

# Delta to add to the last timestamp, if "end" wasn't given
end_epsilon = 1e-6

def extract_timestamp(line):
return float(line.split()[0])

def sendit():
# If we have more data after this, use the timestamp of
# the next line as the end. Otherwise, use the given
# overall end time, or add end_epsilon to the last data
# point.
if nextline:
block_end = extract_timestamp(nextline)
if end and block_end > end:
# This is unexpected, but we'll defer to the server
# to return an error in this case.
class ContextObject(object):
def __init__(s):
s.last_result = None
s.line1 = None
s.line2 = None
s.block_data = ""
s.block_start = start
s.clock_start = time.time()

def insert_iter(s, iter, force_send = False):
"""Insert all lines of ASCII formatted data from the
given iterable. Lines must be terminated with '\n'"""
for line in iter:
s.insert_line(line, force_send)

def insert_line(s, line, force_send = False):
"""Insert a single line of ASCII formatted data. Line
must be terminated with '\n'"""
# The passed line goes into line2, but we add line1 to the
# block. This lets us "look ahead" to the next line.
s.line1 = s.line2
s.line2 = line
if s.line1 is None:
return
if s.block_start is None:
s.block_start = extract_timestamp(s.line1)
s.block_data += s.line1

# Return if we don't need to send this to the server yet
if (len(s.block_data) == 0 or
(force_send == False and
len(s.block_data) < max_data and
(time.time() - s.clock_start) < max_time)):
return

# Calculate ending timestamp for this block
if s.line2:
# Use the timestamp of the next line
block_end = extract_timestamp(s.line2)
if end and block_end > end:
# Something fishy -- use the user's end
block_end = end
elif end:
# No more data; use the user's provided end
block_end = end
elif end:
block_end = end
else:
block_end = extract_timestamp(line) + end_epsilon

# Send it
params["start"] = float_to_string(block_start)
params["end"] = float_to_string(block_end)
return self.http.put("stream/insert", block_data, params)

clock_start = time.time()
block_data = ""
block_start = start
result = None
line = None
nextline = None
for (line, nextline) in nilmdb.utils.misc.pairwise(data):
# If we don't have a starting time, extract it from the first line
if block_start is None:
block_start = extract_timestamp(line)

clock_elapsed = time.time() - clock_start
block_data += line

# If we have enough data, or enough time has elapsed,
# send this block to the server, and empty things out
# for the next block.
if (len(block_data) > max_data) or (clock_elapsed > max_time):
result = sendit()
block_start = None
block_data = ""
clock_start = time.time()

# One last block?
if len(block_data):
result = sendit()

# Return the most recent JSON result we got back, or None if
# we didn't make any requests.
return result
else:
# No more data; add an epsilon to the last timestamp
block_end = extract_timestamp(s.line1) + end_epsilon

# Send it to the server
print "sending ", len(s.block_data)
s.last_result = self.stream_insert_block(
path, s.block_data, s.block_start, block_end)

# Start a new block
s.block_data = ""
s.block_start = None
s.clock_start = time.time()

# execute body of the "with" statement
ctx = ContextObject()
yield ctx

# Force the remaining data to be sent, if any
ctx.insert_line(None, force_send = True)

def stream_insert(self, path, data, start = None, end = None):
"""Insert rows of data into a stream. data should be an
iterable object that provides ASCII data that matches the
database layout for path. See stream_insert_context for
details on the 'start' and 'end' parameters."""
with self.stream_insert_context(path, start, end) as ctx:
ctx.insert_iter(data)
return ctx.last_result

def stream_insert_block(self, path, block, start, end):
"""Insert an entire block of data into a stream. Like
stream_insert, except 'block' contains multiple lines of ASCII
text and is sent in one single chunk."""
params = { "path": path,
"start": float_to_string(start),
"end": float_to_string(end) }
return self.http.put("stream/insert", block, params)

def stream_intervals(self, path, start = None, end = None):
"""


Loading…
Cancel
Save