timeit.sh script works too!tags/insert-rework-wip
@@ -1,3 +1,5 @@ | |||
# -*- coding: utf-8 -*- | |||
"""Class for performing HTTP client requests via libcurl""" | |||
from __future__ import absolute_import | |||
@@ -8,6 +10,7 @@ import sys | |||
import re | |||
import os | |||
import simplejson as json | |||
import itertools | |||
import nilmdb.httpclient | |||
@@ -89,33 +92,73 @@ class Client(object): | |||
params = { "path": path } | |||
return self.http.get("stream/destroy", params) | |||
def stream_insert(self, path, data): | |||
def stream_insert(self, path, data, start = None, end = None): | |||
"""Insert data into a stream. data should be a file-like object | |||
that provides ASCII data that matches the database layout for path.""" | |||
that provides ASCII data that matches the database layout for path. | |||
start and end are the starting and ending timestamp of this | |||
stream; all timestamps t in the data must satisfy 'start <= t | |||
< end'. If left unspecified, 'start' is the timestamp of the | |||
first line of data, and 'end' is the timestamp on the last line | |||
of data, plus a small delta of 1μs. | |||
""" | |||
params = { "path": path } | |||
# See design.md for a discussion of how much data to send. | |||
# These are soft limits -- actual data might be rounded up. | |||
max_data = 1048576 | |||
max_time = 30 | |||
end_epsilon = 1e-6 | |||
def sendit(): | |||
result = self.http.put("stream/insert", send_data, params) | |||
params["old_timestamp"] = result[1] | |||
return result | |||
def pairwise(iterable): | |||
"s -> (s0,s1), (s1,s2), ..., (sn,None)" | |||
a, b = itertools.tee(iterable) | |||
next(b, None) | |||
return itertools.izip_longest(a, b) | |||
result = None | |||
start = time.time() | |||
send_data = "" | |||
for line in data: | |||
elapsed = time.time() - start | |||
send_data += line | |||
def extract_timestamp(line): | |||
return float(line.split()[0]) | |||
if (len(send_data) > max_data) or (elapsed > max_time): | |||
def sendit(): | |||
# If we have more data after this, use the timestamp of | |||
# the next line as the end. Otherwise, use the given | |||
# overall end time, or add end_epsilon to the last data | |||
# point. | |||
if nextline: | |||
block_end = extract_timestamp(nextline) | |||
elif end: | |||
block_end = end | |||
else: | |||
block_end = extract_timestamp(line) + end_epsilon | |||
# Send it | |||
params["start"] = repr(block_start) # use repr to keep precision | |||
params["end"] = repr(block_end) | |||
return self.http.put("stream/insert", block_data, params) | |||
clock_start = time.time() | |||
block_data = "" | |||
block_start = start | |||
result = None | |||
for (line, nextline) in pairwise(data): | |||
# If we don't have a starting time, extract it from the first line | |||
if block_start is None: | |||
block_start = extract_timestamp(line) | |||
clock_elapsed = time.time() - clock_start | |||
block_data += line | |||
# If we have enough data, or enough time has elapsed, | |||
# send this block to the server, and empty things out | |||
# for the next block. | |||
if (len(block_data) > max_data) or (clock_elapsed > max_time): | |||
result = sendit() | |||
send_data = "" | |||
start = time.time() | |||
if len(send_data): | |||
block_start = None | |||
block_data = "" | |||
clock_start = time.time() | |||
# One last block? | |||
if len(block_data): | |||
result = sendit() | |||
# Return the most recent JSON result we got back, or None if | |||
@@ -161,7 +161,6 @@ class Stream(NilmApp): | |||
Insert new data into the database. Provide textual data | |||
(matching the path's layout) as a HTTP PUT. | |||
""" | |||
# Important that we always read the input before throwing any | |||
# errors, to keep lengths happy for persistent connections. | |||
# However, CherryPy 3.2.2 has a bug where this fails for GET | |||
@@ -186,12 +185,26 @@ class Stream(NilmApp): | |||
"Error parsing input data: " + | |||
e.message) | |||
if (not parser.min_timestamp or not parser.max_timestamp or | |||
not len(parser.data)): | |||
raise cherrypy.HTTPError("400 Bad Request", | |||
"no data provided") | |||
# Check limits | |||
start = float(start) | |||
end = float(end) | |||
if parser.min_timestamp < start: | |||
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " + | |||
repr(parser.min_timestamp) + | |||
" < start time " + repr(start)) | |||
if parser.max_timestamp >= end: | |||
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " + | |||
repr(parser.max_timestamp) + | |||
" >= end time " + repr(end)) | |||
# Now do the nilmdb insert, passing it the parser full of data. | |||
try: | |||
result = self.db.stream_insert(path, | |||
parser.min_timestamp, | |||
parser.max_timestamp, | |||
parser.data) | |||
result = self.db.stream_insert(path, start, end, parser.data) | |||
except nilmdb.nilmdb.NilmDBError as e: | |||
raise cherrypy.HTTPError("400 Bad Request", e.message) | |||
@@ -2,7 +2,7 @@ | |||
# note: the value doesn't matter, that's why they're empty here | |||
nocapture= | |||
nologcapture= # comment to see cherrypy logs on failure | |||
with-coverage= | |||
#with-coverage= | |||
cover-inclusive= | |||
cover-package=nilmdb | |||
cover-erase= | |||
@@ -16,7 +16,7 @@ verbosity=2 | |||
#tests=tests/test_interval.py | |||
#tests=tests/test_rbtree.py,tests/test_interval.py | |||
#tests=tests/test_interval.py | |||
#tests=tests/test_client.py | |||
tests=tests/test_client.py | |||
#tests=tests/test_timestamper.py | |||
#tests=tests/test_serializer.py | |||
#tests=tests/test_iteratorizer.py | |||
@@ -155,14 +155,15 @@ class TestClient(object): | |||
# Try forcing a server request with empty data | |||
with assert_raises(ClientError) as e: | |||
client.http.put("stream/insert", "", { "path": "/newton/prep" }) | |||
client.http.put("stream/insert", "", { "path": "/newton/prep", | |||
"start": 0, "end": 0 }) | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("no data provided", str(e.exception)) | |||
# Now do the real load | |||
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120) | |||
result = client.stream_insert("/newton/prep", data) | |||
eq_(result[0], "ok") | |||
eq_(result, "ok") | |||
# Try some overlapping data -- just insert it again | |||
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120) | |||
@@ -215,7 +216,8 @@ class TestClient(object): | |||
# Check PUT with generator out | |||
with assert_raises(ClientError) as e: | |||
client.http.put_gen("stream/insert", "", | |||
{ "path": "/newton/prep" }).next() | |||
{ "path": "/newton/prep", | |||
"start": 0, "end": 0 }).next() | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("no data provided", str(e.exception)) | |||
@@ -196,6 +196,6 @@ class TestServer(object): | |||
# GET instead of POST (no body) | |||
# (actual POST test is done by client code) | |||
with assert_raises(HTTPError) as e: | |||
getjson("/stream/insert?path=/newton/prep") | |||
getjson("/stream/insert?path=/newton/prep&start=0&end=0") | |||
eq_(e.exception.code, 400) | |||