cover overlapping data, missing data, etc. Consolidate nilmdb.client.MyCurl.getjson() and putjson(). Add __str__ for more compact representation of IntervalSet Make stream_id not NULL in SQL database. Format NilmDBError with name of exception for more clarity Work around CherryPy issue 1138 by removing HTML escaping before returning JSON error responses from the server. Reached 100% test coverage again (woohoo) git-svn-id: https://bucket.mit.edu/svn/nilm/nilmdb@10665 ddd99763-3ecb-0310-9145-efcb8ce7c51ftags/bxinterval-last
@@ -84,11 +84,10 @@ class MyCurl(object): | |||
else: | |||
raise NilmCommError(**args) | |||
def getjson(self, url, params = None): | |||
"""Simple GET that returns JSON string""" | |||
def _reqjson(self, url, params): | |||
"""GET or POST that returns JSON string""" | |||
self._setup_url(url, params) | |||
body = cStringIO.StringIO() | |||
self.curl.setopt(pycurl.UPLOAD, 0) | |||
self.curl.setopt(pycurl.WRITEFUNCTION, body.write) | |||
try: | |||
self.curl.perform() | |||
@@ -97,22 +96,20 @@ class MyCurl(object): | |||
body_str = body.getvalue() | |||
self._check_error(body_str) | |||
return json.loads(body_str) | |||
def getjson(self, url, params = None): | |||
"""Simple GET that returns JSON string""" | |||
self.curl.setopt(pycurl.UPLOAD, 0) | |||
return self._reqjson(url, params) | |||
def putjson(self, url, postdata, params = None): | |||
"""Simple PUT that returns JSON string""" | |||
self._setup_url(url, params) | |||
body = cStringIO.StringIO() | |||
data = cStringIO.StringIO(postdata) | |||
self.curl.setopt(pycurl.UPLOAD, 1) | |||
self.curl.setopt(pycurl.READFUNCTION, data.read) | |||
self.curl.setopt(pycurl.WRITEFUNCTION, body.write) | |||
try: | |||
self.curl.perform() | |||
except pycurl.error as e: | |||
raise ServerError(502, self.url, "Curl error: " + e[1]) | |||
body_str = body.getvalue() | |||
self._check_error(body_str) | |||
return json.loads(body_str) | |||
return self._reqjson(url, params) | |||
class Client(object): | |||
"""Main client interface to the Nilm database.""" | |||
@@ -80,6 +80,9 @@ class IntervalSet(object): | |||
def __repr__(self): | |||
return self.__class__.__name__ + "(" + repr(self.data) + ")" | |||
def __str__(self): | |||
return "[" + ", ".join([ str(x) for x in self.data ]) + "]" | |||
def __eq__(self, other): | |||
"""Test equality of two IntervalSets. | |||
@@ -50,7 +50,7 @@ _sql_schema_updates = { | |||
-- Times are stored as μs since Unix epoch | |||
-- Positions are opaque: PyTables rows, file offsets, etc. | |||
CREATE TABLE ranges( | |||
stream_id INTEGER, | |||
stream_id INTEGER NOT NULL, | |||
start_time INTEGER NOT NULL, | |||
end_time INTEGER NOT NULL, | |||
start_pos INTEGER NOT NULL, | |||
@@ -62,7 +62,7 @@ _sql_schema_updates = { | |||
1: """ | |||
-- Generic dictionary-type metadata that can be associated with a stream | |||
CREATE TABLE metadata( | |||
stream_id INTEGER, | |||
stream_id INTEGER NOT NULL, | |||
key TEXT NOT NULL, | |||
value TEXT | |||
); | |||
@@ -72,8 +72,7 @@ _sql_schema_updates = { | |||
class NilmDBError(Exception): | |||
"""Base exception for NilmDB errors""" | |||
def __init__(self, message = "Unspecified error"): | |||
Exception.__init__(self, message) | |||
self.message = message | |||
Exception.__init__(self, self.__class__.__name__ + ": " + message) | |||
class StreamError(NilmDBError): | |||
pass | |||
@@ -150,8 +149,8 @@ class NilmDB(object): | |||
""" | |||
# Could cache these, if it's a performance bottleneck | |||
iset = IntervalSet() | |||
result = self.con.execute("SELECT start_time, end_time " + | |||
"FROM ranges " + | |||
result = self.con.execute("SELECT start_time, end_time " | |||
"FROM ranges " | |||
"WHERE stream_id=?", (stream_id,)) | |||
try: | |||
for (start, end) in result: | |||
@@ -160,6 +159,14 @@ class NilmDB(object): | |||
raise NilmDBError("unexpected overlap in ranges table!") | |||
return iset | |||
def _add_interval(self, stream_id, interval, start_pos, end_pos): | |||
# Arguments must be ints (not numpy.int64, etc) | |||
self.con.execute("INSERT INTO ranges " | |||
"(stream_id,start_time,end_time,start_pos,end_pos) " | |||
"VALUES (?,?,?,?,?)", | |||
(stream_id, interval.start, interval.end, | |||
int(start_pos), int(end_pos))) | |||
def stream_list(self, path = None, layout = None): | |||
"""Return list of [path, layout] lists of all streams | |||
in the database. | |||
@@ -178,7 +185,7 @@ class NilmDB(object): | |||
if path: | |||
where += " AND path=?" | |||
params += (path,) | |||
result = self.con.execute("SELECT path, layout " + | |||
result = self.con.execute("SELECT path, layout " | |||
"FROM streams " + where, params).fetchall() | |||
return sorted(list(x) for x in result) | |||
@@ -282,39 +289,28 @@ class NilmDB(object): | |||
path: Path at which to add the data | |||
parser: nilmdb.layout.Parser instance full of data to insert | |||
""" | |||
if (not parser.min_timestamp or not parser.max_timestamp or | |||
not len(parser.data)): | |||
raise StreamError("no data provided") | |||
# First check for basic overlap using timestamp info from the parser. | |||
stream_id = self._stream_id(path) | |||
iset = self._get_intervals(stream_id) | |||
interval = Interval(parser.min_timestamp, parser.max_timestamp) | |||
if iset.intersects(interval): | |||
raise OverlapError("New data overlaps existing data: " | |||
raise OverlapError("new data overlaps existing data: " | |||
+ str(iset & interval)) | |||
# TODO: Check through layout and see if there's a better way | |||
# to handle the data parsing now that we can use | |||
# table.append(). Probably not a good idea to pass strings to | |||
# it, though, to reduce time on the serialized nilmdb side of | |||
# things. | |||
# Either way, start using table.append() below, then | |||
# figure out the row tracking, insert intervals into the database, | |||
# and do tests of multiple inserts, overlapping data, etc. | |||
# Insert the data into pytables | |||
table = self.h5file.getNode(path) | |||
print "rows", table.nrows | |||
with nilmdb.Timer("append"): | |||
table.append(parser.data) | |||
print "rows", table.nrows | |||
row_start = table.nrows | |||
table.append(parser.data) | |||
row_end = table.nrows | |||
table.flush() | |||
# with nilmdb.Timer("fill"): | |||
# parser.fill_table(table) | |||
# with nilmdb.Timer("flush"): | |||
# table.flush() | |||
# table.append() | |||
# with nilmdb.Timer("flush"): | |||
# table.flush() | |||
# TODO: Need to figure out which rows were used there. | |||
# Insert the record into the sql database. | |||
# Casts are to convert from numpy.int64. | |||
self._add_interval(stream_id, interval, int(row_start), int(row_end)) | |||
# And that's all | |||
return "ok" | |||
raise NilmDBError("go away") |
@@ -145,15 +145,11 @@ class Stream(NilmApp): | |||
# Important that we always read the input before throwing any | |||
# errors, to keep lengths happy for persistent connections. | |||
# However, CherryPy 3.2.2 has a bug where this fails for GET | |||
# requests, so catch that. | |||
# requests, so catch that. (issue #1134) | |||
try: | |||
body = cherrypy.request.body.read() | |||
except TypeError: | |||
body = "" | |||
# Bail out if no body provided | |||
if not body: | |||
raise cherrypy.HTTPError("400 Bad Request", "No data provided") | |||
raise cherrypy.HTTPError("400 Bad Request", "No request body") | |||
# Check path and get layout | |||
streams = self.db.stream_list(path = path) | |||
@@ -164,8 +160,7 @@ class Stream(NilmApp): | |||
# Parse the input data | |||
try: | |||
parser = nilmdb.layout.Parser(layout) | |||
with nilmdb.Timer("parse"): | |||
parser.parse(body) | |||
parser.parse(body) | |||
except nilmdb.layout.ParserError as e: | |||
raise cherrypy.HTTPError("400 Bad Request", | |||
"Error parsing input data: " + | |||
@@ -245,6 +240,13 @@ class Server(object): | |||
# Override the response type, which was previously set to text/html | |||
cherrypy.serving.response.headers['Content-Type'] = ( | |||
"application/json;charset=utf-8" ) | |||
# Undo the HTML escaping that cherrypy's get_error_page function applies | |||
# (cherrypy issue 1135) | |||
for k, v in errordata.iteritems(): | |||
v = v.replace("<","<") | |||
v = v.replace(">",">") | |||
v = v.replace("&","&") | |||
errordata[k] = v | |||
return json.dumps(errordata, separators=(',',':')) | |||
def start(self, blocking = False, event = None): | |||
@@ -12,6 +12,7 @@ import distutils.version | |||
import os | |||
import sys | |||
import threading | |||
import cStringIO | |||
from test_helpers import * | |||
@@ -44,6 +45,11 @@ class TestClient(object): | |||
with assert_raises(nilmdb.client.ServerError): | |||
client.version() | |||
# Trigger same error with a PUT request | |||
client = nilmdb.Client(url = "http://localhost:1/") | |||
with assert_raises(nilmdb.client.ServerError): | |||
client.version() | |||
# Then a fake URL on a real host | |||
client = nilmdb.Client(url = "http://localhost:12380/fake/") | |||
with assert_raises(nilmdb.client.ClientError): | |||
@@ -133,7 +139,26 @@ class TestClient(object): | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("timestamp is not monotonically increasing", str(e.exception)) | |||
# Now try empty data (no server request made) | |||
empty = cStringIO.StringIO("") | |||
data = nilmdb.timestamper.TimestamperRate(empty, start, 120) | |||
result = client.stream_insert("/newton/prep", data) | |||
eq_(result, None) | |||
# Try forcing a server request with empty data | |||
with assert_raises(ClientError) as e: | |||
client.curl.putjson("stream/insert", "", { "path": "/newton/prep" }) | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("no data provided", str(e.exception)) | |||
# Now do the real load | |||
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120) | |||
result = client.stream_insert("/newton/prep", data) | |||
eq_(result, "ok") | |||
# Try some overlapping data -- just insert it again | |||
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120) | |||
with assert_raises(ClientError) as e: | |||
result = client.stream_insert("/newton/prep", data) | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("OverlapError", str(e.exception)) |