Compare commits

..

24 Commits

Author SHA1 Message Date
5dce851bef Merge branch 'client-insert-context' 2013-02-23 14:37:59 -05:00
1431e41d16 Allow inserting empty intervals in the database, and add tests for it.
Previously, we could get empty intervals anyway by having a non-empty
interval and removing a smaller interval around each piece of data.
Turns out that empty intervals are OK and needed in some situations,
so explicitly allow and test for it.
2013-02-21 14:07:35 -05:00
a49c655816 Strictly enforce (start < end) for all intervals.
Previously, we allowed start == end, but this doesn't make sense with
half-open intervals.
2013-02-21 14:06:40 -05:00
30e3ffc0e9 Fix check for interval ends to be None, so that zero doesn't confuse it 2013-02-21 12:42:33 -05:00
db7211c3a9 Have server verify that start <= end before creating intervals
Also rename _fill_in_limits to _check_user_times
2013-02-21 12:38:51 -05:00
c6d57cf5c3 Fix errors with calculating limits when start==end==None
This also has the effect of now handling negative timestamps
correctly.
2013-02-19 19:27:06 -05:00
ca5253ddee Fix and test stream_count 2013-02-19 18:26:44 -05:00
e19da84b2e server: always return None instead of sometimes returning "ok"
Previously some functions returned the string "ok".
2013-02-19 18:26:44 -05:00
3e8e3542fd Test for detecting nested HTTP requests 2013-02-19 18:26:44 -05:00
2f7365412d client: detect and give a more clear error when HTTP requests are nested 2013-02-19 17:20:07 -05:00
bba9ad131e Add test for client.stream_insert_context 2013-02-19 17:19:45 -05:00
ee24380d1f Replace duplicated URL in tests with a variable 2013-02-19 15:27:51 -05:00
bfcd91acf8 client tests: renumber 2013-02-19 15:25:34 -05:00
d97291d4d3 client: Use .stream_insert_block from within .stream_insert_context
Avoids duplicating code.
2013-02-19 15:25:01 -05:00
a61fbbcf45 Big rework of client stream_insert_context
Now supports these operations:
  ctx.insert_line()
  ctx.insert_iter()
  ctx.finalize() (end the current contiguous interval, so a new one
                  can be started with a gap)
  ctx.update_end() (update ending timestamp before finalizing interval)
  ctx.update_start() (update starting timestamp for new interval)
2013-02-18 18:06:03 -05:00
5adc8fd0a7 Remove nilmdb.utils.misc.pairwise, as it's no longer used. 2013-02-18 18:06:03 -05:00
251a486c28 client.py: Significant speedup in stream_insert_context
block_data += "string" is fast with local variables, but slow with
variables inside some namespace.  Instead, build a list of strings and
join them once at the end.  This fixes the slowdown that resulted from
the stream_insert_context cleanup.
2013-02-18 18:06:03 -05:00
1edb96a0bd Add client.stream_insert_context, convert everything to use it. Slow.
Not sure why this is so painfully slow.  Need more testing;
might have to scratch the idea.
2013-02-18 18:06:03 -05:00
52e674a192 Fix warning in mustclose decorator 2013-02-18 18:05:45 -05:00
e241c13bf1 Remove must_close decorator from client
It still should be closed, but warning each time was mostly for
debugging and it's kind of annoying when writing one-off programs
where it's OK to just let things get torn down as they're completed.
Not closing is not fatal in terms of data integrity etc.
2013-02-18 18:02:05 -05:00
b53ff31212 client: Add must_close() decorator to nilmdb.Client, and fix tests
Test suite wasn't closing connections correctly.
2013-02-16 18:55:23 -05:00
2045e89f24 client: Add context manager functionality, test closing 2013-02-16 18:55:20 -05:00
841b2dab5c server: Replace /dbpath and /dbsize with a more generic /dbinfo
Update tests accordingly.  This isn't backwards compatible, but
existing clients don't rely on it.
2013-02-14 16:57:33 -05:00
d634f7d3cf bulkdata: Use file writes instead of writing to the mmap.
Extending and then writing to the mmap file has a problem: if the disk
fills up, the mapping becomes invalid, and the Python interpreter will
get a SIGBUS, killing it.  It's difficult to catch this gracefully;
there's no way to do that with existing modules.  Instead, switch to
only using mmap when reading, and normal file writes when writing.
Since we only ever append, it should have similar performance.
2013-02-13 20:30:39 -05:00
19 changed files with 687 additions and 251 deletions

View File

@@ -7,7 +7,7 @@ Prerequisites:
sudo apt-get install python2.7 python2.7-dev python-setuptools cython sudo apt-get install python2.7 python2.7-dev python-setuptools cython
# Base NilmDB dependencies # Base NilmDB dependencies
sudo apt-get install python-cherrypy3 python-decorator python-simplejson python-pycurl python-dateutil python-tz sudo apt-get install python-cherrypy3 python-decorator python-simplejson python-pycurl python-dateutil python-tz python-psutil
# Tools for running tests # Tools for running tests
sudo apt-get install python-nose python-coverage sudo apt-get install python-nose python-coverage

View File

@@ -8,22 +8,35 @@ import nilmdb.client.httpclient
import time import time
import simplejson as json import simplejson as json
import contextlib
def float_to_string(f): def float_to_string(f):
"""Use repr to maintain full precision in the string output.""" """Use repr to maintain full precision in the string output."""
return repr(float(f)) return repr(float(f))
def extract_timestamp(line):
"""Extract just the timestamp from a line of data text"""
return float(line.split()[0])
class Client(object): class Client(object):
"""Main client interface to the Nilm database.""" """Main client interface to the Nilm database."""
def __init__(self, url): def __init__(self, url):
self.http = nilmdb.client.httpclient.HTTPClient(url) self.http = nilmdb.client.httpclient.HTTPClient(url)
# __enter__/__exit__ allow this class to be a context manager
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def _json_param(self, data): def _json_param(self, data):
"""Return compact json-encoded version of parameter""" """Return compact json-encoded version of parameter"""
return json.dumps(data, separators=(',',':')) return json.dumps(data, separators=(',',':'))
def close(self): def close(self):
"""Close the connection; safe to call multiple times"""
self.http.close() self.http.close()
def geturl(self): def geturl(self):
@@ -34,13 +47,10 @@ class Client(object):
"""Return server version""" """Return server version"""
return self.http.get("version") return self.http.get("version")
def dbpath(self): def dbinfo(self):
"""Return server database path""" """Return server database info (path, size, free space)
return self.http.get("dbpath") as a dictionary."""
return self.http.get("dbinfo")
def dbsize(self):
"""Return server database size as human readable string"""
return self.http.get("dbsize")
def stream_list(self, path = None, layout = None): def stream_list(self, path = None, layout = None):
params = {} params = {}
@@ -95,79 +105,45 @@ class Client(object):
params["end"] = float_to_string(end) params["end"] = float_to_string(end)
return self.http.get("stream/remove", params) return self.http.get("stream/remove", params)
def stream_insert(self, path, data, start = None, end = None): @contextlib.contextmanager
"""Insert data into a stream. data should be a file-like object def stream_insert_context(self, path, start = None, end = None):
that provides ASCII data that matches the database layout for path. """Return a context manager that allows data to be efficiently
inserted into a stream in a piecewise manner. Data is be provided
as single lines, and is aggregated and sent to the server in larger
chunks as necessary. Data lines must match the database layout for
the given path, and end with a newline.
start and end are the starting and ending timestamp of this Example:
stream; all timestamps t in the data must satisfy 'start <= t with client.stream_insert_context('/path', start, end) as ctx:
< end'. If left unspecified, 'start' is the timestamp of the ctx.insert_line('1234567890.0 1 2 3 4\\n')
first line of data, and 'end' is the timestamp on the last line ctx.insert_line('1234567891.0 1 2 3 4\\n')
of data, plus a small delta of 1μs.
For more details, see help for nilmdb.client.client.StreamInserter
This may make multiple requests to the server, if the data is
large enough or enough time has passed between insertions.
""" """
params = { "path": path } ctx = StreamInserter(self, path, start, end)
yield ctx
ctx.finalize()
# See design.md for a discussion of how much data to send. def stream_insert(self, path, data, start = None, end = None):
# These are soft limits -- actual data might be rounded up. """Insert rows of data into a stream. data should be an
max_data = 1048576 iterable object that provides ASCII data that matches the
max_time = 30 database layout for path. See stream_insert_context for
end_epsilon = 1e-6 details on the 'start' and 'end' parameters."""
with self.stream_insert_context(path, start, end) as ctx:
ctx.insert_iter(data)
return ctx.last_response
def stream_insert_block(self, path, block, start, end):
def extract_timestamp(line): """Insert an entire block of data into a stream. Like
return float(line.split()[0]) stream_insert, except 'block' contains multiple lines of ASCII
text and is sent in one single chunk."""
def sendit(): params = { "path": path,
# If we have more data after this, use the timestamp of "start": float_to_string(start),
# the next line as the end. Otherwise, use the given "end": float_to_string(end) }
# overall end time, or add end_epsilon to the last data return self.http.put("stream/insert", block, params)
# point.
if nextline:
block_end = extract_timestamp(nextline)
if end and block_end > end:
# This is unexpected, but we'll defer to the server
# to return an error in this case.
block_end = end
elif end:
block_end = end
else:
block_end = extract_timestamp(line) + end_epsilon
# Send it
params["start"] = float_to_string(block_start)
params["end"] = float_to_string(block_end)
return self.http.put("stream/insert", block_data, params)
clock_start = time.time()
block_data = ""
block_start = start
result = None
line = None
nextline = None
for (line, nextline) in nilmdb.utils.misc.pairwise(data):
# If we don't have a starting time, extract it from the first line
if block_start is None:
block_start = extract_timestamp(line)
clock_elapsed = time.time() - clock_start
block_data += line
# If we have enough data, or enough time has elapsed,
# send this block to the server, and empty things out
# for the next block.
if (len(block_data) > max_data) or (clock_elapsed > max_time):
result = sendit()
block_start = None
block_data = ""
clock_start = time.time()
# One last block?
if len(block_data):
result = sendit()
# Return the most recent JSON result we got back, or None if
# we didn't make any requests.
return result
def stream_intervals(self, path, start = None, end = None): def stream_intervals(self, path, start = None, end = None):
""" """
@@ -188,8 +164,8 @@ class Client(object):
lines of ASCII-formatted data that matches the database lines of ASCII-formatted data that matches the database
layout for the given path. layout for the given path.
Specify count=True to just get a count of values rather than Specify count = True to return a count of matching data points
the actual data. rather than the actual data. The output format is unchanged.
""" """
params = { params = {
"path": path, "path": path,
@@ -202,3 +178,202 @@ class Client(object):
params["count"] = 1 params["count"] = 1
return self.http.get_gen("stream/extract", params, retjson = False) return self.http.get_gen("stream/extract", params, retjson = False)
def stream_count(self, path, start = None, end = None):
"""
Return the number of rows of data in the stream that satisfy
the given timestamps.
"""
counts = list(self.stream_extract(path, start, end, count = True))
return int(counts[0])
class StreamInserter(object):
"""Object returned by stream_insert_context() that manages
the insertion of rows of data into a particular path.
The basic data flow is that we are filling a contiguous interval
on the server, with no gaps, that extends from timestamp 'start'
to timestamp 'end'. Data timestamps satisfy 'start <= t < end'.
Data is provided by the user one line at a time with
.insert_line() or .insert_iter().
1. The first inserted line begins a new interval that starts at
'start'. If 'start' is not given, it is deduced from the first
line's timestamp.
2. Subsequent lines go into the same contiguous interval. As lines
are inserted, this routine may make multiple insertion requests to
the server, but will structure the timestamps to leave no gaps.
3. The current contiguous interval can be completed by manually
calling .finalize(), which the context manager will also do
automatically. This will send any remaining data to the server,
using the 'end' timestamp to end the interval.
After a .finalize(), inserting new data goes back to step 1.
.update_start() can be called before step 1 to change the start
time for the interval. .update_end() can be called before step 3
to change the end time for the interval.
"""
# See design.md for a discussion of how much data to send.
# These are soft limits -- actual data might be rounded up.
# We send when we have a certain amount of data queued, or
# when a certain amount of time has passed since the last send.
_max_data = 1048576
_max_time = 30
# Delta to add to the final timestamp, if "end" wasn't given
_end_epsilon = 1e-6
def __init__(self, client, path, start = None, end = None):
"""'http' is the httpclient object. 'path' is the database
path to insert to. 'start' and 'end' are used for the first
contiguous interval."""
self.last_response = None
self._client = client
self._path = path
# Start and end for the overall contiguous interval we're
# filling
self._interval_start = start
self._interval_end = end
# Data for the specific block we're building up to send
self._block_data = []
self._block_len = 0
self._block_start = None
# Time of last request
self._last_time = time.time()
# We keep a buffer of the two most recently inserted lines.
# Only the older one actually gets processed; the newer one
# is used to "look-ahead" to the next timestamp if we need
# to internally split an insertion into two requests.
self._line_old = None
self._line_new = None
def insert_iter(self, iter):
"""Insert all lines of ASCII formatted data from the given
iterable. Lines must be terminated with '\\n'."""
for line in iter:
self.insert_line(line)
def insert_line(self, line, allow_intermediate = True):
"""Insert a single line of ASCII formatted data. Line
must be terminated with '\\n'."""
if line and (len(line) < 1 or line[-1] != '\n'):
raise ValueError("lines must end in with a newline character")
# Store this new line, but process the previous (old) one.
# This lets us "look ahead" to the next line.
self._line_old = self._line_new
self._line_new = line
if self._line_old is None:
return
# If starting a new block, pull out the timestamp if needed.
if self._block_start is None:
if self._interval_start is not None:
# User provided a start timestamp. Use it once, then
# clear it for the next block.
self._block_start = self._interval_start
self._interval_start = None
else:
# Extract timestamp from the first row
self._block_start = extract_timestamp(self._line_old)
# Save the line
self._block_data.append(self._line_old)
self._block_len += len(self._line_old)
if allow_intermediate:
# Send an intermediate block to the server if needed.
elapsed = time.time() - self._last_time
if (self._block_len > self._max_data) or (elapsed > self._max_time):
self._send_block_intermediate()
def update_start(self, start):
"""Update the start time for the next contiguous interval.
Call this before starting to insert data for a new interval,
for example, after .finalize()"""
self._interval_start = start
def update_end(self, end):
"""Update the end time for the current contiguous interval.
Call this before .finalize()"""
self._interval_end = end
def finalize(self):
"""Stop filling the current contiguous interval.
All outstanding data will be sent, and the interval end
time of the interval will be taken from the 'end' argument
used when initializing this class, or the most recent
value passed to update_end(), or the last timestamp plus
a small epsilon value if no other endpoint was provided.
If more data is inserted after a finalize(), it will become
part of a new interval and there may be a gap left in-between."""
# Special marker tells insert_line that this is the end
self.insert_line(None, allow_intermediate = False)
if self._block_len > 0:
# We have data pending, so send the final block
self._send_block_final()
elif None not in (self._interval_start, self._interval_end):
# We have no data, but enough information to create an
# empty interval.
self._block_start = self._interval_start
self._interval_start = None
self._send_block_final()
else:
# No data, and no timestamps to use to create an empty
# interval.
pass
# Make sure both timestamps are emptied for future intervals.
self._interval_start = None
self._interval_end = None
def _send_block_intermediate(self):
"""Send data, when we still have more data to send.
Use the timestamp from the next line, so that the blocks
are contiguous."""
block_end = extract_timestamp(self._line_new)
if self._interval_end is not None and block_end > self._interval_end:
# Something's fishy -- the timestamp we found is after
# the user's specified end. Limit it here, and the
# server will return an error.
block_end = self._interval_end
self._send_block(block_end)
def _send_block_final(self):
"""Send data, when this is the last block for the interval.
There is no next line, so figure out the actual interval end
using interval_end or end_epsilon."""
if self._interval_end is not None:
# Use the user's specified end timestamp
block_end = self._interval_end
# Clear it in case we send more intervals in the future.
self._interval_end = None
else:
# Add an epsilon to the last timestamp we saw
block_end = extract_timestamp(self._line_old) + self._end_epsilon
self._send_block(block_end)
def _send_block(self, block_end):
"""Send current block to the server"""
self.last_response = self._client.stream_insert_block(
self._path, "".join(self._block_data),
self._block_start, block_end)
# Clear out the block
self._block_data = []
self._block_len = 0
self._block_start = None
# Note when we sent it
self._last_time = time.time()

View File

@@ -33,6 +33,18 @@ class HTTPClient(object):
self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.URL, url)
self.url = url self.url = url
def _check_busy_and_set_upload(self, upload):
"""Sets the pycurl.UPLOAD option, but also raises a more
friendly exception if the client is already serving a request."""
try:
self.curl.setopt(pycurl.UPLOAD, upload)
except pycurl.error as e:
if "is currently running" in str(e):
raise Exception("Client is already performing a request, and "
"nesting calls is not supported.")
else: # pragma: no cover (shouldn't happen)
raise
def _check_error(self, body = None): def _check_error(self, body = None):
code = self.curl.getinfo(pycurl.RESPONSE_CODE) code = self.curl.getinfo(pycurl.RESPONSE_CODE)
if code == 200: if code == 200:
@@ -80,11 +92,11 @@ class HTTPClient(object):
self._status = int(data.split(" ")[1]) self._status = int(data.split(" ")[1])
self._headers += data self._headers += data
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback) self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
def func(callback): def perform(callback):
self.curl.setopt(pycurl.WRITEFUNCTION, callback) self.curl.setopt(pycurl.WRITEFUNCTION, callback)
self.curl.perform() self.curl.perform()
try: try:
with nilmdb.utils.Iteratorizer(func, curl_hack = True) as it: with nilmdb.utils.Iteratorizer(perform, curl_hack = True) as it:
for i in it: for i in it:
if self._status == 200: if self._status == 200:
# If we had a 200 response, yield the data to caller. # If we had a 200 response, yield the data to caller.
@@ -156,12 +168,12 @@ class HTTPClient(object):
def get(self, url, params = None, retjson = True): def get(self, url, params = None, retjson = True):
"""Simple GET""" """Simple GET"""
self.curl.setopt(pycurl.UPLOAD, 0) self._check_busy_and_set_upload(0)
return self._doreq(url, params, retjson) return self._doreq(url, params, retjson)
def put(self, url, postdata, params = None, retjson = True): def put(self, url, postdata, params = None, retjson = True):
"""Simple PUT""" """Simple PUT"""
self.curl.setopt(pycurl.UPLOAD, 1) self._check_busy_and_set_upload(1)
self._setup_url(url, params) self._setup_url(url, params)
data = cStringIO.StringIO(postdata) data = cStringIO.StringIO(postdata)
self.curl.setopt(pycurl.READFUNCTION, data.read) self.curl.setopt(pycurl.READFUNCTION, data.read)
@@ -184,12 +196,12 @@ class HTTPClient(object):
def get_gen(self, url, params = None, retjson = True): def get_gen(self, url, params = None, retjson = True):
"""Simple GET, returning a generator""" """Simple GET, returning a generator"""
self.curl.setopt(pycurl.UPLOAD, 0) self._check_busy_and_set_upload(0)
return self._doreq_gen(url, params, retjson) return self._doreq_gen(url, params, retjson)
def put_gen(self, url, postdata, params = None, retjson = True): def put_gen(self, url, postdata, params = None, retjson = True):
"""Simple PUT, returning a generator""" """Simple PUT, returning a generator"""
self.curl.setopt(pycurl.UPLOAD, 1) self._check_busy_and_set_upload(1)
self._setup_url(url, params) self._setup_url(url, params)
data = cStringIO.StringIO(postdata) data = cStringIO.StringIO(postdata)
self.curl.setopt(pycurl.READFUNCTION, data.read) self.curl.setopt(pycurl.READFUNCTION, data.read)

View File

@@ -1,5 +1,6 @@
import nilmdb import nilmdb
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
from nilmdb.utils import human_size
from argparse import ArgumentDefaultsHelpFormatter as def_form from argparse import ArgumentDefaultsHelpFormatter as def_form
@@ -17,5 +18,7 @@ def cmd_info(self):
printf("Client version: %s\n", nilmdb.__version__) printf("Client version: %s\n", nilmdb.__version__)
printf("Server version: %s\n", self.client.version()) printf("Server version: %s\n", self.client.version())
printf("Server URL: %s\n", self.client.geturl()) printf("Server URL: %s\n", self.client.geturl())
printf("Server database path: %s\n", self.client.dbpath()) dbinfo = self.client.dbinfo()
printf("Server database size: %s\n", self.client.dbsize()) printf("Server database path: %s\n", dbinfo["path"])
printf("Server database size: %s\n", human_size(dbinfo["size"]))
printf("Server database free space: %s\n", human_size(dbinfo["free"]))

View File

@@ -47,8 +47,8 @@ def cmd_list_verify(self):
self.args.path = self.args.path_positional self.args.path = self.args.path_positional
if self.args.start is not None and self.args.end is not None: if self.args.start is not None and self.args.end is not None:
if self.args.start > self.args.end: if self.args.start >= self.args.end:
self.parser.error("start is after end") self.parser.error("start must precede end")
def cmd_list(self): def cmd_list(self):
"""List available streams""" """List available streams"""

View File

@@ -8,8 +8,7 @@ def setup(self, sub):
Remove all data from a specified time range within a Remove all data from a specified time range within a
stream. stream.
""") """)
cmd.set_defaults(verify = cmd_remove_verify, cmd.set_defaults(handler = cmd_remove)
handler = cmd_remove)
group = cmd.add_argument_group("Data selection") group = cmd.add_argument_group("Data selection")
group.add_argument("path", group.add_argument("path",
@@ -25,11 +24,6 @@ def setup(self, sub):
group.add_argument("-c", "--count", action="store_true", group.add_argument("-c", "--count", action="store_true",
help="Output number of data points removed") help="Output number of data points removed")
def cmd_remove_verify(self):
if self.args.start is not None and self.args.end is not None:
if self.args.start > self.args.end:
self.parser.error("start is after end")
def cmd_remove(self): def cmd_remove(self):
try: try:
count = self.client.stream_remove(self.args.path, count = self.client.stream_remove(self.args.path,

View File

@@ -13,6 +13,16 @@ import struct
import mmap import mmap
import re import re
# If we have the faulthandler module, use it. All of the mmap stuff
# might trigger a SIGSEGV or SIGBUS if we're not careful, and
# faulthandler will give a traceback in that case. (the Python
# interpreter will still die either way).
try: # pragma: no cover
import faulthandler
faulthandler.enable()
except: # pragma: no cover
pass
# Up to 256 open file descriptors at any given time. # Up to 256 open file descriptors at any given time.
# These variables are global so they can be used in the decorator arguments. # These variables are global so they can be used in the decorator arguments.
table_cache_size = 16 table_cache_size = 16
@@ -161,6 +171,52 @@ class BulkData(object):
ospath = os.path.join(self.root, *elements) ospath = os.path.join(self.root, *elements)
return Table(ospath) return Table(ospath)
@nilmdb.utils.must_close(wrap_verify = True)
class File(object):
"""Object representing a single file on disk. Data can be appended,
or the self.mmap handle can be used for random reads."""
def __init__(self, root, subdir, filename):
# Create path if it doesn't exist
try:
os.mkdir(os.path.join(root, subdir))
except OSError:
pass
# Open/create file
self._f = open(os.path.join(root, subdir, filename), "a+b", 0)
# Seek to end, and get size
self._f.seek(0, 2)
self.size = self._f.tell()
# Open mmap object
self.mmap = None
self._mmap_reopen()
def _mmap_reopen(self):
if self.size == 0:
# Don't mmap if the file is empty; it would fail
pass
elif self.mmap is None:
# Not opened yet, so open it
self.mmap = mmap.mmap(self._f.fileno(), 0)
else:
# Already opened, so just resize it
self.mmap.resize(self.size)
def close(self):
if self.mmap is not None:
self.mmap.close()
self._f.close()
def append(self, data):
# Write data, flush it, and resize our mmap accordingly
self._f.write(data)
self._f.flush()
self.size += len(data)
self._mmap_reopen()
@nilmdb.utils.must_close(wrap_verify = True) @nilmdb.utils.must_close(wrap_verify = True)
class Table(object): class Table(object):
"""Tools to help access a single table (data at a specific OS path).""" """Tools to help access a single table (data at a specific OS path)."""
@@ -211,7 +267,7 @@ class Table(object):
self.nrows = self._get_nrows() self.nrows = self._get_nrows()
def close(self): def close(self):
self.mmap_open.cache_remove_all() self.file_open.cache_remove_all()
# Internal helpers # Internal helpers
def _get_nrows(self): def _get_nrows(self):
@@ -275,37 +331,11 @@ class Table(object):
# Cache open files # Cache open files
@nilmdb.utils.lru_cache(size = fd_cache_size, @nilmdb.utils.lru_cache(size = fd_cache_size,
keys = slice(0, 3), # exclude newsize onremove = lambda f: f.close())
onremove = lambda x: x.close()) def file_open(self, subdir, filename):
def mmap_open(self, subdir, filename, newsize = None):
"""Open and map a given 'subdir/filename' (relative to self.root). """Open and map a given 'subdir/filename' (relative to self.root).
Will be automatically closed when evicted from the cache. Will be automatically closed when evicted from the cache."""
return File(self.root, subdir, filename)
If 'newsize' is provided, the file is truncated to the given
size before the mapping is returned. (Note that the LRU cache
on this function means the truncate will only happen if the
object isn't already cached; mmap.resize should be used too.)"""
try:
os.mkdir(os.path.join(self.root, subdir))
except OSError:
pass
f = open(os.path.join(self.root, subdir, filename), "a+", 0)
if newsize is not None:
# mmap can't map a zero-length file, so this allows the
# caller to set the filesize between file creation and
# mmap.
f.truncate(newsize)
mm = mmap.mmap(f.fileno(), 0)
return mm
def mmap_open_resize(self, subdir, filename, newsize):
"""Open and map a given 'subdir/filename' (relative to self.root).
The file is resized to the given size."""
# Pass new size to mmap_open
mm = self.mmap_open(subdir, filename, newsize)
# In case we got a cached copy, need to call mm.resize too.
mm.resize(newsize)
return mm
def append(self, data): def append(self, data):
"""Append the data and flush it to disk. """Append the data and flush it to disk.
@@ -317,14 +347,13 @@ class Table(object):
(subdir, fname, offset, count) = self._offset_from_row(self.nrows) (subdir, fname, offset, count) = self._offset_from_row(self.nrows)
if count > remaining: if count > remaining:
count = remaining count = remaining
newsize = offset + count * self.packer.size
mm = self.mmap_open_resize(subdir, fname, newsize) f = self.file_open(subdir, fname)
mm.seek(offset)
# Write the data # Write the data
for i in xrange(count): for i in xrange(count):
row = dataiter.next() row = dataiter.next()
mm.write(self.packer.pack(*row)) f.append(self.packer.pack(*row))
remaining -= count remaining -= count
self.nrows += count self.nrows += count
@@ -351,7 +380,7 @@ class Table(object):
(subdir, filename, offset, count) = self._offset_from_row(row) (subdir, filename, offset, count) = self._offset_from_row(row)
if count > remaining: if count > remaining:
count = remaining count = remaining
mm = self.mmap_open(subdir, filename) mm = self.file_open(subdir, filename).mmap
for i in xrange(count): for i in xrange(count):
ret.append(list(self.packer.unpack_from(mm, offset))) ret.append(list(self.packer.unpack_from(mm, offset)))
offset += self.packer.size offset += self.packer.size
@@ -363,7 +392,7 @@ class Table(object):
if key < 0 or key >= self.nrows: if key < 0 or key >= self.nrows:
raise IndexError("Index out of range") raise IndexError("Index out of range")
(subdir, filename, offset, count) = self._offset_from_row(key) (subdir, filename, offset, count) = self._offset_from_row(key)
mm = self.mmap_open(subdir, filename) mm = self.file_open(subdir, filename).mmap
# unpack_from ignores the mmap object's current seek position # unpack_from ignores the mmap object's current seek position
return list(self.packer.unpack_from(mm, offset)) return list(self.packer.unpack_from(mm, offset))
@@ -410,8 +439,8 @@ class Table(object):
# are generally easier if we don't have to special-case that. # are generally easier if we don't have to special-case that.
if (len(merged) == 1 and if (len(merged) == 1 and
merged[0][0] == 0 and merged[0][1] == self.rows_per_file): merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
# Close potentially open file in mmap_open LRU cache # Close potentially open file in file_open LRU cache
self.mmap_open.cache_remove(self, subdir, filename) self.file_open.cache_remove(self, subdir, filename)
# Delete files # Delete files
os.remove(datafile) os.remove(datafile)

View File

@@ -36,7 +36,7 @@ cdef class Interval:
""" """
'start' and 'end' are arbitrary floats that represent time 'start' and 'end' are arbitrary floats that represent time
""" """
if start > end: if start >= end:
# Explicitly disallow zero-width intervals (since they're half-open) # Explicitly disallow zero-width intervals (since they're half-open)
raise IntervalError("start %s must precede end %s" % (start, end)) raise IntervalError("start %s must precede end %s" % (start, end))
self.start = float(start) self.start = float(start)

View File

@@ -142,6 +142,15 @@ class NilmDB(object):
with self.con: with self.con:
cur.execute("PRAGMA user_version = {v:d}".format(v=version)) cur.execute("PRAGMA user_version = {v:d}".format(v=version))
def _check_user_times(self, start, end):
if start is None:
start = -1e12
if end is None:
end = 1e12
if start >= end:
raise NilmDBError("start must precede end")
return (start, end)
@nilmdb.utils.lru_cache(size = 16) @nilmdb.utils.lru_cache(size = 16)
def _get_intervals(self, stream_id): def _get_intervals(self, stream_id):
""" """
@@ -303,7 +312,8 @@ class NilmDB(object):
""" """
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
intervals = self._get_intervals(stream_id) intervals = self._get_intervals(stream_id)
requested = Interval(start or 0, end or 1e12) (start, end) = self._check_user_times(start, end)
requested = Interval(start, end)
result = [] result = []
for n, i in enumerate(intervals.intersection(requested)): for n, i in enumerate(intervals.intersection(requested)):
if n >= self.max_results: if n >= self.max_results:
@@ -396,7 +406,7 @@ class NilmDB(object):
path: Path at which to add the data path: Path at which to add the data
start: Starting timestamp start: Starting timestamp
end: Ending timestamp end: Ending timestamp
data: Rows of data, to be passed to PyTable's table.append data: Rows of data, to be passed to bulkdata table.append
method. E.g. nilmdb.layout.Parser.data method. E.g. nilmdb.layout.Parser.data
""" """
# First check for basic overlap using timestamp info given. # First check for basic overlap using timestamp info given.
@@ -417,7 +427,7 @@ class NilmDB(object):
self._add_interval(stream_id, interval, row_start, row_end) self._add_interval(stream_id, interval, row_start, row_end)
# And that's all # And that's all
return "ok" return
def _find_start(self, table, dbinterval): def _find_start(self, table, dbinterval):
""" """
@@ -475,7 +485,8 @@ class NilmDB(object):
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
table = self.data.getnode(path) table = self.data.getnode(path)
intervals = self._get_intervals(stream_id) intervals = self._get_intervals(stream_id)
requested = Interval(start or 0, end or 1e12) (start, end) = self._check_user_times(start, end)
requested = Interval(start, end)
result = [] result = []
matched = 0 matched = 0
remaining = self.max_results remaining = self.max_results
@@ -521,12 +532,10 @@ class NilmDB(object):
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
table = self.data.getnode(path) table = self.data.getnode(path)
intervals = self._get_intervals(stream_id) intervals = self._get_intervals(stream_id)
to_remove = Interval(start or 0, end or 1e12) (start, end) = self._check_user_times(start, end)
to_remove = Interval(start, end)
removed = 0 removed = 0
if start == end:
return 0
# Can't remove intervals from within the iterator, so we need to # Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now. # remember what's currently in the intersection now.
all_candidates = list(intervals.intersection(to_remove, orig = True)) all_candidates = list(intervals.intersection(to_remove, orig = True))

View File

@@ -13,6 +13,7 @@ import os
import simplejson as json import simplejson as json
import decorator import decorator
import traceback import traceback
import psutil
try: try:
cherrypy.tools.json_out cherrypy.tools.json_out
@@ -99,17 +100,16 @@ class Root(NilmApp):
def version(self): def version(self):
return nilmdb.__version__ return nilmdb.__version__
# /dbpath # /dbinfo
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
def dbpath(self): def dbinfo(self):
return self.db.get_basepath() """Return a dictionary with the database path,
size of the database in bytes, and free disk space in bytes"""
# /dbsize path = self.db.get_basepath()
@cherrypy.expose return { "path": path,
@cherrypy.tools.json_out() "size": nilmdb.utils.du(path),
def dbsize(self): "free": psutil.disk_usage(path).free }
return nilmdb.utils.du(self.db.get_basepath())
class Stream(NilmApp): class Stream(NilmApp):
"""Stream-specific operations""" """Stream-specific operations"""
@@ -177,7 +177,6 @@ class Stream(NilmApp):
dictionary""" dictionary"""
data_dict = json.loads(data) data_dict = json.loads(data)
self.db.stream_set_metadata(path, data_dict) self.db.stream_set_metadata(path, data_dict)
return "ok"
# /stream/update_metadata?path=/newton/prep&data=<json> # /stream/update_metadata?path=/newton/prep&data=<json>
@cherrypy.expose @cherrypy.expose
@@ -188,7 +187,6 @@ class Stream(NilmApp):
should be a json-encoded dictionary""" should be a json-encoded dictionary"""
data_dict = json.loads(data) data_dict = json.loads(data)
self.db.stream_update_metadata(path, data_dict) self.db.stream_update_metadata(path, data_dict)
return "ok"
# /stream/insert?path=/newton/prep # /stream/insert?path=/newton/prep
@cherrypy.expose @cherrypy.expose
@@ -223,19 +221,17 @@ class Stream(NilmApp):
"error parsing input data: " + "error parsing input data: " +
e.message) e.message)
if (not parser.min_timestamp or not parser.max_timestamp or
not len(parser.data)):
raise cherrypy.HTTPError("400 Bad Request",
"no data provided")
# Check limits # Check limits
start = float(start) start = float(start)
end = float(end) end = float(end)
if parser.min_timestamp < start: if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
if parser.min_timestamp is not None and parser.min_timestamp < start:
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " + raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
repr(parser.min_timestamp) + repr(parser.min_timestamp) +
" < start time " + repr(start)) " < start time " + repr(start))
if parser.max_timestamp >= end: if parser.max_timestamp is not None and parser.max_timestamp >= end:
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " + raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
repr(parser.max_timestamp) + repr(parser.max_timestamp) +
" >= end time " + repr(end)) " >= end time " + repr(end))
@@ -247,7 +243,7 @@ class Stream(NilmApp):
raise cherrypy.HTTPError("400 Bad Request", e.message) raise cherrypy.HTTPError("400 Bad Request", e.message)
# Done # Done
return "ok" return
# /stream/remove?path=/newton/prep # /stream/remove?path=/newton/prep
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0 # /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
@@ -265,9 +261,9 @@ class Stream(NilmApp):
if end is not None: if end is not None:
end = float(end) end = float(end)
if start is not None and end is not None: if start is not None and end is not None:
if end < start: if start >= end:
raise cherrypy.HTTPError("400 Bad Request", raise cherrypy.HTTPError("400 Bad Request",
"end before start") "start must precede end")
return self.db.stream_remove(path, start, end) return self.db.stream_remove(path, start, end)
# /stream/intervals?path=/newton/prep # /stream/intervals?path=/newton/prep
@@ -292,9 +288,9 @@ class Stream(NilmApp):
end = float(end) end = float(end)
if start is not None and end is not None: if start is not None and end is not None:
if end < start: if start >= end:
raise cherrypy.HTTPError("400 Bad Request", raise cherrypy.HTTPError("400 Bad Request",
"end before start") "start must precede end")
streams = self.db.stream_list(path = path) streams = self.db.stream_list(path = path)
if len(streams) != 1: if len(streams) != 1:
@@ -332,9 +328,9 @@ class Stream(NilmApp):
# Check parameters # Check parameters
if start is not None and end is not None: if start is not None and end is not None:
if end < start: if start >= end:
raise cherrypy.HTTPError("400 Bad Request", raise cherrypy.HTTPError("400 Bad Request",
"end before start") "start must precede end")
# Check path and get layout # Check path and get layout
streams = self.db.stream_list(path = path) streams = self.db.stream_list(path = path)

View File

@@ -4,8 +4,7 @@ from nilmdb.utils.timer import Timer
from nilmdb.utils.iteratorizer import Iteratorizer from nilmdb.utils.iteratorizer import Iteratorizer
from nilmdb.utils.serializer import Serializer from nilmdb.utils.serializer import Serializer
from nilmdb.utils.lrucache import lru_cache from nilmdb.utils.lrucache import lru_cache
from nilmdb.utils.diskusage import du from nilmdb.utils.diskusage import du, human_size
from nilmdb.utils.mustclose import must_close from nilmdb.utils.mustclose import must_close
from nilmdb.utils.urllib import urlencode from nilmdb.utils.urllib import urlencode
from nilmdb.utils import misc
from nilmdb.utils import atomic from nilmdb.utils import atomic

View File

@@ -1,7 +1,7 @@
import os import os
from math import log from math import log
def sizeof_fmt(num): def human_size(num):
"""Human friendly file size""" """Human friendly file size"""
unit_list = zip(['bytes', 'kiB', 'MiB', 'GiB', 'TiB'], [0, 0, 1, 2, 2]) unit_list = zip(['bytes', 'kiB', 'MiB', 'GiB', 'TiB'], [0, 0, 1, 2, 2])
if num > 1: if num > 1:
@@ -15,15 +15,11 @@ def sizeof_fmt(num):
if num == 1: # pragma: no cover if num == 1: # pragma: no cover
return '1 byte' return '1 byte'
def du_bytes(path): def du(path):
"""Like du -sb, returns total size of path in bytes.""" """Like du -sb, returns total size of path in bytes."""
size = os.path.getsize(path) size = os.path.getsize(path)
if os.path.isdir(path): if os.path.isdir(path):
for thisfile in os.listdir(path): for thisfile in os.listdir(path):
filepath = os.path.join(path, thisfile) filepath = os.path.join(path, thisfile)
size += du_bytes(filepath) size += du(filepath)
return size return size
def du(path):
"""Like du -sh, returns total size of path as a human-readable string."""
return sizeof_fmt(du_bytes(path))

View File

@@ -1,8 +0,0 @@
import itertools
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), ..., (sn,None)"
a, b = itertools.tee(iterable)
next(b, None)
return itertools.izip_longest(a, b)

View File

@@ -38,7 +38,8 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
@wrap_class_method @wrap_class_method
def close(orig, self, *args, **kwargs): def close(orig, self, *args, **kwargs):
del self._must_close if "_must_close" in self.__dict__:
del self._must_close
return orig(self, *args, **kwargs) return orig(self, *args, **kwargs)
# Optionally wrap all other functions # Optionally wrap all other functions

View File

@@ -114,6 +114,7 @@ setup(name='nilmdb',
'pycurl', 'pycurl',
'python-dateutil', 'python-dateutil',
'pytz', 'pytz',
'psutil >= 0.3.0',
], ],
packages = [ 'nilmdb', packages = [ 'nilmdb',
'nilmdb.utils', 'nilmdb.utils',

View File

@@ -18,10 +18,12 @@ import simplejson as json
import unittest import unittest
import warnings import warnings
import resource import resource
import time
from testutil.helpers import * from testutil.helpers import *
testdb = "tests/client-testdb" testdb = "tests/client-testdb"
testurl = "http://localhost:12380/"
def setup_module(): def setup_module():
global test_server, test_db global test_server, test_db
@@ -44,28 +46,32 @@ def teardown_module():
class TestClient(object): class TestClient(object):
def test_client_1_basic(self): def test_client_01_basic(self):
# Test a fake host # Test a fake host
client = nilmdb.Client(url = "http://localhost:1/") client = nilmdb.Client(url = "http://localhost:1/")
with assert_raises(nilmdb.client.ServerError): with assert_raises(nilmdb.client.ServerError):
client.version() client.version()
client.close()
# Trigger same error with a PUT request # Trigger same error with a PUT request
client = nilmdb.Client(url = "http://localhost:1/") client = nilmdb.Client(url = "http://localhost:1/")
with assert_raises(nilmdb.client.ServerError): with assert_raises(nilmdb.client.ServerError):
client.version() client.version()
client.close()
# Then a fake URL on a real host # Then a fake URL on a real host
client = nilmdb.Client(url = "http://localhost:12380/fake/") client = nilmdb.Client(url = "http://localhost:12380/fake/")
with assert_raises(nilmdb.client.ClientError): with assert_raises(nilmdb.client.ClientError):
client.version() client.version()
client.close()
# Now a real URL with no http:// prefix # Now a real URL with no http:// prefix
client = nilmdb.Client(url = "localhost:12380") client = nilmdb.Client(url = "localhost:12380")
version = client.version() version = client.version()
client.close()
# Now use the real URL # Now use the real URL
client = nilmdb.Client(url = "http://localhost:12380/") client = nilmdb.Client(url = testurl)
version = client.version() version = client.version()
eq_(distutils.version.LooseVersion(version), eq_(distutils.version.LooseVersion(version),
distutils.version.LooseVersion(test_server.version)) distutils.version.LooseVersion(test_server.version))
@@ -73,10 +79,11 @@ class TestClient(object):
# Bad URLs should give 404, not 500 # Bad URLs should give 404, not 500
with assert_raises(ClientError): with assert_raises(ClientError):
client.http.get("/stream/create") client.http.get("/stream/create")
client.close()
def test_client_2_createlist(self): def test_client_02_createlist(self):
# Basic stream tests, like those in test_nilmdb:test_stream # Basic stream tests, like those in test_nilmdb:test_stream
client = nilmdb.Client(url = "http://localhost:12380/") client = nilmdb.Client(url = testurl)
# Database starts empty # Database starts empty
eq_(client.stream_list(), []) eq_(client.stream_list(), [])
@@ -101,8 +108,10 @@ class TestClient(object):
["/newton/zzz/rawnotch", "RawNotchedData"] ["/newton/zzz/rawnotch", "RawNotchedData"]
]) ])
# Match just one type or one path # Match just one type or one path
eq_(client.stream_list(layout="RawData"), [ ["/newton/raw", "RawData"] ]) eq_(client.stream_list(layout="RawData"),
eq_(client.stream_list(path="/newton/raw"), [ ["/newton/raw", "RawData"] ]) [ ["/newton/raw", "RawData"] ])
eq_(client.stream_list(path="/newton/raw"),
[ ["/newton/raw", "RawData"] ])
# Try messing with resource limits to trigger errors and get # Try messing with resource limits to trigger errors and get
# more coverage. Here, make it so we can only create files 1 # more coverage. Here, make it so we can only create files 1
@@ -114,9 +123,10 @@ class TestClient(object):
client.stream_create("/newton/hello", "RawData") client.stream_create("/newton/hello", "RawData")
resource.setrlimit(resource.RLIMIT_FSIZE, limit) resource.setrlimit(resource.RLIMIT_FSIZE, limit)
client.close()
def test_client_3_metadata(self): def test_client_03_metadata(self):
client = nilmdb.Client(url = "http://localhost:12380/") client = nilmdb.Client(url = testurl)
# Set / get metadata # Set / get metadata
eq_(client.stream_get_metadata("/newton/prep"), {}) eq_(client.stream_get_metadata("/newton/prep"), {})
@@ -131,9 +141,10 @@ class TestClient(object):
client.stream_update_metadata("/newton/raw", meta3) client.stream_update_metadata("/newton/raw", meta3)
eq_(client.stream_get_metadata("/newton/prep"), meta1) eq_(client.stream_get_metadata("/newton/prep"), meta1)
eq_(client.stream_get_metadata("/newton/raw"), meta1) eq_(client.stream_get_metadata("/newton/raw"), meta1)
eq_(client.stream_get_metadata("/newton/raw", [ "description" ] ), meta2) eq_(client.stream_get_metadata("/newton/raw",
eq_(client.stream_get_metadata("/newton/raw", [ "description", [ "description" ] ), meta2)
"v_scale" ] ), meta1) eq_(client.stream_get_metadata("/newton/raw",
[ "description", "v_scale" ] ), meta1)
# missing key # missing key
eq_(client.stream_get_metadata("/newton/raw", "descr"), eq_(client.stream_get_metadata("/newton/raw", "descr"),
@@ -146,9 +157,10 @@ class TestClient(object):
client.stream_set_metadata("/newton/prep", [1,2,3]) client.stream_set_metadata("/newton/prep", [1,2,3])
with assert_raises(ClientError): with assert_raises(ClientError):
client.stream_update_metadata("/newton/prep", [1,2,3]) client.stream_update_metadata("/newton/prep", [1,2,3])
client.close()
def test_client_4_insert(self): def test_client_04_insert(self):
client = nilmdb.Client(url = "http://localhost:12380/") client = nilmdb.Client(url = testurl)
datetime_tz.localtz_set("America/New_York") datetime_tz.localtz_set("America/New_York")
@@ -177,12 +189,33 @@ class TestClient(object):
result = client.stream_insert("/newton/prep", data) result = client.stream_insert("/newton/prep", data)
eq_(result, None) eq_(result, None)
# Try forcing a server request with empty data # It's OK to insert an empty interval
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": 1, "end": 2 })
eq_(list(client.stream_intervals("/newton/prep")), [[1, 2]])
client.stream_remove("/newton/prep")
eq_(list(client.stream_intervals("/newton/prep")), [])
# Timestamps can be negative too
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": -2, "end": -1 })
eq_(list(client.stream_intervals("/newton/prep")), [[-2, -1]])
client.stream_remove("/newton/prep")
eq_(list(client.stream_intervals("/newton/prep")), [])
# Intervals that end at zero shouldn't be any different
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": -1, "end": 0 })
eq_(list(client.stream_intervals("/newton/prep")), [[-1, 0]])
client.stream_remove("/newton/prep")
eq_(list(client.stream_intervals("/newton/prep")), [])
# Try forcing a server request with equal start and end
with assert_raises(ClientError) as e: with assert_raises(ClientError) as e:
client.http.put("stream/insert", "", { "path": "/newton/prep", client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": 0, "end": 0 }) "start": 0, "end": 0 })
in_("400 Bad Request", str(e.exception)) in_("400 Bad Request", str(e.exception))
in_("no data provided", str(e.exception)) in_("start must precede end", str(e.exception))
# Specify start/end (starts too late) # Specify start/end (starts too late)
data = timestamper.TimestamperRate(testfile, start, 120) data = timestamper.TimestamperRate(testfile, start, 120)
@@ -208,7 +241,6 @@ class TestClient(object):
data = timestamper.TimestamperRate(testfile, start, 120) data = timestamper.TimestamperRate(testfile, start, 120)
result = client.stream_insert("/newton/prep", data, result = client.stream_insert("/newton/prep", data,
start, start + 119.999777) start, start + 119.999777)
eq_(result, "ok")
# Verify the intervals. Should be just one, even if the data # Verify the intervals. Should be just one, even if the data
# was inserted in chunks, due to nilmdb interval concatenation. # was inserted in chunks, due to nilmdb interval concatenation.
@@ -222,20 +254,33 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception)) in_("400 Bad Request", str(e.exception))
in_("verlap", str(e.exception)) in_("verlap", str(e.exception))
def test_client_5_extractremove(self): client.close()
# Misc tests for extract and remove. Most of them are in test_cmdline.
client = nilmdb.Client(url = "http://localhost:12380/")
for x in client.stream_extract("/newton/prep", 123, 123): def test_client_05_extractremove(self):
# Misc tests for extract and remove. Most of them are in test_cmdline.
client = nilmdb.Client(url = testurl)
for x in client.stream_extract("/newton/prep", 999123, 999124):
raise AssertionError("shouldn't be any data for this request") raise AssertionError("shouldn't be any data for this request")
with assert_raises(ClientError) as e: with assert_raises(ClientError) as e:
client.stream_remove("/newton/prep", 123, 120) client.stream_remove("/newton/prep", 123, 120)
def test_client_6_generators(self): # Test the exception we get if we nest requests
with assert_raises(Exception) as e:
for data in client.stream_extract("/newton/prep"):
x = client.stream_intervals("/newton/prep")
in_("nesting calls is not supported", str(e.exception))
# Test count
eq_(client.stream_count("/newton/prep"), 14400)
client.close()
def test_client_06_generators(self):
# A lot of the client functionality is already tested by test_cmdline, # A lot of the client functionality is already tested by test_cmdline,
# but this gets a bit more coverage that cmdline misses. # but this gets a bit more coverage that cmdline misses.
client = nilmdb.Client(url = "http://localhost:12380/") client = nilmdb.Client(url = testurl)
# Trigger a client error in generator # Trigger a client error in generator
start = datetime_tz.datetime_tz.smartparse("20120323T2000") start = datetime_tz.datetime_tz.smartparse("20120323T2000")
@@ -246,7 +291,7 @@ class TestClient(object):
start.totimestamp(), start.totimestamp(),
end.totimestamp()).next() end.totimestamp()).next()
in_("400 Bad Request", str(e.exception)) in_("400 Bad Request", str(e.exception))
in_("end before start", str(e.exception)) in_("start must precede end", str(e.exception))
# Trigger a curl error in generator # Trigger a curl error in generator
with assert_raises(ServerError) as e: with assert_raises(ServerError) as e:
@@ -272,7 +317,7 @@ class TestClient(object):
{ "path": "/newton/prep", { "path": "/newton/prep",
"start": 0, "end": 0 }).next() "start": 0, "end": 0 }).next()
in_("400 Bad Request", str(e.exception)) in_("400 Bad Request", str(e.exception))
in_("no data provided", str(e.exception)) in_("start must precede end", str(e.exception))
# Check 404 for missing streams # Check 404 for missing streams
for function in [ client.stream_intervals, client.stream_extract ]: for function in [ client.stream_intervals, client.stream_extract ]:
@@ -281,13 +326,15 @@ class TestClient(object):
in_("404 Not Found", str(e.exception)) in_("404 Not Found", str(e.exception))
in_("No such stream", str(e.exception)) in_("No such stream", str(e.exception))
def test_client_7_headers(self): client.close()
def test_client_07_headers(self):
# Make sure that /stream/intervals and /stream/extract # Make sure that /stream/intervals and /stream/extract
# properly return streaming, chunked, text/plain response. # properly return streaming, chunked, text/plain response.
# Pokes around in client.http internals a bit to look at the # Pokes around in client.http internals a bit to look at the
# response headers. # response headers.
client = nilmdb.Client(url = "http://localhost:12380/") client = nilmdb.Client(url = testurl)
http = client.http http = client.http
# Use a warning rather than returning a test failure, so that we can # Use a warning rather than returning a test failure, so that we can
@@ -307,7 +354,7 @@ class TestClient(object):
x = http.get("stream/extract", x = http.get("stream/extract",
{ "path": "/newton/prep", { "path": "/newton/prep",
"start": "123", "start": "123",
"end": "123" }, retjson=False) "end": "124" }, retjson=False)
if "Transfer-Encoding: chunked" not in http._headers: if "Transfer-Encoding: chunked" not in http._headers:
warnings.warn("Non-chunked HTTP response for /stream/extract") warnings.warn("Non-chunked HTTP response for /stream/extract")
if "Content-Type: text/plain;charset=utf-8" not in http._headers: if "Content-Type: text/plain;charset=utf-8" not in http._headers:
@@ -320,9 +367,11 @@ class TestClient(object):
"header in /stream/extract response:\n" + "header in /stream/extract response:\n" +
http._headers) http._headers)
def test_client_8_unicode(self): client.close()
def test_client_08_unicode(self):
# Basic Unicode tests # Basic Unicode tests
client = nilmdb.Client(url = "http://localhost:12380/") client = nilmdb.Client(url = testurl)
# Delete streams that exist # Delete streams that exist
for stream in client.stream_list(): for stream in client.stream_list():
@@ -356,3 +405,174 @@ class TestClient(object):
eq_(client.stream_get_metadata(raw[0]), meta1) eq_(client.stream_get_metadata(raw[0]), meta1)
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2) eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1) eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
client.close()
def test_client_09_closing(self):
# Make sure we actually close sockets correctly. New
# connections will block for a while if they're not, since the
# server will stop accepting new connections.
for test in [1, 2]:
start = time.time()
for i in range(50):
if time.time() - start > 15:
raise AssertionError("Connections seem to be blocking... "
"probably not closing properly.")
if test == 1:
# explicit close
client = nilmdb.Client(url = testurl)
with assert_raises(ClientError) as e:
client.stream_remove("/newton/prep", 123, 120)
client.close() # remove this to see the failure
elif test == 2:
# use the context manager
with nilmdb.Client(url = testurl) as c:
with assert_raises(ClientError) as e:
c.stream_remove("/newton/prep", 123, 120)
def test_client_10_context(self):
# Test using the client's stream insertion context manager to
# insert data.
client = nilmdb.Client(testurl)
client.stream_create("/context/test", "uint16_1")
with client.stream_insert_context("/context/test") as ctx:
# override _max_data to trigger frequent server updates
ctx._max_data = 15
with assert_raises(ValueError):
ctx.insert_line("100 1")
ctx.insert_line("100 1\n")
ctx.insert_iter([ "101 1\n",
"102 1\n",
"103 1\n" ])
ctx.insert_line("104 1\n")
ctx.insert_line("105 1\n")
ctx.finalize()
ctx.insert_line("106 1\n")
ctx.update_end(106.5)
ctx.finalize()
ctx.update_start(106.8)
ctx.insert_line("107 1\n")
ctx.insert_line("108 1\n")
ctx.insert_line("109 1\n")
ctx.insert_line("110 1\n")
ctx.insert_line("111 1\n")
ctx.update_end(113)
ctx.insert_line("112 1\n")
ctx.update_end(114)
ctx.insert_line("113 1\n")
ctx.update_end(115)
ctx.insert_line("114 1\n")
ctx.finalize()
with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 100, 200) as ctx:
ctx.insert_line("115 1\n")
with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 200, 300) as ctx:
ctx.insert_line("115 1\n")
with client.stream_insert_context("/context/test", 200, 300) as ctx:
# make sure our override wasn't permanent
ne_(ctx._max_data, 15)
ctx.insert_line("225 1\n")
ctx.finalize()
eq_(list(client.stream_intervals("/context/test")),
[ [ 100, 105.000001 ],
[ 106, 106.5 ],
[ 106.8, 115 ],
[ 200, 300 ] ])
client.stream_destroy("/context/test")
client.close()
def test_client_11_emptyintervals(self):
# Empty intervals are ok! If recording detection events
# by inserting rows into the database, we want to be able to
# have an interval where no events occurred. Test them here.
client = nilmdb.Client(testurl)
client.stream_create("/empty/test", "uint16_1")
def info():
result = []
for interval in list(client.stream_intervals("/empty/test")):
result.append((client.stream_count("/empty/test", *interval),
interval))
return result
eq_(info(), [])
# Insert a region with just a few points
with client.stream_insert_context("/empty/test") as ctx:
ctx.update_start(100)
ctx.insert_line("140 1\n")
ctx.insert_line("150 1\n")
ctx.insert_line("160 1\n")
ctx.update_end(200)
ctx.finalize()
eq_(info(), [(3, [100, 200])])
# Delete chunk, which will leave one data point and two intervals
client.stream_remove("/empty/test", 145, 175)
eq_(info(), [(1, [100, 145]),
(0, [175, 200])])
# Try also creating a completely empty interval from scratch,
# in a few different ways.
client.stream_insert_block("/empty/test", "", 300, 350)
client.stream_insert("/empty/test", [], 400, 450)
with client.stream_insert_context("/empty/test", 500, 550):
pass
# If enough timestamps aren't provided, empty streams won't be created.
client.stream_insert("/empty/test", [])
with client.stream_insert_context("/empty/test"):
pass
client.stream_insert("/empty/test", [], start = 600)
with client.stream_insert_context("/empty/test", start = 700):
pass
client.stream_insert("/empty/test", [], end = 850)
with client.stream_insert_context("/empty/test", end = 950):
pass
# Try various things that might cause problems
with client.stream_insert_context("/empty/test", 1000, 1050):
ctx.finalize() # inserts [1000, 1050]
ctx.finalize() # nothing
ctx.finalize() # nothing
ctx.insert_line("1100 1\n")
ctx.finalize() # inserts [1100, 1100.000001]
ctx.update_start(1199)
ctx.insert_line("1200 1\n")
ctx.update_end(1250)
ctx.finalize() # inserts [1199, 1250]
ctx.update_start(1299)
ctx.finalize() # nothing
ctx.update_end(1350)
ctx.finalize() # nothing
ctx.update_start(1400)
ctx.update_end(1450)
ctx.finalize()
# implicit last finalize inserts [1400, 1450]
# Check everything
eq_(info(), [(1, [100, 145]),
(0, [175, 200]),
(0, [300, 350]),
(0, [400, 450]),
(0, [500, 550]),
(0, [1000, 1050]),
(1, [1100, 1100.000001]),
(1, [1199, 1250]),
(0, [1400, 1450]),
])
# Clean up
client.stream_destroy("/empty/test")
client.close()

View File

@@ -194,9 +194,11 @@ class TestCmdline(object):
def test_02_info(self): def test_02_info(self):
self.ok("info") self.ok("info")
self.contain("Server URL: http://localhost:12380/") self.contain("Server URL: http://localhost:12380/")
self.contain("Client version: " + nilmdb.__version__)
self.contain("Server version: " + test_server.version) self.contain("Server version: " + test_server.version)
self.contain("Server database path") self.contain("Server database path")
self.contain("Server database size") self.contain("Server database size")
self.contain("Server database free space")
def test_03_createlist(self): def test_03_createlist(self):
# Basic stream tests, like those in test_client. # Basic stream tests, like those in test_client.
@@ -272,7 +274,7 @@ class TestCmdline(object):
# reversed range # reversed range
self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01") self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01")
self.contain("start is after end") self.contain("start must precede end")
def test_04_metadata(self): def test_04_metadata(self):
# Set / get metadata # Set / get metadata
@@ -442,7 +444,7 @@ class TestCmdline(object):
self.contain("no intervals") self.contain("no intervals")
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'" self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'"
+ " --end='23 Mar 2012 10:05:15.50'") + " --end='23 Mar 2012 10:05:15.51'")
lines_(self.captured, 2) lines_(self.captured, 2)
self.contain("10:05:15.500") self.contain("10:05:15.500")
@@ -471,29 +473,29 @@ class TestCmdline(object):
# empty ranges return error 2 # empty ranges return error 2
self.fail("extract -a /newton/prep " + self.fail("extract -a /newton/prep " +
"--start '23 Mar 2012 10:00:30' " + "--start '23 Mar 2012 20:00:30' " +
"--end '23 Mar 2012 10:00:30'", "--end '23 Mar 2012 20:00:31'",
exitcode = 2, require_error = False) exitcode = 2, require_error = False)
self.contain("no data") self.contain("no data")
self.fail("extract -a /newton/prep " + self.fail("extract -a /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " + "--start '23 Mar 2012 20:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'", "--end '23 Mar 2012 20:00:30.000002'",
exitcode = 2, require_error = False) exitcode = 2, require_error = False)
self.contain("no data") self.contain("no data")
self.fail("extract -a /newton/prep " + self.fail("extract -a /newton/prep " +
"--start '23 Mar 2022 10:00:30' " + "--start '23 Mar 2022 10:00:30' " +
"--end '23 Mar 2022 10:00:30'", "--end '23 Mar 2022 10:00:31'",
exitcode = 2, require_error = False) exitcode = 2, require_error = False)
self.contain("no data") self.contain("no data")
# but are ok if we're just counting results # but are ok if we're just counting results
self.ok("extract --count /newton/prep " + self.ok("extract --count /newton/prep " +
"--start '23 Mar 2012 10:00:30' " + "--start '23 Mar 2012 20:00:30' " +
"--end '23 Mar 2012 10:00:30'") "--end '23 Mar 2012 20:00:31'")
self.match("0\n") self.match("0\n")
self.ok("extract -c /newton/prep " + self.ok("extract -c /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " + "--start '23 Mar 2012 20:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'") "--end '23 Mar 2012 20:00:30.000002'")
self.match("0\n") self.match("0\n")
# Check various dumps against stored copies of how they should appear # Check various dumps against stored copies of how they should appear
@@ -540,31 +542,31 @@ class TestCmdline(object):
self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01") self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01")
self.contain("No stream at path") self.contain("No stream at path")
# empty or backward ranges return errors
self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01") self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01")
self.contain("start is after end") self.contain("start must precede end")
# empty ranges return success, backwards ranges return error self.fail("remove /newton/prep " +
self.ok("remove /newton/prep " + "--start '23 Mar 2012 10:00:30' " +
"--start '23 Mar 2012 10:00:30' " + "--end '23 Mar 2012 10:00:30'")
"--end '23 Mar 2012 10:00:30'") self.contain("start must precede end")
self.match("") self.fail("remove /newton/prep " +
self.ok("remove /newton/prep " + "--start '23 Mar 2012 10:00:30.000001' " +
"--start '23 Mar 2012 10:00:30.000001' " + "--end '23 Mar 2012 10:00:30.000001'")
"--end '23 Mar 2012 10:00:30.000001'") self.contain("start must precede end")
self.match("") self.fail("remove /newton/prep " +
self.ok("remove /newton/prep " + "--start '23 Mar 2022 10:00:30' " +
"--start '23 Mar 2022 10:00:30' " + "--end '23 Mar 2022 10:00:30'")
"--end '23 Mar 2022 10:00:30'") self.contain("start must precede end")
self.match("")
# Verbose # Verbose
self.ok("remove -c /newton/prep " + self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:30' " + "--start '23 Mar 2022 20:00:30' " +
"--end '23 Mar 2012 10:00:30'") "--end '23 Mar 2022 20:00:31'")
self.match("0\n") self.match("0\n")
self.ok("remove --count /newton/prep " + self.ok("remove --count /newton/prep " +
"--start '23 Mar 2012 10:00:30' " + "--start '23 Mar 2022 20:00:30' " +
"--end '23 Mar 2012 10:00:30'") "--end '23 Mar 2022 20:00:31'")
self.match("0\n") self.match("0\n")
# Make sure we have the data we expect # Make sure we have the data we expect
@@ -765,7 +767,7 @@ class TestCmdline(object):
"tests/data/prep-20120323T1000") "tests/data/prep-20120323T1000")
# Should take up about 2.8 MB here (including directory entries) # Should take up about 2.8 MB here (including directory entries)
du_before = nilmdb.utils.diskusage.du_bytes(testdb) du_before = nilmdb.utils.diskusage.du(testdb)
# Make sure we have the data we expect # Make sure we have the data we expect
self.ok("list --detail") self.ok("list --detail")
@@ -815,7 +817,7 @@ class TestCmdline(object):
# We have 1/8 of the data that we had before, so the file size # We have 1/8 of the data that we had before, so the file size
# should have dropped below 1/4 of what it used to be # should have dropped below 1/4 of what it used to be
du_after = nilmdb.utils.diskusage.du_bytes(testdb) du_after = nilmdb.utils.diskusage.du(testdb)
lt_(du_after, (du_before / 4)) lt_(du_after, (du_before / 4))
# Remove anything that came from the 10:02 data file # Remove anything that came from the 10:02 data file

View File

@@ -55,7 +55,7 @@ class TestInterval:
for x in [ "03/24/2012", "03/25/2012", "03/26/2012" ] ] for x in [ "03/24/2012", "03/25/2012", "03/26/2012" ] ]
# basic construction # basic construction
i = Interval(d1, d1) i = Interval(d1, d2)
i = Interval(d1, d3) i = Interval(d1, d3)
eq_(i.start, d1) eq_(i.start, d1)
eq_(i.end, d3) eq_(i.end, d3)
@@ -77,8 +77,8 @@ class TestInterval:
assert(Interval(d1, d3) > Interval(d1, d2)) assert(Interval(d1, d3) > Interval(d1, d2))
assert(Interval(d1, d2) < Interval(d2, d3)) assert(Interval(d1, d2) < Interval(d2, d3))
assert(Interval(d1, d3) < Interval(d2, d3)) assert(Interval(d1, d3) < Interval(d2, d3))
assert(Interval(d2, d2) > Interval(d1, d3)) assert(Interval(d2, d2+0.01) > Interval(d1, d3))
assert(Interval(d3, d3) == Interval(d3, d3)) assert(Interval(d3, d3+0.01) == Interval(d3, d3+0.01))
#with assert_raises(TypeError): # was AttributeError, that's wrong #with assert_raises(TypeError): # was AttributeError, that's wrong
# x = (i == 123) # x = (i == 123)
@@ -293,7 +293,7 @@ class TestIntervalDB:
# actual start, end can be a subset # actual start, end can be a subset
a = DBInterval(150, 200, 100, 200, 10000, 20000) a = DBInterval(150, 200, 100, 200, 10000, 20000)
b = DBInterval(100, 150, 100, 200, 10000, 20000) b = DBInterval(100, 150, 100, 200, 10000, 20000)
c = DBInterval(150, 150, 100, 200, 10000, 20000) c = DBInterval(150, 160, 100, 200, 10000, 20000)
# Make a set of DBIntervals # Make a set of DBIntervals
iseta = IntervalSet([a, b]) iseta = IntervalSet([a, b])

View File

@@ -93,6 +93,13 @@ class Test00Nilmdb(object): # named 00 so it runs first
eq_(db.stream_get_metadata("/newton/prep"), meta1) eq_(db.stream_get_metadata("/newton/prep"), meta1)
eq_(db.stream_get_metadata("/newton/raw"), meta1) eq_(db.stream_get_metadata("/newton/raw"), meta1)
# fill in some test coverage for start >= end
with assert_raises(nilmdb.server.NilmDBError):
db.stream_remove("/newton/prep", 0, 0)
with assert_raises(nilmdb.server.NilmDBError):
db.stream_remove("/newton/prep", 1, 0)
db.stream_remove("/newton/prep", 0, 1)
db.close() db.close()
class TestBlockingServer(object): class TestBlockingServer(object):