Compare commits
24 Commits
nilmdb-1.0
...
nilmdb-1.1
Author | SHA1 | Date | |
---|---|---|---|
5dce851bef | |||
1431e41d16 | |||
a49c655816 | |||
30e3ffc0e9 | |||
db7211c3a9 | |||
c6d57cf5c3 | |||
ca5253ddee | |||
e19da84b2e | |||
3e8e3542fd | |||
2f7365412d | |||
bba9ad131e | |||
ee24380d1f | |||
bfcd91acf8 | |||
d97291d4d3 | |||
a61fbbcf45 | |||
5adc8fd0a7 | |||
251a486c28 | |||
1edb96a0bd | |||
52e674a192 | |||
e241c13bf1 | |||
b53ff31212 | |||
2045e89f24 | |||
841b2dab5c | |||
d634f7d3cf |
@@ -7,7 +7,7 @@ Prerequisites:
|
|||||||
sudo apt-get install python2.7 python2.7-dev python-setuptools cython
|
sudo apt-get install python2.7 python2.7-dev python-setuptools cython
|
||||||
|
|
||||||
# Base NilmDB dependencies
|
# Base NilmDB dependencies
|
||||||
sudo apt-get install python-cherrypy3 python-decorator python-simplejson python-pycurl python-dateutil python-tz
|
sudo apt-get install python-cherrypy3 python-decorator python-simplejson python-pycurl python-dateutil python-tz python-psutil
|
||||||
|
|
||||||
# Tools for running tests
|
# Tools for running tests
|
||||||
sudo apt-get install python-nose python-coverage
|
sudo apt-get install python-nose python-coverage
|
||||||
|
@@ -8,22 +8,35 @@ import nilmdb.client.httpclient
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
|
import contextlib
|
||||||
|
|
||||||
def float_to_string(f):
|
def float_to_string(f):
|
||||||
"""Use repr to maintain full precision in the string output."""
|
"""Use repr to maintain full precision in the string output."""
|
||||||
return repr(float(f))
|
return repr(float(f))
|
||||||
|
|
||||||
|
def extract_timestamp(line):
|
||||||
|
"""Extract just the timestamp from a line of data text"""
|
||||||
|
return float(line.split()[0])
|
||||||
|
|
||||||
class Client(object):
|
class Client(object):
|
||||||
"""Main client interface to the Nilm database."""
|
"""Main client interface to the Nilm database."""
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url):
|
||||||
self.http = nilmdb.client.httpclient.HTTPClient(url)
|
self.http = nilmdb.client.httpclient.HTTPClient(url)
|
||||||
|
|
||||||
|
# __enter__/__exit__ allow this class to be a context manager
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
|
self.close()
|
||||||
|
|
||||||
def _json_param(self, data):
|
def _json_param(self, data):
|
||||||
"""Return compact json-encoded version of parameter"""
|
"""Return compact json-encoded version of parameter"""
|
||||||
return json.dumps(data, separators=(',',':'))
|
return json.dumps(data, separators=(',',':'))
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
"""Close the connection; safe to call multiple times"""
|
||||||
self.http.close()
|
self.http.close()
|
||||||
|
|
||||||
def geturl(self):
|
def geturl(self):
|
||||||
@@ -34,13 +47,10 @@ class Client(object):
|
|||||||
"""Return server version"""
|
"""Return server version"""
|
||||||
return self.http.get("version")
|
return self.http.get("version")
|
||||||
|
|
||||||
def dbpath(self):
|
def dbinfo(self):
|
||||||
"""Return server database path"""
|
"""Return server database info (path, size, free space)
|
||||||
return self.http.get("dbpath")
|
as a dictionary."""
|
||||||
|
return self.http.get("dbinfo")
|
||||||
def dbsize(self):
|
|
||||||
"""Return server database size as human readable string"""
|
|
||||||
return self.http.get("dbsize")
|
|
||||||
|
|
||||||
def stream_list(self, path = None, layout = None):
|
def stream_list(self, path = None, layout = None):
|
||||||
params = {}
|
params = {}
|
||||||
@@ -95,79 +105,45 @@ class Client(object):
|
|||||||
params["end"] = float_to_string(end)
|
params["end"] = float_to_string(end)
|
||||||
return self.http.get("stream/remove", params)
|
return self.http.get("stream/remove", params)
|
||||||
|
|
||||||
def stream_insert(self, path, data, start = None, end = None):
|
@contextlib.contextmanager
|
||||||
"""Insert data into a stream. data should be a file-like object
|
def stream_insert_context(self, path, start = None, end = None):
|
||||||
that provides ASCII data that matches the database layout for path.
|
"""Return a context manager that allows data to be efficiently
|
||||||
|
inserted into a stream in a piecewise manner. Data is be provided
|
||||||
|
as single lines, and is aggregated and sent to the server in larger
|
||||||
|
chunks as necessary. Data lines must match the database layout for
|
||||||
|
the given path, and end with a newline.
|
||||||
|
|
||||||
start and end are the starting and ending timestamp of this
|
Example:
|
||||||
stream; all timestamps t in the data must satisfy 'start <= t
|
with client.stream_insert_context('/path', start, end) as ctx:
|
||||||
< end'. If left unspecified, 'start' is the timestamp of the
|
ctx.insert_line('1234567890.0 1 2 3 4\\n')
|
||||||
first line of data, and 'end' is the timestamp on the last line
|
ctx.insert_line('1234567891.0 1 2 3 4\\n')
|
||||||
of data, plus a small delta of 1μs.
|
|
||||||
|
For more details, see help for nilmdb.client.client.StreamInserter
|
||||||
|
|
||||||
|
This may make multiple requests to the server, if the data is
|
||||||
|
large enough or enough time has passed between insertions.
|
||||||
"""
|
"""
|
||||||
params = { "path": path }
|
ctx = StreamInserter(self, path, start, end)
|
||||||
|
yield ctx
|
||||||
|
ctx.finalize()
|
||||||
|
|
||||||
# See design.md for a discussion of how much data to send.
|
def stream_insert(self, path, data, start = None, end = None):
|
||||||
# These are soft limits -- actual data might be rounded up.
|
"""Insert rows of data into a stream. data should be an
|
||||||
max_data = 1048576
|
iterable object that provides ASCII data that matches the
|
||||||
max_time = 30
|
database layout for path. See stream_insert_context for
|
||||||
end_epsilon = 1e-6
|
details on the 'start' and 'end' parameters."""
|
||||||
|
with self.stream_insert_context(path, start, end) as ctx:
|
||||||
|
ctx.insert_iter(data)
|
||||||
|
return ctx.last_response
|
||||||
|
|
||||||
|
def stream_insert_block(self, path, block, start, end):
|
||||||
def extract_timestamp(line):
|
"""Insert an entire block of data into a stream. Like
|
||||||
return float(line.split()[0])
|
stream_insert, except 'block' contains multiple lines of ASCII
|
||||||
|
text and is sent in one single chunk."""
|
||||||
def sendit():
|
params = { "path": path,
|
||||||
# If we have more data after this, use the timestamp of
|
"start": float_to_string(start),
|
||||||
# the next line as the end. Otherwise, use the given
|
"end": float_to_string(end) }
|
||||||
# overall end time, or add end_epsilon to the last data
|
return self.http.put("stream/insert", block, params)
|
||||||
# point.
|
|
||||||
if nextline:
|
|
||||||
block_end = extract_timestamp(nextline)
|
|
||||||
if end and block_end > end:
|
|
||||||
# This is unexpected, but we'll defer to the server
|
|
||||||
# to return an error in this case.
|
|
||||||
block_end = end
|
|
||||||
elif end:
|
|
||||||
block_end = end
|
|
||||||
else:
|
|
||||||
block_end = extract_timestamp(line) + end_epsilon
|
|
||||||
|
|
||||||
# Send it
|
|
||||||
params["start"] = float_to_string(block_start)
|
|
||||||
params["end"] = float_to_string(block_end)
|
|
||||||
return self.http.put("stream/insert", block_data, params)
|
|
||||||
|
|
||||||
clock_start = time.time()
|
|
||||||
block_data = ""
|
|
||||||
block_start = start
|
|
||||||
result = None
|
|
||||||
line = None
|
|
||||||
nextline = None
|
|
||||||
for (line, nextline) in nilmdb.utils.misc.pairwise(data):
|
|
||||||
# If we don't have a starting time, extract it from the first line
|
|
||||||
if block_start is None:
|
|
||||||
block_start = extract_timestamp(line)
|
|
||||||
|
|
||||||
clock_elapsed = time.time() - clock_start
|
|
||||||
block_data += line
|
|
||||||
|
|
||||||
# If we have enough data, or enough time has elapsed,
|
|
||||||
# send this block to the server, and empty things out
|
|
||||||
# for the next block.
|
|
||||||
if (len(block_data) > max_data) or (clock_elapsed > max_time):
|
|
||||||
result = sendit()
|
|
||||||
block_start = None
|
|
||||||
block_data = ""
|
|
||||||
clock_start = time.time()
|
|
||||||
|
|
||||||
# One last block?
|
|
||||||
if len(block_data):
|
|
||||||
result = sendit()
|
|
||||||
|
|
||||||
# Return the most recent JSON result we got back, or None if
|
|
||||||
# we didn't make any requests.
|
|
||||||
return result
|
|
||||||
|
|
||||||
def stream_intervals(self, path, start = None, end = None):
|
def stream_intervals(self, path, start = None, end = None):
|
||||||
"""
|
"""
|
||||||
@@ -188,8 +164,8 @@ class Client(object):
|
|||||||
lines of ASCII-formatted data that matches the database
|
lines of ASCII-formatted data that matches the database
|
||||||
layout for the given path.
|
layout for the given path.
|
||||||
|
|
||||||
Specify count=True to just get a count of values rather than
|
Specify count = True to return a count of matching data points
|
||||||
the actual data.
|
rather than the actual data. The output format is unchanged.
|
||||||
"""
|
"""
|
||||||
params = {
|
params = {
|
||||||
"path": path,
|
"path": path,
|
||||||
@@ -202,3 +178,202 @@ class Client(object):
|
|||||||
params["count"] = 1
|
params["count"] = 1
|
||||||
|
|
||||||
return self.http.get_gen("stream/extract", params, retjson = False)
|
return self.http.get_gen("stream/extract", params, retjson = False)
|
||||||
|
|
||||||
|
def stream_count(self, path, start = None, end = None):
|
||||||
|
"""
|
||||||
|
Return the number of rows of data in the stream that satisfy
|
||||||
|
the given timestamps.
|
||||||
|
"""
|
||||||
|
counts = list(self.stream_extract(path, start, end, count = True))
|
||||||
|
return int(counts[0])
|
||||||
|
|
||||||
|
class StreamInserter(object):
|
||||||
|
"""Object returned by stream_insert_context() that manages
|
||||||
|
the insertion of rows of data into a particular path.
|
||||||
|
|
||||||
|
The basic data flow is that we are filling a contiguous interval
|
||||||
|
on the server, with no gaps, that extends from timestamp 'start'
|
||||||
|
to timestamp 'end'. Data timestamps satisfy 'start <= t < end'.
|
||||||
|
Data is provided by the user one line at a time with
|
||||||
|
.insert_line() or .insert_iter().
|
||||||
|
|
||||||
|
1. The first inserted line begins a new interval that starts at
|
||||||
|
'start'. If 'start' is not given, it is deduced from the first
|
||||||
|
line's timestamp.
|
||||||
|
|
||||||
|
2. Subsequent lines go into the same contiguous interval. As lines
|
||||||
|
are inserted, this routine may make multiple insertion requests to
|
||||||
|
the server, but will structure the timestamps to leave no gaps.
|
||||||
|
|
||||||
|
3. The current contiguous interval can be completed by manually
|
||||||
|
calling .finalize(), which the context manager will also do
|
||||||
|
automatically. This will send any remaining data to the server,
|
||||||
|
using the 'end' timestamp to end the interval.
|
||||||
|
|
||||||
|
After a .finalize(), inserting new data goes back to step 1.
|
||||||
|
|
||||||
|
.update_start() can be called before step 1 to change the start
|
||||||
|
time for the interval. .update_end() can be called before step 3
|
||||||
|
to change the end time for the interval.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# See design.md for a discussion of how much data to send.
|
||||||
|
# These are soft limits -- actual data might be rounded up.
|
||||||
|
# We send when we have a certain amount of data queued, or
|
||||||
|
# when a certain amount of time has passed since the last send.
|
||||||
|
_max_data = 1048576
|
||||||
|
_max_time = 30
|
||||||
|
|
||||||
|
# Delta to add to the final timestamp, if "end" wasn't given
|
||||||
|
_end_epsilon = 1e-6
|
||||||
|
|
||||||
|
def __init__(self, client, path, start = None, end = None):
|
||||||
|
"""'http' is the httpclient object. 'path' is the database
|
||||||
|
path to insert to. 'start' and 'end' are used for the first
|
||||||
|
contiguous interval."""
|
||||||
|
self.last_response = None
|
||||||
|
|
||||||
|
self._client = client
|
||||||
|
self._path = path
|
||||||
|
|
||||||
|
# Start and end for the overall contiguous interval we're
|
||||||
|
# filling
|
||||||
|
self._interval_start = start
|
||||||
|
self._interval_end = end
|
||||||
|
|
||||||
|
# Data for the specific block we're building up to send
|
||||||
|
self._block_data = []
|
||||||
|
self._block_len = 0
|
||||||
|
self._block_start = None
|
||||||
|
|
||||||
|
# Time of last request
|
||||||
|
self._last_time = time.time()
|
||||||
|
|
||||||
|
# We keep a buffer of the two most recently inserted lines.
|
||||||
|
# Only the older one actually gets processed; the newer one
|
||||||
|
# is used to "look-ahead" to the next timestamp if we need
|
||||||
|
# to internally split an insertion into two requests.
|
||||||
|
self._line_old = None
|
||||||
|
self._line_new = None
|
||||||
|
|
||||||
|
def insert_iter(self, iter):
|
||||||
|
"""Insert all lines of ASCII formatted data from the given
|
||||||
|
iterable. Lines must be terminated with '\\n'."""
|
||||||
|
for line in iter:
|
||||||
|
self.insert_line(line)
|
||||||
|
|
||||||
|
def insert_line(self, line, allow_intermediate = True):
|
||||||
|
"""Insert a single line of ASCII formatted data. Line
|
||||||
|
must be terminated with '\\n'."""
|
||||||
|
if line and (len(line) < 1 or line[-1] != '\n'):
|
||||||
|
raise ValueError("lines must end in with a newline character")
|
||||||
|
|
||||||
|
# Store this new line, but process the previous (old) one.
|
||||||
|
# This lets us "look ahead" to the next line.
|
||||||
|
self._line_old = self._line_new
|
||||||
|
self._line_new = line
|
||||||
|
if self._line_old is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
# If starting a new block, pull out the timestamp if needed.
|
||||||
|
if self._block_start is None:
|
||||||
|
if self._interval_start is not None:
|
||||||
|
# User provided a start timestamp. Use it once, then
|
||||||
|
# clear it for the next block.
|
||||||
|
self._block_start = self._interval_start
|
||||||
|
self._interval_start = None
|
||||||
|
else:
|
||||||
|
# Extract timestamp from the first row
|
||||||
|
self._block_start = extract_timestamp(self._line_old)
|
||||||
|
|
||||||
|
# Save the line
|
||||||
|
self._block_data.append(self._line_old)
|
||||||
|
self._block_len += len(self._line_old)
|
||||||
|
|
||||||
|
if allow_intermediate:
|
||||||
|
# Send an intermediate block to the server if needed.
|
||||||
|
elapsed = time.time() - self._last_time
|
||||||
|
if (self._block_len > self._max_data) or (elapsed > self._max_time):
|
||||||
|
self._send_block_intermediate()
|
||||||
|
|
||||||
|
def update_start(self, start):
|
||||||
|
"""Update the start time for the next contiguous interval.
|
||||||
|
Call this before starting to insert data for a new interval,
|
||||||
|
for example, after .finalize()"""
|
||||||
|
self._interval_start = start
|
||||||
|
|
||||||
|
def update_end(self, end):
|
||||||
|
"""Update the end time for the current contiguous interval.
|
||||||
|
Call this before .finalize()"""
|
||||||
|
self._interval_end = end
|
||||||
|
|
||||||
|
def finalize(self):
|
||||||
|
"""Stop filling the current contiguous interval.
|
||||||
|
All outstanding data will be sent, and the interval end
|
||||||
|
time of the interval will be taken from the 'end' argument
|
||||||
|
used when initializing this class, or the most recent
|
||||||
|
value passed to update_end(), or the last timestamp plus
|
||||||
|
a small epsilon value if no other endpoint was provided.
|
||||||
|
|
||||||
|
If more data is inserted after a finalize(), it will become
|
||||||
|
part of a new interval and there may be a gap left in-between."""
|
||||||
|
# Special marker tells insert_line that this is the end
|
||||||
|
self.insert_line(None, allow_intermediate = False)
|
||||||
|
|
||||||
|
if self._block_len > 0:
|
||||||
|
# We have data pending, so send the final block
|
||||||
|
self._send_block_final()
|
||||||
|
elif None not in (self._interval_start, self._interval_end):
|
||||||
|
# We have no data, but enough information to create an
|
||||||
|
# empty interval.
|
||||||
|
self._block_start = self._interval_start
|
||||||
|
self._interval_start = None
|
||||||
|
self._send_block_final()
|
||||||
|
else:
|
||||||
|
# No data, and no timestamps to use to create an empty
|
||||||
|
# interval.
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Make sure both timestamps are emptied for future intervals.
|
||||||
|
self._interval_start = None
|
||||||
|
self._interval_end = None
|
||||||
|
|
||||||
|
def _send_block_intermediate(self):
|
||||||
|
"""Send data, when we still have more data to send.
|
||||||
|
Use the timestamp from the next line, so that the blocks
|
||||||
|
are contiguous."""
|
||||||
|
block_end = extract_timestamp(self._line_new)
|
||||||
|
if self._interval_end is not None and block_end > self._interval_end:
|
||||||
|
# Something's fishy -- the timestamp we found is after
|
||||||
|
# the user's specified end. Limit it here, and the
|
||||||
|
# server will return an error.
|
||||||
|
block_end = self._interval_end
|
||||||
|
self._send_block(block_end)
|
||||||
|
|
||||||
|
def _send_block_final(self):
|
||||||
|
"""Send data, when this is the last block for the interval.
|
||||||
|
There is no next line, so figure out the actual interval end
|
||||||
|
using interval_end or end_epsilon."""
|
||||||
|
if self._interval_end is not None:
|
||||||
|
# Use the user's specified end timestamp
|
||||||
|
block_end = self._interval_end
|
||||||
|
# Clear it in case we send more intervals in the future.
|
||||||
|
self._interval_end = None
|
||||||
|
else:
|
||||||
|
# Add an epsilon to the last timestamp we saw
|
||||||
|
block_end = extract_timestamp(self._line_old) + self._end_epsilon
|
||||||
|
self._send_block(block_end)
|
||||||
|
|
||||||
|
def _send_block(self, block_end):
|
||||||
|
"""Send current block to the server"""
|
||||||
|
self.last_response = self._client.stream_insert_block(
|
||||||
|
self._path, "".join(self._block_data),
|
||||||
|
self._block_start, block_end)
|
||||||
|
|
||||||
|
# Clear out the block
|
||||||
|
self._block_data = []
|
||||||
|
self._block_len = 0
|
||||||
|
self._block_start = None
|
||||||
|
|
||||||
|
# Note when we sent it
|
||||||
|
self._last_time = time.time()
|
||||||
|
@@ -33,6 +33,18 @@ class HTTPClient(object):
|
|||||||
self.curl.setopt(pycurl.URL, url)
|
self.curl.setopt(pycurl.URL, url)
|
||||||
self.url = url
|
self.url = url
|
||||||
|
|
||||||
|
def _check_busy_and_set_upload(self, upload):
|
||||||
|
"""Sets the pycurl.UPLOAD option, but also raises a more
|
||||||
|
friendly exception if the client is already serving a request."""
|
||||||
|
try:
|
||||||
|
self.curl.setopt(pycurl.UPLOAD, upload)
|
||||||
|
except pycurl.error as e:
|
||||||
|
if "is currently running" in str(e):
|
||||||
|
raise Exception("Client is already performing a request, and "
|
||||||
|
"nesting calls is not supported.")
|
||||||
|
else: # pragma: no cover (shouldn't happen)
|
||||||
|
raise
|
||||||
|
|
||||||
def _check_error(self, body = None):
|
def _check_error(self, body = None):
|
||||||
code = self.curl.getinfo(pycurl.RESPONSE_CODE)
|
code = self.curl.getinfo(pycurl.RESPONSE_CODE)
|
||||||
if code == 200:
|
if code == 200:
|
||||||
@@ -80,11 +92,11 @@ class HTTPClient(object):
|
|||||||
self._status = int(data.split(" ")[1])
|
self._status = int(data.split(" ")[1])
|
||||||
self._headers += data
|
self._headers += data
|
||||||
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
|
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
|
||||||
def func(callback):
|
def perform(callback):
|
||||||
self.curl.setopt(pycurl.WRITEFUNCTION, callback)
|
self.curl.setopt(pycurl.WRITEFUNCTION, callback)
|
||||||
self.curl.perform()
|
self.curl.perform()
|
||||||
try:
|
try:
|
||||||
with nilmdb.utils.Iteratorizer(func, curl_hack = True) as it:
|
with nilmdb.utils.Iteratorizer(perform, curl_hack = True) as it:
|
||||||
for i in it:
|
for i in it:
|
||||||
if self._status == 200:
|
if self._status == 200:
|
||||||
# If we had a 200 response, yield the data to caller.
|
# If we had a 200 response, yield the data to caller.
|
||||||
@@ -156,12 +168,12 @@ class HTTPClient(object):
|
|||||||
|
|
||||||
def get(self, url, params = None, retjson = True):
|
def get(self, url, params = None, retjson = True):
|
||||||
"""Simple GET"""
|
"""Simple GET"""
|
||||||
self.curl.setopt(pycurl.UPLOAD, 0)
|
self._check_busy_and_set_upload(0)
|
||||||
return self._doreq(url, params, retjson)
|
return self._doreq(url, params, retjson)
|
||||||
|
|
||||||
def put(self, url, postdata, params = None, retjson = True):
|
def put(self, url, postdata, params = None, retjson = True):
|
||||||
"""Simple PUT"""
|
"""Simple PUT"""
|
||||||
self.curl.setopt(pycurl.UPLOAD, 1)
|
self._check_busy_and_set_upload(1)
|
||||||
self._setup_url(url, params)
|
self._setup_url(url, params)
|
||||||
data = cStringIO.StringIO(postdata)
|
data = cStringIO.StringIO(postdata)
|
||||||
self.curl.setopt(pycurl.READFUNCTION, data.read)
|
self.curl.setopt(pycurl.READFUNCTION, data.read)
|
||||||
@@ -184,12 +196,12 @@ class HTTPClient(object):
|
|||||||
|
|
||||||
def get_gen(self, url, params = None, retjson = True):
|
def get_gen(self, url, params = None, retjson = True):
|
||||||
"""Simple GET, returning a generator"""
|
"""Simple GET, returning a generator"""
|
||||||
self.curl.setopt(pycurl.UPLOAD, 0)
|
self._check_busy_and_set_upload(0)
|
||||||
return self._doreq_gen(url, params, retjson)
|
return self._doreq_gen(url, params, retjson)
|
||||||
|
|
||||||
def put_gen(self, url, postdata, params = None, retjson = True):
|
def put_gen(self, url, postdata, params = None, retjson = True):
|
||||||
"""Simple PUT, returning a generator"""
|
"""Simple PUT, returning a generator"""
|
||||||
self.curl.setopt(pycurl.UPLOAD, 1)
|
self._check_busy_and_set_upload(1)
|
||||||
self._setup_url(url, params)
|
self._setup_url(url, params)
|
||||||
data = cStringIO.StringIO(postdata)
|
data = cStringIO.StringIO(postdata)
|
||||||
self.curl.setopt(pycurl.READFUNCTION, data.read)
|
self.curl.setopt(pycurl.READFUNCTION, data.read)
|
||||||
|
@@ -1,5 +1,6 @@
|
|||||||
import nilmdb
|
import nilmdb
|
||||||
from nilmdb.utils.printf import *
|
from nilmdb.utils.printf import *
|
||||||
|
from nilmdb.utils import human_size
|
||||||
|
|
||||||
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
||||||
|
|
||||||
@@ -17,5 +18,7 @@ def cmd_info(self):
|
|||||||
printf("Client version: %s\n", nilmdb.__version__)
|
printf("Client version: %s\n", nilmdb.__version__)
|
||||||
printf("Server version: %s\n", self.client.version())
|
printf("Server version: %s\n", self.client.version())
|
||||||
printf("Server URL: %s\n", self.client.geturl())
|
printf("Server URL: %s\n", self.client.geturl())
|
||||||
printf("Server database path: %s\n", self.client.dbpath())
|
dbinfo = self.client.dbinfo()
|
||||||
printf("Server database size: %s\n", self.client.dbsize())
|
printf("Server database path: %s\n", dbinfo["path"])
|
||||||
|
printf("Server database size: %s\n", human_size(dbinfo["size"]))
|
||||||
|
printf("Server database free space: %s\n", human_size(dbinfo["free"]))
|
||||||
|
@@ -47,8 +47,8 @@ def cmd_list_verify(self):
|
|||||||
self.args.path = self.args.path_positional
|
self.args.path = self.args.path_positional
|
||||||
|
|
||||||
if self.args.start is not None and self.args.end is not None:
|
if self.args.start is not None and self.args.end is not None:
|
||||||
if self.args.start > self.args.end:
|
if self.args.start >= self.args.end:
|
||||||
self.parser.error("start is after end")
|
self.parser.error("start must precede end")
|
||||||
|
|
||||||
def cmd_list(self):
|
def cmd_list(self):
|
||||||
"""List available streams"""
|
"""List available streams"""
|
||||||
|
@@ -8,8 +8,7 @@ def setup(self, sub):
|
|||||||
Remove all data from a specified time range within a
|
Remove all data from a specified time range within a
|
||||||
stream.
|
stream.
|
||||||
""")
|
""")
|
||||||
cmd.set_defaults(verify = cmd_remove_verify,
|
cmd.set_defaults(handler = cmd_remove)
|
||||||
handler = cmd_remove)
|
|
||||||
|
|
||||||
group = cmd.add_argument_group("Data selection")
|
group = cmd.add_argument_group("Data selection")
|
||||||
group.add_argument("path",
|
group.add_argument("path",
|
||||||
@@ -25,11 +24,6 @@ def setup(self, sub):
|
|||||||
group.add_argument("-c", "--count", action="store_true",
|
group.add_argument("-c", "--count", action="store_true",
|
||||||
help="Output number of data points removed")
|
help="Output number of data points removed")
|
||||||
|
|
||||||
def cmd_remove_verify(self):
|
|
||||||
if self.args.start is not None and self.args.end is not None:
|
|
||||||
if self.args.start > self.args.end:
|
|
||||||
self.parser.error("start is after end")
|
|
||||||
|
|
||||||
def cmd_remove(self):
|
def cmd_remove(self):
|
||||||
try:
|
try:
|
||||||
count = self.client.stream_remove(self.args.path,
|
count = self.client.stream_remove(self.args.path,
|
||||||
|
@@ -13,6 +13,16 @@ import struct
|
|||||||
import mmap
|
import mmap
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
# If we have the faulthandler module, use it. All of the mmap stuff
|
||||||
|
# might trigger a SIGSEGV or SIGBUS if we're not careful, and
|
||||||
|
# faulthandler will give a traceback in that case. (the Python
|
||||||
|
# interpreter will still die either way).
|
||||||
|
try: # pragma: no cover
|
||||||
|
import faulthandler
|
||||||
|
faulthandler.enable()
|
||||||
|
except: # pragma: no cover
|
||||||
|
pass
|
||||||
|
|
||||||
# Up to 256 open file descriptors at any given time.
|
# Up to 256 open file descriptors at any given time.
|
||||||
# These variables are global so they can be used in the decorator arguments.
|
# These variables are global so they can be used in the decorator arguments.
|
||||||
table_cache_size = 16
|
table_cache_size = 16
|
||||||
@@ -161,6 +171,52 @@ class BulkData(object):
|
|||||||
ospath = os.path.join(self.root, *elements)
|
ospath = os.path.join(self.root, *elements)
|
||||||
return Table(ospath)
|
return Table(ospath)
|
||||||
|
|
||||||
|
@nilmdb.utils.must_close(wrap_verify = True)
|
||||||
|
class File(object):
|
||||||
|
"""Object representing a single file on disk. Data can be appended,
|
||||||
|
or the self.mmap handle can be used for random reads."""
|
||||||
|
|
||||||
|
def __init__(self, root, subdir, filename):
|
||||||
|
# Create path if it doesn't exist
|
||||||
|
try:
|
||||||
|
os.mkdir(os.path.join(root, subdir))
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Open/create file
|
||||||
|
self._f = open(os.path.join(root, subdir, filename), "a+b", 0)
|
||||||
|
|
||||||
|
# Seek to end, and get size
|
||||||
|
self._f.seek(0, 2)
|
||||||
|
self.size = self._f.tell()
|
||||||
|
|
||||||
|
# Open mmap object
|
||||||
|
self.mmap = None
|
||||||
|
self._mmap_reopen()
|
||||||
|
|
||||||
|
def _mmap_reopen(self):
|
||||||
|
if self.size == 0:
|
||||||
|
# Don't mmap if the file is empty; it would fail
|
||||||
|
pass
|
||||||
|
elif self.mmap is None:
|
||||||
|
# Not opened yet, so open it
|
||||||
|
self.mmap = mmap.mmap(self._f.fileno(), 0)
|
||||||
|
else:
|
||||||
|
# Already opened, so just resize it
|
||||||
|
self.mmap.resize(self.size)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if self.mmap is not None:
|
||||||
|
self.mmap.close()
|
||||||
|
self._f.close()
|
||||||
|
|
||||||
|
def append(self, data):
|
||||||
|
# Write data, flush it, and resize our mmap accordingly
|
||||||
|
self._f.write(data)
|
||||||
|
self._f.flush()
|
||||||
|
self.size += len(data)
|
||||||
|
self._mmap_reopen()
|
||||||
|
|
||||||
@nilmdb.utils.must_close(wrap_verify = True)
|
@nilmdb.utils.must_close(wrap_verify = True)
|
||||||
class Table(object):
|
class Table(object):
|
||||||
"""Tools to help access a single table (data at a specific OS path)."""
|
"""Tools to help access a single table (data at a specific OS path)."""
|
||||||
@@ -211,7 +267,7 @@ class Table(object):
|
|||||||
self.nrows = self._get_nrows()
|
self.nrows = self._get_nrows()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.mmap_open.cache_remove_all()
|
self.file_open.cache_remove_all()
|
||||||
|
|
||||||
# Internal helpers
|
# Internal helpers
|
||||||
def _get_nrows(self):
|
def _get_nrows(self):
|
||||||
@@ -275,37 +331,11 @@ class Table(object):
|
|||||||
|
|
||||||
# Cache open files
|
# Cache open files
|
||||||
@nilmdb.utils.lru_cache(size = fd_cache_size,
|
@nilmdb.utils.lru_cache(size = fd_cache_size,
|
||||||
keys = slice(0, 3), # exclude newsize
|
onremove = lambda f: f.close())
|
||||||
onremove = lambda x: x.close())
|
def file_open(self, subdir, filename):
|
||||||
def mmap_open(self, subdir, filename, newsize = None):
|
|
||||||
"""Open and map a given 'subdir/filename' (relative to self.root).
|
"""Open and map a given 'subdir/filename' (relative to self.root).
|
||||||
Will be automatically closed when evicted from the cache.
|
Will be automatically closed when evicted from the cache."""
|
||||||
|
return File(self.root, subdir, filename)
|
||||||
If 'newsize' is provided, the file is truncated to the given
|
|
||||||
size before the mapping is returned. (Note that the LRU cache
|
|
||||||
on this function means the truncate will only happen if the
|
|
||||||
object isn't already cached; mmap.resize should be used too.)"""
|
|
||||||
try:
|
|
||||||
os.mkdir(os.path.join(self.root, subdir))
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
f = open(os.path.join(self.root, subdir, filename), "a+", 0)
|
|
||||||
if newsize is not None:
|
|
||||||
# mmap can't map a zero-length file, so this allows the
|
|
||||||
# caller to set the filesize between file creation and
|
|
||||||
# mmap.
|
|
||||||
f.truncate(newsize)
|
|
||||||
mm = mmap.mmap(f.fileno(), 0)
|
|
||||||
return mm
|
|
||||||
|
|
||||||
def mmap_open_resize(self, subdir, filename, newsize):
|
|
||||||
"""Open and map a given 'subdir/filename' (relative to self.root).
|
|
||||||
The file is resized to the given size."""
|
|
||||||
# Pass new size to mmap_open
|
|
||||||
mm = self.mmap_open(subdir, filename, newsize)
|
|
||||||
# In case we got a cached copy, need to call mm.resize too.
|
|
||||||
mm.resize(newsize)
|
|
||||||
return mm
|
|
||||||
|
|
||||||
def append(self, data):
|
def append(self, data):
|
||||||
"""Append the data and flush it to disk.
|
"""Append the data and flush it to disk.
|
||||||
@@ -317,14 +347,13 @@ class Table(object):
|
|||||||
(subdir, fname, offset, count) = self._offset_from_row(self.nrows)
|
(subdir, fname, offset, count) = self._offset_from_row(self.nrows)
|
||||||
if count > remaining:
|
if count > remaining:
|
||||||
count = remaining
|
count = remaining
|
||||||
newsize = offset + count * self.packer.size
|
|
||||||
mm = self.mmap_open_resize(subdir, fname, newsize)
|
f = self.file_open(subdir, fname)
|
||||||
mm.seek(offset)
|
|
||||||
|
|
||||||
# Write the data
|
# Write the data
|
||||||
for i in xrange(count):
|
for i in xrange(count):
|
||||||
row = dataiter.next()
|
row = dataiter.next()
|
||||||
mm.write(self.packer.pack(*row))
|
f.append(self.packer.pack(*row))
|
||||||
remaining -= count
|
remaining -= count
|
||||||
self.nrows += count
|
self.nrows += count
|
||||||
|
|
||||||
@@ -351,7 +380,7 @@ class Table(object):
|
|||||||
(subdir, filename, offset, count) = self._offset_from_row(row)
|
(subdir, filename, offset, count) = self._offset_from_row(row)
|
||||||
if count > remaining:
|
if count > remaining:
|
||||||
count = remaining
|
count = remaining
|
||||||
mm = self.mmap_open(subdir, filename)
|
mm = self.file_open(subdir, filename).mmap
|
||||||
for i in xrange(count):
|
for i in xrange(count):
|
||||||
ret.append(list(self.packer.unpack_from(mm, offset)))
|
ret.append(list(self.packer.unpack_from(mm, offset)))
|
||||||
offset += self.packer.size
|
offset += self.packer.size
|
||||||
@@ -363,7 +392,7 @@ class Table(object):
|
|||||||
if key < 0 or key >= self.nrows:
|
if key < 0 or key >= self.nrows:
|
||||||
raise IndexError("Index out of range")
|
raise IndexError("Index out of range")
|
||||||
(subdir, filename, offset, count) = self._offset_from_row(key)
|
(subdir, filename, offset, count) = self._offset_from_row(key)
|
||||||
mm = self.mmap_open(subdir, filename)
|
mm = self.file_open(subdir, filename).mmap
|
||||||
# unpack_from ignores the mmap object's current seek position
|
# unpack_from ignores the mmap object's current seek position
|
||||||
return list(self.packer.unpack_from(mm, offset))
|
return list(self.packer.unpack_from(mm, offset))
|
||||||
|
|
||||||
@@ -410,8 +439,8 @@ class Table(object):
|
|||||||
# are generally easier if we don't have to special-case that.
|
# are generally easier if we don't have to special-case that.
|
||||||
if (len(merged) == 1 and
|
if (len(merged) == 1 and
|
||||||
merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
|
merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
|
||||||
# Close potentially open file in mmap_open LRU cache
|
# Close potentially open file in file_open LRU cache
|
||||||
self.mmap_open.cache_remove(self, subdir, filename)
|
self.file_open.cache_remove(self, subdir, filename)
|
||||||
|
|
||||||
# Delete files
|
# Delete files
|
||||||
os.remove(datafile)
|
os.remove(datafile)
|
||||||
|
@@ -36,7 +36,7 @@ cdef class Interval:
|
|||||||
"""
|
"""
|
||||||
'start' and 'end' are arbitrary floats that represent time
|
'start' and 'end' are arbitrary floats that represent time
|
||||||
"""
|
"""
|
||||||
if start > end:
|
if start >= end:
|
||||||
# Explicitly disallow zero-width intervals (since they're half-open)
|
# Explicitly disallow zero-width intervals (since they're half-open)
|
||||||
raise IntervalError("start %s must precede end %s" % (start, end))
|
raise IntervalError("start %s must precede end %s" % (start, end))
|
||||||
self.start = float(start)
|
self.start = float(start)
|
||||||
|
@@ -142,6 +142,15 @@ class NilmDB(object):
|
|||||||
with self.con:
|
with self.con:
|
||||||
cur.execute("PRAGMA user_version = {v:d}".format(v=version))
|
cur.execute("PRAGMA user_version = {v:d}".format(v=version))
|
||||||
|
|
||||||
|
def _check_user_times(self, start, end):
|
||||||
|
if start is None:
|
||||||
|
start = -1e12
|
||||||
|
if end is None:
|
||||||
|
end = 1e12
|
||||||
|
if start >= end:
|
||||||
|
raise NilmDBError("start must precede end")
|
||||||
|
return (start, end)
|
||||||
|
|
||||||
@nilmdb.utils.lru_cache(size = 16)
|
@nilmdb.utils.lru_cache(size = 16)
|
||||||
def _get_intervals(self, stream_id):
|
def _get_intervals(self, stream_id):
|
||||||
"""
|
"""
|
||||||
@@ -303,7 +312,8 @@ class NilmDB(object):
|
|||||||
"""
|
"""
|
||||||
stream_id = self._stream_id(path)
|
stream_id = self._stream_id(path)
|
||||||
intervals = self._get_intervals(stream_id)
|
intervals = self._get_intervals(stream_id)
|
||||||
requested = Interval(start or 0, end or 1e12)
|
(start, end) = self._check_user_times(start, end)
|
||||||
|
requested = Interval(start, end)
|
||||||
result = []
|
result = []
|
||||||
for n, i in enumerate(intervals.intersection(requested)):
|
for n, i in enumerate(intervals.intersection(requested)):
|
||||||
if n >= self.max_results:
|
if n >= self.max_results:
|
||||||
@@ -396,7 +406,7 @@ class NilmDB(object):
|
|||||||
path: Path at which to add the data
|
path: Path at which to add the data
|
||||||
start: Starting timestamp
|
start: Starting timestamp
|
||||||
end: Ending timestamp
|
end: Ending timestamp
|
||||||
data: Rows of data, to be passed to PyTable's table.append
|
data: Rows of data, to be passed to bulkdata table.append
|
||||||
method. E.g. nilmdb.layout.Parser.data
|
method. E.g. nilmdb.layout.Parser.data
|
||||||
"""
|
"""
|
||||||
# First check for basic overlap using timestamp info given.
|
# First check for basic overlap using timestamp info given.
|
||||||
@@ -417,7 +427,7 @@ class NilmDB(object):
|
|||||||
self._add_interval(stream_id, interval, row_start, row_end)
|
self._add_interval(stream_id, interval, row_start, row_end)
|
||||||
|
|
||||||
# And that's all
|
# And that's all
|
||||||
return "ok"
|
return
|
||||||
|
|
||||||
def _find_start(self, table, dbinterval):
|
def _find_start(self, table, dbinterval):
|
||||||
"""
|
"""
|
||||||
@@ -475,7 +485,8 @@ class NilmDB(object):
|
|||||||
stream_id = self._stream_id(path)
|
stream_id = self._stream_id(path)
|
||||||
table = self.data.getnode(path)
|
table = self.data.getnode(path)
|
||||||
intervals = self._get_intervals(stream_id)
|
intervals = self._get_intervals(stream_id)
|
||||||
requested = Interval(start or 0, end or 1e12)
|
(start, end) = self._check_user_times(start, end)
|
||||||
|
requested = Interval(start, end)
|
||||||
result = []
|
result = []
|
||||||
matched = 0
|
matched = 0
|
||||||
remaining = self.max_results
|
remaining = self.max_results
|
||||||
@@ -521,12 +532,10 @@ class NilmDB(object):
|
|||||||
stream_id = self._stream_id(path)
|
stream_id = self._stream_id(path)
|
||||||
table = self.data.getnode(path)
|
table = self.data.getnode(path)
|
||||||
intervals = self._get_intervals(stream_id)
|
intervals = self._get_intervals(stream_id)
|
||||||
to_remove = Interval(start or 0, end or 1e12)
|
(start, end) = self._check_user_times(start, end)
|
||||||
|
to_remove = Interval(start, end)
|
||||||
removed = 0
|
removed = 0
|
||||||
|
|
||||||
if start == end:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# Can't remove intervals from within the iterator, so we need to
|
# Can't remove intervals from within the iterator, so we need to
|
||||||
# remember what's currently in the intersection now.
|
# remember what's currently in the intersection now.
|
||||||
all_candidates = list(intervals.intersection(to_remove, orig = True))
|
all_candidates = list(intervals.intersection(to_remove, orig = True))
|
||||||
|
@@ -13,6 +13,7 @@ import os
|
|||||||
import simplejson as json
|
import simplejson as json
|
||||||
import decorator
|
import decorator
|
||||||
import traceback
|
import traceback
|
||||||
|
import psutil
|
||||||
|
|
||||||
try:
|
try:
|
||||||
cherrypy.tools.json_out
|
cherrypy.tools.json_out
|
||||||
@@ -99,17 +100,16 @@ class Root(NilmApp):
|
|||||||
def version(self):
|
def version(self):
|
||||||
return nilmdb.__version__
|
return nilmdb.__version__
|
||||||
|
|
||||||
# /dbpath
|
# /dbinfo
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@cherrypy.tools.json_out()
|
@cherrypy.tools.json_out()
|
||||||
def dbpath(self):
|
def dbinfo(self):
|
||||||
return self.db.get_basepath()
|
"""Return a dictionary with the database path,
|
||||||
|
size of the database in bytes, and free disk space in bytes"""
|
||||||
# /dbsize
|
path = self.db.get_basepath()
|
||||||
@cherrypy.expose
|
return { "path": path,
|
||||||
@cherrypy.tools.json_out()
|
"size": nilmdb.utils.du(path),
|
||||||
def dbsize(self):
|
"free": psutil.disk_usage(path).free }
|
||||||
return nilmdb.utils.du(self.db.get_basepath())
|
|
||||||
|
|
||||||
class Stream(NilmApp):
|
class Stream(NilmApp):
|
||||||
"""Stream-specific operations"""
|
"""Stream-specific operations"""
|
||||||
@@ -177,7 +177,6 @@ class Stream(NilmApp):
|
|||||||
dictionary"""
|
dictionary"""
|
||||||
data_dict = json.loads(data)
|
data_dict = json.loads(data)
|
||||||
self.db.stream_set_metadata(path, data_dict)
|
self.db.stream_set_metadata(path, data_dict)
|
||||||
return "ok"
|
|
||||||
|
|
||||||
# /stream/update_metadata?path=/newton/prep&data=<json>
|
# /stream/update_metadata?path=/newton/prep&data=<json>
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@@ -188,7 +187,6 @@ class Stream(NilmApp):
|
|||||||
should be a json-encoded dictionary"""
|
should be a json-encoded dictionary"""
|
||||||
data_dict = json.loads(data)
|
data_dict = json.loads(data)
|
||||||
self.db.stream_update_metadata(path, data_dict)
|
self.db.stream_update_metadata(path, data_dict)
|
||||||
return "ok"
|
|
||||||
|
|
||||||
# /stream/insert?path=/newton/prep
|
# /stream/insert?path=/newton/prep
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@@ -223,19 +221,17 @@ class Stream(NilmApp):
|
|||||||
"error parsing input data: " +
|
"error parsing input data: " +
|
||||||
e.message)
|
e.message)
|
||||||
|
|
||||||
if (not parser.min_timestamp or not parser.max_timestamp or
|
|
||||||
not len(parser.data)):
|
|
||||||
raise cherrypy.HTTPError("400 Bad Request",
|
|
||||||
"no data provided")
|
|
||||||
|
|
||||||
# Check limits
|
# Check limits
|
||||||
start = float(start)
|
start = float(start)
|
||||||
end = float(end)
|
end = float(end)
|
||||||
if parser.min_timestamp < start:
|
if start >= end:
|
||||||
|
raise cherrypy.HTTPError("400 Bad Request",
|
||||||
|
"start must precede end")
|
||||||
|
if parser.min_timestamp is not None and parser.min_timestamp < start:
|
||||||
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
|
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
|
||||||
repr(parser.min_timestamp) +
|
repr(parser.min_timestamp) +
|
||||||
" < start time " + repr(start))
|
" < start time " + repr(start))
|
||||||
if parser.max_timestamp >= end:
|
if parser.max_timestamp is not None and parser.max_timestamp >= end:
|
||||||
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
|
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
|
||||||
repr(parser.max_timestamp) +
|
repr(parser.max_timestamp) +
|
||||||
" >= end time " + repr(end))
|
" >= end time " + repr(end))
|
||||||
@@ -247,7 +243,7 @@ class Stream(NilmApp):
|
|||||||
raise cherrypy.HTTPError("400 Bad Request", e.message)
|
raise cherrypy.HTTPError("400 Bad Request", e.message)
|
||||||
|
|
||||||
# Done
|
# Done
|
||||||
return "ok"
|
return
|
||||||
|
|
||||||
# /stream/remove?path=/newton/prep
|
# /stream/remove?path=/newton/prep
|
||||||
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||||
@@ -265,9 +261,9 @@ class Stream(NilmApp):
|
|||||||
if end is not None:
|
if end is not None:
|
||||||
end = float(end)
|
end = float(end)
|
||||||
if start is not None and end is not None:
|
if start is not None and end is not None:
|
||||||
if end < start:
|
if start >= end:
|
||||||
raise cherrypy.HTTPError("400 Bad Request",
|
raise cherrypy.HTTPError("400 Bad Request",
|
||||||
"end before start")
|
"start must precede end")
|
||||||
return self.db.stream_remove(path, start, end)
|
return self.db.stream_remove(path, start, end)
|
||||||
|
|
||||||
# /stream/intervals?path=/newton/prep
|
# /stream/intervals?path=/newton/prep
|
||||||
@@ -292,9 +288,9 @@ class Stream(NilmApp):
|
|||||||
end = float(end)
|
end = float(end)
|
||||||
|
|
||||||
if start is not None and end is not None:
|
if start is not None and end is not None:
|
||||||
if end < start:
|
if start >= end:
|
||||||
raise cherrypy.HTTPError("400 Bad Request",
|
raise cherrypy.HTTPError("400 Bad Request",
|
||||||
"end before start")
|
"start must precede end")
|
||||||
|
|
||||||
streams = self.db.stream_list(path = path)
|
streams = self.db.stream_list(path = path)
|
||||||
if len(streams) != 1:
|
if len(streams) != 1:
|
||||||
@@ -332,9 +328,9 @@ class Stream(NilmApp):
|
|||||||
|
|
||||||
# Check parameters
|
# Check parameters
|
||||||
if start is not None and end is not None:
|
if start is not None and end is not None:
|
||||||
if end < start:
|
if start >= end:
|
||||||
raise cherrypy.HTTPError("400 Bad Request",
|
raise cherrypy.HTTPError("400 Bad Request",
|
||||||
"end before start")
|
"start must precede end")
|
||||||
|
|
||||||
# Check path and get layout
|
# Check path and get layout
|
||||||
streams = self.db.stream_list(path = path)
|
streams = self.db.stream_list(path = path)
|
||||||
|
@@ -4,8 +4,7 @@ from nilmdb.utils.timer import Timer
|
|||||||
from nilmdb.utils.iteratorizer import Iteratorizer
|
from nilmdb.utils.iteratorizer import Iteratorizer
|
||||||
from nilmdb.utils.serializer import Serializer
|
from nilmdb.utils.serializer import Serializer
|
||||||
from nilmdb.utils.lrucache import lru_cache
|
from nilmdb.utils.lrucache import lru_cache
|
||||||
from nilmdb.utils.diskusage import du
|
from nilmdb.utils.diskusage import du, human_size
|
||||||
from nilmdb.utils.mustclose import must_close
|
from nilmdb.utils.mustclose import must_close
|
||||||
from nilmdb.utils.urllib import urlencode
|
from nilmdb.utils.urllib import urlencode
|
||||||
from nilmdb.utils import misc
|
|
||||||
from nilmdb.utils import atomic
|
from nilmdb.utils import atomic
|
||||||
|
@@ -1,7 +1,7 @@
|
|||||||
import os
|
import os
|
||||||
from math import log
|
from math import log
|
||||||
|
|
||||||
def sizeof_fmt(num):
|
def human_size(num):
|
||||||
"""Human friendly file size"""
|
"""Human friendly file size"""
|
||||||
unit_list = zip(['bytes', 'kiB', 'MiB', 'GiB', 'TiB'], [0, 0, 1, 2, 2])
|
unit_list = zip(['bytes', 'kiB', 'MiB', 'GiB', 'TiB'], [0, 0, 1, 2, 2])
|
||||||
if num > 1:
|
if num > 1:
|
||||||
@@ -15,15 +15,11 @@ def sizeof_fmt(num):
|
|||||||
if num == 1: # pragma: no cover
|
if num == 1: # pragma: no cover
|
||||||
return '1 byte'
|
return '1 byte'
|
||||||
|
|
||||||
def du_bytes(path):
|
def du(path):
|
||||||
"""Like du -sb, returns total size of path in bytes."""
|
"""Like du -sb, returns total size of path in bytes."""
|
||||||
size = os.path.getsize(path)
|
size = os.path.getsize(path)
|
||||||
if os.path.isdir(path):
|
if os.path.isdir(path):
|
||||||
for thisfile in os.listdir(path):
|
for thisfile in os.listdir(path):
|
||||||
filepath = os.path.join(path, thisfile)
|
filepath = os.path.join(path, thisfile)
|
||||||
size += du_bytes(filepath)
|
size += du(filepath)
|
||||||
return size
|
return size
|
||||||
|
|
||||||
def du(path):
|
|
||||||
"""Like du -sh, returns total size of path as a human-readable string."""
|
|
||||||
return sizeof_fmt(du_bytes(path))
|
|
||||||
|
@@ -1,8 +0,0 @@
|
|||||||
import itertools
|
|
||||||
|
|
||||||
def pairwise(iterable):
|
|
||||||
"s -> (s0,s1), (s1,s2), ..., (sn,None)"
|
|
||||||
a, b = itertools.tee(iterable)
|
|
||||||
next(b, None)
|
|
||||||
return itertools.izip_longest(a, b)
|
|
||||||
|
|
@@ -38,6 +38,7 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
|
|||||||
|
|
||||||
@wrap_class_method
|
@wrap_class_method
|
||||||
def close(orig, self, *args, **kwargs):
|
def close(orig, self, *args, **kwargs):
|
||||||
|
if "_must_close" in self.__dict__:
|
||||||
del self._must_close
|
del self._must_close
|
||||||
return orig(self, *args, **kwargs)
|
return orig(self, *args, **kwargs)
|
||||||
|
|
||||||
|
1
setup.py
1
setup.py
@@ -114,6 +114,7 @@ setup(name='nilmdb',
|
|||||||
'pycurl',
|
'pycurl',
|
||||||
'python-dateutil',
|
'python-dateutil',
|
||||||
'pytz',
|
'pytz',
|
||||||
|
'psutil >= 0.3.0',
|
||||||
],
|
],
|
||||||
packages = [ 'nilmdb',
|
packages = [ 'nilmdb',
|
||||||
'nilmdb.utils',
|
'nilmdb.utils',
|
||||||
|
@@ -18,10 +18,12 @@ import simplejson as json
|
|||||||
import unittest
|
import unittest
|
||||||
import warnings
|
import warnings
|
||||||
import resource
|
import resource
|
||||||
|
import time
|
||||||
|
|
||||||
from testutil.helpers import *
|
from testutil.helpers import *
|
||||||
|
|
||||||
testdb = "tests/client-testdb"
|
testdb = "tests/client-testdb"
|
||||||
|
testurl = "http://localhost:12380/"
|
||||||
|
|
||||||
def setup_module():
|
def setup_module():
|
||||||
global test_server, test_db
|
global test_server, test_db
|
||||||
@@ -44,28 +46,32 @@ def teardown_module():
|
|||||||
|
|
||||||
class TestClient(object):
|
class TestClient(object):
|
||||||
|
|
||||||
def test_client_1_basic(self):
|
def test_client_01_basic(self):
|
||||||
# Test a fake host
|
# Test a fake host
|
||||||
client = nilmdb.Client(url = "http://localhost:1/")
|
client = nilmdb.Client(url = "http://localhost:1/")
|
||||||
with assert_raises(nilmdb.client.ServerError):
|
with assert_raises(nilmdb.client.ServerError):
|
||||||
client.version()
|
client.version()
|
||||||
|
client.close()
|
||||||
|
|
||||||
# Trigger same error with a PUT request
|
# Trigger same error with a PUT request
|
||||||
client = nilmdb.Client(url = "http://localhost:1/")
|
client = nilmdb.Client(url = "http://localhost:1/")
|
||||||
with assert_raises(nilmdb.client.ServerError):
|
with assert_raises(nilmdb.client.ServerError):
|
||||||
client.version()
|
client.version()
|
||||||
|
client.close()
|
||||||
|
|
||||||
# Then a fake URL on a real host
|
# Then a fake URL on a real host
|
||||||
client = nilmdb.Client(url = "http://localhost:12380/fake/")
|
client = nilmdb.Client(url = "http://localhost:12380/fake/")
|
||||||
with assert_raises(nilmdb.client.ClientError):
|
with assert_raises(nilmdb.client.ClientError):
|
||||||
client.version()
|
client.version()
|
||||||
|
client.close()
|
||||||
|
|
||||||
# Now a real URL with no http:// prefix
|
# Now a real URL with no http:// prefix
|
||||||
client = nilmdb.Client(url = "localhost:12380")
|
client = nilmdb.Client(url = "localhost:12380")
|
||||||
version = client.version()
|
version = client.version()
|
||||||
|
client.close()
|
||||||
|
|
||||||
# Now use the real URL
|
# Now use the real URL
|
||||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
client = nilmdb.Client(url = testurl)
|
||||||
version = client.version()
|
version = client.version()
|
||||||
eq_(distutils.version.LooseVersion(version),
|
eq_(distutils.version.LooseVersion(version),
|
||||||
distutils.version.LooseVersion(test_server.version))
|
distutils.version.LooseVersion(test_server.version))
|
||||||
@@ -73,10 +79,11 @@ class TestClient(object):
|
|||||||
# Bad URLs should give 404, not 500
|
# Bad URLs should give 404, not 500
|
||||||
with assert_raises(ClientError):
|
with assert_raises(ClientError):
|
||||||
client.http.get("/stream/create")
|
client.http.get("/stream/create")
|
||||||
|
client.close()
|
||||||
|
|
||||||
def test_client_2_createlist(self):
|
def test_client_02_createlist(self):
|
||||||
# Basic stream tests, like those in test_nilmdb:test_stream
|
# Basic stream tests, like those in test_nilmdb:test_stream
|
||||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
client = nilmdb.Client(url = testurl)
|
||||||
|
|
||||||
# Database starts empty
|
# Database starts empty
|
||||||
eq_(client.stream_list(), [])
|
eq_(client.stream_list(), [])
|
||||||
@@ -101,8 +108,10 @@ class TestClient(object):
|
|||||||
["/newton/zzz/rawnotch", "RawNotchedData"]
|
["/newton/zzz/rawnotch", "RawNotchedData"]
|
||||||
])
|
])
|
||||||
# Match just one type or one path
|
# Match just one type or one path
|
||||||
eq_(client.stream_list(layout="RawData"), [ ["/newton/raw", "RawData"] ])
|
eq_(client.stream_list(layout="RawData"),
|
||||||
eq_(client.stream_list(path="/newton/raw"), [ ["/newton/raw", "RawData"] ])
|
[ ["/newton/raw", "RawData"] ])
|
||||||
|
eq_(client.stream_list(path="/newton/raw"),
|
||||||
|
[ ["/newton/raw", "RawData"] ])
|
||||||
|
|
||||||
# Try messing with resource limits to trigger errors and get
|
# Try messing with resource limits to trigger errors and get
|
||||||
# more coverage. Here, make it so we can only create files 1
|
# more coverage. Here, make it so we can only create files 1
|
||||||
@@ -114,9 +123,10 @@ class TestClient(object):
|
|||||||
client.stream_create("/newton/hello", "RawData")
|
client.stream_create("/newton/hello", "RawData")
|
||||||
resource.setrlimit(resource.RLIMIT_FSIZE, limit)
|
resource.setrlimit(resource.RLIMIT_FSIZE, limit)
|
||||||
|
|
||||||
|
client.close()
|
||||||
|
|
||||||
def test_client_3_metadata(self):
|
def test_client_03_metadata(self):
|
||||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
client = nilmdb.Client(url = testurl)
|
||||||
|
|
||||||
# Set / get metadata
|
# Set / get metadata
|
||||||
eq_(client.stream_get_metadata("/newton/prep"), {})
|
eq_(client.stream_get_metadata("/newton/prep"), {})
|
||||||
@@ -131,9 +141,10 @@ class TestClient(object):
|
|||||||
client.stream_update_metadata("/newton/raw", meta3)
|
client.stream_update_metadata("/newton/raw", meta3)
|
||||||
eq_(client.stream_get_metadata("/newton/prep"), meta1)
|
eq_(client.stream_get_metadata("/newton/prep"), meta1)
|
||||||
eq_(client.stream_get_metadata("/newton/raw"), meta1)
|
eq_(client.stream_get_metadata("/newton/raw"), meta1)
|
||||||
eq_(client.stream_get_metadata("/newton/raw", [ "description" ] ), meta2)
|
eq_(client.stream_get_metadata("/newton/raw",
|
||||||
eq_(client.stream_get_metadata("/newton/raw", [ "description",
|
[ "description" ] ), meta2)
|
||||||
"v_scale" ] ), meta1)
|
eq_(client.stream_get_metadata("/newton/raw",
|
||||||
|
[ "description", "v_scale" ] ), meta1)
|
||||||
|
|
||||||
# missing key
|
# missing key
|
||||||
eq_(client.stream_get_metadata("/newton/raw", "descr"),
|
eq_(client.stream_get_metadata("/newton/raw", "descr"),
|
||||||
@@ -146,9 +157,10 @@ class TestClient(object):
|
|||||||
client.stream_set_metadata("/newton/prep", [1,2,3])
|
client.stream_set_metadata("/newton/prep", [1,2,3])
|
||||||
with assert_raises(ClientError):
|
with assert_raises(ClientError):
|
||||||
client.stream_update_metadata("/newton/prep", [1,2,3])
|
client.stream_update_metadata("/newton/prep", [1,2,3])
|
||||||
|
client.close()
|
||||||
|
|
||||||
def test_client_4_insert(self):
|
def test_client_04_insert(self):
|
||||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
client = nilmdb.Client(url = testurl)
|
||||||
|
|
||||||
datetime_tz.localtz_set("America/New_York")
|
datetime_tz.localtz_set("America/New_York")
|
||||||
|
|
||||||
@@ -177,12 +189,33 @@ class TestClient(object):
|
|||||||
result = client.stream_insert("/newton/prep", data)
|
result = client.stream_insert("/newton/prep", data)
|
||||||
eq_(result, None)
|
eq_(result, None)
|
||||||
|
|
||||||
# Try forcing a server request with empty data
|
# It's OK to insert an empty interval
|
||||||
|
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||||||
|
"start": 1, "end": 2 })
|
||||||
|
eq_(list(client.stream_intervals("/newton/prep")), [[1, 2]])
|
||||||
|
client.stream_remove("/newton/prep")
|
||||||
|
eq_(list(client.stream_intervals("/newton/prep")), [])
|
||||||
|
|
||||||
|
# Timestamps can be negative too
|
||||||
|
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||||||
|
"start": -2, "end": -1 })
|
||||||
|
eq_(list(client.stream_intervals("/newton/prep")), [[-2, -1]])
|
||||||
|
client.stream_remove("/newton/prep")
|
||||||
|
eq_(list(client.stream_intervals("/newton/prep")), [])
|
||||||
|
|
||||||
|
# Intervals that end at zero shouldn't be any different
|
||||||
|
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||||||
|
"start": -1, "end": 0 })
|
||||||
|
eq_(list(client.stream_intervals("/newton/prep")), [[-1, 0]])
|
||||||
|
client.stream_remove("/newton/prep")
|
||||||
|
eq_(list(client.stream_intervals("/newton/prep")), [])
|
||||||
|
|
||||||
|
# Try forcing a server request with equal start and end
|
||||||
with assert_raises(ClientError) as e:
|
with assert_raises(ClientError) as e:
|
||||||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||||||
"start": 0, "end": 0 })
|
"start": 0, "end": 0 })
|
||||||
in_("400 Bad Request", str(e.exception))
|
in_("400 Bad Request", str(e.exception))
|
||||||
in_("no data provided", str(e.exception))
|
in_("start must precede end", str(e.exception))
|
||||||
|
|
||||||
# Specify start/end (starts too late)
|
# Specify start/end (starts too late)
|
||||||
data = timestamper.TimestamperRate(testfile, start, 120)
|
data = timestamper.TimestamperRate(testfile, start, 120)
|
||||||
@@ -208,7 +241,6 @@ class TestClient(object):
|
|||||||
data = timestamper.TimestamperRate(testfile, start, 120)
|
data = timestamper.TimestamperRate(testfile, start, 120)
|
||||||
result = client.stream_insert("/newton/prep", data,
|
result = client.stream_insert("/newton/prep", data,
|
||||||
start, start + 119.999777)
|
start, start + 119.999777)
|
||||||
eq_(result, "ok")
|
|
||||||
|
|
||||||
# Verify the intervals. Should be just one, even if the data
|
# Verify the intervals. Should be just one, even if the data
|
||||||
# was inserted in chunks, due to nilmdb interval concatenation.
|
# was inserted in chunks, due to nilmdb interval concatenation.
|
||||||
@@ -222,20 +254,33 @@ class TestClient(object):
|
|||||||
in_("400 Bad Request", str(e.exception))
|
in_("400 Bad Request", str(e.exception))
|
||||||
in_("verlap", str(e.exception))
|
in_("verlap", str(e.exception))
|
||||||
|
|
||||||
def test_client_5_extractremove(self):
|
client.close()
|
||||||
# Misc tests for extract and remove. Most of them are in test_cmdline.
|
|
||||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
|
||||||
|
|
||||||
for x in client.stream_extract("/newton/prep", 123, 123):
|
def test_client_05_extractremove(self):
|
||||||
|
# Misc tests for extract and remove. Most of them are in test_cmdline.
|
||||||
|
client = nilmdb.Client(url = testurl)
|
||||||
|
|
||||||
|
for x in client.stream_extract("/newton/prep", 999123, 999124):
|
||||||
raise AssertionError("shouldn't be any data for this request")
|
raise AssertionError("shouldn't be any data for this request")
|
||||||
|
|
||||||
with assert_raises(ClientError) as e:
|
with assert_raises(ClientError) as e:
|
||||||
client.stream_remove("/newton/prep", 123, 120)
|
client.stream_remove("/newton/prep", 123, 120)
|
||||||
|
|
||||||
def test_client_6_generators(self):
|
# Test the exception we get if we nest requests
|
||||||
|
with assert_raises(Exception) as e:
|
||||||
|
for data in client.stream_extract("/newton/prep"):
|
||||||
|
x = client.stream_intervals("/newton/prep")
|
||||||
|
in_("nesting calls is not supported", str(e.exception))
|
||||||
|
|
||||||
|
# Test count
|
||||||
|
eq_(client.stream_count("/newton/prep"), 14400)
|
||||||
|
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
def test_client_06_generators(self):
|
||||||
# A lot of the client functionality is already tested by test_cmdline,
|
# A lot of the client functionality is already tested by test_cmdline,
|
||||||
# but this gets a bit more coverage that cmdline misses.
|
# but this gets a bit more coverage that cmdline misses.
|
||||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
client = nilmdb.Client(url = testurl)
|
||||||
|
|
||||||
# Trigger a client error in generator
|
# Trigger a client error in generator
|
||||||
start = datetime_tz.datetime_tz.smartparse("20120323T2000")
|
start = datetime_tz.datetime_tz.smartparse("20120323T2000")
|
||||||
@@ -246,7 +291,7 @@ class TestClient(object):
|
|||||||
start.totimestamp(),
|
start.totimestamp(),
|
||||||
end.totimestamp()).next()
|
end.totimestamp()).next()
|
||||||
in_("400 Bad Request", str(e.exception))
|
in_("400 Bad Request", str(e.exception))
|
||||||
in_("end before start", str(e.exception))
|
in_("start must precede end", str(e.exception))
|
||||||
|
|
||||||
# Trigger a curl error in generator
|
# Trigger a curl error in generator
|
||||||
with assert_raises(ServerError) as e:
|
with assert_raises(ServerError) as e:
|
||||||
@@ -272,7 +317,7 @@ class TestClient(object):
|
|||||||
{ "path": "/newton/prep",
|
{ "path": "/newton/prep",
|
||||||
"start": 0, "end": 0 }).next()
|
"start": 0, "end": 0 }).next()
|
||||||
in_("400 Bad Request", str(e.exception))
|
in_("400 Bad Request", str(e.exception))
|
||||||
in_("no data provided", str(e.exception))
|
in_("start must precede end", str(e.exception))
|
||||||
|
|
||||||
# Check 404 for missing streams
|
# Check 404 for missing streams
|
||||||
for function in [ client.stream_intervals, client.stream_extract ]:
|
for function in [ client.stream_intervals, client.stream_extract ]:
|
||||||
@@ -281,13 +326,15 @@ class TestClient(object):
|
|||||||
in_("404 Not Found", str(e.exception))
|
in_("404 Not Found", str(e.exception))
|
||||||
in_("No such stream", str(e.exception))
|
in_("No such stream", str(e.exception))
|
||||||
|
|
||||||
def test_client_7_headers(self):
|
client.close()
|
||||||
|
|
||||||
|
def test_client_07_headers(self):
|
||||||
# Make sure that /stream/intervals and /stream/extract
|
# Make sure that /stream/intervals and /stream/extract
|
||||||
# properly return streaming, chunked, text/plain response.
|
# properly return streaming, chunked, text/plain response.
|
||||||
# Pokes around in client.http internals a bit to look at the
|
# Pokes around in client.http internals a bit to look at the
|
||||||
# response headers.
|
# response headers.
|
||||||
|
|
||||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
client = nilmdb.Client(url = testurl)
|
||||||
http = client.http
|
http = client.http
|
||||||
|
|
||||||
# Use a warning rather than returning a test failure, so that we can
|
# Use a warning rather than returning a test failure, so that we can
|
||||||
@@ -307,7 +354,7 @@ class TestClient(object):
|
|||||||
x = http.get("stream/extract",
|
x = http.get("stream/extract",
|
||||||
{ "path": "/newton/prep",
|
{ "path": "/newton/prep",
|
||||||
"start": "123",
|
"start": "123",
|
||||||
"end": "123" }, retjson=False)
|
"end": "124" }, retjson=False)
|
||||||
if "Transfer-Encoding: chunked" not in http._headers:
|
if "Transfer-Encoding: chunked" not in http._headers:
|
||||||
warnings.warn("Non-chunked HTTP response for /stream/extract")
|
warnings.warn("Non-chunked HTTP response for /stream/extract")
|
||||||
if "Content-Type: text/plain;charset=utf-8" not in http._headers:
|
if "Content-Type: text/plain;charset=utf-8" not in http._headers:
|
||||||
@@ -320,9 +367,11 @@ class TestClient(object):
|
|||||||
"header in /stream/extract response:\n" +
|
"header in /stream/extract response:\n" +
|
||||||
http._headers)
|
http._headers)
|
||||||
|
|
||||||
def test_client_8_unicode(self):
|
client.close()
|
||||||
|
|
||||||
|
def test_client_08_unicode(self):
|
||||||
# Basic Unicode tests
|
# Basic Unicode tests
|
||||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
client = nilmdb.Client(url = testurl)
|
||||||
|
|
||||||
# Delete streams that exist
|
# Delete streams that exist
|
||||||
for stream in client.stream_list():
|
for stream in client.stream_list():
|
||||||
@@ -356,3 +405,174 @@ class TestClient(object):
|
|||||||
eq_(client.stream_get_metadata(raw[0]), meta1)
|
eq_(client.stream_get_metadata(raw[0]), meta1)
|
||||||
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
|
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
|
||||||
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
|
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
|
||||||
|
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
def test_client_09_closing(self):
|
||||||
|
# Make sure we actually close sockets correctly. New
|
||||||
|
# connections will block for a while if they're not, since the
|
||||||
|
# server will stop accepting new connections.
|
||||||
|
for test in [1, 2]:
|
||||||
|
start = time.time()
|
||||||
|
for i in range(50):
|
||||||
|
if time.time() - start > 15:
|
||||||
|
raise AssertionError("Connections seem to be blocking... "
|
||||||
|
"probably not closing properly.")
|
||||||
|
if test == 1:
|
||||||
|
# explicit close
|
||||||
|
client = nilmdb.Client(url = testurl)
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
client.stream_remove("/newton/prep", 123, 120)
|
||||||
|
client.close() # remove this to see the failure
|
||||||
|
elif test == 2:
|
||||||
|
# use the context manager
|
||||||
|
with nilmdb.Client(url = testurl) as c:
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
c.stream_remove("/newton/prep", 123, 120)
|
||||||
|
|
||||||
|
def test_client_10_context(self):
|
||||||
|
# Test using the client's stream insertion context manager to
|
||||||
|
# insert data.
|
||||||
|
client = nilmdb.Client(testurl)
|
||||||
|
|
||||||
|
client.stream_create("/context/test", "uint16_1")
|
||||||
|
with client.stream_insert_context("/context/test") as ctx:
|
||||||
|
# override _max_data to trigger frequent server updates
|
||||||
|
ctx._max_data = 15
|
||||||
|
|
||||||
|
with assert_raises(ValueError):
|
||||||
|
ctx.insert_line("100 1")
|
||||||
|
|
||||||
|
ctx.insert_line("100 1\n")
|
||||||
|
ctx.insert_iter([ "101 1\n",
|
||||||
|
"102 1\n",
|
||||||
|
"103 1\n" ])
|
||||||
|
ctx.insert_line("104 1\n")
|
||||||
|
ctx.insert_line("105 1\n")
|
||||||
|
ctx.finalize()
|
||||||
|
|
||||||
|
ctx.insert_line("106 1\n")
|
||||||
|
ctx.update_end(106.5)
|
||||||
|
ctx.finalize()
|
||||||
|
ctx.update_start(106.8)
|
||||||
|
ctx.insert_line("107 1\n")
|
||||||
|
ctx.insert_line("108 1\n")
|
||||||
|
ctx.insert_line("109 1\n")
|
||||||
|
ctx.insert_line("110 1\n")
|
||||||
|
ctx.insert_line("111 1\n")
|
||||||
|
ctx.update_end(113)
|
||||||
|
ctx.insert_line("112 1\n")
|
||||||
|
ctx.update_end(114)
|
||||||
|
ctx.insert_line("113 1\n")
|
||||||
|
ctx.update_end(115)
|
||||||
|
ctx.insert_line("114 1\n")
|
||||||
|
ctx.finalize()
|
||||||
|
|
||||||
|
with assert_raises(ClientError):
|
||||||
|
with client.stream_insert_context("/context/test", 100, 200) as ctx:
|
||||||
|
ctx.insert_line("115 1\n")
|
||||||
|
|
||||||
|
with assert_raises(ClientError):
|
||||||
|
with client.stream_insert_context("/context/test", 200, 300) as ctx:
|
||||||
|
ctx.insert_line("115 1\n")
|
||||||
|
|
||||||
|
with client.stream_insert_context("/context/test", 200, 300) as ctx:
|
||||||
|
# make sure our override wasn't permanent
|
||||||
|
ne_(ctx._max_data, 15)
|
||||||
|
ctx.insert_line("225 1\n")
|
||||||
|
ctx.finalize()
|
||||||
|
|
||||||
|
eq_(list(client.stream_intervals("/context/test")),
|
||||||
|
[ [ 100, 105.000001 ],
|
||||||
|
[ 106, 106.5 ],
|
||||||
|
[ 106.8, 115 ],
|
||||||
|
[ 200, 300 ] ])
|
||||||
|
|
||||||
|
client.stream_destroy("/context/test")
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
def test_client_11_emptyintervals(self):
|
||||||
|
# Empty intervals are ok! If recording detection events
|
||||||
|
# by inserting rows into the database, we want to be able to
|
||||||
|
# have an interval where no events occurred. Test them here.
|
||||||
|
client = nilmdb.Client(testurl)
|
||||||
|
client.stream_create("/empty/test", "uint16_1")
|
||||||
|
|
||||||
|
def info():
|
||||||
|
result = []
|
||||||
|
for interval in list(client.stream_intervals("/empty/test")):
|
||||||
|
result.append((client.stream_count("/empty/test", *interval),
|
||||||
|
interval))
|
||||||
|
return result
|
||||||
|
|
||||||
|
eq_(info(), [])
|
||||||
|
|
||||||
|
# Insert a region with just a few points
|
||||||
|
with client.stream_insert_context("/empty/test") as ctx:
|
||||||
|
ctx.update_start(100)
|
||||||
|
ctx.insert_line("140 1\n")
|
||||||
|
ctx.insert_line("150 1\n")
|
||||||
|
ctx.insert_line("160 1\n")
|
||||||
|
ctx.update_end(200)
|
||||||
|
ctx.finalize()
|
||||||
|
|
||||||
|
eq_(info(), [(3, [100, 200])])
|
||||||
|
|
||||||
|
# Delete chunk, which will leave one data point and two intervals
|
||||||
|
client.stream_remove("/empty/test", 145, 175)
|
||||||
|
eq_(info(), [(1, [100, 145]),
|
||||||
|
(0, [175, 200])])
|
||||||
|
|
||||||
|
# Try also creating a completely empty interval from scratch,
|
||||||
|
# in a few different ways.
|
||||||
|
client.stream_insert_block("/empty/test", "", 300, 350)
|
||||||
|
client.stream_insert("/empty/test", [], 400, 450)
|
||||||
|
with client.stream_insert_context("/empty/test", 500, 550):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# If enough timestamps aren't provided, empty streams won't be created.
|
||||||
|
client.stream_insert("/empty/test", [])
|
||||||
|
with client.stream_insert_context("/empty/test"):
|
||||||
|
pass
|
||||||
|
client.stream_insert("/empty/test", [], start = 600)
|
||||||
|
with client.stream_insert_context("/empty/test", start = 700):
|
||||||
|
pass
|
||||||
|
client.stream_insert("/empty/test", [], end = 850)
|
||||||
|
with client.stream_insert_context("/empty/test", end = 950):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Try various things that might cause problems
|
||||||
|
with client.stream_insert_context("/empty/test", 1000, 1050):
|
||||||
|
ctx.finalize() # inserts [1000, 1050]
|
||||||
|
ctx.finalize() # nothing
|
||||||
|
ctx.finalize() # nothing
|
||||||
|
ctx.insert_line("1100 1\n")
|
||||||
|
ctx.finalize() # inserts [1100, 1100.000001]
|
||||||
|
ctx.update_start(1199)
|
||||||
|
ctx.insert_line("1200 1\n")
|
||||||
|
ctx.update_end(1250)
|
||||||
|
ctx.finalize() # inserts [1199, 1250]
|
||||||
|
ctx.update_start(1299)
|
||||||
|
ctx.finalize() # nothing
|
||||||
|
ctx.update_end(1350)
|
||||||
|
ctx.finalize() # nothing
|
||||||
|
ctx.update_start(1400)
|
||||||
|
ctx.update_end(1450)
|
||||||
|
ctx.finalize()
|
||||||
|
# implicit last finalize inserts [1400, 1450]
|
||||||
|
|
||||||
|
# Check everything
|
||||||
|
eq_(info(), [(1, [100, 145]),
|
||||||
|
(0, [175, 200]),
|
||||||
|
(0, [300, 350]),
|
||||||
|
(0, [400, 450]),
|
||||||
|
(0, [500, 550]),
|
||||||
|
(0, [1000, 1050]),
|
||||||
|
(1, [1100, 1100.000001]),
|
||||||
|
(1, [1199, 1250]),
|
||||||
|
(0, [1400, 1450]),
|
||||||
|
])
|
||||||
|
|
||||||
|
# Clean up
|
||||||
|
client.stream_destroy("/empty/test")
|
||||||
|
client.close()
|
||||||
|
@@ -194,9 +194,11 @@ class TestCmdline(object):
|
|||||||
def test_02_info(self):
|
def test_02_info(self):
|
||||||
self.ok("info")
|
self.ok("info")
|
||||||
self.contain("Server URL: http://localhost:12380/")
|
self.contain("Server URL: http://localhost:12380/")
|
||||||
|
self.contain("Client version: " + nilmdb.__version__)
|
||||||
self.contain("Server version: " + test_server.version)
|
self.contain("Server version: " + test_server.version)
|
||||||
self.contain("Server database path")
|
self.contain("Server database path")
|
||||||
self.contain("Server database size")
|
self.contain("Server database size")
|
||||||
|
self.contain("Server database free space")
|
||||||
|
|
||||||
def test_03_createlist(self):
|
def test_03_createlist(self):
|
||||||
# Basic stream tests, like those in test_client.
|
# Basic stream tests, like those in test_client.
|
||||||
@@ -272,7 +274,7 @@ class TestCmdline(object):
|
|||||||
|
|
||||||
# reversed range
|
# reversed range
|
||||||
self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01")
|
self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01")
|
||||||
self.contain("start is after end")
|
self.contain("start must precede end")
|
||||||
|
|
||||||
def test_04_metadata(self):
|
def test_04_metadata(self):
|
||||||
# Set / get metadata
|
# Set / get metadata
|
||||||
@@ -442,7 +444,7 @@ class TestCmdline(object):
|
|||||||
self.contain("no intervals")
|
self.contain("no intervals")
|
||||||
|
|
||||||
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'"
|
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'"
|
||||||
+ " --end='23 Mar 2012 10:05:15.50'")
|
+ " --end='23 Mar 2012 10:05:15.51'")
|
||||||
lines_(self.captured, 2)
|
lines_(self.captured, 2)
|
||||||
self.contain("10:05:15.500")
|
self.contain("10:05:15.500")
|
||||||
|
|
||||||
@@ -471,29 +473,29 @@ class TestCmdline(object):
|
|||||||
|
|
||||||
# empty ranges return error 2
|
# empty ranges return error 2
|
||||||
self.fail("extract -a /newton/prep " +
|
self.fail("extract -a /newton/prep " +
|
||||||
"--start '23 Mar 2012 10:00:30' " +
|
"--start '23 Mar 2012 20:00:30' " +
|
||||||
"--end '23 Mar 2012 10:00:30'",
|
"--end '23 Mar 2012 20:00:31'",
|
||||||
exitcode = 2, require_error = False)
|
exitcode = 2, require_error = False)
|
||||||
self.contain("no data")
|
self.contain("no data")
|
||||||
self.fail("extract -a /newton/prep " +
|
self.fail("extract -a /newton/prep " +
|
||||||
"--start '23 Mar 2012 10:00:30.000001' " +
|
"--start '23 Mar 2012 20:00:30.000001' " +
|
||||||
"--end '23 Mar 2012 10:00:30.000001'",
|
"--end '23 Mar 2012 20:00:30.000002'",
|
||||||
exitcode = 2, require_error = False)
|
exitcode = 2, require_error = False)
|
||||||
self.contain("no data")
|
self.contain("no data")
|
||||||
self.fail("extract -a /newton/prep " +
|
self.fail("extract -a /newton/prep " +
|
||||||
"--start '23 Mar 2022 10:00:30' " +
|
"--start '23 Mar 2022 10:00:30' " +
|
||||||
"--end '23 Mar 2022 10:00:30'",
|
"--end '23 Mar 2022 10:00:31'",
|
||||||
exitcode = 2, require_error = False)
|
exitcode = 2, require_error = False)
|
||||||
self.contain("no data")
|
self.contain("no data")
|
||||||
|
|
||||||
# but are ok if we're just counting results
|
# but are ok if we're just counting results
|
||||||
self.ok("extract --count /newton/prep " +
|
self.ok("extract --count /newton/prep " +
|
||||||
"--start '23 Mar 2012 10:00:30' " +
|
"--start '23 Mar 2012 20:00:30' " +
|
||||||
"--end '23 Mar 2012 10:00:30'")
|
"--end '23 Mar 2012 20:00:31'")
|
||||||
self.match("0\n")
|
self.match("0\n")
|
||||||
self.ok("extract -c /newton/prep " +
|
self.ok("extract -c /newton/prep " +
|
||||||
"--start '23 Mar 2012 10:00:30.000001' " +
|
"--start '23 Mar 2012 20:00:30.000001' " +
|
||||||
"--end '23 Mar 2012 10:00:30.000001'")
|
"--end '23 Mar 2012 20:00:30.000002'")
|
||||||
self.match("0\n")
|
self.match("0\n")
|
||||||
|
|
||||||
# Check various dumps against stored copies of how they should appear
|
# Check various dumps against stored copies of how they should appear
|
||||||
@@ -540,31 +542,31 @@ class TestCmdline(object):
|
|||||||
self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01")
|
self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01")
|
||||||
self.contain("No stream at path")
|
self.contain("No stream at path")
|
||||||
|
|
||||||
|
# empty or backward ranges return errors
|
||||||
self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01")
|
self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01")
|
||||||
self.contain("start is after end")
|
self.contain("start must precede end")
|
||||||
|
|
||||||
# empty ranges return success, backwards ranges return error
|
self.fail("remove /newton/prep " +
|
||||||
self.ok("remove /newton/prep " +
|
|
||||||
"--start '23 Mar 2012 10:00:30' " +
|
"--start '23 Mar 2012 10:00:30' " +
|
||||||
"--end '23 Mar 2012 10:00:30'")
|
"--end '23 Mar 2012 10:00:30'")
|
||||||
self.match("")
|
self.contain("start must precede end")
|
||||||
self.ok("remove /newton/prep " +
|
self.fail("remove /newton/prep " +
|
||||||
"--start '23 Mar 2012 10:00:30.000001' " +
|
"--start '23 Mar 2012 10:00:30.000001' " +
|
||||||
"--end '23 Mar 2012 10:00:30.000001'")
|
"--end '23 Mar 2012 10:00:30.000001'")
|
||||||
self.match("")
|
self.contain("start must precede end")
|
||||||
self.ok("remove /newton/prep " +
|
self.fail("remove /newton/prep " +
|
||||||
"--start '23 Mar 2022 10:00:30' " +
|
"--start '23 Mar 2022 10:00:30' " +
|
||||||
"--end '23 Mar 2022 10:00:30'")
|
"--end '23 Mar 2022 10:00:30'")
|
||||||
self.match("")
|
self.contain("start must precede end")
|
||||||
|
|
||||||
# Verbose
|
# Verbose
|
||||||
self.ok("remove -c /newton/prep " +
|
self.ok("remove -c /newton/prep " +
|
||||||
"--start '23 Mar 2012 10:00:30' " +
|
"--start '23 Mar 2022 20:00:30' " +
|
||||||
"--end '23 Mar 2012 10:00:30'")
|
"--end '23 Mar 2022 20:00:31'")
|
||||||
self.match("0\n")
|
self.match("0\n")
|
||||||
self.ok("remove --count /newton/prep " +
|
self.ok("remove --count /newton/prep " +
|
||||||
"--start '23 Mar 2012 10:00:30' " +
|
"--start '23 Mar 2022 20:00:30' " +
|
||||||
"--end '23 Mar 2012 10:00:30'")
|
"--end '23 Mar 2022 20:00:31'")
|
||||||
self.match("0\n")
|
self.match("0\n")
|
||||||
|
|
||||||
# Make sure we have the data we expect
|
# Make sure we have the data we expect
|
||||||
@@ -765,7 +767,7 @@ class TestCmdline(object):
|
|||||||
"tests/data/prep-20120323T1000")
|
"tests/data/prep-20120323T1000")
|
||||||
|
|
||||||
# Should take up about 2.8 MB here (including directory entries)
|
# Should take up about 2.8 MB here (including directory entries)
|
||||||
du_before = nilmdb.utils.diskusage.du_bytes(testdb)
|
du_before = nilmdb.utils.diskusage.du(testdb)
|
||||||
|
|
||||||
# Make sure we have the data we expect
|
# Make sure we have the data we expect
|
||||||
self.ok("list --detail")
|
self.ok("list --detail")
|
||||||
@@ -815,7 +817,7 @@ class TestCmdline(object):
|
|||||||
|
|
||||||
# We have 1/8 of the data that we had before, so the file size
|
# We have 1/8 of the data that we had before, so the file size
|
||||||
# should have dropped below 1/4 of what it used to be
|
# should have dropped below 1/4 of what it used to be
|
||||||
du_after = nilmdb.utils.diskusage.du_bytes(testdb)
|
du_after = nilmdb.utils.diskusage.du(testdb)
|
||||||
lt_(du_after, (du_before / 4))
|
lt_(du_after, (du_before / 4))
|
||||||
|
|
||||||
# Remove anything that came from the 10:02 data file
|
# Remove anything that came from the 10:02 data file
|
||||||
|
@@ -55,7 +55,7 @@ class TestInterval:
|
|||||||
for x in [ "03/24/2012", "03/25/2012", "03/26/2012" ] ]
|
for x in [ "03/24/2012", "03/25/2012", "03/26/2012" ] ]
|
||||||
|
|
||||||
# basic construction
|
# basic construction
|
||||||
i = Interval(d1, d1)
|
i = Interval(d1, d2)
|
||||||
i = Interval(d1, d3)
|
i = Interval(d1, d3)
|
||||||
eq_(i.start, d1)
|
eq_(i.start, d1)
|
||||||
eq_(i.end, d3)
|
eq_(i.end, d3)
|
||||||
@@ -77,8 +77,8 @@ class TestInterval:
|
|||||||
assert(Interval(d1, d3) > Interval(d1, d2))
|
assert(Interval(d1, d3) > Interval(d1, d2))
|
||||||
assert(Interval(d1, d2) < Interval(d2, d3))
|
assert(Interval(d1, d2) < Interval(d2, d3))
|
||||||
assert(Interval(d1, d3) < Interval(d2, d3))
|
assert(Interval(d1, d3) < Interval(d2, d3))
|
||||||
assert(Interval(d2, d2) > Interval(d1, d3))
|
assert(Interval(d2, d2+0.01) > Interval(d1, d3))
|
||||||
assert(Interval(d3, d3) == Interval(d3, d3))
|
assert(Interval(d3, d3+0.01) == Interval(d3, d3+0.01))
|
||||||
#with assert_raises(TypeError): # was AttributeError, that's wrong
|
#with assert_raises(TypeError): # was AttributeError, that's wrong
|
||||||
# x = (i == 123)
|
# x = (i == 123)
|
||||||
|
|
||||||
@@ -293,7 +293,7 @@ class TestIntervalDB:
|
|||||||
# actual start, end can be a subset
|
# actual start, end can be a subset
|
||||||
a = DBInterval(150, 200, 100, 200, 10000, 20000)
|
a = DBInterval(150, 200, 100, 200, 10000, 20000)
|
||||||
b = DBInterval(100, 150, 100, 200, 10000, 20000)
|
b = DBInterval(100, 150, 100, 200, 10000, 20000)
|
||||||
c = DBInterval(150, 150, 100, 200, 10000, 20000)
|
c = DBInterval(150, 160, 100, 200, 10000, 20000)
|
||||||
|
|
||||||
# Make a set of DBIntervals
|
# Make a set of DBIntervals
|
||||||
iseta = IntervalSet([a, b])
|
iseta = IntervalSet([a, b])
|
||||||
|
@@ -93,6 +93,13 @@ class Test00Nilmdb(object): # named 00 so it runs first
|
|||||||
eq_(db.stream_get_metadata("/newton/prep"), meta1)
|
eq_(db.stream_get_metadata("/newton/prep"), meta1)
|
||||||
eq_(db.stream_get_metadata("/newton/raw"), meta1)
|
eq_(db.stream_get_metadata("/newton/raw"), meta1)
|
||||||
|
|
||||||
|
# fill in some test coverage for start >= end
|
||||||
|
with assert_raises(nilmdb.server.NilmDBError):
|
||||||
|
db.stream_remove("/newton/prep", 0, 0)
|
||||||
|
with assert_raises(nilmdb.server.NilmDBError):
|
||||||
|
db.stream_remove("/newton/prep", 1, 0)
|
||||||
|
db.stream_remove("/newton/prep", 0, 1)
|
||||||
|
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
class TestBlockingServer(object):
|
class TestBlockingServer(object):
|
||||||
|
Reference in New Issue
Block a user