Compare commits

...

6 Commits

Author SHA1 Message Date
9082cc9f44 Merging adjacent intervals is working now!
Adjust test expectations accordingly, since the number of intervals
they print out will now be smaller.
2012-12-12 19:25:27 -05:00
bf64a40472 Some misc test additions, interval optimizations. Still need adjacency test 2012-12-11 23:31:55 -05:00
32dbeebc09 More insertion checks. Need to get interval concatenation working. 2012-12-11 18:08:00 -05:00
66ddc79b15 Inserting works again, with proper end/start for paired blocks.
timeit.sh script works too!
2012-12-07 20:30:39 -05:00
7a8bd0bf41 Don't include layout on client side 2012-12-07 16:24:15 -05:00
ee552de740 Start reworking/fixing insert timestamps 2012-12-06 20:25:24 -05:00
11 changed files with 222 additions and 91 deletions

View File

@@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
"""Class for performing HTTP client requests via libcurl""" """Class for performing HTTP client requests via libcurl"""
from __future__ import absolute_import from __future__ import absolute_import
@@ -8,6 +10,7 @@ import sys
import re import re
import os import os
import simplejson as json import simplejson as json
import itertools
import nilmdb.httpclient import nilmdb.httpclient
@@ -16,6 +19,10 @@ from nilmdb.httpclient import ClientError, ServerError, Error
version = "1.0" version = "1.0"
def float_to_string(f):
# Use repr to maintain full precision in the string output.
return repr(float(f))
class Client(object): class Client(object):
"""Main client interface to the Nilm database.""" """Main client interface to the Nilm database."""
@@ -89,33 +96,77 @@ class Client(object):
params = { "path": path } params = { "path": path }
return self.http.get("stream/destroy", params) return self.http.get("stream/destroy", params)
def stream_insert(self, path, data): def stream_insert(self, path, data, start = None, end = None):
"""Insert data into a stream. data should be a file-like object """Insert data into a stream. data should be a file-like object
that provides ASCII data that matches the database layout for path.""" that provides ASCII data that matches the database layout for path.
start and end are the starting and ending timestamp of this
stream; all timestamps t in the data must satisfy 'start <= t
< end'. If left unspecified, 'start' is the timestamp of the
first line of data, and 'end' is the timestamp on the last line
of data, plus a small delta of 1μs.
"""
params = { "path": path } params = { "path": path }
# See design.md for a discussion of how much data to send. # See design.md for a discussion of how much data to send.
# These are soft limits -- actual data might be rounded up. # These are soft limits -- actual data might be rounded up.
max_data = 1048576 max_data = 1048576
max_time = 30 max_time = 30
end_epsilon = 1e-6
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), ..., (sn,None)"
a, b = itertools.tee(iterable)
next(b, None)
return itertools.izip_longest(a, b)
def extract_timestamp(line):
return float(line.split()[0])
def sendit(): def sendit():
result = self.http.put("stream/insert", send_data, params) # If we have more data after this, use the timestamp of
params["old_timestamp"] = result[1] # the next line as the end. Otherwise, use the given
return result # overall end time, or add end_epsilon to the last data
# point.
if nextline:
block_end = extract_timestamp(nextline)
if end and block_end > end:
# This is unexpected, but we'll defer to the server
# to return an error in this case.
block_end = end
elif end:
block_end = end
else:
block_end = extract_timestamp(line) + end_epsilon
# Send it
params["start"] = float_to_string(block_start)
params["end"] = float_to_string(block_end)
return self.http.put("stream/insert", block_data, params)
clock_start = time.time()
block_data = ""
block_start = start
result = None result = None
start = time.time() for (line, nextline) in pairwise(data):
send_data = "" # If we don't have a starting time, extract it from the first line
for line in data: if block_start is None:
elapsed = time.time() - start block_start = extract_timestamp(line)
send_data += line
if (len(send_data) > max_data) or (elapsed > max_time): clock_elapsed = time.time() - clock_start
block_data += line
# If we have enough data, or enough time has elapsed,
# send this block to the server, and empty things out
# for the next block.
if (len(block_data) > max_data) or (clock_elapsed > max_time):
result = sendit() result = sendit()
send_data = "" block_start = None
start = time.time() block_data = ""
if len(send_data): clock_start = time.time()
# One last block?
if len(block_data):
result = sendit() result = sendit()
# Return the most recent JSON result we got back, or None if # Return the most recent JSON result we got back, or None if
@@ -130,9 +181,9 @@ class Client(object):
"path": path "path": path
} }
if start is not None: if start is not None:
params["start"] = repr(start) # use repr to keep precision params["start"] = float_to_string(start)
if end is not None: if end is not None:
params["end"] = repr(end) params["end"] = float_to_string(end)
return self.http.get_gen("stream/intervals", params, retjson = True) return self.http.get_gen("stream/intervals", params, retjson = True)
def stream_extract(self, path, start = None, end = None, count = False): def stream_extract(self, path, start = None, end = None, count = False):
@@ -148,9 +199,9 @@ class Client(object):
"path": path, "path": path,
} }
if start is not None: if start is not None:
params["start"] = repr(start) # use repr to keep precision params["start"] = float_to_string(start)
if end is not None: if end is not None:
params["end"] = repr(end) params["end"] = float_to_string(end)
if count: if count:
params["count"] = 1 params["count"] = 1

View File

@@ -1,7 +1,6 @@
from __future__ import absolute_import from __future__ import absolute_import
from nilmdb.printf import * from nilmdb.printf import *
import nilmdb.client import nilmdb.client
import nilmdb.layout
import sys import sys
def setup(self, sub): def setup(self, sub):

View File

@@ -1,7 +1,6 @@
from __future__ import absolute_import from __future__ import absolute_import
from nilmdb.printf import * from nilmdb.printf import *
import nilmdb.client import nilmdb.client
import nilmdb.layout
import nilmdb.timestamper import nilmdb.timestamper
import sys import sys

View File

@@ -177,8 +177,8 @@ cdef class IntervalSet:
else: else:
return False return False
this = [ x for x in self ] this = list(self)
that = [ x for x in other ] that = list(other)
try: try:
while True: while True:
@@ -236,6 +236,12 @@ cdef class IntervalSet:
self.__iadd__(x) self.__iadd__(x)
return self return self
def iadd_nocheck(self, Interval other not None):
"""Inplace add -- modifies self.
'Optimized' version that doesn't check for intersection and
only inserts the new interval into the tree."""
self.tree.insert(rbtree.RBNode(other.start, other.end, other))
def __isub__(self, Interval other not None): def __isub__(self, Interval other not None):
"""Inplace subtract -- modifies self """Inplace subtract -- modifies self
@@ -300,3 +306,13 @@ cdef class IntervalSet:
if n.obj.intersects(other): if n.obj.intersects(other):
return True return True
return False return False
def find_end(self, double t):
"""
Return an Interval from this tree that ends at time t, or
None if it doesn't exist.
"""
n = self.tree.find_left_end(t)
if n and n.obj.end == t:
return n.obj
return None

View File

@@ -192,36 +192,58 @@ class NilmDB(object):
# Return cached value # Return cached value
return self._cached_iset[stream_id] return self._cached_iset[stream_id]
# TODO: Split add_interval into two pieces, one to add
# and one to flush to disk?
# Need to think about this. Basic problem is that we can't
# mess with intervals once they're in the IntervalSet,
# without mucking with bxinterval internals.
# Maybe add a separate optimization step?
# Join intervals that have a fairly small gap between them
def _add_interval(self, stream_id, interval, start_pos, end_pos): def _add_interval(self, stream_id, interval, start_pos, end_pos):
""" """
Add interval to the internal interval cache, and to the database. Add interval to the internal interval cache, and to the database.
Note: arguments must be ints (not numpy.int64, etc) Note: arguments must be ints (not numpy.int64, etc)
""" """
# Ensure this stream's intervals are cached, and add the new # Ensure this stream's intervals are cached
# interval to that cache.
iset = self._get_intervals(stream_id) iset = self._get_intervals(stream_id)
try:
iset += DBInterval(interval.start, interval.end, # Check for overlap
interval.start, interval.end, if iset.intersects(interval): # pragma: no cover (gets caught earlier)
start_pos, end_pos)
except IntervalError as e: # pragma: no cover
raise NilmDBError("new interval overlaps existing data") raise NilmDBError("new interval overlaps existing data")
# Check for adjacency. If there's a stream in the database
# that ends exactly when this one starts, and the database
# rows match up, we can make one interval that covers the
# time range [adjacent.start -> interval.end)
# and database rows [ adjacent.start_pos -> end_pos ].
# Only do this if the resulting interval isn't too large.
max_merged_rows = 30000000 # a bit more than 1 hour at 8 KHz
adjacent = iset.find_end(interval.start)
if (adjacent is not None and
start_pos == adjacent.db_endpos and
(end_pos - adjacent.db_startpos) < max_merged_rows):
# First delete the old one, both from our cache and the
# database
iset -= adjacent
self.con.execute("DELETE FROM ranges WHERE "
"stream_id=? AND start_time=? AND "
"end_time=? AND start_pos=? AND "
"end_pos=?", (stream_id,
adjacent.db_start,
adjacent.db_end,
adjacent.db_startpos,
adjacent.db_endpos))
# Now update our interval so the fallthrough add is
# correct.
interval.start = adjacent.start
start_pos = adjacent.db_startpos
# Add the new interval to the cache
iset.iadd_nocheck(DBInterval(interval.start, interval.end,
interval.start, interval.end,
start_pos, end_pos))
# Insert into the database # Insert into the database
self.con.execute("INSERT INTO ranges " self.con.execute("INSERT INTO ranges "
"(stream_id,start_time,end_time,start_pos,end_pos) " "(stream_id,start_time,end_time,start_pos,end_pos) "
"VALUES (?,?,?,?,?)", "VALUES (?,?,?,?,?)",
(stream_id, interval.start, interval.end, (stream_id, interval.start, interval.end,
int(start_pos), int(end_pos))) int(start_pos), int(end_pos)))
self.con.commit() self.con.commit()
def stream_list(self, path = None, layout = None): def stream_list(self, path = None, layout = None):
@@ -383,30 +405,18 @@ class NilmDB(object):
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,)) con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
con.execute("DELETE FROM streams WHERE id=?", (stream_id,)) con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
def stream_insert(self, path, parser, old_timestamp = None): def stream_insert(self, path, start, end, data):
"""Insert new data into the database. """Insert new data into the database.
path: Path at which to add the data path: Path at which to add the data
parser: nilmdb.layout.Parser instance full of data to insert start: Starting timestamp
end: Ending timestamp
data: Rows of data, to be passed to PyTable's table.append
method. E.g. nilmdb.layout.Parser.data
""" """
if (not parser.min_timestamp or not parser.max_timestamp or
not len(parser.data)):
raise StreamError("no data provided")
# If we were provided with an old timestamp, the expectation
# is that the client has a contiguous block of time it is sending,
# but it's doing it over multiple calls to stream_insert.
# old_timestamp is the max_timestamp of the previous insert.
# To make things continuous, use that as our starting timestamp
# instead of what the parser found.
if old_timestamp:
min_timestamp = old_timestamp
else:
min_timestamp = parser.min_timestamp
# First check for basic overlap using timestamp info given. # First check for basic overlap using timestamp info given.
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
iset = self._get_intervals(stream_id) iset = self._get_intervals(stream_id)
interval = Interval(min_timestamp, parser.max_timestamp) interval = Interval(start, end)
if iset.intersects(interval): if iset.intersects(interval):
raise OverlapError("new data overlaps existing data at range: " raise OverlapError("new data overlaps existing data at range: "
+ str(iset & interval)) + str(iset & interval))
@@ -414,7 +424,7 @@ class NilmDB(object):
# Insert the data into pytables # Insert the data into pytables
table = self.h5file.getNode(path) table = self.h5file.getNode(path)
row_start = table.nrows row_start = table.nrows
table.append(parser.data) table.append(data)
row_end = table.nrows row_end = table.nrows
table.flush() table.flush()

View File

@@ -156,19 +156,11 @@ class Stream(NilmApp):
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
#@cherrypy.tools.disable_prb() #@cherrypy.tools.disable_prb()
def insert(self, path, old_timestamp = None): def insert(self, path, start, end):
""" """
Insert new data into the database. Provide textual data Insert new data into the database. Provide textual data
(matching the path's layout) as a HTTP PUT. (matching the path's layout) as a HTTP PUT.
old_timestamp is used when making multiple, split-up insertions
for a larger contiguous block of data. The first insert
will return the maximum timestamp that it saw, and the second
insert should provide this timestamp as an argument. This is
used to extend the previous database interval rather than
start a new one.
""" """
# Important that we always read the input before throwing any # Important that we always read the input before throwing any
# errors, to keep lengths happy for persistent connections. # errors, to keep lengths happy for persistent connections.
# However, CherryPy 3.2.2 has a bug where this fails for GET # However, CherryPy 3.2.2 has a bug where this fails for GET
@@ -193,18 +185,31 @@ class Stream(NilmApp):
"Error parsing input data: " + "Error parsing input data: " +
e.message) e.message)
if (not parser.min_timestamp or not parser.max_timestamp or
not len(parser.data)):
raise cherrypy.HTTPError("400 Bad Request",
"no data provided")
# Check limits
start = float(start)
end = float(end)
if parser.min_timestamp < start:
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
repr(parser.min_timestamp) +
" < start time " + repr(start))
if parser.max_timestamp >= end:
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
repr(parser.max_timestamp) +
" >= end time " + repr(end))
# Now do the nilmdb insert, passing it the parser full of data. # Now do the nilmdb insert, passing it the parser full of data.
try: try:
if old_timestamp: result = self.db.stream_insert(path, start, end, parser.data)
old_timestamp = float(old_timestamp)
result = self.db.stream_insert(path, parser, old_timestamp)
except nilmdb.nilmdb.NilmDBError as e: except nilmdb.nilmdb.NilmDBError as e:
raise cherrypy.HTTPError("400 Bad Request", e.message) raise cherrypy.HTTPError("400 Bad Request", e.message)
# Return the maximum timestamp that we saw. The client will # Done
# return this back to us as the old_timestamp parameter, if return "ok"
# it has more data to send.
return ("ok", parser.max_timestamp)
# /stream/intervals?path=/newton/prep # /stream/intervals?path=/newton/prep
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0 # /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0

View File

@@ -131,6 +131,7 @@ class TestClient(object):
testfile = "tests/data/prep-20120323T1000" testfile = "tests/data/prep-20120323T1000"
start = datetime_tz.datetime_tz.smartparse("20120323T1000") start = datetime_tz.datetime_tz.smartparse("20120323T1000")
start = start.totimestamp()
rate = 120 rate = 120
# First try a nonexistent path # First try a nonexistent path
@@ -155,14 +156,41 @@ class TestClient(object):
# Try forcing a server request with empty data # Try forcing a server request with empty data
with assert_raises(ClientError) as e: with assert_raises(ClientError) as e:
client.http.put("stream/insert", "", { "path": "/newton/prep" }) client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": 0, "end": 0 })
in_("400 Bad Request", str(e.exception)) in_("400 Bad Request", str(e.exception))
in_("no data provided", str(e.exception)) in_("no data provided", str(e.exception))
# Specify start/end (starts too late)
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120)
with assert_raises(ClientError) as e:
result = client.stream_insert("/newton/prep", data,
start + 5, start + 120)
in_("400 Bad Request", str(e.exception))
in_("Data timestamp 1332511200.0 < start time 1332511205.0",
str(e.exception))
# Specify start/end (ends too early)
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120)
with assert_raises(ClientError) as e:
result = client.stream_insert("/newton/prep", data,
start, start + 1)
in_("400 Bad Request", str(e.exception))
# Client chunks the input, so the exact timestamp here might change
# if the chunk positions change.
in_("Data timestamp 1332511271.016667 >= end time 1332511201.0",
str(e.exception))
# Now do the real load # Now do the real load
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120) data = nilmdb.timestamper.TimestamperRate(testfile, start, 120)
result = client.stream_insert("/newton/prep", data) result = client.stream_insert("/newton/prep", data,
eq_(result[0], "ok") start, start + 119.999777)
eq_(result, "ok")
# Verify the intervals. Should be just one, even if the data
# was inserted in chunks, due to nilmdb interval concatenation.
intervals = list(client.stream_intervals("/newton/prep"))
eq_(intervals, [[start, start + 119.999777]])
# Try some overlapping data -- just insert it again # Try some overlapping data -- just insert it again
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120) data = nilmdb.timestamper.TimestamperRate(testfile, start, 120)
@@ -215,7 +243,8 @@ class TestClient(object):
# Check PUT with generator out # Check PUT with generator out
with assert_raises(ClientError) as e: with assert_raises(ClientError) as e:
client.http.put_gen("stream/insert", "", client.http.put_gen("stream/insert", "",
{ "path": "/newton/prep" }).next() { "path": "/newton/prep",
"start": 0, "end": 0 }).next()
in_("400 Bad Request", str(e.exception)) in_("400 Bad Request", str(e.exception))
in_("no data provided", str(e.exception)) in_("no data provided", str(e.exception))
@@ -238,7 +267,7 @@ class TestClient(object):
# still disable chunked responses for debugging. # still disable chunked responses for debugging.
x = client.http.get("stream/intervals", { "path": "/newton/prep" }, x = client.http.get("stream/intervals", { "path": "/newton/prep" },
retjson=False) retjson=False)
eq_(x.count('\n'), 2) lines_(x, 1)
if "transfer-encoding: chunked" not in client.http._headers.lower(): if "transfer-encoding: chunked" not in client.http._headers.lower():
warnings.warn("Non-chunked HTTP response for /stream/intervals") warnings.warn("Non-chunked HTTP response for /stream/intervals")

View File

@@ -368,36 +368,36 @@ class TestCmdline(object):
def test_cmdline_07_detail(self): def test_cmdline_07_detail(self):
# Just count the number of lines, it's probably fine # Just count the number of lines, it's probably fine
self.ok("list --detail") self.ok("list --detail")
eq_(self.captured.count('\n'), 11) lines_(self.captured, 8)
self.ok("list --detail --path *prep") self.ok("list --detail --path *prep")
eq_(self.captured.count('\n'), 7) lines_(self.captured, 4)
self.ok("list --detail --path *prep --start='23 Mar 2012 10:02'") self.ok("list --detail --path *prep --start='23 Mar 2012 10:02'")
eq_(self.captured.count('\n'), 5) lines_(self.captured, 3)
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05'") self.ok("list --detail --path *prep --start='23 Mar 2012 10:05'")
eq_(self.captured.count('\n'), 3) lines_(self.captured, 2)
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15'") self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15'")
eq_(self.captured.count('\n'), 2) lines_(self.captured, 2)
self.contain("10:05:15.000") self.contain("10:05:15.000")
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'") self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'")
eq_(self.captured.count('\n'), 2) lines_(self.captured, 2)
self.contain("10:05:15.500") self.contain("10:05:15.500")
self.ok("list --detail --path *prep --start='23 Mar 2012 19:05:15.50'") self.ok("list --detail --path *prep --start='23 Mar 2012 19:05:15.50'")
eq_(self.captured.count('\n'), 2) lines_(self.captured, 2)
self.contain("no intervals") self.contain("no intervals")
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'" self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'"
+ " --end='23 Mar 2012 10:05:15.50'") + " --end='23 Mar 2012 10:05:15.50'")
eq_(self.captured.count('\n'), 2) lines_(self.captured, 2)
self.contain("10:05:15.500") self.contain("10:05:15.500")
self.ok("list --detail") self.ok("list --detail")
eq_(self.captured.count('\n'), 11) lines_(self.captured, 8)
def test_cmdline_08_extract(self): def test_cmdline_08_extract(self):
# nonexistent stream # nonexistent stream
@@ -450,7 +450,7 @@ class TestCmdline(object):
# all data put in by tests # all data put in by tests
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01") self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
eq_(self.captured.count('\n'), 43204) lines_(self.captured, 43204)
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01") self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("43200\n") self.match("43200\n")
@@ -459,7 +459,7 @@ class TestCmdline(object):
server_stop() server_stop()
server_start(max_results = 2) server_start(max_results = 2)
self.ok("list --detail") self.ok("list --detail")
eq_(self.captured.count('\n'), 11) lines_(self.captured, 8)
server_stop() server_stop()
server_start() server_start()
@@ -484,7 +484,7 @@ class TestCmdline(object):
# Notice how they're not empty # Notice how they're not empty
self.ok("list --detail") self.ok("list --detail")
eq_(self.captured.count('\n'), 11) lines_(self.captured, 8)
# Delete some # Delete some
self.ok("destroy /newton/prep") self.ok("destroy /newton/prep")

View File

@@ -20,6 +20,12 @@ def ne_(a, b):
if not a != b: if not a != b:
raise AssertionError("unexpected %s == %s" % (myrepr(a), myrepr(b))) raise AssertionError("unexpected %s == %s" % (myrepr(a), myrepr(b)))
def lines_(a, n):
l = a.count('\n')
if not l == n:
raise AssertionError("wanted %d lines, got %d in output: '%s'"
% (n, l, a))
def recursive_unlink(path): def recursive_unlink(path):
try: try:
shutil.rmtree(path) shutil.rmtree(path)

View File

@@ -137,6 +137,15 @@ class TestInterval:
x = iseta != 3 x = iseta != 3
ne_(IntervalSet(a), IntervalSet(b)) ne_(IntervalSet(a), IntervalSet(b))
# Note that assignment makes a new reference (not a copy)
isetd = IntervalSet(isetb)
isete = isetd
eq_(isetd, isetb)
eq_(isetd, isete)
isetd -= a
ne_(isetd, isetb)
eq_(isetd, isete)
# test iterator # test iterator
for interval in iseta: for interval in iseta:
pass pass
@@ -158,11 +167,18 @@ class TestInterval:
iset = IntervalSet(a) iset = IntervalSet(a)
iset += IntervalSet(b) iset += IntervalSet(b)
eq_(iset, IntervalSet([a, b])) eq_(iset, IntervalSet([a, b]))
iset = IntervalSet(a) iset = IntervalSet(a)
iset += b iset += b
eq_(iset, IntervalSet([a, b])) eq_(iset, IntervalSet([a, b]))
iset = IntervalSet(a)
iset.iadd_nocheck(b)
eq_(iset, IntervalSet([a, b]))
iset = IntervalSet(a) + IntervalSet(b) iset = IntervalSet(a) + IntervalSet(b)
eq_(iset, IntervalSet([a, b])) eq_(iset, IntervalSet([a, b]))
iset = IntervalSet(b) + a iset = IntervalSet(b) + a
eq_(iset, IntervalSet([a, b])) eq_(iset, IntervalSet([a, b]))

View File

@@ -196,6 +196,6 @@ class TestServer(object):
# GET instead of POST (no body) # GET instead of POST (no body)
# (actual POST test is done by client code) # (actual POST test is done by client code)
with assert_raises(HTTPError) as e: with assert_raises(HTTPError) as e:
getjson("/stream/insert?path=/newton/prep") getjson("/stream/insert?path=/newton/prep&start=0&end=0")
eq_(e.exception.code, 400) eq_(e.exception.code, 400)