The server buffers the string and passes it to nilmdb. Nilmdb passes the string to bulkdata. Bulkdata uses the rocket interface to parse it in chunks, as necessary. Everything gets passed back up and everyone is happy. Currently, only pyrocket implements append_string.tags/nilmdb-1.3.0
@@ -6,6 +6,7 @@ from __future__ import absolute_import | |||
from __future__ import division | |||
import nilmdb | |||
from nilmdb.utils.printf import * | |||
from nilmdb.utils.time import float_time_to_string as ftts | |||
import os | |||
import cPickle as pickle | |||
@@ -338,6 +339,65 @@ class Table(object): | |||
remaining -= count | |||
self.nrows += count | |||
def append_string(self, data, start, end): | |||
"""Parse the formatted string in 'data', according to the | |||
current layout, and append it to the table. If any timestamps | |||
are non-monotonic, or don't fall between 'start' and 'end', | |||
a ValueError is raised. | |||
If this function succeeds, it returns normally. Otherwise, | |||
the table is reverted back to its original state by truncating | |||
or deleting files as necessary.""" | |||
data_offset = 0 | |||
last_timestamp = -1e12 | |||
tot_rows = self.nrows | |||
count = 0 | |||
linenum = 0 | |||
try: | |||
while data_offset < len(data): | |||
# See how many rows we can fit into the current file, | |||
# and open it | |||
(subdir, fname, offset, count) = self._offset_from_row(tot_rows) | |||
f = self.file_open(subdir, fname) | |||
# Ask the rocket object to parse and append up to "count" | |||
# rows of data, verifying things along the way. | |||
try: | |||
(added_rows, data_offset, last_timestamp, linenum | |||
) = f.append_string(count, data, data_offset, linenum, | |||
start, end, last_timestamp) | |||
except rocket.ParseError as e: | |||
(linenum, errtype, obj) = e.message | |||
if errtype == rocket.ERR_NON_MONOTONIC: | |||
err = sprintf("line %d: timestamp is not monotonically " | |||
"increasing", linenum) | |||
elif errtype == rocket.ERR_OUT_OF_INTERVAL: | |||
if obj < start: | |||
err = sprintf("line %d: Data timestamp %s < " | |||
"start time %s", linenum, | |||
ftts(obj), ftts(start)) | |||
else: | |||
err = sprintf("line %d: Data timestamp %s >= " | |||
"end time %s", linenum, | |||
ftts(obj), ftts(end)) | |||
else: | |||
err = str(e) | |||
raise ValueError("error parsing input data: " + err) | |||
tot_rows += added_rows | |||
except Exception: | |||
# Some failure, so try to roll things back by truncating or | |||
# deleting files that we may have appended data to. | |||
cleanpos = self.nrows | |||
while cleanpos <= tot_rows: | |||
(subdir, fname, offset, count) = self._offset_from_row(cleanpos) | |||
self._remove_or_truncate_file(subdir, fname, offset) | |||
cleanpos += count | |||
# Re-raise original exception | |||
raise | |||
else: | |||
# Success, so update self.nrows accordingly | |||
self.nrows = tot_rows | |||
def _get_data(self, start, stop, as_string): | |||
"""Extract data corresponding to Python range [n:m], | |||
and returns a numeric list or formatted string, | |||
@@ -412,8 +412,7 @@ class NilmDB(object): | |||
path: Path at which to add the data | |||
start: Starting timestamp | |||
end: Ending timestamp | |||
data: Rows of data, to be passed to bulkdata table.append | |||
method. E.g. nilmdb.layout.Parser.data | |||
data: Textual data, formatted according to the layout of path | |||
""" | |||
# First check for basic overlap using timestamp info given. | |||
stream_id = self._stream_id(path) | |||
@@ -423,10 +422,11 @@ class NilmDB(object): | |||
raise OverlapError("new data overlaps existing data at range: " | |||
+ str(iset & interval)) | |||
# Insert the data | |||
# Tenatively append the data. This will raise a ValueError if | |||
# there are any parse errors. | |||
table = self.data.getnode(path) | |||
row_start = table.nrows | |||
table.append(data) | |||
table.append_string(data, start, end) | |||
row_end = table.nrows | |||
# Insert the record into the sql database. | |||
@@ -7,7 +7,16 @@ | |||
from __future__ import absolute_import | |||
import nilmdb | |||
import struct | |||
import cStringIO | |||
import itertools | |||
from . import layout as _layout | |||
from nilmdb.utils.time import float_time_to_string as ftts | |||
ERR_UNKNOWN = 0 | |||
ERR_NON_MONOTONIC = 1 | |||
ERR_OUT_OF_INTERVAL = 2 | |||
class ParseError(Exception): | |||
pass | |||
@nilmdb.utils.must_close(wrap_verify = False) | |||
class Rocket(object): | |||
@@ -47,6 +56,7 @@ class Rocket(object): | |||
self.packer = struct.Struct(struct_fmt) | |||
# For packing/unpacking from strings. | |||
self.layoutparser = _layout.Layout(self.layout) | |||
self.formatter = _layout.Formatter(self.layout) | |||
def close(self): | |||
@@ -63,18 +73,55 @@ class Rocket(object): | |||
# We assume the file is opened in append mode, | |||
# so all writes go to the end. | |||
written = 0 | |||
while True: | |||
if written >= maxrows: | |||
break | |||
try: | |||
row = data.next() | |||
except StopIteration: | |||
break | |||
for row in itertools.islice(data, maxrows): | |||
self.file.write(self.packer.pack(*row)) | |||
written += 1 | |||
self.file.flush() | |||
return written | |||
def append_string(self, count, data, data_offset, linenum, | |||
start, end, last_timestamp): | |||
"""Parse string and append data. | |||
count: maximum number of rows to add | |||
data: string data | |||
data_offset: byte offset into data to start parsing | |||
linenum: current line number of data | |||
start: starting timestamp for interval | |||
end: end timestamp for interval | |||
last_timestamp: last timestamp that was previously parsed | |||
Raises ParseError((linenum, timestamp, ERR_TYPE)) if | |||
timestamps are non-monotonic, outside the start/end interval, | |||
etc. | |||
On success, return a tuple with three values: | |||
added_rows: how many rows were added from the file | |||
data_offset: current offset into the data string | |||
last_timestamp: last timestamp we parsed | |||
""" | |||
# Parse the input data | |||
indata = cStringIO.StringIO(data) | |||
indata.seek(data_offset) | |||
written = 0 | |||
while written < count: | |||
line = indata.readline() | |||
linenum += 1 | |||
if line == "": | |||
break | |||
try: | |||
(ts, row) = self.layoutparser.parse(line) | |||
except ValueError as e: | |||
raise ParseError((linenum, ERR_UNKNOWN, e)) | |||
if ts <= last_timestamp: | |||
raise ParseError((linenum, ERR_NON_MONOTONIC, ts)) | |||
last_timestamp = ts | |||
if ts < start or ts >= end: | |||
raise ParseError((linenum, ERR_OUT_OF_INTERVAL, ts)) | |||
self.append_iter(1, [row]) | |||
written += 1 | |||
return (written, indata.tell(), last_timestamp, linenum) | |||
def extract_list(self, offset, count): | |||
"""Extract count rows of data from the file at offset offset. | |||
Return a list of lists [[row],[row],...]""" | |||
@@ -2,6 +2,8 @@ | |||
#include <structmember.h> | |||
#include <endian.h> | |||
// PyErr_NewExceptionWithDoc, PyModule_AddIntConstant | |||
/* Somewhat arbitrary, just so we can use fixed sizes for strings | |||
etc. */ | |||
static const int MAX_LAYOUT_COUNT = 64; | |||
@@ -208,6 +208,7 @@ class Stream(NilmApp): | |||
# /stream/insert?path=/newton/prep | |||
@cherrypy.expose | |||
@cherrypy.tools.json_out() | |||
@exception_to_httperror(NilmDBError, ValueError) | |||
@cherrypy.tools.allow_methods(methods = ["PUT"]) | |||
def insert(self, path, start, end): | |||
""" | |||
@@ -224,16 +225,6 @@ class Stream(NilmApp): | |||
streams = self.db.stream_list(path = path) | |||
if len(streams) != 1: | |||
raise cherrypy.HTTPError("404 Not Found", "No such stream") | |||
layout = streams[0][1] | |||
# Parse the input data | |||
try: | |||
parser = nilmdb.server.layout.Parser(layout) | |||
parser.parse(body) | |||
except nilmdb.server.layout.ParserError as e: | |||
raise cherrypy.HTTPError("400 Bad Request", | |||
"error parsing input data: " + | |||
e.message) | |||
# Check limits | |||
start = float(start) | |||
@@ -241,20 +232,10 @@ class Stream(NilmApp): | |||
if start >= end: | |||
raise cherrypy.HTTPError("400 Bad Request", | |||
"start must precede end") | |||
if parser.min_timestamp is not None and parser.min_timestamp < start: | |||
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " + | |||
repr(parser.min_timestamp) + | |||
" < start time " + repr(start)) | |||
if parser.max_timestamp is not None and parser.max_timestamp >= end: | |||
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " + | |||
repr(parser.max_timestamp) + | |||
" >= end time " + repr(end)) | |||
# Now do the nilmdb insert, passing it the parser full of data. | |||
try: | |||
self.db.stream_insert(path, start, end, parser.data) | |||
except NilmDBError as e: | |||
raise cherrypy.HTTPError("400 Bad Request", e.message) | |||
# Pass the data directly to nilmdb, which will parse it and | |||
# raise a ValueError if there are any problems. | |||
self.db.stream_insert(path, start, end, body) | |||
# Done | |||
return | |||
@@ -1,3 +1,4 @@ | |||
# comments are cool? | |||
2.66568e+05 2.24029e+05 5.16140e+03 2.52517e+03 8.35084e+03 3.72470e+03 1.35534e+03 2.03900e+03 | |||
2.57914e+05 2.27183e+05 4.30368e+03 4.13080e+03 7.25535e+03 4.89047e+03 1.63859e+03 1.93496e+03 | |||
2.51717e+05 2.26047e+05 5.99445e+03 3.49363e+03 8.07250e+03 5.08267e+03 2.26917e+03 2.86231e+03 | |||
@@ -196,7 +196,8 @@ class TestClient(object): | |||
with assert_raises(ClientError) as e: | |||
result = client.stream_insert("/newton/prep", data) | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("timestamp is not monotonically increasing", str(e.exception)) | |||
in2_("timestamp is not monotonically increasing", | |||
"start must precede end", str(e.exception)) | |||
# Now try empty data (no server request made) | |||
empty = cStringIO.StringIO("") | |||
@@ -238,7 +239,7 @@ class TestClient(object): | |||
result = client.stream_insert("/newton/prep", data, | |||
start + 5, start + 120) | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("Data timestamp 1332511200.0 < start time 1332511205.0", | |||
in_("Data timestamp 1332511200.000000 < start time 1332511205.000000", | |||
str(e.exception)) | |||
# Specify start/end (ends too early) | |||
@@ -250,7 +251,7 @@ class TestClient(object): | |||
# Client chunks the input, so the exact timestamp here might change | |||
# if the chunk positions change. | |||
assert(re.search("Data timestamp 13325[0-9]+\.[0-9]+ " | |||
">= end time 1332511201.0", str(e.exception)) | |||
">= end time 1332511201.000000", str(e.exception)) | |||
is not None) | |||
# Now do the real load | |||
@@ -481,6 +482,15 @@ class TestClient(object): | |||
ctx.insert_line("225 1\n") | |||
ctx.finalize() | |||
with assert_raises(ClientError): | |||
with client.stream_insert_context("/context/test", 300, 400) as ctx: | |||
ctx.insert_line("301 1\n") | |||
ctx.insert_line("302 2\n") | |||
ctx.insert_line("303 3\n") | |||
ctx.insert_line("304 4\n") | |||
ctx.insert_line("304 4\n") # non-monotonic after a few lines | |||
ctx.finalize() | |||
eq_(list(client.stream_intervals("/context/test")), | |||
[ [ 100, 105.000001 ], | |||
[ 106, 106.5 ], | |||
@@ -392,11 +392,6 @@ class TestCmdline(object): | |||
self.fail("insert /newton/prep " | |||
"tests/data/prep-20120323T1000") | |||
# insert pre-timestamped data, from stdin | |||
os.environ['TZ'] = "UTC" | |||
with open("tests/data/prep-20120323T1004-timestamped") as input: | |||
self.ok("insert --none /newton/prep", input) | |||
# insert pre-timestamped data, with bad times (non-monotonic) | |||
os.environ['TZ'] = "UTC" | |||
with open("tests/data/prep-20120323T1004-badtimes") as input: | |||
@@ -405,6 +400,11 @@ class TestCmdline(object): | |||
self.contain("line 7:") | |||
self.contain("timestamp is not monotonically increasing") | |||
# insert pre-timestamped data, from stdin | |||
os.environ['TZ'] = "UTC" | |||
with open("tests/data/prep-20120323T1004-timestamped") as input: | |||
self.ok("insert --none /newton/prep", input) | |||
# insert data with normal timestamper from filename | |||
os.environ['TZ'] = "UTC" | |||
self.ok("insert --rate 120 /newton/prep " | |||
@@ -20,6 +20,11 @@ def in_(a, b): | |||
if a not in b: | |||
raise AssertionError("%s not in %s" % (myrepr(a), myrepr(b))) | |||
def in2_(a1, a2, b): | |||
if a1 not in b and a2 not in b: | |||
raise AssertionError("(%s or %s) not in %s" % (myrepr(a1), myrepr(a2), | |||
myrepr(b))) | |||
def ne_(a, b): | |||
if not a != b: | |||
raise AssertionError("unexpected %s == %s" % (myrepr(a), myrepr(b))) | |||