Client cleanups; fix tests to account for time epsilon = 1
This commit is contained in:
parent
7576883f49
commit
d21c3470bc
|
@ -127,11 +127,11 @@ class Client(object):
|
|||
@contextlib.contextmanager
|
||||
def stream_insert_context(self, path, start = None, end = None):
|
||||
"""Return a context manager that allows data to be efficiently
|
||||
inserted into a stream in a piecewise manner. Data is be
|
||||
inserted into a stream in a piecewise manner. Data is
|
||||
provided as ASCII lines, and is aggregated and sent to the
|
||||
server in larger chunks as necessary. Data lines must match
|
||||
the database layout for the given path, and end with a
|
||||
newline.
|
||||
server in larger or smaller chunks as necessary. Data lines
|
||||
must match the database layout for the given path, and end
|
||||
with a newline.
|
||||
|
||||
Example:
|
||||
with client.stream_insert_context('/path', start, end) as ctx:
|
||||
|
@ -152,7 +152,7 @@ class Client(object):
|
|||
or iterable that provides ASCII data that matches the database
|
||||
layout for path. Data is passed through stream_insert_context,
|
||||
so it will be broken into reasonably-sized chunks and
|
||||
timestamps will be deduced if missing."""
|
||||
start/end will be deduced if missing."""
|
||||
with self.stream_insert_context(path, start, end) as ctx:
|
||||
if isinstance(data, basestring):
|
||||
ctx.insert(data)
|
||||
|
@ -276,10 +276,10 @@ class StreamInserter(object):
|
|||
_max_data = 2 * 1024 * 1024
|
||||
_max_data_after_send = 64 * 1024
|
||||
|
||||
def __init__(self, client, path, start = None, end = None):
|
||||
def __init__(self, client, path, start, end):
|
||||
"""'client' is the client object. 'path' is the database
|
||||
path to insert to. 'start' and 'end' are used for the first
|
||||
contiguous interval."""
|
||||
contiguous interval and may be None."""
|
||||
self.last_response = None
|
||||
|
||||
self._client = client
|
||||
|
|
|
@ -486,72 +486,75 @@ class TestClient(object):
|
|||
# override _max_data to trigger frequent server updates
|
||||
ctx._max_data = 15
|
||||
|
||||
ctx.insert("100 1\n")
|
||||
ctx.insert("1000 1\n")
|
||||
|
||||
ctx.insert("101 ")
|
||||
ctx.insert("1\n102 1")
|
||||
ctx.insert("1010 ")
|
||||
ctx.insert("1\n1020 1")
|
||||
ctx.insert("")
|
||||
ctx.insert("\n103 1\n")
|
||||
ctx.insert("\n1030 1\n")
|
||||
|
||||
ctx.insert("104 1\n")
|
||||
ctx.insert("1040 1\n")
|
||||
ctx.insert("# hello\n")
|
||||
ctx.insert(" # hello\n")
|
||||
ctx.insert(" 105 1\n")
|
||||
ctx.insert(" 1050 1\n")
|
||||
ctx.finalize()
|
||||
|
||||
ctx.insert("107 1\n")
|
||||
ctx.update_end(108)
|
||||
ctx.insert("1070 1\n")
|
||||
ctx.update_end(1080)
|
||||
ctx.finalize()
|
||||
ctx.update_start(109)
|
||||
ctx.insert("110 1\n")
|
||||
ctx.insert("111 1\n")
|
||||
ctx.update_start(1090)
|
||||
ctx.insert("1100 1\n")
|
||||
ctx.insert("1110 1\n")
|
||||
ctx.send()
|
||||
ctx.insert("112 1\n")
|
||||
ctx.insert("113 1\n")
|
||||
ctx.insert("114 1\n")
|
||||
ctx.update_end(116)
|
||||
ctx.insert("115 1\n")
|
||||
ctx.update_end(117)
|
||||
ctx.insert("116 1\n")
|
||||
ctx.update_end(118)
|
||||
ctx.insert("117 1" +
|
||||
ctx.insert("1120 1\n")
|
||||
ctx.insert("1130 1\n")
|
||||
ctx.insert("1140 1\n")
|
||||
ctx.update_end(1160)
|
||||
ctx.insert("1150 1\n")
|
||||
ctx.update_end(1170)
|
||||
ctx.insert("1160 1\n")
|
||||
ctx.update_end(1180)
|
||||
ctx.insert("1170 1" +
|
||||
" # this is super long" * 100 +
|
||||
"\n")
|
||||
ctx.finalize()
|
||||
ctx.insert("# this is super long" * 100)
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_context("/context/test", 100, 200) as ctx:
|
||||
ctx.insert("118 1\n")
|
||||
with client.stream_insert_context("/context/test",
|
||||
1000, 2000) as ctx:
|
||||
ctx.insert("1180 1\n")
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_context("/context/test", 200, 300) as ctx:
|
||||
ctx.insert("118 1\n")
|
||||
with client.stream_insert_context("/context/test",
|
||||
2000, 3000) as ctx:
|
||||
ctx.insert("1180 1\n")
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_context("/context/test") as ctx:
|
||||
ctx.insert("bogus data\n")
|
||||
|
||||
with client.stream_insert_context("/context/test", 200, 300) as ctx:
|
||||
with client.stream_insert_context("/context/test", 2000, 3000) as ctx:
|
||||
# make sure our override wasn't permanent
|
||||
ne_(ctx._max_data, 15)
|
||||
ctx.insert("225 1\n")
|
||||
ctx.insert("2250 1\n")
|
||||
ctx.finalize()
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_context("/context/test", 300, 400) as ctx:
|
||||
ctx.insert("301 1\n")
|
||||
ctx.insert("302 2\n")
|
||||
ctx.insert("303 3\n")
|
||||
ctx.insert("304 4\n")
|
||||
ctx.insert("304 4\n") # non-monotonic after a few lines
|
||||
with client.stream_insert_context("/context/test",
|
||||
3000, 4000) as ctx:
|
||||
ctx.insert("3010 1\n")
|
||||
ctx.insert("3020 2\n")
|
||||
ctx.insert("3030 3\n")
|
||||
ctx.insert("3040 4\n")
|
||||
ctx.insert("3040 4\n") # non-monotonic after a few lines
|
||||
ctx.finalize()
|
||||
|
||||
eq_(list(client.stream_intervals("/context/test")),
|
||||
[ [ 100, 106 ],
|
||||
[ 107, 108 ],
|
||||
[ 109, 118 ],
|
||||
[ 200, 300 ] ])
|
||||
[ [ 1000, 1051 ],
|
||||
[ 1070, 1080 ],
|
||||
[ 1090, 1180 ],
|
||||
[ 2000, 3000 ] ])
|
||||
|
||||
# destroy stream (try without removing data first)
|
||||
with assert_raises(ClientError):
|
||||
|
|
Loading…
Reference in New Issue
Block a user