Compare commits
6 Commits
before-ins
...
insert-rew
Author | SHA1 | Date | |
---|---|---|---|
9082cc9f44 | |||
bf64a40472 | |||
32dbeebc09 | |||
66ddc79b15 | |||
7a8bd0bf41 | |||
ee552de740 |
@@ -1,3 +1,5 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
"""Class for performing HTTP client requests via libcurl"""
|
"""Class for performing HTTP client requests via libcurl"""
|
||||||
|
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
@@ -8,6 +10,7 @@ import sys
|
|||||||
import re
|
import re
|
||||||
import os
|
import os
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
|
import itertools
|
||||||
|
|
||||||
import nilmdb.httpclient
|
import nilmdb.httpclient
|
||||||
|
|
||||||
@@ -16,6 +19,10 @@ from nilmdb.httpclient import ClientError, ServerError, Error
|
|||||||
|
|
||||||
version = "1.0"
|
version = "1.0"
|
||||||
|
|
||||||
|
def float_to_string(f):
|
||||||
|
# Use repr to maintain full precision in the string output.
|
||||||
|
return repr(float(f))
|
||||||
|
|
||||||
class Client(object):
|
class Client(object):
|
||||||
"""Main client interface to the Nilm database."""
|
"""Main client interface to the Nilm database."""
|
||||||
|
|
||||||
@@ -89,33 +96,77 @@ class Client(object):
|
|||||||
params = { "path": path }
|
params = { "path": path }
|
||||||
return self.http.get("stream/destroy", params)
|
return self.http.get("stream/destroy", params)
|
||||||
|
|
||||||
def stream_insert(self, path, data):
|
def stream_insert(self, path, data, start = None, end = None):
|
||||||
"""Insert data into a stream. data should be a file-like object
|
"""Insert data into a stream. data should be a file-like object
|
||||||
that provides ASCII data that matches the database layout for path."""
|
that provides ASCII data that matches the database layout for path.
|
||||||
|
|
||||||
|
start and end are the starting and ending timestamp of this
|
||||||
|
stream; all timestamps t in the data must satisfy 'start <= t
|
||||||
|
< end'. If left unspecified, 'start' is the timestamp of the
|
||||||
|
first line of data, and 'end' is the timestamp on the last line
|
||||||
|
of data, plus a small delta of 1μs.
|
||||||
|
"""
|
||||||
params = { "path": path }
|
params = { "path": path }
|
||||||
|
|
||||||
# See design.md for a discussion of how much data to send.
|
# See design.md for a discussion of how much data to send.
|
||||||
# These are soft limits -- actual data might be rounded up.
|
# These are soft limits -- actual data might be rounded up.
|
||||||
max_data = 1048576
|
max_data = 1048576
|
||||||
max_time = 30
|
max_time = 30
|
||||||
|
end_epsilon = 1e-6
|
||||||
|
|
||||||
|
def pairwise(iterable):
|
||||||
|
"s -> (s0,s1), (s1,s2), ..., (sn,None)"
|
||||||
|
a, b = itertools.tee(iterable)
|
||||||
|
next(b, None)
|
||||||
|
return itertools.izip_longest(a, b)
|
||||||
|
|
||||||
|
def extract_timestamp(line):
|
||||||
|
return float(line.split()[0])
|
||||||
|
|
||||||
def sendit():
|
def sendit():
|
||||||
result = self.http.put("stream/insert", send_data, params)
|
# If we have more data after this, use the timestamp of
|
||||||
params["old_timestamp"] = result[1]
|
# the next line as the end. Otherwise, use the given
|
||||||
return result
|
# overall end time, or add end_epsilon to the last data
|
||||||
|
# point.
|
||||||
|
if nextline:
|
||||||
|
block_end = extract_timestamp(nextline)
|
||||||
|
if end and block_end > end:
|
||||||
|
# This is unexpected, but we'll defer to the server
|
||||||
|
# to return an error in this case.
|
||||||
|
block_end = end
|
||||||
|
elif end:
|
||||||
|
block_end = end
|
||||||
|
else:
|
||||||
|
block_end = extract_timestamp(line) + end_epsilon
|
||||||
|
|
||||||
|
# Send it
|
||||||
|
params["start"] = float_to_string(block_start)
|
||||||
|
params["end"] = float_to_string(block_end)
|
||||||
|
return self.http.put("stream/insert", block_data, params)
|
||||||
|
|
||||||
|
clock_start = time.time()
|
||||||
|
block_data = ""
|
||||||
|
block_start = start
|
||||||
result = None
|
result = None
|
||||||
start = time.time()
|
for (line, nextline) in pairwise(data):
|
||||||
send_data = ""
|
# If we don't have a starting time, extract it from the first line
|
||||||
for line in data:
|
if block_start is None:
|
||||||
elapsed = time.time() - start
|
block_start = extract_timestamp(line)
|
||||||
send_data += line
|
|
||||||
|
|
||||||
if (len(send_data) > max_data) or (elapsed > max_time):
|
clock_elapsed = time.time() - clock_start
|
||||||
|
block_data += line
|
||||||
|
|
||||||
|
# If we have enough data, or enough time has elapsed,
|
||||||
|
# send this block to the server, and empty things out
|
||||||
|
# for the next block.
|
||||||
|
if (len(block_data) > max_data) or (clock_elapsed > max_time):
|
||||||
result = sendit()
|
result = sendit()
|
||||||
send_data = ""
|
block_start = None
|
||||||
start = time.time()
|
block_data = ""
|
||||||
if len(send_data):
|
clock_start = time.time()
|
||||||
|
|
||||||
|
# One last block?
|
||||||
|
if len(block_data):
|
||||||
result = sendit()
|
result = sendit()
|
||||||
|
|
||||||
# Return the most recent JSON result we got back, or None if
|
# Return the most recent JSON result we got back, or None if
|
||||||
@@ -130,9 +181,9 @@ class Client(object):
|
|||||||
"path": path
|
"path": path
|
||||||
}
|
}
|
||||||
if start is not None:
|
if start is not None:
|
||||||
params["start"] = repr(start) # use repr to keep precision
|
params["start"] = float_to_string(start)
|
||||||
if end is not None:
|
if end is not None:
|
||||||
params["end"] = repr(end)
|
params["end"] = float_to_string(end)
|
||||||
return self.http.get_gen("stream/intervals", params, retjson = True)
|
return self.http.get_gen("stream/intervals", params, retjson = True)
|
||||||
|
|
||||||
def stream_extract(self, path, start = None, end = None, count = False):
|
def stream_extract(self, path, start = None, end = None, count = False):
|
||||||
@@ -148,9 +199,9 @@ class Client(object):
|
|||||||
"path": path,
|
"path": path,
|
||||||
}
|
}
|
||||||
if start is not None:
|
if start is not None:
|
||||||
params["start"] = repr(start) # use repr to keep precision
|
params["start"] = float_to_string(start)
|
||||||
if end is not None:
|
if end is not None:
|
||||||
params["end"] = repr(end)
|
params["end"] = float_to_string(end)
|
||||||
if count:
|
if count:
|
||||||
params["count"] = 1
|
params["count"] = 1
|
||||||
|
|
||||||
|
@@ -1,7 +1,6 @@
|
|||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
from nilmdb.printf import *
|
from nilmdb.printf import *
|
||||||
import nilmdb.client
|
import nilmdb.client
|
||||||
import nilmdb.layout
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
def setup(self, sub):
|
def setup(self, sub):
|
||||||
|
@@ -1,7 +1,6 @@
|
|||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
from nilmdb.printf import *
|
from nilmdb.printf import *
|
||||||
import nilmdb.client
|
import nilmdb.client
|
||||||
import nilmdb.layout
|
|
||||||
import nilmdb.timestamper
|
import nilmdb.timestamper
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
@@ -177,8 +177,8 @@ cdef class IntervalSet:
|
|||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
this = [ x for x in self ]
|
this = list(self)
|
||||||
that = [ x for x in other ]
|
that = list(other)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
@@ -236,6 +236,12 @@ cdef class IntervalSet:
|
|||||||
self.__iadd__(x)
|
self.__iadd__(x)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def iadd_nocheck(self, Interval other not None):
|
||||||
|
"""Inplace add -- modifies self.
|
||||||
|
'Optimized' version that doesn't check for intersection and
|
||||||
|
only inserts the new interval into the tree."""
|
||||||
|
self.tree.insert(rbtree.RBNode(other.start, other.end, other))
|
||||||
|
|
||||||
def __isub__(self, Interval other not None):
|
def __isub__(self, Interval other not None):
|
||||||
"""Inplace subtract -- modifies self
|
"""Inplace subtract -- modifies self
|
||||||
|
|
||||||
@@ -300,3 +306,13 @@ cdef class IntervalSet:
|
|||||||
if n.obj.intersects(other):
|
if n.obj.intersects(other):
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def find_end(self, double t):
|
||||||
|
"""
|
||||||
|
Return an Interval from this tree that ends at time t, or
|
||||||
|
None if it doesn't exist.
|
||||||
|
"""
|
||||||
|
n = self.tree.find_left_end(t)
|
||||||
|
if n and n.obj.end == t:
|
||||||
|
return n.obj
|
||||||
|
return None
|
||||||
|
@@ -192,36 +192,58 @@ class NilmDB(object):
|
|||||||
# Return cached value
|
# Return cached value
|
||||||
return self._cached_iset[stream_id]
|
return self._cached_iset[stream_id]
|
||||||
|
|
||||||
# TODO: Split add_interval into two pieces, one to add
|
|
||||||
# and one to flush to disk?
|
|
||||||
# Need to think about this. Basic problem is that we can't
|
|
||||||
# mess with intervals once they're in the IntervalSet,
|
|
||||||
# without mucking with bxinterval internals.
|
|
||||||
|
|
||||||
# Maybe add a separate optimization step?
|
|
||||||
# Join intervals that have a fairly small gap between them
|
|
||||||
|
|
||||||
def _add_interval(self, stream_id, interval, start_pos, end_pos):
|
def _add_interval(self, stream_id, interval, start_pos, end_pos):
|
||||||
"""
|
"""
|
||||||
Add interval to the internal interval cache, and to the database.
|
Add interval to the internal interval cache, and to the database.
|
||||||
Note: arguments must be ints (not numpy.int64, etc)
|
Note: arguments must be ints (not numpy.int64, etc)
|
||||||
"""
|
"""
|
||||||
# Ensure this stream's intervals are cached, and add the new
|
# Ensure this stream's intervals are cached
|
||||||
# interval to that cache.
|
|
||||||
iset = self._get_intervals(stream_id)
|
iset = self._get_intervals(stream_id)
|
||||||
try:
|
|
||||||
iset += DBInterval(interval.start, interval.end,
|
# Check for overlap
|
||||||
interval.start, interval.end,
|
if iset.intersects(interval): # pragma: no cover (gets caught earlier)
|
||||||
start_pos, end_pos)
|
|
||||||
except IntervalError as e: # pragma: no cover
|
|
||||||
raise NilmDBError("new interval overlaps existing data")
|
raise NilmDBError("new interval overlaps existing data")
|
||||||
|
|
||||||
|
# Check for adjacency. If there's a stream in the database
|
||||||
|
# that ends exactly when this one starts, and the database
|
||||||
|
# rows match up, we can make one interval that covers the
|
||||||
|
# time range [adjacent.start -> interval.end)
|
||||||
|
# and database rows [ adjacent.start_pos -> end_pos ].
|
||||||
|
# Only do this if the resulting interval isn't too large.
|
||||||
|
max_merged_rows = 30000000 # a bit more than 1 hour at 8 KHz
|
||||||
|
adjacent = iset.find_end(interval.start)
|
||||||
|
if (adjacent is not None and
|
||||||
|
start_pos == adjacent.db_endpos and
|
||||||
|
(end_pos - adjacent.db_startpos) < max_merged_rows):
|
||||||
|
# First delete the old one, both from our cache and the
|
||||||
|
# database
|
||||||
|
iset -= adjacent
|
||||||
|
self.con.execute("DELETE FROM ranges WHERE "
|
||||||
|
"stream_id=? AND start_time=? AND "
|
||||||
|
"end_time=? AND start_pos=? AND "
|
||||||
|
"end_pos=?", (stream_id,
|
||||||
|
adjacent.db_start,
|
||||||
|
adjacent.db_end,
|
||||||
|
adjacent.db_startpos,
|
||||||
|
adjacent.db_endpos))
|
||||||
|
|
||||||
|
# Now update our interval so the fallthrough add is
|
||||||
|
# correct.
|
||||||
|
interval.start = adjacent.start
|
||||||
|
start_pos = adjacent.db_startpos
|
||||||
|
|
||||||
|
# Add the new interval to the cache
|
||||||
|
iset.iadd_nocheck(DBInterval(interval.start, interval.end,
|
||||||
|
interval.start, interval.end,
|
||||||
|
start_pos, end_pos))
|
||||||
|
|
||||||
# Insert into the database
|
# Insert into the database
|
||||||
self.con.execute("INSERT INTO ranges "
|
self.con.execute("INSERT INTO ranges "
|
||||||
"(stream_id,start_time,end_time,start_pos,end_pos) "
|
"(stream_id,start_time,end_time,start_pos,end_pos) "
|
||||||
"VALUES (?,?,?,?,?)",
|
"VALUES (?,?,?,?,?)",
|
||||||
(stream_id, interval.start, interval.end,
|
(stream_id, interval.start, interval.end,
|
||||||
int(start_pos), int(end_pos)))
|
int(start_pos), int(end_pos)))
|
||||||
|
|
||||||
self.con.commit()
|
self.con.commit()
|
||||||
|
|
||||||
def stream_list(self, path = None, layout = None):
|
def stream_list(self, path = None, layout = None):
|
||||||
@@ -383,30 +405,18 @@ class NilmDB(object):
|
|||||||
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
|
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
|
||||||
con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
|
con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
|
||||||
|
|
||||||
def stream_insert(self, path, parser, old_timestamp = None):
|
def stream_insert(self, path, start, end, data):
|
||||||
"""Insert new data into the database.
|
"""Insert new data into the database.
|
||||||
path: Path at which to add the data
|
path: Path at which to add the data
|
||||||
parser: nilmdb.layout.Parser instance full of data to insert
|
start: Starting timestamp
|
||||||
|
end: Ending timestamp
|
||||||
|
data: Rows of data, to be passed to PyTable's table.append
|
||||||
|
method. E.g. nilmdb.layout.Parser.data
|
||||||
"""
|
"""
|
||||||
if (not parser.min_timestamp or not parser.max_timestamp or
|
|
||||||
not len(parser.data)):
|
|
||||||
raise StreamError("no data provided")
|
|
||||||
|
|
||||||
# If we were provided with an old timestamp, the expectation
|
|
||||||
# is that the client has a contiguous block of time it is sending,
|
|
||||||
# but it's doing it over multiple calls to stream_insert.
|
|
||||||
# old_timestamp is the max_timestamp of the previous insert.
|
|
||||||
# To make things continuous, use that as our starting timestamp
|
|
||||||
# instead of what the parser found.
|
|
||||||
if old_timestamp:
|
|
||||||
min_timestamp = old_timestamp
|
|
||||||
else:
|
|
||||||
min_timestamp = parser.min_timestamp
|
|
||||||
|
|
||||||
# First check for basic overlap using timestamp info given.
|
# First check for basic overlap using timestamp info given.
|
||||||
stream_id = self._stream_id(path)
|
stream_id = self._stream_id(path)
|
||||||
iset = self._get_intervals(stream_id)
|
iset = self._get_intervals(stream_id)
|
||||||
interval = Interval(min_timestamp, parser.max_timestamp)
|
interval = Interval(start, end)
|
||||||
if iset.intersects(interval):
|
if iset.intersects(interval):
|
||||||
raise OverlapError("new data overlaps existing data at range: "
|
raise OverlapError("new data overlaps existing data at range: "
|
||||||
+ str(iset & interval))
|
+ str(iset & interval))
|
||||||
@@ -414,7 +424,7 @@ class NilmDB(object):
|
|||||||
# Insert the data into pytables
|
# Insert the data into pytables
|
||||||
table = self.h5file.getNode(path)
|
table = self.h5file.getNode(path)
|
||||||
row_start = table.nrows
|
row_start = table.nrows
|
||||||
table.append(parser.data)
|
table.append(data)
|
||||||
row_end = table.nrows
|
row_end = table.nrows
|
||||||
table.flush()
|
table.flush()
|
||||||
|
|
||||||
|
@@ -156,19 +156,11 @@ class Stream(NilmApp):
|
|||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@cherrypy.tools.json_out()
|
@cherrypy.tools.json_out()
|
||||||
#@cherrypy.tools.disable_prb()
|
#@cherrypy.tools.disable_prb()
|
||||||
def insert(self, path, old_timestamp = None):
|
def insert(self, path, start, end):
|
||||||
"""
|
"""
|
||||||
Insert new data into the database. Provide textual data
|
Insert new data into the database. Provide textual data
|
||||||
(matching the path's layout) as a HTTP PUT.
|
(matching the path's layout) as a HTTP PUT.
|
||||||
|
|
||||||
old_timestamp is used when making multiple, split-up insertions
|
|
||||||
for a larger contiguous block of data. The first insert
|
|
||||||
will return the maximum timestamp that it saw, and the second
|
|
||||||
insert should provide this timestamp as an argument. This is
|
|
||||||
used to extend the previous database interval rather than
|
|
||||||
start a new one.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Important that we always read the input before throwing any
|
# Important that we always read the input before throwing any
|
||||||
# errors, to keep lengths happy for persistent connections.
|
# errors, to keep lengths happy for persistent connections.
|
||||||
# However, CherryPy 3.2.2 has a bug where this fails for GET
|
# However, CherryPy 3.2.2 has a bug where this fails for GET
|
||||||
@@ -193,18 +185,31 @@ class Stream(NilmApp):
|
|||||||
"Error parsing input data: " +
|
"Error parsing input data: " +
|
||||||
e.message)
|
e.message)
|
||||||
|
|
||||||
|
if (not parser.min_timestamp or not parser.max_timestamp or
|
||||||
|
not len(parser.data)):
|
||||||
|
raise cherrypy.HTTPError("400 Bad Request",
|
||||||
|
"no data provided")
|
||||||
|
|
||||||
|
# Check limits
|
||||||
|
start = float(start)
|
||||||
|
end = float(end)
|
||||||
|
if parser.min_timestamp < start:
|
||||||
|
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
|
||||||
|
repr(parser.min_timestamp) +
|
||||||
|
" < start time " + repr(start))
|
||||||
|
if parser.max_timestamp >= end:
|
||||||
|
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
|
||||||
|
repr(parser.max_timestamp) +
|
||||||
|
" >= end time " + repr(end))
|
||||||
|
|
||||||
# Now do the nilmdb insert, passing it the parser full of data.
|
# Now do the nilmdb insert, passing it the parser full of data.
|
||||||
try:
|
try:
|
||||||
if old_timestamp:
|
result = self.db.stream_insert(path, start, end, parser.data)
|
||||||
old_timestamp = float(old_timestamp)
|
|
||||||
result = self.db.stream_insert(path, parser, old_timestamp)
|
|
||||||
except nilmdb.nilmdb.NilmDBError as e:
|
except nilmdb.nilmdb.NilmDBError as e:
|
||||||
raise cherrypy.HTTPError("400 Bad Request", e.message)
|
raise cherrypy.HTTPError("400 Bad Request", e.message)
|
||||||
|
|
||||||
# Return the maximum timestamp that we saw. The client will
|
# Done
|
||||||
# return this back to us as the old_timestamp parameter, if
|
return "ok"
|
||||||
# it has more data to send.
|
|
||||||
return ("ok", parser.max_timestamp)
|
|
||||||
|
|
||||||
# /stream/intervals?path=/newton/prep
|
# /stream/intervals?path=/newton/prep
|
||||||
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||||
|
@@ -131,6 +131,7 @@ class TestClient(object):
|
|||||||
|
|
||||||
testfile = "tests/data/prep-20120323T1000"
|
testfile = "tests/data/prep-20120323T1000"
|
||||||
start = datetime_tz.datetime_tz.smartparse("20120323T1000")
|
start = datetime_tz.datetime_tz.smartparse("20120323T1000")
|
||||||
|
start = start.totimestamp()
|
||||||
rate = 120
|
rate = 120
|
||||||
|
|
||||||
# First try a nonexistent path
|
# First try a nonexistent path
|
||||||
@@ -155,14 +156,41 @@ class TestClient(object):
|
|||||||
|
|
||||||
# Try forcing a server request with empty data
|
# Try forcing a server request with empty data
|
||||||
with assert_raises(ClientError) as e:
|
with assert_raises(ClientError) as e:
|
||||||
client.http.put("stream/insert", "", { "path": "/newton/prep" })
|
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||||||
|
"start": 0, "end": 0 })
|
||||||
in_("400 Bad Request", str(e.exception))
|
in_("400 Bad Request", str(e.exception))
|
||||||
in_("no data provided", str(e.exception))
|
in_("no data provided", str(e.exception))
|
||||||
|
|
||||||
|
# Specify start/end (starts too late)
|
||||||
|
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120)
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
result = client.stream_insert("/newton/prep", data,
|
||||||
|
start + 5, start + 120)
|
||||||
|
in_("400 Bad Request", str(e.exception))
|
||||||
|
in_("Data timestamp 1332511200.0 < start time 1332511205.0",
|
||||||
|
str(e.exception))
|
||||||
|
|
||||||
|
# Specify start/end (ends too early)
|
||||||
|
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120)
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
result = client.stream_insert("/newton/prep", data,
|
||||||
|
start, start + 1)
|
||||||
|
in_("400 Bad Request", str(e.exception))
|
||||||
|
# Client chunks the input, so the exact timestamp here might change
|
||||||
|
# if the chunk positions change.
|
||||||
|
in_("Data timestamp 1332511271.016667 >= end time 1332511201.0",
|
||||||
|
str(e.exception))
|
||||||
|
|
||||||
# Now do the real load
|
# Now do the real load
|
||||||
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120)
|
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120)
|
||||||
result = client.stream_insert("/newton/prep", data)
|
result = client.stream_insert("/newton/prep", data,
|
||||||
eq_(result[0], "ok")
|
start, start + 119.999777)
|
||||||
|
eq_(result, "ok")
|
||||||
|
|
||||||
|
# Verify the intervals. Should be just one, even if the data
|
||||||
|
# was inserted in chunks, due to nilmdb interval concatenation.
|
||||||
|
intervals = list(client.stream_intervals("/newton/prep"))
|
||||||
|
eq_(intervals, [[start, start + 119.999777]])
|
||||||
|
|
||||||
# Try some overlapping data -- just insert it again
|
# Try some overlapping data -- just insert it again
|
||||||
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120)
|
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120)
|
||||||
@@ -215,7 +243,8 @@ class TestClient(object):
|
|||||||
# Check PUT with generator out
|
# Check PUT with generator out
|
||||||
with assert_raises(ClientError) as e:
|
with assert_raises(ClientError) as e:
|
||||||
client.http.put_gen("stream/insert", "",
|
client.http.put_gen("stream/insert", "",
|
||||||
{ "path": "/newton/prep" }).next()
|
{ "path": "/newton/prep",
|
||||||
|
"start": 0, "end": 0 }).next()
|
||||||
in_("400 Bad Request", str(e.exception))
|
in_("400 Bad Request", str(e.exception))
|
||||||
in_("no data provided", str(e.exception))
|
in_("no data provided", str(e.exception))
|
||||||
|
|
||||||
@@ -238,7 +267,7 @@ class TestClient(object):
|
|||||||
# still disable chunked responses for debugging.
|
# still disable chunked responses for debugging.
|
||||||
x = client.http.get("stream/intervals", { "path": "/newton/prep" },
|
x = client.http.get("stream/intervals", { "path": "/newton/prep" },
|
||||||
retjson=False)
|
retjson=False)
|
||||||
eq_(x.count('\n'), 2)
|
lines_(x, 1)
|
||||||
if "transfer-encoding: chunked" not in client.http._headers.lower():
|
if "transfer-encoding: chunked" not in client.http._headers.lower():
|
||||||
warnings.warn("Non-chunked HTTP response for /stream/intervals")
|
warnings.warn("Non-chunked HTTP response for /stream/intervals")
|
||||||
|
|
||||||
|
@@ -368,36 +368,36 @@ class TestCmdline(object):
|
|||||||
def test_cmdline_07_detail(self):
|
def test_cmdline_07_detail(self):
|
||||||
# Just count the number of lines, it's probably fine
|
# Just count the number of lines, it's probably fine
|
||||||
self.ok("list --detail")
|
self.ok("list --detail")
|
||||||
eq_(self.captured.count('\n'), 11)
|
lines_(self.captured, 8)
|
||||||
|
|
||||||
self.ok("list --detail --path *prep")
|
self.ok("list --detail --path *prep")
|
||||||
eq_(self.captured.count('\n'), 7)
|
lines_(self.captured, 4)
|
||||||
|
|
||||||
self.ok("list --detail --path *prep --start='23 Mar 2012 10:02'")
|
self.ok("list --detail --path *prep --start='23 Mar 2012 10:02'")
|
||||||
eq_(self.captured.count('\n'), 5)
|
lines_(self.captured, 3)
|
||||||
|
|
||||||
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05'")
|
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05'")
|
||||||
eq_(self.captured.count('\n'), 3)
|
lines_(self.captured, 2)
|
||||||
|
|
||||||
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15'")
|
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15'")
|
||||||
eq_(self.captured.count('\n'), 2)
|
lines_(self.captured, 2)
|
||||||
self.contain("10:05:15.000")
|
self.contain("10:05:15.000")
|
||||||
|
|
||||||
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'")
|
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'")
|
||||||
eq_(self.captured.count('\n'), 2)
|
lines_(self.captured, 2)
|
||||||
self.contain("10:05:15.500")
|
self.contain("10:05:15.500")
|
||||||
|
|
||||||
self.ok("list --detail --path *prep --start='23 Mar 2012 19:05:15.50'")
|
self.ok("list --detail --path *prep --start='23 Mar 2012 19:05:15.50'")
|
||||||
eq_(self.captured.count('\n'), 2)
|
lines_(self.captured, 2)
|
||||||
self.contain("no intervals")
|
self.contain("no intervals")
|
||||||
|
|
||||||
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'"
|
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'"
|
||||||
+ " --end='23 Mar 2012 10:05:15.50'")
|
+ " --end='23 Mar 2012 10:05:15.50'")
|
||||||
eq_(self.captured.count('\n'), 2)
|
lines_(self.captured, 2)
|
||||||
self.contain("10:05:15.500")
|
self.contain("10:05:15.500")
|
||||||
|
|
||||||
self.ok("list --detail")
|
self.ok("list --detail")
|
||||||
eq_(self.captured.count('\n'), 11)
|
lines_(self.captured, 8)
|
||||||
|
|
||||||
def test_cmdline_08_extract(self):
|
def test_cmdline_08_extract(self):
|
||||||
# nonexistent stream
|
# nonexistent stream
|
||||||
@@ -450,7 +450,7 @@ class TestCmdline(object):
|
|||||||
|
|
||||||
# all data put in by tests
|
# all data put in by tests
|
||||||
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
|
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||||
eq_(self.captured.count('\n'), 43204)
|
lines_(self.captured, 43204)
|
||||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||||
self.match("43200\n")
|
self.match("43200\n")
|
||||||
|
|
||||||
@@ -459,7 +459,7 @@ class TestCmdline(object):
|
|||||||
server_stop()
|
server_stop()
|
||||||
server_start(max_results = 2)
|
server_start(max_results = 2)
|
||||||
self.ok("list --detail")
|
self.ok("list --detail")
|
||||||
eq_(self.captured.count('\n'), 11)
|
lines_(self.captured, 8)
|
||||||
server_stop()
|
server_stop()
|
||||||
server_start()
|
server_start()
|
||||||
|
|
||||||
@@ -484,7 +484,7 @@ class TestCmdline(object):
|
|||||||
|
|
||||||
# Notice how they're not empty
|
# Notice how they're not empty
|
||||||
self.ok("list --detail")
|
self.ok("list --detail")
|
||||||
eq_(self.captured.count('\n'), 11)
|
lines_(self.captured, 8)
|
||||||
|
|
||||||
# Delete some
|
# Delete some
|
||||||
self.ok("destroy /newton/prep")
|
self.ok("destroy /newton/prep")
|
||||||
|
@@ -20,6 +20,12 @@ def ne_(a, b):
|
|||||||
if not a != b:
|
if not a != b:
|
||||||
raise AssertionError("unexpected %s == %s" % (myrepr(a), myrepr(b)))
|
raise AssertionError("unexpected %s == %s" % (myrepr(a), myrepr(b)))
|
||||||
|
|
||||||
|
def lines_(a, n):
|
||||||
|
l = a.count('\n')
|
||||||
|
if not l == n:
|
||||||
|
raise AssertionError("wanted %d lines, got %d in output: '%s'"
|
||||||
|
% (n, l, a))
|
||||||
|
|
||||||
def recursive_unlink(path):
|
def recursive_unlink(path):
|
||||||
try:
|
try:
|
||||||
shutil.rmtree(path)
|
shutil.rmtree(path)
|
||||||
|
@@ -137,6 +137,15 @@ class TestInterval:
|
|||||||
x = iseta != 3
|
x = iseta != 3
|
||||||
ne_(IntervalSet(a), IntervalSet(b))
|
ne_(IntervalSet(a), IntervalSet(b))
|
||||||
|
|
||||||
|
# Note that assignment makes a new reference (not a copy)
|
||||||
|
isetd = IntervalSet(isetb)
|
||||||
|
isete = isetd
|
||||||
|
eq_(isetd, isetb)
|
||||||
|
eq_(isetd, isete)
|
||||||
|
isetd -= a
|
||||||
|
ne_(isetd, isetb)
|
||||||
|
eq_(isetd, isete)
|
||||||
|
|
||||||
# test iterator
|
# test iterator
|
||||||
for interval in iseta:
|
for interval in iseta:
|
||||||
pass
|
pass
|
||||||
@@ -158,11 +167,18 @@ class TestInterval:
|
|||||||
iset = IntervalSet(a)
|
iset = IntervalSet(a)
|
||||||
iset += IntervalSet(b)
|
iset += IntervalSet(b)
|
||||||
eq_(iset, IntervalSet([a, b]))
|
eq_(iset, IntervalSet([a, b]))
|
||||||
|
|
||||||
iset = IntervalSet(a)
|
iset = IntervalSet(a)
|
||||||
iset += b
|
iset += b
|
||||||
eq_(iset, IntervalSet([a, b]))
|
eq_(iset, IntervalSet([a, b]))
|
||||||
|
|
||||||
|
iset = IntervalSet(a)
|
||||||
|
iset.iadd_nocheck(b)
|
||||||
|
eq_(iset, IntervalSet([a, b]))
|
||||||
|
|
||||||
iset = IntervalSet(a) + IntervalSet(b)
|
iset = IntervalSet(a) + IntervalSet(b)
|
||||||
eq_(iset, IntervalSet([a, b]))
|
eq_(iset, IntervalSet([a, b]))
|
||||||
|
|
||||||
iset = IntervalSet(b) + a
|
iset = IntervalSet(b) + a
|
||||||
eq_(iset, IntervalSet([a, b]))
|
eq_(iset, IntervalSet([a, b]))
|
||||||
|
|
||||||
|
@@ -196,6 +196,6 @@ class TestServer(object):
|
|||||||
# GET instead of POST (no body)
|
# GET instead of POST (no body)
|
||||||
# (actual POST test is done by client code)
|
# (actual POST test is done by client code)
|
||||||
with assert_raises(HTTPError) as e:
|
with assert_raises(HTTPError) as e:
|
||||||
getjson("/stream/insert?path=/newton/prep")
|
getjson("/stream/insert?path=/newton/prep&start=0&end=0")
|
||||||
eq_(e.exception.code, 400)
|
eq_(e.exception.code, 400)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user