Compare commits

...

24 Commits

Author SHA1 Message Date
5dce851bef Merge branch 'client-insert-context' 2013-02-23 14:37:59 -05:00
1431e41d16 Allow inserting empty intervals in the database, and add tests for it.
Previously, we could get empty intervals anyway by having a non-empty
interval and removing a smaller interval around each piece of data.
Turns out that empty intervals are OK and needed in some situations,
so explicitly allow and test for it.
2013-02-21 14:07:35 -05:00
a49c655816 Strictly enforce (start < end) for all intervals.
Previously, we allowed start == end, but this doesn't make sense with
half-open intervals.
2013-02-21 14:06:40 -05:00
30e3ffc0e9 Fix check for interval ends to be None, so that zero doesn't confuse it 2013-02-21 12:42:33 -05:00
db7211c3a9 Have server verify that start <= end before creating intervals
Also rename _fill_in_limits to _check_user_times
2013-02-21 12:38:51 -05:00
c6d57cf5c3 Fix errors with calculating limits when start==end==None
This also has the effect of now handling negative timestamps
correctly.
2013-02-19 19:27:06 -05:00
ca5253ddee Fix and test stream_count 2013-02-19 18:26:44 -05:00
e19da84b2e server: always return None instead of sometimes returning "ok"
Previously some functions returned the string "ok".
2013-02-19 18:26:44 -05:00
3e8e3542fd Test for detecting nested HTTP requests 2013-02-19 18:26:44 -05:00
2f7365412d client: detect and give a more clear error when HTTP requests are nested 2013-02-19 17:20:07 -05:00
bba9ad131e Add test for client.stream_insert_context 2013-02-19 17:19:45 -05:00
ee24380d1f Replace duplicated URL in tests with a variable 2013-02-19 15:27:51 -05:00
bfcd91acf8 client tests: renumber 2013-02-19 15:25:34 -05:00
d97291d4d3 client: Use .stream_insert_block from within .stream_insert_context
Avoids duplicating code.
2013-02-19 15:25:01 -05:00
a61fbbcf45 Big rework of client stream_insert_context
Now supports these operations:
  ctx.insert_line()
  ctx.insert_iter()
  ctx.finalize() (end the current contiguous interval, so a new one
                  can be started with a gap)
  ctx.update_end() (update ending timestamp before finalizing interval)
  ctx.update_start() (update starting timestamp for new interval)
2013-02-18 18:06:03 -05:00
5adc8fd0a7 Remove nilmdb.utils.misc.pairwise, as it's no longer used. 2013-02-18 18:06:03 -05:00
251a486c28 client.py: Significant speedup in stream_insert_context
block_data += "string" is fast with local variables, but slow with
variables inside some namespace.  Instead, build a list of strings and
join them once at the end.  This fixes the slowdown that resulted from
the stream_insert_context cleanup.
2013-02-18 18:06:03 -05:00
1edb96a0bd Add client.stream_insert_context, convert everything to use it. Slow.
Not sure why this is so painfully slow.  Need more testing;
might have to scratch the idea.
2013-02-18 18:06:03 -05:00
52e674a192 Fix warning in mustclose decorator 2013-02-18 18:05:45 -05:00
e241c13bf1 Remove must_close decorator from client
It still should be closed, but warning each time was mostly for
debugging and it's kind of annoying when writing one-off programs
where it's OK to just let things get torn down as they're completed.
Not closing is not fatal in terms of data integrity etc.
2013-02-18 18:02:05 -05:00
b53ff31212 client: Add must_close() decorator to nilmdb.Client, and fix tests
Test suite wasn't closing connections correctly.
2013-02-16 18:55:23 -05:00
2045e89f24 client: Add context manager functionality, test closing 2013-02-16 18:55:20 -05:00
841b2dab5c server: Replace /dbpath and /dbsize with a more generic /dbinfo
Update tests accordingly.  This isn't backwards compatible, but
existing clients don't rely on it.
2013-02-14 16:57:33 -05:00
d634f7d3cf bulkdata: Use file writes instead of writing to the mmap.
Extending and then writing to the mmap file has a problem: if the disk
fills up, the mapping becomes invalid, and the Python interpreter will
get a SIGBUS, killing it.  It's difficult to catch this gracefully;
there's no way to do that with existing modules.  Instead, switch to
only using mmap when reading, and normal file writes when writing.
Since we only ever append, it should have similar performance.
2013-02-13 20:30:39 -05:00
19 changed files with 687 additions and 251 deletions

View File

@@ -7,7 +7,7 @@ Prerequisites:
sudo apt-get install python2.7 python2.7-dev python-setuptools cython
# Base NilmDB dependencies
sudo apt-get install python-cherrypy3 python-decorator python-simplejson python-pycurl python-dateutil python-tz
sudo apt-get install python-cherrypy3 python-decorator python-simplejson python-pycurl python-dateutil python-tz python-psutil
# Tools for running tests
sudo apt-get install python-nose python-coverage

View File

@@ -8,22 +8,35 @@ import nilmdb.client.httpclient
import time
import simplejson as json
import contextlib
def float_to_string(f):
"""Use repr to maintain full precision in the string output."""
return repr(float(f))
def extract_timestamp(line):
"""Extract just the timestamp from a line of data text"""
return float(line.split()[0])
class Client(object):
"""Main client interface to the Nilm database."""
def __init__(self, url):
self.http = nilmdb.client.httpclient.HTTPClient(url)
# __enter__/__exit__ allow this class to be a context manager
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def _json_param(self, data):
"""Return compact json-encoded version of parameter"""
return json.dumps(data, separators=(',',':'))
def close(self):
"""Close the connection; safe to call multiple times"""
self.http.close()
def geturl(self):
@@ -34,13 +47,10 @@ class Client(object):
"""Return server version"""
return self.http.get("version")
def dbpath(self):
"""Return server database path"""
return self.http.get("dbpath")
def dbsize(self):
"""Return server database size as human readable string"""
return self.http.get("dbsize")
def dbinfo(self):
"""Return server database info (path, size, free space)
as a dictionary."""
return self.http.get("dbinfo")
def stream_list(self, path = None, layout = None):
params = {}
@@ -95,79 +105,45 @@ class Client(object):
params["end"] = float_to_string(end)
return self.http.get("stream/remove", params)
def stream_insert(self, path, data, start = None, end = None):
"""Insert data into a stream. data should be a file-like object
that provides ASCII data that matches the database layout for path.
@contextlib.contextmanager
def stream_insert_context(self, path, start = None, end = None):
"""Return a context manager that allows data to be efficiently
inserted into a stream in a piecewise manner. Data is be provided
as single lines, and is aggregated and sent to the server in larger
chunks as necessary. Data lines must match the database layout for
the given path, and end with a newline.
start and end are the starting and ending timestamp of this
stream; all timestamps t in the data must satisfy 'start <= t
< end'. If left unspecified, 'start' is the timestamp of the
first line of data, and 'end' is the timestamp on the last line
of data, plus a small delta of 1μs.
Example:
with client.stream_insert_context('/path', start, end) as ctx:
ctx.insert_line('1234567890.0 1 2 3 4\\n')
ctx.insert_line('1234567891.0 1 2 3 4\\n')
For more details, see help for nilmdb.client.client.StreamInserter
This may make multiple requests to the server, if the data is
large enough or enough time has passed between insertions.
"""
params = { "path": path }
ctx = StreamInserter(self, path, start, end)
yield ctx
ctx.finalize()
# See design.md for a discussion of how much data to send.
# These are soft limits -- actual data might be rounded up.
max_data = 1048576
max_time = 30
end_epsilon = 1e-6
def stream_insert(self, path, data, start = None, end = None):
"""Insert rows of data into a stream. data should be an
iterable object that provides ASCII data that matches the
database layout for path. See stream_insert_context for
details on the 'start' and 'end' parameters."""
with self.stream_insert_context(path, start, end) as ctx:
ctx.insert_iter(data)
return ctx.last_response
def extract_timestamp(line):
return float(line.split()[0])
def sendit():
# If we have more data after this, use the timestamp of
# the next line as the end. Otherwise, use the given
# overall end time, or add end_epsilon to the last data
# point.
if nextline:
block_end = extract_timestamp(nextline)
if end and block_end > end:
# This is unexpected, but we'll defer to the server
# to return an error in this case.
block_end = end
elif end:
block_end = end
else:
block_end = extract_timestamp(line) + end_epsilon
# Send it
params["start"] = float_to_string(block_start)
params["end"] = float_to_string(block_end)
return self.http.put("stream/insert", block_data, params)
clock_start = time.time()
block_data = ""
block_start = start
result = None
line = None
nextline = None
for (line, nextline) in nilmdb.utils.misc.pairwise(data):
# If we don't have a starting time, extract it from the first line
if block_start is None:
block_start = extract_timestamp(line)
clock_elapsed = time.time() - clock_start
block_data += line
# If we have enough data, or enough time has elapsed,
# send this block to the server, and empty things out
# for the next block.
if (len(block_data) > max_data) or (clock_elapsed > max_time):
result = sendit()
block_start = None
block_data = ""
clock_start = time.time()
# One last block?
if len(block_data):
result = sendit()
# Return the most recent JSON result we got back, or None if
# we didn't make any requests.
return result
def stream_insert_block(self, path, block, start, end):
"""Insert an entire block of data into a stream. Like
stream_insert, except 'block' contains multiple lines of ASCII
text and is sent in one single chunk."""
params = { "path": path,
"start": float_to_string(start),
"end": float_to_string(end) }
return self.http.put("stream/insert", block, params)
def stream_intervals(self, path, start = None, end = None):
"""
@@ -188,8 +164,8 @@ class Client(object):
lines of ASCII-formatted data that matches the database
layout for the given path.
Specify count=True to just get a count of values rather than
the actual data.
Specify count = True to return a count of matching data points
rather than the actual data. The output format is unchanged.
"""
params = {
"path": path,
@@ -202,3 +178,202 @@ class Client(object):
params["count"] = 1
return self.http.get_gen("stream/extract", params, retjson = False)
def stream_count(self, path, start = None, end = None):
"""
Return the number of rows of data in the stream that satisfy
the given timestamps.
"""
counts = list(self.stream_extract(path, start, end, count = True))
return int(counts[0])
class StreamInserter(object):
"""Object returned by stream_insert_context() that manages
the insertion of rows of data into a particular path.
The basic data flow is that we are filling a contiguous interval
on the server, with no gaps, that extends from timestamp 'start'
to timestamp 'end'. Data timestamps satisfy 'start <= t < end'.
Data is provided by the user one line at a time with
.insert_line() or .insert_iter().
1. The first inserted line begins a new interval that starts at
'start'. If 'start' is not given, it is deduced from the first
line's timestamp.
2. Subsequent lines go into the same contiguous interval. As lines
are inserted, this routine may make multiple insertion requests to
the server, but will structure the timestamps to leave no gaps.
3. The current contiguous interval can be completed by manually
calling .finalize(), which the context manager will also do
automatically. This will send any remaining data to the server,
using the 'end' timestamp to end the interval.
After a .finalize(), inserting new data goes back to step 1.
.update_start() can be called before step 1 to change the start
time for the interval. .update_end() can be called before step 3
to change the end time for the interval.
"""
# See design.md for a discussion of how much data to send.
# These are soft limits -- actual data might be rounded up.
# We send when we have a certain amount of data queued, or
# when a certain amount of time has passed since the last send.
_max_data = 1048576
_max_time = 30
# Delta to add to the final timestamp, if "end" wasn't given
_end_epsilon = 1e-6
def __init__(self, client, path, start = None, end = None):
"""'http' is the httpclient object. 'path' is the database
path to insert to. 'start' and 'end' are used for the first
contiguous interval."""
self.last_response = None
self._client = client
self._path = path
# Start and end for the overall contiguous interval we're
# filling
self._interval_start = start
self._interval_end = end
# Data for the specific block we're building up to send
self._block_data = []
self._block_len = 0
self._block_start = None
# Time of last request
self._last_time = time.time()
# We keep a buffer of the two most recently inserted lines.
# Only the older one actually gets processed; the newer one
# is used to "look-ahead" to the next timestamp if we need
# to internally split an insertion into two requests.
self._line_old = None
self._line_new = None
def insert_iter(self, iter):
"""Insert all lines of ASCII formatted data from the given
iterable. Lines must be terminated with '\\n'."""
for line in iter:
self.insert_line(line)
def insert_line(self, line, allow_intermediate = True):
"""Insert a single line of ASCII formatted data. Line
must be terminated with '\\n'."""
if line and (len(line) < 1 or line[-1] != '\n'):
raise ValueError("lines must end in with a newline character")
# Store this new line, but process the previous (old) one.
# This lets us "look ahead" to the next line.
self._line_old = self._line_new
self._line_new = line
if self._line_old is None:
return
# If starting a new block, pull out the timestamp if needed.
if self._block_start is None:
if self._interval_start is not None:
# User provided a start timestamp. Use it once, then
# clear it for the next block.
self._block_start = self._interval_start
self._interval_start = None
else:
# Extract timestamp from the first row
self._block_start = extract_timestamp(self._line_old)
# Save the line
self._block_data.append(self._line_old)
self._block_len += len(self._line_old)
if allow_intermediate:
# Send an intermediate block to the server if needed.
elapsed = time.time() - self._last_time
if (self._block_len > self._max_data) or (elapsed > self._max_time):
self._send_block_intermediate()
def update_start(self, start):
"""Update the start time for the next contiguous interval.
Call this before starting to insert data for a new interval,
for example, after .finalize()"""
self._interval_start = start
def update_end(self, end):
"""Update the end time for the current contiguous interval.
Call this before .finalize()"""
self._interval_end = end
def finalize(self):
"""Stop filling the current contiguous interval.
All outstanding data will be sent, and the interval end
time of the interval will be taken from the 'end' argument
used when initializing this class, or the most recent
value passed to update_end(), or the last timestamp plus
a small epsilon value if no other endpoint was provided.
If more data is inserted after a finalize(), it will become
part of a new interval and there may be a gap left in-between."""
# Special marker tells insert_line that this is the end
self.insert_line(None, allow_intermediate = False)
if self._block_len > 0:
# We have data pending, so send the final block
self._send_block_final()
elif None not in (self._interval_start, self._interval_end):
# We have no data, but enough information to create an
# empty interval.
self._block_start = self._interval_start
self._interval_start = None
self._send_block_final()
else:
# No data, and no timestamps to use to create an empty
# interval.
pass
# Make sure both timestamps are emptied for future intervals.
self._interval_start = None
self._interval_end = None
def _send_block_intermediate(self):
"""Send data, when we still have more data to send.
Use the timestamp from the next line, so that the blocks
are contiguous."""
block_end = extract_timestamp(self._line_new)
if self._interval_end is not None and block_end > self._interval_end:
# Something's fishy -- the timestamp we found is after
# the user's specified end. Limit it here, and the
# server will return an error.
block_end = self._interval_end
self._send_block(block_end)
def _send_block_final(self):
"""Send data, when this is the last block for the interval.
There is no next line, so figure out the actual interval end
using interval_end or end_epsilon."""
if self._interval_end is not None:
# Use the user's specified end timestamp
block_end = self._interval_end
# Clear it in case we send more intervals in the future.
self._interval_end = None
else:
# Add an epsilon to the last timestamp we saw
block_end = extract_timestamp(self._line_old) + self._end_epsilon
self._send_block(block_end)
def _send_block(self, block_end):
"""Send current block to the server"""
self.last_response = self._client.stream_insert_block(
self._path, "".join(self._block_data),
self._block_start, block_end)
# Clear out the block
self._block_data = []
self._block_len = 0
self._block_start = None
# Note when we sent it
self._last_time = time.time()

View File

@@ -33,6 +33,18 @@ class HTTPClient(object):
self.curl.setopt(pycurl.URL, url)
self.url = url
def _check_busy_and_set_upload(self, upload):
"""Sets the pycurl.UPLOAD option, but also raises a more
friendly exception if the client is already serving a request."""
try:
self.curl.setopt(pycurl.UPLOAD, upload)
except pycurl.error as e:
if "is currently running" in str(e):
raise Exception("Client is already performing a request, and "
"nesting calls is not supported.")
else: # pragma: no cover (shouldn't happen)
raise
def _check_error(self, body = None):
code = self.curl.getinfo(pycurl.RESPONSE_CODE)
if code == 200:
@@ -80,11 +92,11 @@ class HTTPClient(object):
self._status = int(data.split(" ")[1])
self._headers += data
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
def func(callback):
def perform(callback):
self.curl.setopt(pycurl.WRITEFUNCTION, callback)
self.curl.perform()
try:
with nilmdb.utils.Iteratorizer(func, curl_hack = True) as it:
with nilmdb.utils.Iteratorizer(perform, curl_hack = True) as it:
for i in it:
if self._status == 200:
# If we had a 200 response, yield the data to caller.
@@ -156,12 +168,12 @@ class HTTPClient(object):
def get(self, url, params = None, retjson = True):
"""Simple GET"""
self.curl.setopt(pycurl.UPLOAD, 0)
self._check_busy_and_set_upload(0)
return self._doreq(url, params, retjson)
def put(self, url, postdata, params = None, retjson = True):
"""Simple PUT"""
self.curl.setopt(pycurl.UPLOAD, 1)
self._check_busy_and_set_upload(1)
self._setup_url(url, params)
data = cStringIO.StringIO(postdata)
self.curl.setopt(pycurl.READFUNCTION, data.read)
@@ -184,12 +196,12 @@ class HTTPClient(object):
def get_gen(self, url, params = None, retjson = True):
"""Simple GET, returning a generator"""
self.curl.setopt(pycurl.UPLOAD, 0)
self._check_busy_and_set_upload(0)
return self._doreq_gen(url, params, retjson)
def put_gen(self, url, postdata, params = None, retjson = True):
"""Simple PUT, returning a generator"""
self.curl.setopt(pycurl.UPLOAD, 1)
self._check_busy_and_set_upload(1)
self._setup_url(url, params)
data = cStringIO.StringIO(postdata)
self.curl.setopt(pycurl.READFUNCTION, data.read)

View File

@@ -1,5 +1,6 @@
import nilmdb
from nilmdb.utils.printf import *
from nilmdb.utils import human_size
from argparse import ArgumentDefaultsHelpFormatter as def_form
@@ -17,5 +18,7 @@ def cmd_info(self):
printf("Client version: %s\n", nilmdb.__version__)
printf("Server version: %s\n", self.client.version())
printf("Server URL: %s\n", self.client.geturl())
printf("Server database path: %s\n", self.client.dbpath())
printf("Server database size: %s\n", self.client.dbsize())
dbinfo = self.client.dbinfo()
printf("Server database path: %s\n", dbinfo["path"])
printf("Server database size: %s\n", human_size(dbinfo["size"]))
printf("Server database free space: %s\n", human_size(dbinfo["free"]))

View File

@@ -47,8 +47,8 @@ def cmd_list_verify(self):
self.args.path = self.args.path_positional
if self.args.start is not None and self.args.end is not None:
if self.args.start > self.args.end:
self.parser.error("start is after end")
if self.args.start >= self.args.end:
self.parser.error("start must precede end")
def cmd_list(self):
"""List available streams"""

View File

@@ -8,8 +8,7 @@ def setup(self, sub):
Remove all data from a specified time range within a
stream.
""")
cmd.set_defaults(verify = cmd_remove_verify,
handler = cmd_remove)
cmd.set_defaults(handler = cmd_remove)
group = cmd.add_argument_group("Data selection")
group.add_argument("path",
@@ -25,11 +24,6 @@ def setup(self, sub):
group.add_argument("-c", "--count", action="store_true",
help="Output number of data points removed")
def cmd_remove_verify(self):
if self.args.start is not None and self.args.end is not None:
if self.args.start > self.args.end:
self.parser.error("start is after end")
def cmd_remove(self):
try:
count = self.client.stream_remove(self.args.path,

View File

@@ -13,6 +13,16 @@ import struct
import mmap
import re
# If we have the faulthandler module, use it. All of the mmap stuff
# might trigger a SIGSEGV or SIGBUS if we're not careful, and
# faulthandler will give a traceback in that case. (the Python
# interpreter will still die either way).
try: # pragma: no cover
import faulthandler
faulthandler.enable()
except: # pragma: no cover
pass
# Up to 256 open file descriptors at any given time.
# These variables are global so they can be used in the decorator arguments.
table_cache_size = 16
@@ -161,6 +171,52 @@ class BulkData(object):
ospath = os.path.join(self.root, *elements)
return Table(ospath)
@nilmdb.utils.must_close(wrap_verify = True)
class File(object):
"""Object representing a single file on disk. Data can be appended,
or the self.mmap handle can be used for random reads."""
def __init__(self, root, subdir, filename):
# Create path if it doesn't exist
try:
os.mkdir(os.path.join(root, subdir))
except OSError:
pass
# Open/create file
self._f = open(os.path.join(root, subdir, filename), "a+b", 0)
# Seek to end, and get size
self._f.seek(0, 2)
self.size = self._f.tell()
# Open mmap object
self.mmap = None
self._mmap_reopen()
def _mmap_reopen(self):
if self.size == 0:
# Don't mmap if the file is empty; it would fail
pass
elif self.mmap is None:
# Not opened yet, so open it
self.mmap = mmap.mmap(self._f.fileno(), 0)
else:
# Already opened, so just resize it
self.mmap.resize(self.size)
def close(self):
if self.mmap is not None:
self.mmap.close()
self._f.close()
def append(self, data):
# Write data, flush it, and resize our mmap accordingly
self._f.write(data)
self._f.flush()
self.size += len(data)
self._mmap_reopen()
@nilmdb.utils.must_close(wrap_verify = True)
class Table(object):
"""Tools to help access a single table (data at a specific OS path)."""
@@ -211,7 +267,7 @@ class Table(object):
self.nrows = self._get_nrows()
def close(self):
self.mmap_open.cache_remove_all()
self.file_open.cache_remove_all()
# Internal helpers
def _get_nrows(self):
@@ -275,37 +331,11 @@ class Table(object):
# Cache open files
@nilmdb.utils.lru_cache(size = fd_cache_size,
keys = slice(0, 3), # exclude newsize
onremove = lambda x: x.close())
def mmap_open(self, subdir, filename, newsize = None):
onremove = lambda f: f.close())
def file_open(self, subdir, filename):
"""Open and map a given 'subdir/filename' (relative to self.root).
Will be automatically closed when evicted from the cache.
If 'newsize' is provided, the file is truncated to the given
size before the mapping is returned. (Note that the LRU cache
on this function means the truncate will only happen if the
object isn't already cached; mmap.resize should be used too.)"""
try:
os.mkdir(os.path.join(self.root, subdir))
except OSError:
pass
f = open(os.path.join(self.root, subdir, filename), "a+", 0)
if newsize is not None:
# mmap can't map a zero-length file, so this allows the
# caller to set the filesize between file creation and
# mmap.
f.truncate(newsize)
mm = mmap.mmap(f.fileno(), 0)
return mm
def mmap_open_resize(self, subdir, filename, newsize):
"""Open and map a given 'subdir/filename' (relative to self.root).
The file is resized to the given size."""
# Pass new size to mmap_open
mm = self.mmap_open(subdir, filename, newsize)
# In case we got a cached copy, need to call mm.resize too.
mm.resize(newsize)
return mm
Will be automatically closed when evicted from the cache."""
return File(self.root, subdir, filename)
def append(self, data):
"""Append the data and flush it to disk.
@@ -317,14 +347,13 @@ class Table(object):
(subdir, fname, offset, count) = self._offset_from_row(self.nrows)
if count > remaining:
count = remaining
newsize = offset + count * self.packer.size
mm = self.mmap_open_resize(subdir, fname, newsize)
mm.seek(offset)
f = self.file_open(subdir, fname)
# Write the data
for i in xrange(count):
row = dataiter.next()
mm.write(self.packer.pack(*row))
f.append(self.packer.pack(*row))
remaining -= count
self.nrows += count
@@ -351,7 +380,7 @@ class Table(object):
(subdir, filename, offset, count) = self._offset_from_row(row)
if count > remaining:
count = remaining
mm = self.mmap_open(subdir, filename)
mm = self.file_open(subdir, filename).mmap
for i in xrange(count):
ret.append(list(self.packer.unpack_from(mm, offset)))
offset += self.packer.size
@@ -363,7 +392,7 @@ class Table(object):
if key < 0 or key >= self.nrows:
raise IndexError("Index out of range")
(subdir, filename, offset, count) = self._offset_from_row(key)
mm = self.mmap_open(subdir, filename)
mm = self.file_open(subdir, filename).mmap
# unpack_from ignores the mmap object's current seek position
return list(self.packer.unpack_from(mm, offset))
@@ -410,8 +439,8 @@ class Table(object):
# are generally easier if we don't have to special-case that.
if (len(merged) == 1 and
merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
# Close potentially open file in mmap_open LRU cache
self.mmap_open.cache_remove(self, subdir, filename)
# Close potentially open file in file_open LRU cache
self.file_open.cache_remove(self, subdir, filename)
# Delete files
os.remove(datafile)

View File

@@ -36,7 +36,7 @@ cdef class Interval:
"""
'start' and 'end' are arbitrary floats that represent time
"""
if start > end:
if start >= end:
# Explicitly disallow zero-width intervals (since they're half-open)
raise IntervalError("start %s must precede end %s" % (start, end))
self.start = float(start)

View File

@@ -142,6 +142,15 @@ class NilmDB(object):
with self.con:
cur.execute("PRAGMA user_version = {v:d}".format(v=version))
def _check_user_times(self, start, end):
if start is None:
start = -1e12
if end is None:
end = 1e12
if start >= end:
raise NilmDBError("start must precede end")
return (start, end)
@nilmdb.utils.lru_cache(size = 16)
def _get_intervals(self, stream_id):
"""
@@ -303,7 +312,8 @@ class NilmDB(object):
"""
stream_id = self._stream_id(path)
intervals = self._get_intervals(stream_id)
requested = Interval(start or 0, end or 1e12)
(start, end) = self._check_user_times(start, end)
requested = Interval(start, end)
result = []
for n, i in enumerate(intervals.intersection(requested)):
if n >= self.max_results:
@@ -396,7 +406,7 @@ class NilmDB(object):
path: Path at which to add the data
start: Starting timestamp
end: Ending timestamp
data: Rows of data, to be passed to PyTable's table.append
data: Rows of data, to be passed to bulkdata table.append
method. E.g. nilmdb.layout.Parser.data
"""
# First check for basic overlap using timestamp info given.
@@ -417,7 +427,7 @@ class NilmDB(object):
self._add_interval(stream_id, interval, row_start, row_end)
# And that's all
return "ok"
return
def _find_start(self, table, dbinterval):
"""
@@ -475,7 +485,8 @@ class NilmDB(object):
stream_id = self._stream_id(path)
table = self.data.getnode(path)
intervals = self._get_intervals(stream_id)
requested = Interval(start or 0, end or 1e12)
(start, end) = self._check_user_times(start, end)
requested = Interval(start, end)
result = []
matched = 0
remaining = self.max_results
@@ -521,12 +532,10 @@ class NilmDB(object):
stream_id = self._stream_id(path)
table = self.data.getnode(path)
intervals = self._get_intervals(stream_id)
to_remove = Interval(start or 0, end or 1e12)
(start, end) = self._check_user_times(start, end)
to_remove = Interval(start, end)
removed = 0
if start == end:
return 0
# Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now.
all_candidates = list(intervals.intersection(to_remove, orig = True))

View File

@@ -13,6 +13,7 @@ import os
import simplejson as json
import decorator
import traceback
import psutil
try:
cherrypy.tools.json_out
@@ -99,17 +100,16 @@ class Root(NilmApp):
def version(self):
return nilmdb.__version__
# /dbpath
# /dbinfo
@cherrypy.expose
@cherrypy.tools.json_out()
def dbpath(self):
return self.db.get_basepath()
# /dbsize
@cherrypy.expose
@cherrypy.tools.json_out()
def dbsize(self):
return nilmdb.utils.du(self.db.get_basepath())
def dbinfo(self):
"""Return a dictionary with the database path,
size of the database in bytes, and free disk space in bytes"""
path = self.db.get_basepath()
return { "path": path,
"size": nilmdb.utils.du(path),
"free": psutil.disk_usage(path).free }
class Stream(NilmApp):
"""Stream-specific operations"""
@@ -177,7 +177,6 @@ class Stream(NilmApp):
dictionary"""
data_dict = json.loads(data)
self.db.stream_set_metadata(path, data_dict)
return "ok"
# /stream/update_metadata?path=/newton/prep&data=<json>
@cherrypy.expose
@@ -188,7 +187,6 @@ class Stream(NilmApp):
should be a json-encoded dictionary"""
data_dict = json.loads(data)
self.db.stream_update_metadata(path, data_dict)
return "ok"
# /stream/insert?path=/newton/prep
@cherrypy.expose
@@ -223,19 +221,17 @@ class Stream(NilmApp):
"error parsing input data: " +
e.message)
if (not parser.min_timestamp or not parser.max_timestamp or
not len(parser.data)):
raise cherrypy.HTTPError("400 Bad Request",
"no data provided")
# Check limits
start = float(start)
end = float(end)
if parser.min_timestamp < start:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
if parser.min_timestamp is not None and parser.min_timestamp < start:
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
repr(parser.min_timestamp) +
" < start time " + repr(start))
if parser.max_timestamp >= end:
if parser.max_timestamp is not None and parser.max_timestamp >= end:
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
repr(parser.max_timestamp) +
" >= end time " + repr(end))
@@ -247,7 +243,7 @@ class Stream(NilmApp):
raise cherrypy.HTTPError("400 Bad Request", e.message)
# Done
return "ok"
return
# /stream/remove?path=/newton/prep
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
@@ -265,9 +261,9 @@ class Stream(NilmApp):
if end is not None:
end = float(end)
if start is not None and end is not None:
if end < start:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"end before start")
"start must precede end")
return self.db.stream_remove(path, start, end)
# /stream/intervals?path=/newton/prep
@@ -292,9 +288,9 @@ class Stream(NilmApp):
end = float(end)
if start is not None and end is not None:
if end < start:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"end before start")
"start must precede end")
streams = self.db.stream_list(path = path)
if len(streams) != 1:
@@ -332,9 +328,9 @@ class Stream(NilmApp):
# Check parameters
if start is not None and end is not None:
if end < start:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"end before start")
"start must precede end")
# Check path and get layout
streams = self.db.stream_list(path = path)

View File

@@ -4,8 +4,7 @@ from nilmdb.utils.timer import Timer
from nilmdb.utils.iteratorizer import Iteratorizer
from nilmdb.utils.serializer import Serializer
from nilmdb.utils.lrucache import lru_cache
from nilmdb.utils.diskusage import du
from nilmdb.utils.diskusage import du, human_size
from nilmdb.utils.mustclose import must_close
from nilmdb.utils.urllib import urlencode
from nilmdb.utils import misc
from nilmdb.utils import atomic

View File

@@ -1,7 +1,7 @@
import os
from math import log
def sizeof_fmt(num):
def human_size(num):
"""Human friendly file size"""
unit_list = zip(['bytes', 'kiB', 'MiB', 'GiB', 'TiB'], [0, 0, 1, 2, 2])
if num > 1:
@@ -15,15 +15,11 @@ def sizeof_fmt(num):
if num == 1: # pragma: no cover
return '1 byte'
def du_bytes(path):
def du(path):
"""Like du -sb, returns total size of path in bytes."""
size = os.path.getsize(path)
if os.path.isdir(path):
for thisfile in os.listdir(path):
filepath = os.path.join(path, thisfile)
size += du_bytes(filepath)
size += du(filepath)
return size
def du(path):
"""Like du -sh, returns total size of path as a human-readable string."""
return sizeof_fmt(du_bytes(path))

View File

@@ -1,8 +0,0 @@
import itertools
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), ..., (sn,None)"
a, b = itertools.tee(iterable)
next(b, None)
return itertools.izip_longest(a, b)

View File

@@ -38,6 +38,7 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
@wrap_class_method
def close(orig, self, *args, **kwargs):
if "_must_close" in self.__dict__:
del self._must_close
return orig(self, *args, **kwargs)

View File

@@ -114,6 +114,7 @@ setup(name='nilmdb',
'pycurl',
'python-dateutil',
'pytz',
'psutil >= 0.3.0',
],
packages = [ 'nilmdb',
'nilmdb.utils',

View File

@@ -18,10 +18,12 @@ import simplejson as json
import unittest
import warnings
import resource
import time
from testutil.helpers import *
testdb = "tests/client-testdb"
testurl = "http://localhost:12380/"
def setup_module():
global test_server, test_db
@@ -44,28 +46,32 @@ def teardown_module():
class TestClient(object):
def test_client_1_basic(self):
def test_client_01_basic(self):
# Test a fake host
client = nilmdb.Client(url = "http://localhost:1/")
with assert_raises(nilmdb.client.ServerError):
client.version()
client.close()
# Trigger same error with a PUT request
client = nilmdb.Client(url = "http://localhost:1/")
with assert_raises(nilmdb.client.ServerError):
client.version()
client.close()
# Then a fake URL on a real host
client = nilmdb.Client(url = "http://localhost:12380/fake/")
with assert_raises(nilmdb.client.ClientError):
client.version()
client.close()
# Now a real URL with no http:// prefix
client = nilmdb.Client(url = "localhost:12380")
version = client.version()
client.close()
# Now use the real URL
client = nilmdb.Client(url = "http://localhost:12380/")
client = nilmdb.Client(url = testurl)
version = client.version()
eq_(distutils.version.LooseVersion(version),
distutils.version.LooseVersion(test_server.version))
@@ -73,10 +79,11 @@ class TestClient(object):
# Bad URLs should give 404, not 500
with assert_raises(ClientError):
client.http.get("/stream/create")
client.close()
def test_client_2_createlist(self):
def test_client_02_createlist(self):
# Basic stream tests, like those in test_nilmdb:test_stream
client = nilmdb.Client(url = "http://localhost:12380/")
client = nilmdb.Client(url = testurl)
# Database starts empty
eq_(client.stream_list(), [])
@@ -101,8 +108,10 @@ class TestClient(object):
["/newton/zzz/rawnotch", "RawNotchedData"]
])
# Match just one type or one path
eq_(client.stream_list(layout="RawData"), [ ["/newton/raw", "RawData"] ])
eq_(client.stream_list(path="/newton/raw"), [ ["/newton/raw", "RawData"] ])
eq_(client.stream_list(layout="RawData"),
[ ["/newton/raw", "RawData"] ])
eq_(client.stream_list(path="/newton/raw"),
[ ["/newton/raw", "RawData"] ])
# Try messing with resource limits to trigger errors and get
# more coverage. Here, make it so we can only create files 1
@@ -114,9 +123,10 @@ class TestClient(object):
client.stream_create("/newton/hello", "RawData")
resource.setrlimit(resource.RLIMIT_FSIZE, limit)
client.close()
def test_client_3_metadata(self):
client = nilmdb.Client(url = "http://localhost:12380/")
def test_client_03_metadata(self):
client = nilmdb.Client(url = testurl)
# Set / get metadata
eq_(client.stream_get_metadata("/newton/prep"), {})
@@ -131,9 +141,10 @@ class TestClient(object):
client.stream_update_metadata("/newton/raw", meta3)
eq_(client.stream_get_metadata("/newton/prep"), meta1)
eq_(client.stream_get_metadata("/newton/raw"), meta1)
eq_(client.stream_get_metadata("/newton/raw", [ "description" ] ), meta2)
eq_(client.stream_get_metadata("/newton/raw", [ "description",
"v_scale" ] ), meta1)
eq_(client.stream_get_metadata("/newton/raw",
[ "description" ] ), meta2)
eq_(client.stream_get_metadata("/newton/raw",
[ "description", "v_scale" ] ), meta1)
# missing key
eq_(client.stream_get_metadata("/newton/raw", "descr"),
@@ -146,9 +157,10 @@ class TestClient(object):
client.stream_set_metadata("/newton/prep", [1,2,3])
with assert_raises(ClientError):
client.stream_update_metadata("/newton/prep", [1,2,3])
client.close()
def test_client_4_insert(self):
client = nilmdb.Client(url = "http://localhost:12380/")
def test_client_04_insert(self):
client = nilmdb.Client(url = testurl)
datetime_tz.localtz_set("America/New_York")
@@ -177,12 +189,33 @@ class TestClient(object):
result = client.stream_insert("/newton/prep", data)
eq_(result, None)
# Try forcing a server request with empty data
# It's OK to insert an empty interval
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": 1, "end": 2 })
eq_(list(client.stream_intervals("/newton/prep")), [[1, 2]])
client.stream_remove("/newton/prep")
eq_(list(client.stream_intervals("/newton/prep")), [])
# Timestamps can be negative too
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": -2, "end": -1 })
eq_(list(client.stream_intervals("/newton/prep")), [[-2, -1]])
client.stream_remove("/newton/prep")
eq_(list(client.stream_intervals("/newton/prep")), [])
# Intervals that end at zero shouldn't be any different
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": -1, "end": 0 })
eq_(list(client.stream_intervals("/newton/prep")), [[-1, 0]])
client.stream_remove("/newton/prep")
eq_(list(client.stream_intervals("/newton/prep")), [])
# Try forcing a server request with equal start and end
with assert_raises(ClientError) as e:
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": 0, "end": 0 })
in_("400 Bad Request", str(e.exception))
in_("no data provided", str(e.exception))
in_("start must precede end", str(e.exception))
# Specify start/end (starts too late)
data = timestamper.TimestamperRate(testfile, start, 120)
@@ -208,7 +241,6 @@ class TestClient(object):
data = timestamper.TimestamperRate(testfile, start, 120)
result = client.stream_insert("/newton/prep", data,
start, start + 119.999777)
eq_(result, "ok")
# Verify the intervals. Should be just one, even if the data
# was inserted in chunks, due to nilmdb interval concatenation.
@@ -222,20 +254,33 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception))
in_("verlap", str(e.exception))
def test_client_5_extractremove(self):
# Misc tests for extract and remove. Most of them are in test_cmdline.
client = nilmdb.Client(url = "http://localhost:12380/")
client.close()
for x in client.stream_extract("/newton/prep", 123, 123):
def test_client_05_extractremove(self):
# Misc tests for extract and remove. Most of them are in test_cmdline.
client = nilmdb.Client(url = testurl)
for x in client.stream_extract("/newton/prep", 999123, 999124):
raise AssertionError("shouldn't be any data for this request")
with assert_raises(ClientError) as e:
client.stream_remove("/newton/prep", 123, 120)
def test_client_6_generators(self):
# Test the exception we get if we nest requests
with assert_raises(Exception) as e:
for data in client.stream_extract("/newton/prep"):
x = client.stream_intervals("/newton/prep")
in_("nesting calls is not supported", str(e.exception))
# Test count
eq_(client.stream_count("/newton/prep"), 14400)
client.close()
def test_client_06_generators(self):
# A lot of the client functionality is already tested by test_cmdline,
# but this gets a bit more coverage that cmdline misses.
client = nilmdb.Client(url = "http://localhost:12380/")
client = nilmdb.Client(url = testurl)
# Trigger a client error in generator
start = datetime_tz.datetime_tz.smartparse("20120323T2000")
@@ -246,7 +291,7 @@ class TestClient(object):
start.totimestamp(),
end.totimestamp()).next()
in_("400 Bad Request", str(e.exception))
in_("end before start", str(e.exception))
in_("start must precede end", str(e.exception))
# Trigger a curl error in generator
with assert_raises(ServerError) as e:
@@ -272,7 +317,7 @@ class TestClient(object):
{ "path": "/newton/prep",
"start": 0, "end": 0 }).next()
in_("400 Bad Request", str(e.exception))
in_("no data provided", str(e.exception))
in_("start must precede end", str(e.exception))
# Check 404 for missing streams
for function in [ client.stream_intervals, client.stream_extract ]:
@@ -281,13 +326,15 @@ class TestClient(object):
in_("404 Not Found", str(e.exception))
in_("No such stream", str(e.exception))
def test_client_7_headers(self):
client.close()
def test_client_07_headers(self):
# Make sure that /stream/intervals and /stream/extract
# properly return streaming, chunked, text/plain response.
# Pokes around in client.http internals a bit to look at the
# response headers.
client = nilmdb.Client(url = "http://localhost:12380/")
client = nilmdb.Client(url = testurl)
http = client.http
# Use a warning rather than returning a test failure, so that we can
@@ -307,7 +354,7 @@ class TestClient(object):
x = http.get("stream/extract",
{ "path": "/newton/prep",
"start": "123",
"end": "123" }, retjson=False)
"end": "124" }, retjson=False)
if "Transfer-Encoding: chunked" not in http._headers:
warnings.warn("Non-chunked HTTP response for /stream/extract")
if "Content-Type: text/plain;charset=utf-8" not in http._headers:
@@ -320,9 +367,11 @@ class TestClient(object):
"header in /stream/extract response:\n" +
http._headers)
def test_client_8_unicode(self):
client.close()
def test_client_08_unicode(self):
# Basic Unicode tests
client = nilmdb.Client(url = "http://localhost:12380/")
client = nilmdb.Client(url = testurl)
# Delete streams that exist
for stream in client.stream_list():
@@ -356,3 +405,174 @@ class TestClient(object):
eq_(client.stream_get_metadata(raw[0]), meta1)
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
client.close()
def test_client_09_closing(self):
# Make sure we actually close sockets correctly. New
# connections will block for a while if they're not, since the
# server will stop accepting new connections.
for test in [1, 2]:
start = time.time()
for i in range(50):
if time.time() - start > 15:
raise AssertionError("Connections seem to be blocking... "
"probably not closing properly.")
if test == 1:
# explicit close
client = nilmdb.Client(url = testurl)
with assert_raises(ClientError) as e:
client.stream_remove("/newton/prep", 123, 120)
client.close() # remove this to see the failure
elif test == 2:
# use the context manager
with nilmdb.Client(url = testurl) as c:
with assert_raises(ClientError) as e:
c.stream_remove("/newton/prep", 123, 120)
def test_client_10_context(self):
# Test using the client's stream insertion context manager to
# insert data.
client = nilmdb.Client(testurl)
client.stream_create("/context/test", "uint16_1")
with client.stream_insert_context("/context/test") as ctx:
# override _max_data to trigger frequent server updates
ctx._max_data = 15
with assert_raises(ValueError):
ctx.insert_line("100 1")
ctx.insert_line("100 1\n")
ctx.insert_iter([ "101 1\n",
"102 1\n",
"103 1\n" ])
ctx.insert_line("104 1\n")
ctx.insert_line("105 1\n")
ctx.finalize()
ctx.insert_line("106 1\n")
ctx.update_end(106.5)
ctx.finalize()
ctx.update_start(106.8)
ctx.insert_line("107 1\n")
ctx.insert_line("108 1\n")
ctx.insert_line("109 1\n")
ctx.insert_line("110 1\n")
ctx.insert_line("111 1\n")
ctx.update_end(113)
ctx.insert_line("112 1\n")
ctx.update_end(114)
ctx.insert_line("113 1\n")
ctx.update_end(115)
ctx.insert_line("114 1\n")
ctx.finalize()
with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 100, 200) as ctx:
ctx.insert_line("115 1\n")
with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 200, 300) as ctx:
ctx.insert_line("115 1\n")
with client.stream_insert_context("/context/test", 200, 300) as ctx:
# make sure our override wasn't permanent
ne_(ctx._max_data, 15)
ctx.insert_line("225 1\n")
ctx.finalize()
eq_(list(client.stream_intervals("/context/test")),
[ [ 100, 105.000001 ],
[ 106, 106.5 ],
[ 106.8, 115 ],
[ 200, 300 ] ])
client.stream_destroy("/context/test")
client.close()
def test_client_11_emptyintervals(self):
# Empty intervals are ok! If recording detection events
# by inserting rows into the database, we want to be able to
# have an interval where no events occurred. Test them here.
client = nilmdb.Client(testurl)
client.stream_create("/empty/test", "uint16_1")
def info():
result = []
for interval in list(client.stream_intervals("/empty/test")):
result.append((client.stream_count("/empty/test", *interval),
interval))
return result
eq_(info(), [])
# Insert a region with just a few points
with client.stream_insert_context("/empty/test") as ctx:
ctx.update_start(100)
ctx.insert_line("140 1\n")
ctx.insert_line("150 1\n")
ctx.insert_line("160 1\n")
ctx.update_end(200)
ctx.finalize()
eq_(info(), [(3, [100, 200])])
# Delete chunk, which will leave one data point and two intervals
client.stream_remove("/empty/test", 145, 175)
eq_(info(), [(1, [100, 145]),
(0, [175, 200])])
# Try also creating a completely empty interval from scratch,
# in a few different ways.
client.stream_insert_block("/empty/test", "", 300, 350)
client.stream_insert("/empty/test", [], 400, 450)
with client.stream_insert_context("/empty/test", 500, 550):
pass
# If enough timestamps aren't provided, empty streams won't be created.
client.stream_insert("/empty/test", [])
with client.stream_insert_context("/empty/test"):
pass
client.stream_insert("/empty/test", [], start = 600)
with client.stream_insert_context("/empty/test", start = 700):
pass
client.stream_insert("/empty/test", [], end = 850)
with client.stream_insert_context("/empty/test", end = 950):
pass
# Try various things that might cause problems
with client.stream_insert_context("/empty/test", 1000, 1050):
ctx.finalize() # inserts [1000, 1050]
ctx.finalize() # nothing
ctx.finalize() # nothing
ctx.insert_line("1100 1\n")
ctx.finalize() # inserts [1100, 1100.000001]
ctx.update_start(1199)
ctx.insert_line("1200 1\n")
ctx.update_end(1250)
ctx.finalize() # inserts [1199, 1250]
ctx.update_start(1299)
ctx.finalize() # nothing
ctx.update_end(1350)
ctx.finalize() # nothing
ctx.update_start(1400)
ctx.update_end(1450)
ctx.finalize()
# implicit last finalize inserts [1400, 1450]
# Check everything
eq_(info(), [(1, [100, 145]),
(0, [175, 200]),
(0, [300, 350]),
(0, [400, 450]),
(0, [500, 550]),
(0, [1000, 1050]),
(1, [1100, 1100.000001]),
(1, [1199, 1250]),
(0, [1400, 1450]),
])
# Clean up
client.stream_destroy("/empty/test")
client.close()

View File

@@ -194,9 +194,11 @@ class TestCmdline(object):
def test_02_info(self):
self.ok("info")
self.contain("Server URL: http://localhost:12380/")
self.contain("Client version: " + nilmdb.__version__)
self.contain("Server version: " + test_server.version)
self.contain("Server database path")
self.contain("Server database size")
self.contain("Server database free space")
def test_03_createlist(self):
# Basic stream tests, like those in test_client.
@@ -272,7 +274,7 @@ class TestCmdline(object):
# reversed range
self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01")
self.contain("start is after end")
self.contain("start must precede end")
def test_04_metadata(self):
# Set / get metadata
@@ -442,7 +444,7 @@ class TestCmdline(object):
self.contain("no intervals")
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'"
+ " --end='23 Mar 2012 10:05:15.50'")
+ " --end='23 Mar 2012 10:05:15.51'")
lines_(self.captured, 2)
self.contain("10:05:15.500")
@@ -471,29 +473,29 @@ class TestCmdline(object):
# empty ranges return error 2
self.fail("extract -a /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'",
"--start '23 Mar 2012 20:00:30' " +
"--end '23 Mar 2012 20:00:31'",
exitcode = 2, require_error = False)
self.contain("no data")
self.fail("extract -a /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'",
"--start '23 Mar 2012 20:00:30.000001' " +
"--end '23 Mar 2012 20:00:30.000002'",
exitcode = 2, require_error = False)
self.contain("no data")
self.fail("extract -a /newton/prep " +
"--start '23 Mar 2022 10:00:30' " +
"--end '23 Mar 2022 10:00:30'",
"--end '23 Mar 2022 10:00:31'",
exitcode = 2, require_error = False)
self.contain("no data")
# but are ok if we're just counting results
self.ok("extract --count /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
"--start '23 Mar 2012 20:00:30' " +
"--end '23 Mar 2012 20:00:31'")
self.match("0\n")
self.ok("extract -c /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'")
"--start '23 Mar 2012 20:00:30.000001' " +
"--end '23 Mar 2012 20:00:30.000002'")
self.match("0\n")
# Check various dumps against stored copies of how they should appear
@@ -540,31 +542,31 @@ class TestCmdline(object):
self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01")
self.contain("No stream at path")
# empty or backward ranges return errors
self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01")
self.contain("start is after end")
self.contain("start must precede end")
# empty ranges return success, backwards ranges return error
self.ok("remove /newton/prep " +
self.fail("remove /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
self.match("")
self.ok("remove /newton/prep " +
self.contain("start must precede end")
self.fail("remove /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'")
self.match("")
self.ok("remove /newton/prep " +
self.contain("start must precede end")
self.fail("remove /newton/prep " +
"--start '23 Mar 2022 10:00:30' " +
"--end '23 Mar 2022 10:00:30'")
self.match("")
self.contain("start must precede end")
# Verbose
self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
"--start '23 Mar 2022 20:00:30' " +
"--end '23 Mar 2022 20:00:31'")
self.match("0\n")
self.ok("remove --count /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
"--start '23 Mar 2022 20:00:30' " +
"--end '23 Mar 2022 20:00:31'")
self.match("0\n")
# Make sure we have the data we expect
@@ -765,7 +767,7 @@ class TestCmdline(object):
"tests/data/prep-20120323T1000")
# Should take up about 2.8 MB here (including directory entries)
du_before = nilmdb.utils.diskusage.du_bytes(testdb)
du_before = nilmdb.utils.diskusage.du(testdb)
# Make sure we have the data we expect
self.ok("list --detail")
@@ -815,7 +817,7 @@ class TestCmdline(object):
# We have 1/8 of the data that we had before, so the file size
# should have dropped below 1/4 of what it used to be
du_after = nilmdb.utils.diskusage.du_bytes(testdb)
du_after = nilmdb.utils.diskusage.du(testdb)
lt_(du_after, (du_before / 4))
# Remove anything that came from the 10:02 data file

View File

@@ -55,7 +55,7 @@ class TestInterval:
for x in [ "03/24/2012", "03/25/2012", "03/26/2012" ] ]
# basic construction
i = Interval(d1, d1)
i = Interval(d1, d2)
i = Interval(d1, d3)
eq_(i.start, d1)
eq_(i.end, d3)
@@ -77,8 +77,8 @@ class TestInterval:
assert(Interval(d1, d3) > Interval(d1, d2))
assert(Interval(d1, d2) < Interval(d2, d3))
assert(Interval(d1, d3) < Interval(d2, d3))
assert(Interval(d2, d2) > Interval(d1, d3))
assert(Interval(d3, d3) == Interval(d3, d3))
assert(Interval(d2, d2+0.01) > Interval(d1, d3))
assert(Interval(d3, d3+0.01) == Interval(d3, d3+0.01))
#with assert_raises(TypeError): # was AttributeError, that's wrong
# x = (i == 123)
@@ -293,7 +293,7 @@ class TestIntervalDB:
# actual start, end can be a subset
a = DBInterval(150, 200, 100, 200, 10000, 20000)
b = DBInterval(100, 150, 100, 200, 10000, 20000)
c = DBInterval(150, 150, 100, 200, 10000, 20000)
c = DBInterval(150, 160, 100, 200, 10000, 20000)
# Make a set of DBIntervals
iseta = IntervalSet([a, b])

View File

@@ -93,6 +93,13 @@ class Test00Nilmdb(object): # named 00 so it runs first
eq_(db.stream_get_metadata("/newton/prep"), meta1)
eq_(db.stream_get_metadata("/newton/raw"), meta1)
# fill in some test coverage for start >= end
with assert_raises(nilmdb.server.NilmDBError):
db.stream_remove("/newton/prep", 0, 0)
with assert_raises(nilmdb.server.NilmDBError):
db.stream_remove("/newton/prep", 1, 0)
db.stream_remove("/newton/prep", 0, 1)
db.close()
class TestBlockingServer(object):