Browse Source

Big rework of stream_insert_context and places that use it.

Things are now block-focused, rather than line-focused.  This should
give a pretty big speedup to inserting client data, especially when
inserting preformatted data.
tags/nilmdb-1.3.0
Jim Paris 11 years ago
parent
commit
0fc092779d
3 changed files with 185 additions and 207 deletions
  1. +146
    -162
      nilmdb/client/client.py
  2. +7
    -13
      nilmdb/cmdline/insert.py
  3. +32
    -32
      tests/test_client.py

+ 146
- 162
nilmdb/client/client.py View File

@@ -122,65 +122,31 @@ class Client(object):

Example:
with client.stream_insert_context('/path', start, end) as ctx:
ctx.insert_line('1234567890.0 1 2 3 4\\n')
ctx.insert_line('1234567891.0 1 2 3 4\\n')
ctx.insert('1234567890.0 1 2 3 4\\n')
ctx.insert('1234567891.0 1 2 3 4\\n')

For more details, see help for nilmdb.client.client.StreamInserter

This may make multiple requests to the server, if the data is
large enough or enough time has passed between insertions.
"""
ctx = StreamInserter(self, path, start, end)
ctx = StreamInserter(self.http, path, start, end)
yield ctx
ctx.finalize()

def stream_insert(self, path, data, start = None, end = None):
"""Insert rows of data into a stream. data should be an
iterable object that provides ASCII data that matches the
database layout for path. See stream_insert_context for
details on the 'start' and 'end' parameters."""
"""Insert rows of data into a stream. data should be a string
or iterable that provides ASCII data that matches the database
layout for path. See stream_insert_context for details on the
'start' and 'end' parameters."""
with self.stream_insert_context(path, start, end) as ctx:
ctx.insert_iter(data)
if isinstance(data, basestring):
ctx.insert(data)
else:
for chunk in data:
ctx.insert(chunk)
return ctx.last_response

def stream_insert_large_block(self, path, block, start, end):
"""Insert a potentially very large block of ASCII data by
splitting it up into smaller chunks, finding newlines
and extracting timestamps, and sending multiple requests
with the individual chunks."""
max_data = StreamInserter._max_data
offset = 0
while (len(block) - offset) >= max_data:
# find the first newline
a = block.find('\n', offset + max_data)
if a < 0:
break
a += 1
# find the second newline
b = block.find('\n', a)
if b < 0:
break
try:
timestamp = float(block[a:b].split(None, 1)[0])
except ValueError:
break
self.stream_insert_block(path, block[offset:a], start, timestamp)
start = timestamp
offset = a
# Return whatever's left; hopefully it's small enough
return self.stream_insert_block(path, block[offset:], start, end)

def stream_insert_block(self, path, block, start, end):
"""Insert an entire block of data into a stream. Like
stream_insert, except 'block' contains multiple lines of ASCII
text and is sent in one single chunk. Note that block should
be a string, and is subject to the maximum length limits of
the server."""
params = { "path": path,
"start": float_time_to_string(start),
"end": float_time_to_string(end) }
return self.http.put("stream/insert", block, params)

def stream_intervals(self, path, start = None, end = None):
"""
Return a generator that yields each stream interval.
@@ -229,8 +195,10 @@ class StreamInserter(object):
The basic data flow is that we are filling a contiguous interval
on the server, with no gaps, that extends from timestamp 'start'
to timestamp 'end'. Data timestamps satisfy 'start <= t < end'.
Data is provided by the user one line at a time with
.insert_line() or .insert_iter().

Data is provided to .insert() as ASCII formatted data separated by
newlines. The chunks of data passed to .insert() do not need to
match up with the newlines; less or more than one line can be passed.

1. The first inserted line begins a new interval that starts at
'start'. If 'start' is not given, it is deduced from the first
@@ -243,7 +211,9 @@ class StreamInserter(object):
3. The current contiguous interval can be completed by manually
calling .finalize(), which the context manager will also do
automatically. This will send any remaining data to the server,
using the 'end' timestamp to end the interval.
using the 'end' timestamp to end the interval. If no 'end'
was provided, it is deduced from the last timestamp seen,
plus a small delta.

After a .finalize(), inserting new data goes back to step 1.

@@ -252,23 +222,20 @@ class StreamInserter(object):
to change the end time for the interval.
"""

# See design.md for a discussion of how much data to send.
# These are soft limits -- actual data might be rounded up.
# We send when we have a certain amount of data queued, or
# when a certain amount of time has passed since the last send.
# See design.md for a discussion of how much data to send. This
# is a soft limit -- we might send up to twice as much or so
_max_data = 2 * 1024 * 1024
_max_time = 30

# Delta to add to the final timestamp, if "end" wasn't given
_end_epsilon = 1e-6

def __init__(self, client, path, start = None, end = None):
def __init__(self, http, path, start = None, end = None):
"""'http' is the httpclient object. 'path' is the database
path to insert to. 'start' and 'end' are used for the first
contiguous interval."""
self.last_response = None

self._client = client
self._http = http
self._path = path

# Start and end for the overall contiguous interval we're
@@ -276,60 +243,33 @@ class StreamInserter(object):
self._interval_start = start
self._interval_end = end

# Data for the specific block we're building up to send
# Current data we're building up to send. Each string
# goes into the array, and gets joined all at once.
self._block_data = []
self._block_len = 0
self._block_start = None

# Time of last request
self._last_time = time.time()

# We keep a buffer of the two most recently inserted lines.
# Only the older one actually gets processed; the newer one
# is used to "look-ahead" to the next timestamp if we need
# to internally split an insertion into two requests.
self._line_old = None
self._line_new = None

def insert_iter(self, iter):
"""Insert all lines of ASCII formatted data from the given
iterable. Lines must be terminated with '\\n'."""
for line in iter:
self.insert_line(line)

def insert_line(self, line, allow_intermediate = True):
"""Insert a single line of ASCII formatted data. Line
must be terminated with '\\n'."""
if line and (len(line) < 1 or line[-1] != '\n'):
raise ValueError("lines must end in with a newline character")

# Store this new line, but process the previous (old) one.
# This lets us "look ahead" to the next line.
self._line_old = self._line_new
self._line_new = line
if self._line_old is None:
return

# If starting a new block, pull out the timestamp if needed.
if self._block_start is None:
if self._interval_start is not None:
# User provided a start timestamp. Use it once, then
# clear it for the next block.
self._block_start = self._interval_start
self._interval_start = None
else:
# Extract timestamp from the first row
self._block_start = extract_timestamp(self._line_old)
def insert(self, data):
"""Insert a chunk of ASCII formatted data in string form. The
overall data must consist of lines terminated by '\\n'."""
length = len(data)
maxdata = self._max_data

if length > maxdata:
# This could make our buffer more than twice what we
# wanted to send, so split it up. This is a bit
# inefficient, but the user really shouldn't be providing
# this much data at once.
for cut in range(0, length, maxdata):
self.insert(data[cut:(cut + maxdata)])
return

# Save the line
self._block_data.append(self._line_old)
self._block_len += len(self._line_old)
# Append this string to our list
self._block_data.append(data)
self._block_len += length

if allow_intermediate:
# Send an intermediate block to the server if needed.
elapsed = time.time() - self._last_time
if (self._block_len > self._max_data) or (elapsed > self._max_time):
self._send_block_intermediate()
# Send the block once we have enough data
if self._block_len >= maxdata:
self._send_block(final = False)

def update_start(self, start):
"""Update the start time for the next contiguous interval.
@@ -352,63 +292,107 @@ class StreamInserter(object):

If more data is inserted after a finalize(), it will become
part of a new interval and there may be a gap left in-between."""
# Special marker tells insert_line that this is the end
self.insert_line(None, allow_intermediate = False)

if self._block_len > 0:
# We have data pending, so send the final block
self._send_block_final()
elif None not in (self._interval_start, self._interval_end):
# We have no data, but enough information to create an
# empty interval.
self._block_start = self._interval_start
self._send_block(final = True)

def _get_first_noncomment(self, block):
"""Return the (start, end) indices of the first full line in
block that isn't a comment, or raise IndexError if
there isn't one."""
start = 0
while True:
end = block.find('\n', start)
if end < 0:
raise IndexError
if block[start] != '#':
return (start, (end + 1))
start = end + 1

def _get_last_noncomment(self, block):
"""Return the (start, end) indices of the last full line in
block[:length] that isn't a comment, or raise IndexError if
there isn't one."""
end = block.rfind('\n')
if end <= 0:
raise IndexError
while True:
start = block.rfind('\n', 0, end)
if block[start + 1] != '#':
return ((start + 1), end)
if start == -1:
raise IndexError
end = start

def _send_block(self, final = False):
"""Send data currently in the block. The data sent will
consist of full lines only, so some might be left over."""
# Build the full string to send
block = "".join(self._block_data)

start_ts = self._interval_start
if start_ts is None:
# Pull start from the first line
try:
(spos, epos) = self._get_first_noncomment(block)
start_ts = extract_timestamp(block[spos:epos])
except (ValueError, IndexError):
pass # no timestamp is OK, if we have no data

if final:
# For a final block, it must end in a newline, and the
# ending timestamp is either the user-provided end,
# or the timestamp of the last line plus epsilon.
end_ts = self._interval_end
try:
if block[-1] != '\n':
raise ValueError("final block didn't end with a newline")
if end_ts is None:
(spos, epos) = self._get_last_noncomment(block)
end_ts = extract_timestamp(block[spos:epos])
end_ts += self._end_epsilon
except (ValueError, IndexError):
pass # no timestamp is OK, if we have no data
self._block_data = []
self._block_len = 0

# Next block is completely fresh
self._interval_start = None
self._send_block_final()
else:
# No data, and no timestamps to use to create an empty
# interval.
pass

# Make sure both timestamps are emptied for future intervals.
self._interval_start = None
self._interval_end = None

def _send_block_intermediate(self):
"""Send data, when we still have more data to send.
Use the timestamp from the next line, so that the blocks
are contiguous."""
block_end = extract_timestamp(self._line_new)
if self._interval_end is not None and block_end > self._interval_end:
# Something's fishy -- the timestamp we found is after
# the user's specified end. Limit it here, and the
# server will return an error.
block_end = self._interval_end
self._send_block(block_end)

def _send_block_final(self):
"""Send data, when this is the last block for the interval.
There is no next line, so figure out the actual interval end
using interval_end or end_epsilon."""
if self._interval_end is not None:
# Use the user's specified end timestamp
block_end = self._interval_end
# Clear it in case we send more intervals in the future.
self._interval_end = None
else:
# Add an epsilon to the last timestamp we saw
block_end = extract_timestamp(self._line_old) + self._end_epsilon
self._send_block(block_end)

def _send_block(self, block_end):
"""Send current block to the server"""
self.last_response = self._client.stream_insert_block(
self._path, "".join(self._block_data),
self._block_start, block_end)

# Clear out the block
self._block_data = []
self._block_len = 0
self._block_start = None

# Note when we sent it
self._last_time = time.time()
# An intermediate block, e.g. "line1\nline2\nline3\nline4"
# We need to save "line3\nline4" for the next block, and
# use the timestamp from "line3" as the ending timestamp
# for this one.
try:
(spos, epos) = self._get_last_noncomment(block)
end_ts = extract_timestamp(block[spos:epos])
except (ValueError, IndexError):
# If we found no timestamp, give up; we'll send this
# block later when we have more data.
return
if spos == 0:
# Not enough data to send an intermediate block
return
if self._interval_end is not None and end_ts > self._interval_end:
# User gave us bad endpoints; send it anyway, and let
# the server complain so that the error is the same
# as if we hadn't done this chunking.
end_ts = self._interval_end
self._block_data = [ block[spos:] ]
self._block_len = (epos - spos)
block = block[:spos]

# Next block continues where this one ended
self._interval_start = end_ts

# Double check endpoints
if start_ts is None or end_ts is None:
if len(block) == 0:
# No data is OK
return
raise ValueError("have data to send, but no start/end times")

# Send it
params = { "path": self._path,
"start": float_time_to_string(start_ts),
"end": float_time_to_string(end_ts) }
self.last_response = self._http.put("stream/insert", block, params)

+ 7
- 13
nilmdb/cmdline/insert.py View File

@@ -93,15 +93,15 @@ def cmd_insert(self):

if arg.start is None:
try:
arg.start = nilmdb.utils.time.parse_time(filename)
arg.start = nilmdb.utils.time.parse_time(filename).totimestamp()
except ValueError:
self.die("error extracting start time from filename '%s'",
filename)

if arg.timestamp:
ts = timestamper.TimestamperRate(infile, arg.start, arg.rate)
data = timestamper.TimestamperRate(infile, arg.start, arg.rate)
else:
ts = None
data = iter(lambda: infile.read(1048576), '')

# Print info
if not arg.quiet:
@@ -111,17 +111,11 @@ def cmd_insert(self):
if arg.end:
printf(" End time: %s\n",
nilmdb.utils.time.format_time(arg.end))
printf("Timestamper: %s\n", str(ts))
if arg.timestamp:
printf("Timestamper: %s\n", str(data))

if ts:
# Insert the data one line at a time through the timestamper
self.client.stream_insert(arg.path, ts)
else:
# Just insert it as one big block
self.client.stream_insert_large_block(arg.path, infile.read(),
arg.start, arg.end)
#self.client.stream_insert_large_block(arg.path, infile,
# arg.start, arg.end)
# Insert the data
self.client.stream_insert(arg.path, data, arg.start, arg.end)

except nilmdb.client.Error as e:
# TODO: It would be nice to be able to offer better errors


+ 32
- 32
tests/test_client.py View File

@@ -440,55 +440,55 @@ class TestClient(object):
# override _max_data to trigger frequent server updates
ctx._max_data = 15

with assert_raises(ValueError):
ctx.insert_line("100 1")
ctx.insert_line("100 1\n")
ctx.insert_iter([ "101 1\n",
"102 1\n",
"103 1\n" ])
ctx.insert_line("104 1\n")
ctx.insert_line("105 1\n")
ctx.insert("100 1\n")
ctx.insert("101 ")
ctx.insert("1\n102 1")
ctx.insert("")
ctx.insert("\n103 1\n")
ctx.insert("104 1\n")
ctx.insert("105 1\n")
ctx.finalize()

ctx.insert_line("106 1\n")
ctx.insert("106 1\n")
ctx.update_end(106.5)
ctx.finalize()
ctx.update_start(106.8)
ctx.insert_line("107 1\n")
ctx.insert_line("108 1\n")
ctx.insert_line("109 1\n")
ctx.insert_line("110 1\n")
ctx.insert_line("111 1\n")
ctx.insert("107 1\n")
ctx.insert("108 1\n")
ctx.insert("109 1\n")
ctx.insert("110 1\n")
ctx.insert("111 1\n")
ctx.update_end(113)
ctx.insert_line("112 1\n")
ctx.insert("112 1\n")
ctx.update_end(114)
ctx.insert_line("113 1\n")
ctx.insert("113 1\n")
ctx.update_end(115)
ctx.insert_line("114 1\n")
ctx.insert("114 1\n")
ctx.finalize()

with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 100, 200) as ctx:
ctx.insert_line("115 1\n")
ctx.insert("115 1\n")

with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 200, 300) as ctx:
ctx.insert_line("115 1\n")
ctx.insert("115 1\n")

with client.stream_insert_context("/context/test", 200, 300) as ctx:
# make sure our override wasn't permanent
ne_(ctx._max_data, 15)
ctx.insert_line("225 1\n")
ctx.insert("225 1\n")
ctx.finalize()

with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 300, 400) as ctx:
ctx.insert_line("301 1\n")
ctx.insert_line("302 2\n")
ctx.insert_line("303 3\n")
ctx.insert_line("304 4\n")
ctx.insert_line("304 4\n") # non-monotonic after a few lines
ctx.insert("301 1\n")
ctx.insert("302 2\n")
ctx.insert("303 3\n")
ctx.insert("304 4\n")
ctx.insert("304 4\n") # non-monotonic after a few lines
ctx.finalize()

eq_(list(client.stream_intervals("/context/test")),
@@ -519,9 +519,9 @@ class TestClient(object):
# Insert a region with just a few points
with client.stream_insert_context("/empty/test") as ctx:
ctx.update_start(100)
ctx.insert_line("140 1\n")
ctx.insert_line("150 1\n")
ctx.insert_line("160 1\n")
ctx.insert("140 1\n")
ctx.insert("150 1\n")
ctx.insert("160 1\n")
ctx.update_end(200)
ctx.finalize()

@@ -534,7 +534,7 @@ class TestClient(object):

# Try also creating a completely empty interval from scratch,
# in a few different ways.
client.stream_insert_block("/empty/test", "", 300, 350)
client.stream_insert("/empty/test", "", 300, 350)
client.stream_insert("/empty/test", [], 400, 450)
with client.stream_insert_context("/empty/test", 500, 550):
pass
@@ -555,10 +555,10 @@ class TestClient(object):
ctx.finalize() # inserts [1000, 1050]
ctx.finalize() # nothing
ctx.finalize() # nothing
ctx.insert_line("1100 1\n")
ctx.insert("1100 1\n")
ctx.finalize() # inserts [1100, 1100.000001]
ctx.update_start(1199)
ctx.insert_line("1200 1\n")
ctx.insert("1200 1\n")
ctx.update_end(1250)
ctx.finalize() # inserts [1199, 1250]
ctx.update_start(1299)


Loading…
Cancel
Save