Compare commits
24 Commits
nilmdb-1.0
...
nilmdb-1.1
Author | SHA1 | Date | |
---|---|---|---|
5dce851bef | |||
1431e41d16 | |||
a49c655816 | |||
30e3ffc0e9 | |||
db7211c3a9 | |||
c6d57cf5c3 | |||
ca5253ddee | |||
e19da84b2e | |||
3e8e3542fd | |||
2f7365412d | |||
bba9ad131e | |||
ee24380d1f | |||
bfcd91acf8 | |||
d97291d4d3 | |||
a61fbbcf45 | |||
5adc8fd0a7 | |||
251a486c28 | |||
1edb96a0bd | |||
52e674a192 | |||
e241c13bf1 | |||
b53ff31212 | |||
2045e89f24 | |||
841b2dab5c | |||
d634f7d3cf |
@@ -7,7 +7,7 @@ Prerequisites:
|
||||
sudo apt-get install python2.7 python2.7-dev python-setuptools cython
|
||||
|
||||
# Base NilmDB dependencies
|
||||
sudo apt-get install python-cherrypy3 python-decorator python-simplejson python-pycurl python-dateutil python-tz
|
||||
sudo apt-get install python-cherrypy3 python-decorator python-simplejson python-pycurl python-dateutil python-tz python-psutil
|
||||
|
||||
# Tools for running tests
|
||||
sudo apt-get install python-nose python-coverage
|
||||
|
@@ -8,22 +8,35 @@ import nilmdb.client.httpclient
|
||||
|
||||
import time
|
||||
import simplejson as json
|
||||
import contextlib
|
||||
|
||||
def float_to_string(f):
|
||||
"""Use repr to maintain full precision in the string output."""
|
||||
return repr(float(f))
|
||||
|
||||
def extract_timestamp(line):
|
||||
"""Extract just the timestamp from a line of data text"""
|
||||
return float(line.split()[0])
|
||||
|
||||
class Client(object):
|
||||
"""Main client interface to the Nilm database."""
|
||||
|
||||
def __init__(self, url):
|
||||
self.http = nilmdb.client.httpclient.HTTPClient(url)
|
||||
|
||||
# __enter__/__exit__ allow this class to be a context manager
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
||||
def _json_param(self, data):
|
||||
"""Return compact json-encoded version of parameter"""
|
||||
return json.dumps(data, separators=(',',':'))
|
||||
|
||||
def close(self):
|
||||
"""Close the connection; safe to call multiple times"""
|
||||
self.http.close()
|
||||
|
||||
def geturl(self):
|
||||
@@ -34,13 +47,10 @@ class Client(object):
|
||||
"""Return server version"""
|
||||
return self.http.get("version")
|
||||
|
||||
def dbpath(self):
|
||||
"""Return server database path"""
|
||||
return self.http.get("dbpath")
|
||||
|
||||
def dbsize(self):
|
||||
"""Return server database size as human readable string"""
|
||||
return self.http.get("dbsize")
|
||||
def dbinfo(self):
|
||||
"""Return server database info (path, size, free space)
|
||||
as a dictionary."""
|
||||
return self.http.get("dbinfo")
|
||||
|
||||
def stream_list(self, path = None, layout = None):
|
||||
params = {}
|
||||
@@ -95,79 +105,45 @@ class Client(object):
|
||||
params["end"] = float_to_string(end)
|
||||
return self.http.get("stream/remove", params)
|
||||
|
||||
def stream_insert(self, path, data, start = None, end = None):
|
||||
"""Insert data into a stream. data should be a file-like object
|
||||
that provides ASCII data that matches the database layout for path.
|
||||
@contextlib.contextmanager
|
||||
def stream_insert_context(self, path, start = None, end = None):
|
||||
"""Return a context manager that allows data to be efficiently
|
||||
inserted into a stream in a piecewise manner. Data is be provided
|
||||
as single lines, and is aggregated and sent to the server in larger
|
||||
chunks as necessary. Data lines must match the database layout for
|
||||
the given path, and end with a newline.
|
||||
|
||||
start and end are the starting and ending timestamp of this
|
||||
stream; all timestamps t in the data must satisfy 'start <= t
|
||||
< end'. If left unspecified, 'start' is the timestamp of the
|
||||
first line of data, and 'end' is the timestamp on the last line
|
||||
of data, plus a small delta of 1μs.
|
||||
Example:
|
||||
with client.stream_insert_context('/path', start, end) as ctx:
|
||||
ctx.insert_line('1234567890.0 1 2 3 4\\n')
|
||||
ctx.insert_line('1234567891.0 1 2 3 4\\n')
|
||||
|
||||
For more details, see help for nilmdb.client.client.StreamInserter
|
||||
|
||||
This may make multiple requests to the server, if the data is
|
||||
large enough or enough time has passed between insertions.
|
||||
"""
|
||||
params = { "path": path }
|
||||
ctx = StreamInserter(self, path, start, end)
|
||||
yield ctx
|
||||
ctx.finalize()
|
||||
|
||||
# See design.md for a discussion of how much data to send.
|
||||
# These are soft limits -- actual data might be rounded up.
|
||||
max_data = 1048576
|
||||
max_time = 30
|
||||
end_epsilon = 1e-6
|
||||
def stream_insert(self, path, data, start = None, end = None):
|
||||
"""Insert rows of data into a stream. data should be an
|
||||
iterable object that provides ASCII data that matches the
|
||||
database layout for path. See stream_insert_context for
|
||||
details on the 'start' and 'end' parameters."""
|
||||
with self.stream_insert_context(path, start, end) as ctx:
|
||||
ctx.insert_iter(data)
|
||||
return ctx.last_response
|
||||
|
||||
|
||||
def extract_timestamp(line):
|
||||
return float(line.split()[0])
|
||||
|
||||
def sendit():
|
||||
# If we have more data after this, use the timestamp of
|
||||
# the next line as the end. Otherwise, use the given
|
||||
# overall end time, or add end_epsilon to the last data
|
||||
# point.
|
||||
if nextline:
|
||||
block_end = extract_timestamp(nextline)
|
||||
if end and block_end > end:
|
||||
# This is unexpected, but we'll defer to the server
|
||||
# to return an error in this case.
|
||||
block_end = end
|
||||
elif end:
|
||||
block_end = end
|
||||
else:
|
||||
block_end = extract_timestamp(line) + end_epsilon
|
||||
|
||||
# Send it
|
||||
params["start"] = float_to_string(block_start)
|
||||
params["end"] = float_to_string(block_end)
|
||||
return self.http.put("stream/insert", block_data, params)
|
||||
|
||||
clock_start = time.time()
|
||||
block_data = ""
|
||||
block_start = start
|
||||
result = None
|
||||
line = None
|
||||
nextline = None
|
||||
for (line, nextline) in nilmdb.utils.misc.pairwise(data):
|
||||
# If we don't have a starting time, extract it from the first line
|
||||
if block_start is None:
|
||||
block_start = extract_timestamp(line)
|
||||
|
||||
clock_elapsed = time.time() - clock_start
|
||||
block_data += line
|
||||
|
||||
# If we have enough data, or enough time has elapsed,
|
||||
# send this block to the server, and empty things out
|
||||
# for the next block.
|
||||
if (len(block_data) > max_data) or (clock_elapsed > max_time):
|
||||
result = sendit()
|
||||
block_start = None
|
||||
block_data = ""
|
||||
clock_start = time.time()
|
||||
|
||||
# One last block?
|
||||
if len(block_data):
|
||||
result = sendit()
|
||||
|
||||
# Return the most recent JSON result we got back, or None if
|
||||
# we didn't make any requests.
|
||||
return result
|
||||
def stream_insert_block(self, path, block, start, end):
|
||||
"""Insert an entire block of data into a stream. Like
|
||||
stream_insert, except 'block' contains multiple lines of ASCII
|
||||
text and is sent in one single chunk."""
|
||||
params = { "path": path,
|
||||
"start": float_to_string(start),
|
||||
"end": float_to_string(end) }
|
||||
return self.http.put("stream/insert", block, params)
|
||||
|
||||
def stream_intervals(self, path, start = None, end = None):
|
||||
"""
|
||||
@@ -188,8 +164,8 @@ class Client(object):
|
||||
lines of ASCII-formatted data that matches the database
|
||||
layout for the given path.
|
||||
|
||||
Specify count=True to just get a count of values rather than
|
||||
the actual data.
|
||||
Specify count = True to return a count of matching data points
|
||||
rather than the actual data. The output format is unchanged.
|
||||
"""
|
||||
params = {
|
||||
"path": path,
|
||||
@@ -202,3 +178,202 @@ class Client(object):
|
||||
params["count"] = 1
|
||||
|
||||
return self.http.get_gen("stream/extract", params, retjson = False)
|
||||
|
||||
def stream_count(self, path, start = None, end = None):
|
||||
"""
|
||||
Return the number of rows of data in the stream that satisfy
|
||||
the given timestamps.
|
||||
"""
|
||||
counts = list(self.stream_extract(path, start, end, count = True))
|
||||
return int(counts[0])
|
||||
|
||||
class StreamInserter(object):
|
||||
"""Object returned by stream_insert_context() that manages
|
||||
the insertion of rows of data into a particular path.
|
||||
|
||||
The basic data flow is that we are filling a contiguous interval
|
||||
on the server, with no gaps, that extends from timestamp 'start'
|
||||
to timestamp 'end'. Data timestamps satisfy 'start <= t < end'.
|
||||
Data is provided by the user one line at a time with
|
||||
.insert_line() or .insert_iter().
|
||||
|
||||
1. The first inserted line begins a new interval that starts at
|
||||
'start'. If 'start' is not given, it is deduced from the first
|
||||
line's timestamp.
|
||||
|
||||
2. Subsequent lines go into the same contiguous interval. As lines
|
||||
are inserted, this routine may make multiple insertion requests to
|
||||
the server, but will structure the timestamps to leave no gaps.
|
||||
|
||||
3. The current contiguous interval can be completed by manually
|
||||
calling .finalize(), which the context manager will also do
|
||||
automatically. This will send any remaining data to the server,
|
||||
using the 'end' timestamp to end the interval.
|
||||
|
||||
After a .finalize(), inserting new data goes back to step 1.
|
||||
|
||||
.update_start() can be called before step 1 to change the start
|
||||
time for the interval. .update_end() can be called before step 3
|
||||
to change the end time for the interval.
|
||||
"""
|
||||
|
||||
# See design.md for a discussion of how much data to send.
|
||||
# These are soft limits -- actual data might be rounded up.
|
||||
# We send when we have a certain amount of data queued, or
|
||||
# when a certain amount of time has passed since the last send.
|
||||
_max_data = 1048576
|
||||
_max_time = 30
|
||||
|
||||
# Delta to add to the final timestamp, if "end" wasn't given
|
||||
_end_epsilon = 1e-6
|
||||
|
||||
def __init__(self, client, path, start = None, end = None):
|
||||
"""'http' is the httpclient object. 'path' is the database
|
||||
path to insert to. 'start' and 'end' are used for the first
|
||||
contiguous interval."""
|
||||
self.last_response = None
|
||||
|
||||
self._client = client
|
||||
self._path = path
|
||||
|
||||
# Start and end for the overall contiguous interval we're
|
||||
# filling
|
||||
self._interval_start = start
|
||||
self._interval_end = end
|
||||
|
||||
# Data for the specific block we're building up to send
|
||||
self._block_data = []
|
||||
self._block_len = 0
|
||||
self._block_start = None
|
||||
|
||||
# Time of last request
|
||||
self._last_time = time.time()
|
||||
|
||||
# We keep a buffer of the two most recently inserted lines.
|
||||
# Only the older one actually gets processed; the newer one
|
||||
# is used to "look-ahead" to the next timestamp if we need
|
||||
# to internally split an insertion into two requests.
|
||||
self._line_old = None
|
||||
self._line_new = None
|
||||
|
||||
def insert_iter(self, iter):
|
||||
"""Insert all lines of ASCII formatted data from the given
|
||||
iterable. Lines must be terminated with '\\n'."""
|
||||
for line in iter:
|
||||
self.insert_line(line)
|
||||
|
||||
def insert_line(self, line, allow_intermediate = True):
|
||||
"""Insert a single line of ASCII formatted data. Line
|
||||
must be terminated with '\\n'."""
|
||||
if line and (len(line) < 1 or line[-1] != '\n'):
|
||||
raise ValueError("lines must end in with a newline character")
|
||||
|
||||
# Store this new line, but process the previous (old) one.
|
||||
# This lets us "look ahead" to the next line.
|
||||
self._line_old = self._line_new
|
||||
self._line_new = line
|
||||
if self._line_old is None:
|
||||
return
|
||||
|
||||
# If starting a new block, pull out the timestamp if needed.
|
||||
if self._block_start is None:
|
||||
if self._interval_start is not None:
|
||||
# User provided a start timestamp. Use it once, then
|
||||
# clear it for the next block.
|
||||
self._block_start = self._interval_start
|
||||
self._interval_start = None
|
||||
else:
|
||||
# Extract timestamp from the first row
|
||||
self._block_start = extract_timestamp(self._line_old)
|
||||
|
||||
# Save the line
|
||||
self._block_data.append(self._line_old)
|
||||
self._block_len += len(self._line_old)
|
||||
|
||||
if allow_intermediate:
|
||||
# Send an intermediate block to the server if needed.
|
||||
elapsed = time.time() - self._last_time
|
||||
if (self._block_len > self._max_data) or (elapsed > self._max_time):
|
||||
self._send_block_intermediate()
|
||||
|
||||
def update_start(self, start):
|
||||
"""Update the start time for the next contiguous interval.
|
||||
Call this before starting to insert data for a new interval,
|
||||
for example, after .finalize()"""
|
||||
self._interval_start = start
|
||||
|
||||
def update_end(self, end):
|
||||
"""Update the end time for the current contiguous interval.
|
||||
Call this before .finalize()"""
|
||||
self._interval_end = end
|
||||
|
||||
def finalize(self):
|
||||
"""Stop filling the current contiguous interval.
|
||||
All outstanding data will be sent, and the interval end
|
||||
time of the interval will be taken from the 'end' argument
|
||||
used when initializing this class, or the most recent
|
||||
value passed to update_end(), or the last timestamp plus
|
||||
a small epsilon value if no other endpoint was provided.
|
||||
|
||||
If more data is inserted after a finalize(), it will become
|
||||
part of a new interval and there may be a gap left in-between."""
|
||||
# Special marker tells insert_line that this is the end
|
||||
self.insert_line(None, allow_intermediate = False)
|
||||
|
||||
if self._block_len > 0:
|
||||
# We have data pending, so send the final block
|
||||
self._send_block_final()
|
||||
elif None not in (self._interval_start, self._interval_end):
|
||||
# We have no data, but enough information to create an
|
||||
# empty interval.
|
||||
self._block_start = self._interval_start
|
||||
self._interval_start = None
|
||||
self._send_block_final()
|
||||
else:
|
||||
# No data, and no timestamps to use to create an empty
|
||||
# interval.
|
||||
pass
|
||||
|
||||
# Make sure both timestamps are emptied for future intervals.
|
||||
self._interval_start = None
|
||||
self._interval_end = None
|
||||
|
||||
def _send_block_intermediate(self):
|
||||
"""Send data, when we still have more data to send.
|
||||
Use the timestamp from the next line, so that the blocks
|
||||
are contiguous."""
|
||||
block_end = extract_timestamp(self._line_new)
|
||||
if self._interval_end is not None and block_end > self._interval_end:
|
||||
# Something's fishy -- the timestamp we found is after
|
||||
# the user's specified end. Limit it here, and the
|
||||
# server will return an error.
|
||||
block_end = self._interval_end
|
||||
self._send_block(block_end)
|
||||
|
||||
def _send_block_final(self):
|
||||
"""Send data, when this is the last block for the interval.
|
||||
There is no next line, so figure out the actual interval end
|
||||
using interval_end or end_epsilon."""
|
||||
if self._interval_end is not None:
|
||||
# Use the user's specified end timestamp
|
||||
block_end = self._interval_end
|
||||
# Clear it in case we send more intervals in the future.
|
||||
self._interval_end = None
|
||||
else:
|
||||
# Add an epsilon to the last timestamp we saw
|
||||
block_end = extract_timestamp(self._line_old) + self._end_epsilon
|
||||
self._send_block(block_end)
|
||||
|
||||
def _send_block(self, block_end):
|
||||
"""Send current block to the server"""
|
||||
self.last_response = self._client.stream_insert_block(
|
||||
self._path, "".join(self._block_data),
|
||||
self._block_start, block_end)
|
||||
|
||||
# Clear out the block
|
||||
self._block_data = []
|
||||
self._block_len = 0
|
||||
self._block_start = None
|
||||
|
||||
# Note when we sent it
|
||||
self._last_time = time.time()
|
||||
|
@@ -33,6 +33,18 @@ class HTTPClient(object):
|
||||
self.curl.setopt(pycurl.URL, url)
|
||||
self.url = url
|
||||
|
||||
def _check_busy_and_set_upload(self, upload):
|
||||
"""Sets the pycurl.UPLOAD option, but also raises a more
|
||||
friendly exception if the client is already serving a request."""
|
||||
try:
|
||||
self.curl.setopt(pycurl.UPLOAD, upload)
|
||||
except pycurl.error as e:
|
||||
if "is currently running" in str(e):
|
||||
raise Exception("Client is already performing a request, and "
|
||||
"nesting calls is not supported.")
|
||||
else: # pragma: no cover (shouldn't happen)
|
||||
raise
|
||||
|
||||
def _check_error(self, body = None):
|
||||
code = self.curl.getinfo(pycurl.RESPONSE_CODE)
|
||||
if code == 200:
|
||||
@@ -80,11 +92,11 @@ class HTTPClient(object):
|
||||
self._status = int(data.split(" ")[1])
|
||||
self._headers += data
|
||||
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
|
||||
def func(callback):
|
||||
def perform(callback):
|
||||
self.curl.setopt(pycurl.WRITEFUNCTION, callback)
|
||||
self.curl.perform()
|
||||
try:
|
||||
with nilmdb.utils.Iteratorizer(func, curl_hack = True) as it:
|
||||
with nilmdb.utils.Iteratorizer(perform, curl_hack = True) as it:
|
||||
for i in it:
|
||||
if self._status == 200:
|
||||
# If we had a 200 response, yield the data to caller.
|
||||
@@ -156,12 +168,12 @@ class HTTPClient(object):
|
||||
|
||||
def get(self, url, params = None, retjson = True):
|
||||
"""Simple GET"""
|
||||
self.curl.setopt(pycurl.UPLOAD, 0)
|
||||
self._check_busy_and_set_upload(0)
|
||||
return self._doreq(url, params, retjson)
|
||||
|
||||
def put(self, url, postdata, params = None, retjson = True):
|
||||
"""Simple PUT"""
|
||||
self.curl.setopt(pycurl.UPLOAD, 1)
|
||||
self._check_busy_and_set_upload(1)
|
||||
self._setup_url(url, params)
|
||||
data = cStringIO.StringIO(postdata)
|
||||
self.curl.setopt(pycurl.READFUNCTION, data.read)
|
||||
@@ -184,12 +196,12 @@ class HTTPClient(object):
|
||||
|
||||
def get_gen(self, url, params = None, retjson = True):
|
||||
"""Simple GET, returning a generator"""
|
||||
self.curl.setopt(pycurl.UPLOAD, 0)
|
||||
self._check_busy_and_set_upload(0)
|
||||
return self._doreq_gen(url, params, retjson)
|
||||
|
||||
def put_gen(self, url, postdata, params = None, retjson = True):
|
||||
"""Simple PUT, returning a generator"""
|
||||
self.curl.setopt(pycurl.UPLOAD, 1)
|
||||
self._check_busy_and_set_upload(1)
|
||||
self._setup_url(url, params)
|
||||
data = cStringIO.StringIO(postdata)
|
||||
self.curl.setopt(pycurl.READFUNCTION, data.read)
|
||||
|
@@ -1,5 +1,6 @@
|
||||
import nilmdb
|
||||
from nilmdb.utils.printf import *
|
||||
from nilmdb.utils import human_size
|
||||
|
||||
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
||||
|
||||
@@ -17,5 +18,7 @@ def cmd_info(self):
|
||||
printf("Client version: %s\n", nilmdb.__version__)
|
||||
printf("Server version: %s\n", self.client.version())
|
||||
printf("Server URL: %s\n", self.client.geturl())
|
||||
printf("Server database path: %s\n", self.client.dbpath())
|
||||
printf("Server database size: %s\n", self.client.dbsize())
|
||||
dbinfo = self.client.dbinfo()
|
||||
printf("Server database path: %s\n", dbinfo["path"])
|
||||
printf("Server database size: %s\n", human_size(dbinfo["size"]))
|
||||
printf("Server database free space: %s\n", human_size(dbinfo["free"]))
|
||||
|
@@ -47,8 +47,8 @@ def cmd_list_verify(self):
|
||||
self.args.path = self.args.path_positional
|
||||
|
||||
if self.args.start is not None and self.args.end is not None:
|
||||
if self.args.start > self.args.end:
|
||||
self.parser.error("start is after end")
|
||||
if self.args.start >= self.args.end:
|
||||
self.parser.error("start must precede end")
|
||||
|
||||
def cmd_list(self):
|
||||
"""List available streams"""
|
||||
|
@@ -8,8 +8,7 @@ def setup(self, sub):
|
||||
Remove all data from a specified time range within a
|
||||
stream.
|
||||
""")
|
||||
cmd.set_defaults(verify = cmd_remove_verify,
|
||||
handler = cmd_remove)
|
||||
cmd.set_defaults(handler = cmd_remove)
|
||||
|
||||
group = cmd.add_argument_group("Data selection")
|
||||
group.add_argument("path",
|
||||
@@ -25,11 +24,6 @@ def setup(self, sub):
|
||||
group.add_argument("-c", "--count", action="store_true",
|
||||
help="Output number of data points removed")
|
||||
|
||||
def cmd_remove_verify(self):
|
||||
if self.args.start is not None and self.args.end is not None:
|
||||
if self.args.start > self.args.end:
|
||||
self.parser.error("start is after end")
|
||||
|
||||
def cmd_remove(self):
|
||||
try:
|
||||
count = self.client.stream_remove(self.args.path,
|
||||
|
@@ -13,6 +13,16 @@ import struct
|
||||
import mmap
|
||||
import re
|
||||
|
||||
# If we have the faulthandler module, use it. All of the mmap stuff
|
||||
# might trigger a SIGSEGV or SIGBUS if we're not careful, and
|
||||
# faulthandler will give a traceback in that case. (the Python
|
||||
# interpreter will still die either way).
|
||||
try: # pragma: no cover
|
||||
import faulthandler
|
||||
faulthandler.enable()
|
||||
except: # pragma: no cover
|
||||
pass
|
||||
|
||||
# Up to 256 open file descriptors at any given time.
|
||||
# These variables are global so they can be used in the decorator arguments.
|
||||
table_cache_size = 16
|
||||
@@ -161,6 +171,52 @@ class BulkData(object):
|
||||
ospath = os.path.join(self.root, *elements)
|
||||
return Table(ospath)
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = True)
|
||||
class File(object):
|
||||
"""Object representing a single file on disk. Data can be appended,
|
||||
or the self.mmap handle can be used for random reads."""
|
||||
|
||||
def __init__(self, root, subdir, filename):
|
||||
# Create path if it doesn't exist
|
||||
try:
|
||||
os.mkdir(os.path.join(root, subdir))
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# Open/create file
|
||||
self._f = open(os.path.join(root, subdir, filename), "a+b", 0)
|
||||
|
||||
# Seek to end, and get size
|
||||
self._f.seek(0, 2)
|
||||
self.size = self._f.tell()
|
||||
|
||||
# Open mmap object
|
||||
self.mmap = None
|
||||
self._mmap_reopen()
|
||||
|
||||
def _mmap_reopen(self):
|
||||
if self.size == 0:
|
||||
# Don't mmap if the file is empty; it would fail
|
||||
pass
|
||||
elif self.mmap is None:
|
||||
# Not opened yet, so open it
|
||||
self.mmap = mmap.mmap(self._f.fileno(), 0)
|
||||
else:
|
||||
# Already opened, so just resize it
|
||||
self.mmap.resize(self.size)
|
||||
|
||||
def close(self):
|
||||
if self.mmap is not None:
|
||||
self.mmap.close()
|
||||
self._f.close()
|
||||
|
||||
def append(self, data):
|
||||
# Write data, flush it, and resize our mmap accordingly
|
||||
self._f.write(data)
|
||||
self._f.flush()
|
||||
self.size += len(data)
|
||||
self._mmap_reopen()
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = True)
|
||||
class Table(object):
|
||||
"""Tools to help access a single table (data at a specific OS path)."""
|
||||
@@ -211,7 +267,7 @@ class Table(object):
|
||||
self.nrows = self._get_nrows()
|
||||
|
||||
def close(self):
|
||||
self.mmap_open.cache_remove_all()
|
||||
self.file_open.cache_remove_all()
|
||||
|
||||
# Internal helpers
|
||||
def _get_nrows(self):
|
||||
@@ -275,37 +331,11 @@ class Table(object):
|
||||
|
||||
# Cache open files
|
||||
@nilmdb.utils.lru_cache(size = fd_cache_size,
|
||||
keys = slice(0, 3), # exclude newsize
|
||||
onremove = lambda x: x.close())
|
||||
def mmap_open(self, subdir, filename, newsize = None):
|
||||
onremove = lambda f: f.close())
|
||||
def file_open(self, subdir, filename):
|
||||
"""Open and map a given 'subdir/filename' (relative to self.root).
|
||||
Will be automatically closed when evicted from the cache.
|
||||
|
||||
If 'newsize' is provided, the file is truncated to the given
|
||||
size before the mapping is returned. (Note that the LRU cache
|
||||
on this function means the truncate will only happen if the
|
||||
object isn't already cached; mmap.resize should be used too.)"""
|
||||
try:
|
||||
os.mkdir(os.path.join(self.root, subdir))
|
||||
except OSError:
|
||||
pass
|
||||
f = open(os.path.join(self.root, subdir, filename), "a+", 0)
|
||||
if newsize is not None:
|
||||
# mmap can't map a zero-length file, so this allows the
|
||||
# caller to set the filesize between file creation and
|
||||
# mmap.
|
||||
f.truncate(newsize)
|
||||
mm = mmap.mmap(f.fileno(), 0)
|
||||
return mm
|
||||
|
||||
def mmap_open_resize(self, subdir, filename, newsize):
|
||||
"""Open and map a given 'subdir/filename' (relative to self.root).
|
||||
The file is resized to the given size."""
|
||||
# Pass new size to mmap_open
|
||||
mm = self.mmap_open(subdir, filename, newsize)
|
||||
# In case we got a cached copy, need to call mm.resize too.
|
||||
mm.resize(newsize)
|
||||
return mm
|
||||
Will be automatically closed when evicted from the cache."""
|
||||
return File(self.root, subdir, filename)
|
||||
|
||||
def append(self, data):
|
||||
"""Append the data and flush it to disk.
|
||||
@@ -317,14 +347,13 @@ class Table(object):
|
||||
(subdir, fname, offset, count) = self._offset_from_row(self.nrows)
|
||||
if count > remaining:
|
||||
count = remaining
|
||||
newsize = offset + count * self.packer.size
|
||||
mm = self.mmap_open_resize(subdir, fname, newsize)
|
||||
mm.seek(offset)
|
||||
|
||||
f = self.file_open(subdir, fname)
|
||||
|
||||
# Write the data
|
||||
for i in xrange(count):
|
||||
row = dataiter.next()
|
||||
mm.write(self.packer.pack(*row))
|
||||
f.append(self.packer.pack(*row))
|
||||
remaining -= count
|
||||
self.nrows += count
|
||||
|
||||
@@ -351,7 +380,7 @@ class Table(object):
|
||||
(subdir, filename, offset, count) = self._offset_from_row(row)
|
||||
if count > remaining:
|
||||
count = remaining
|
||||
mm = self.mmap_open(subdir, filename)
|
||||
mm = self.file_open(subdir, filename).mmap
|
||||
for i in xrange(count):
|
||||
ret.append(list(self.packer.unpack_from(mm, offset)))
|
||||
offset += self.packer.size
|
||||
@@ -363,7 +392,7 @@ class Table(object):
|
||||
if key < 0 or key >= self.nrows:
|
||||
raise IndexError("Index out of range")
|
||||
(subdir, filename, offset, count) = self._offset_from_row(key)
|
||||
mm = self.mmap_open(subdir, filename)
|
||||
mm = self.file_open(subdir, filename).mmap
|
||||
# unpack_from ignores the mmap object's current seek position
|
||||
return list(self.packer.unpack_from(mm, offset))
|
||||
|
||||
@@ -410,8 +439,8 @@ class Table(object):
|
||||
# are generally easier if we don't have to special-case that.
|
||||
if (len(merged) == 1 and
|
||||
merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
|
||||
# Close potentially open file in mmap_open LRU cache
|
||||
self.mmap_open.cache_remove(self, subdir, filename)
|
||||
# Close potentially open file in file_open LRU cache
|
||||
self.file_open.cache_remove(self, subdir, filename)
|
||||
|
||||
# Delete files
|
||||
os.remove(datafile)
|
||||
|
@@ -36,7 +36,7 @@ cdef class Interval:
|
||||
"""
|
||||
'start' and 'end' are arbitrary floats that represent time
|
||||
"""
|
||||
if start > end:
|
||||
if start >= end:
|
||||
# Explicitly disallow zero-width intervals (since they're half-open)
|
||||
raise IntervalError("start %s must precede end %s" % (start, end))
|
||||
self.start = float(start)
|
||||
|
@@ -142,6 +142,15 @@ class NilmDB(object):
|
||||
with self.con:
|
||||
cur.execute("PRAGMA user_version = {v:d}".format(v=version))
|
||||
|
||||
def _check_user_times(self, start, end):
|
||||
if start is None:
|
||||
start = -1e12
|
||||
if end is None:
|
||||
end = 1e12
|
||||
if start >= end:
|
||||
raise NilmDBError("start must precede end")
|
||||
return (start, end)
|
||||
|
||||
@nilmdb.utils.lru_cache(size = 16)
|
||||
def _get_intervals(self, stream_id):
|
||||
"""
|
||||
@@ -303,7 +312,8 @@ class NilmDB(object):
|
||||
"""
|
||||
stream_id = self._stream_id(path)
|
||||
intervals = self._get_intervals(stream_id)
|
||||
requested = Interval(start or 0, end or 1e12)
|
||||
(start, end) = self._check_user_times(start, end)
|
||||
requested = Interval(start, end)
|
||||
result = []
|
||||
for n, i in enumerate(intervals.intersection(requested)):
|
||||
if n >= self.max_results:
|
||||
@@ -396,7 +406,7 @@ class NilmDB(object):
|
||||
path: Path at which to add the data
|
||||
start: Starting timestamp
|
||||
end: Ending timestamp
|
||||
data: Rows of data, to be passed to PyTable's table.append
|
||||
data: Rows of data, to be passed to bulkdata table.append
|
||||
method. E.g. nilmdb.layout.Parser.data
|
||||
"""
|
||||
# First check for basic overlap using timestamp info given.
|
||||
@@ -417,7 +427,7 @@ class NilmDB(object):
|
||||
self._add_interval(stream_id, interval, row_start, row_end)
|
||||
|
||||
# And that's all
|
||||
return "ok"
|
||||
return
|
||||
|
||||
def _find_start(self, table, dbinterval):
|
||||
"""
|
||||
@@ -475,7 +485,8 @@ class NilmDB(object):
|
||||
stream_id = self._stream_id(path)
|
||||
table = self.data.getnode(path)
|
||||
intervals = self._get_intervals(stream_id)
|
||||
requested = Interval(start or 0, end or 1e12)
|
||||
(start, end) = self._check_user_times(start, end)
|
||||
requested = Interval(start, end)
|
||||
result = []
|
||||
matched = 0
|
||||
remaining = self.max_results
|
||||
@@ -521,12 +532,10 @@ class NilmDB(object):
|
||||
stream_id = self._stream_id(path)
|
||||
table = self.data.getnode(path)
|
||||
intervals = self._get_intervals(stream_id)
|
||||
to_remove = Interval(start or 0, end or 1e12)
|
||||
(start, end) = self._check_user_times(start, end)
|
||||
to_remove = Interval(start, end)
|
||||
removed = 0
|
||||
|
||||
if start == end:
|
||||
return 0
|
||||
|
||||
# Can't remove intervals from within the iterator, so we need to
|
||||
# remember what's currently in the intersection now.
|
||||
all_candidates = list(intervals.intersection(to_remove, orig = True))
|
||||
|
@@ -13,6 +13,7 @@ import os
|
||||
import simplejson as json
|
||||
import decorator
|
||||
import traceback
|
||||
import psutil
|
||||
|
||||
try:
|
||||
cherrypy.tools.json_out
|
||||
@@ -99,17 +100,16 @@ class Root(NilmApp):
|
||||
def version(self):
|
||||
return nilmdb.__version__
|
||||
|
||||
# /dbpath
|
||||
# /dbinfo
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
def dbpath(self):
|
||||
return self.db.get_basepath()
|
||||
|
||||
# /dbsize
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
def dbsize(self):
|
||||
return nilmdb.utils.du(self.db.get_basepath())
|
||||
def dbinfo(self):
|
||||
"""Return a dictionary with the database path,
|
||||
size of the database in bytes, and free disk space in bytes"""
|
||||
path = self.db.get_basepath()
|
||||
return { "path": path,
|
||||
"size": nilmdb.utils.du(path),
|
||||
"free": psutil.disk_usage(path).free }
|
||||
|
||||
class Stream(NilmApp):
|
||||
"""Stream-specific operations"""
|
||||
@@ -177,7 +177,6 @@ class Stream(NilmApp):
|
||||
dictionary"""
|
||||
data_dict = json.loads(data)
|
||||
self.db.stream_set_metadata(path, data_dict)
|
||||
return "ok"
|
||||
|
||||
# /stream/update_metadata?path=/newton/prep&data=<json>
|
||||
@cherrypy.expose
|
||||
@@ -188,7 +187,6 @@ class Stream(NilmApp):
|
||||
should be a json-encoded dictionary"""
|
||||
data_dict = json.loads(data)
|
||||
self.db.stream_update_metadata(path, data_dict)
|
||||
return "ok"
|
||||
|
||||
# /stream/insert?path=/newton/prep
|
||||
@cherrypy.expose
|
||||
@@ -223,19 +221,17 @@ class Stream(NilmApp):
|
||||
"error parsing input data: " +
|
||||
e.message)
|
||||
|
||||
if (not parser.min_timestamp or not parser.max_timestamp or
|
||||
not len(parser.data)):
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"no data provided")
|
||||
|
||||
# Check limits
|
||||
start = float(start)
|
||||
end = float(end)
|
||||
if parser.min_timestamp < start:
|
||||
if start >= end:
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"start must precede end")
|
||||
if parser.min_timestamp is not None and parser.min_timestamp < start:
|
||||
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
|
||||
repr(parser.min_timestamp) +
|
||||
" < start time " + repr(start))
|
||||
if parser.max_timestamp >= end:
|
||||
if parser.max_timestamp is not None and parser.max_timestamp >= end:
|
||||
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
|
||||
repr(parser.max_timestamp) +
|
||||
" >= end time " + repr(end))
|
||||
@@ -247,7 +243,7 @@ class Stream(NilmApp):
|
||||
raise cherrypy.HTTPError("400 Bad Request", e.message)
|
||||
|
||||
# Done
|
||||
return "ok"
|
||||
return
|
||||
|
||||
# /stream/remove?path=/newton/prep
|
||||
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
@@ -265,9 +261,9 @@ class Stream(NilmApp):
|
||||
if end is not None:
|
||||
end = float(end)
|
||||
if start is not None and end is not None:
|
||||
if end < start:
|
||||
if start >= end:
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"end before start")
|
||||
"start must precede end")
|
||||
return self.db.stream_remove(path, start, end)
|
||||
|
||||
# /stream/intervals?path=/newton/prep
|
||||
@@ -292,9 +288,9 @@ class Stream(NilmApp):
|
||||
end = float(end)
|
||||
|
||||
if start is not None and end is not None:
|
||||
if end < start:
|
||||
if start >= end:
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"end before start")
|
||||
"start must precede end")
|
||||
|
||||
streams = self.db.stream_list(path = path)
|
||||
if len(streams) != 1:
|
||||
@@ -332,9 +328,9 @@ class Stream(NilmApp):
|
||||
|
||||
# Check parameters
|
||||
if start is not None and end is not None:
|
||||
if end < start:
|
||||
if start >= end:
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"end before start")
|
||||
"start must precede end")
|
||||
|
||||
# Check path and get layout
|
||||
streams = self.db.stream_list(path = path)
|
||||
|
@@ -4,8 +4,7 @@ from nilmdb.utils.timer import Timer
|
||||
from nilmdb.utils.iteratorizer import Iteratorizer
|
||||
from nilmdb.utils.serializer import Serializer
|
||||
from nilmdb.utils.lrucache import lru_cache
|
||||
from nilmdb.utils.diskusage import du
|
||||
from nilmdb.utils.diskusage import du, human_size
|
||||
from nilmdb.utils.mustclose import must_close
|
||||
from nilmdb.utils.urllib import urlencode
|
||||
from nilmdb.utils import misc
|
||||
from nilmdb.utils import atomic
|
||||
|
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
from math import log
|
||||
|
||||
def sizeof_fmt(num):
|
||||
def human_size(num):
|
||||
"""Human friendly file size"""
|
||||
unit_list = zip(['bytes', 'kiB', 'MiB', 'GiB', 'TiB'], [0, 0, 1, 2, 2])
|
||||
if num > 1:
|
||||
@@ -15,15 +15,11 @@ def sizeof_fmt(num):
|
||||
if num == 1: # pragma: no cover
|
||||
return '1 byte'
|
||||
|
||||
def du_bytes(path):
|
||||
def du(path):
|
||||
"""Like du -sb, returns total size of path in bytes."""
|
||||
size = os.path.getsize(path)
|
||||
if os.path.isdir(path):
|
||||
for thisfile in os.listdir(path):
|
||||
filepath = os.path.join(path, thisfile)
|
||||
size += du_bytes(filepath)
|
||||
size += du(filepath)
|
||||
return size
|
||||
|
||||
def du(path):
|
||||
"""Like du -sh, returns total size of path as a human-readable string."""
|
||||
return sizeof_fmt(du_bytes(path))
|
||||
|
@@ -1,8 +0,0 @@
|
||||
import itertools
|
||||
|
||||
def pairwise(iterable):
|
||||
"s -> (s0,s1), (s1,s2), ..., (sn,None)"
|
||||
a, b = itertools.tee(iterable)
|
||||
next(b, None)
|
||||
return itertools.izip_longest(a, b)
|
||||
|
@@ -38,6 +38,7 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
|
||||
|
||||
@wrap_class_method
|
||||
def close(orig, self, *args, **kwargs):
|
||||
if "_must_close" in self.__dict__:
|
||||
del self._must_close
|
||||
return orig(self, *args, **kwargs)
|
||||
|
||||
|
1
setup.py
1
setup.py
@@ -114,6 +114,7 @@ setup(name='nilmdb',
|
||||
'pycurl',
|
||||
'python-dateutil',
|
||||
'pytz',
|
||||
'psutil >= 0.3.0',
|
||||
],
|
||||
packages = [ 'nilmdb',
|
||||
'nilmdb.utils',
|
||||
|
@@ -18,10 +18,12 @@ import simplejson as json
|
||||
import unittest
|
||||
import warnings
|
||||
import resource
|
||||
import time
|
||||
|
||||
from testutil.helpers import *
|
||||
|
||||
testdb = "tests/client-testdb"
|
||||
testurl = "http://localhost:12380/"
|
||||
|
||||
def setup_module():
|
||||
global test_server, test_db
|
||||
@@ -44,28 +46,32 @@ def teardown_module():
|
||||
|
||||
class TestClient(object):
|
||||
|
||||
def test_client_1_basic(self):
|
||||
def test_client_01_basic(self):
|
||||
# Test a fake host
|
||||
client = nilmdb.Client(url = "http://localhost:1/")
|
||||
with assert_raises(nilmdb.client.ServerError):
|
||||
client.version()
|
||||
client.close()
|
||||
|
||||
# Trigger same error with a PUT request
|
||||
client = nilmdb.Client(url = "http://localhost:1/")
|
||||
with assert_raises(nilmdb.client.ServerError):
|
||||
client.version()
|
||||
client.close()
|
||||
|
||||
# Then a fake URL on a real host
|
||||
client = nilmdb.Client(url = "http://localhost:12380/fake/")
|
||||
with assert_raises(nilmdb.client.ClientError):
|
||||
client.version()
|
||||
client.close()
|
||||
|
||||
# Now a real URL with no http:// prefix
|
||||
client = nilmdb.Client(url = "localhost:12380")
|
||||
version = client.version()
|
||||
client.close()
|
||||
|
||||
# Now use the real URL
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
client = nilmdb.Client(url = testurl)
|
||||
version = client.version()
|
||||
eq_(distutils.version.LooseVersion(version),
|
||||
distutils.version.LooseVersion(test_server.version))
|
||||
@@ -73,10 +79,11 @@ class TestClient(object):
|
||||
# Bad URLs should give 404, not 500
|
||||
with assert_raises(ClientError):
|
||||
client.http.get("/stream/create")
|
||||
client.close()
|
||||
|
||||
def test_client_2_createlist(self):
|
||||
def test_client_02_createlist(self):
|
||||
# Basic stream tests, like those in test_nilmdb:test_stream
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
client = nilmdb.Client(url = testurl)
|
||||
|
||||
# Database starts empty
|
||||
eq_(client.stream_list(), [])
|
||||
@@ -101,8 +108,10 @@ class TestClient(object):
|
||||
["/newton/zzz/rawnotch", "RawNotchedData"]
|
||||
])
|
||||
# Match just one type or one path
|
||||
eq_(client.stream_list(layout="RawData"), [ ["/newton/raw", "RawData"] ])
|
||||
eq_(client.stream_list(path="/newton/raw"), [ ["/newton/raw", "RawData"] ])
|
||||
eq_(client.stream_list(layout="RawData"),
|
||||
[ ["/newton/raw", "RawData"] ])
|
||||
eq_(client.stream_list(path="/newton/raw"),
|
||||
[ ["/newton/raw", "RawData"] ])
|
||||
|
||||
# Try messing with resource limits to trigger errors and get
|
||||
# more coverage. Here, make it so we can only create files 1
|
||||
@@ -114,9 +123,10 @@ class TestClient(object):
|
||||
client.stream_create("/newton/hello", "RawData")
|
||||
resource.setrlimit(resource.RLIMIT_FSIZE, limit)
|
||||
|
||||
client.close()
|
||||
|
||||
def test_client_3_metadata(self):
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
def test_client_03_metadata(self):
|
||||
client = nilmdb.Client(url = testurl)
|
||||
|
||||
# Set / get metadata
|
||||
eq_(client.stream_get_metadata("/newton/prep"), {})
|
||||
@@ -131,9 +141,10 @@ class TestClient(object):
|
||||
client.stream_update_metadata("/newton/raw", meta3)
|
||||
eq_(client.stream_get_metadata("/newton/prep"), meta1)
|
||||
eq_(client.stream_get_metadata("/newton/raw"), meta1)
|
||||
eq_(client.stream_get_metadata("/newton/raw", [ "description" ] ), meta2)
|
||||
eq_(client.stream_get_metadata("/newton/raw", [ "description",
|
||||
"v_scale" ] ), meta1)
|
||||
eq_(client.stream_get_metadata("/newton/raw",
|
||||
[ "description" ] ), meta2)
|
||||
eq_(client.stream_get_metadata("/newton/raw",
|
||||
[ "description", "v_scale" ] ), meta1)
|
||||
|
||||
# missing key
|
||||
eq_(client.stream_get_metadata("/newton/raw", "descr"),
|
||||
@@ -146,9 +157,10 @@ class TestClient(object):
|
||||
client.stream_set_metadata("/newton/prep", [1,2,3])
|
||||
with assert_raises(ClientError):
|
||||
client.stream_update_metadata("/newton/prep", [1,2,3])
|
||||
client.close()
|
||||
|
||||
def test_client_4_insert(self):
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
def test_client_04_insert(self):
|
||||
client = nilmdb.Client(url = testurl)
|
||||
|
||||
datetime_tz.localtz_set("America/New_York")
|
||||
|
||||
@@ -177,12 +189,33 @@ class TestClient(object):
|
||||
result = client.stream_insert("/newton/prep", data)
|
||||
eq_(result, None)
|
||||
|
||||
# Try forcing a server request with empty data
|
||||
# It's OK to insert an empty interval
|
||||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||||
"start": 1, "end": 2 })
|
||||
eq_(list(client.stream_intervals("/newton/prep")), [[1, 2]])
|
||||
client.stream_remove("/newton/prep")
|
||||
eq_(list(client.stream_intervals("/newton/prep")), [])
|
||||
|
||||
# Timestamps can be negative too
|
||||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||||
"start": -2, "end": -1 })
|
||||
eq_(list(client.stream_intervals("/newton/prep")), [[-2, -1]])
|
||||
client.stream_remove("/newton/prep")
|
||||
eq_(list(client.stream_intervals("/newton/prep")), [])
|
||||
|
||||
# Intervals that end at zero shouldn't be any different
|
||||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||||
"start": -1, "end": 0 })
|
||||
eq_(list(client.stream_intervals("/newton/prep")), [[-1, 0]])
|
||||
client.stream_remove("/newton/prep")
|
||||
eq_(list(client.stream_intervals("/newton/prep")), [])
|
||||
|
||||
# Try forcing a server request with equal start and end
|
||||
with assert_raises(ClientError) as e:
|
||||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||||
"start": 0, "end": 0 })
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("no data provided", str(e.exception))
|
||||
in_("start must precede end", str(e.exception))
|
||||
|
||||
# Specify start/end (starts too late)
|
||||
data = timestamper.TimestamperRate(testfile, start, 120)
|
||||
@@ -208,7 +241,6 @@ class TestClient(object):
|
||||
data = timestamper.TimestamperRate(testfile, start, 120)
|
||||
result = client.stream_insert("/newton/prep", data,
|
||||
start, start + 119.999777)
|
||||
eq_(result, "ok")
|
||||
|
||||
# Verify the intervals. Should be just one, even if the data
|
||||
# was inserted in chunks, due to nilmdb interval concatenation.
|
||||
@@ -222,20 +254,33 @@ class TestClient(object):
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("verlap", str(e.exception))
|
||||
|
||||
def test_client_5_extractremove(self):
|
||||
# Misc tests for extract and remove. Most of them are in test_cmdline.
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
client.close()
|
||||
|
||||
for x in client.stream_extract("/newton/prep", 123, 123):
|
||||
def test_client_05_extractremove(self):
|
||||
# Misc tests for extract and remove. Most of them are in test_cmdline.
|
||||
client = nilmdb.Client(url = testurl)
|
||||
|
||||
for x in client.stream_extract("/newton/prep", 999123, 999124):
|
||||
raise AssertionError("shouldn't be any data for this request")
|
||||
|
||||
with assert_raises(ClientError) as e:
|
||||
client.stream_remove("/newton/prep", 123, 120)
|
||||
|
||||
def test_client_6_generators(self):
|
||||
# Test the exception we get if we nest requests
|
||||
with assert_raises(Exception) as e:
|
||||
for data in client.stream_extract("/newton/prep"):
|
||||
x = client.stream_intervals("/newton/prep")
|
||||
in_("nesting calls is not supported", str(e.exception))
|
||||
|
||||
# Test count
|
||||
eq_(client.stream_count("/newton/prep"), 14400)
|
||||
|
||||
client.close()
|
||||
|
||||
def test_client_06_generators(self):
|
||||
# A lot of the client functionality is already tested by test_cmdline,
|
||||
# but this gets a bit more coverage that cmdline misses.
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
client = nilmdb.Client(url = testurl)
|
||||
|
||||
# Trigger a client error in generator
|
||||
start = datetime_tz.datetime_tz.smartparse("20120323T2000")
|
||||
@@ -246,7 +291,7 @@ class TestClient(object):
|
||||
start.totimestamp(),
|
||||
end.totimestamp()).next()
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("end before start", str(e.exception))
|
||||
in_("start must precede end", str(e.exception))
|
||||
|
||||
# Trigger a curl error in generator
|
||||
with assert_raises(ServerError) as e:
|
||||
@@ -272,7 +317,7 @@ class TestClient(object):
|
||||
{ "path": "/newton/prep",
|
||||
"start": 0, "end": 0 }).next()
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("no data provided", str(e.exception))
|
||||
in_("start must precede end", str(e.exception))
|
||||
|
||||
# Check 404 for missing streams
|
||||
for function in [ client.stream_intervals, client.stream_extract ]:
|
||||
@@ -281,13 +326,15 @@ class TestClient(object):
|
||||
in_("404 Not Found", str(e.exception))
|
||||
in_("No such stream", str(e.exception))
|
||||
|
||||
def test_client_7_headers(self):
|
||||
client.close()
|
||||
|
||||
def test_client_07_headers(self):
|
||||
# Make sure that /stream/intervals and /stream/extract
|
||||
# properly return streaming, chunked, text/plain response.
|
||||
# Pokes around in client.http internals a bit to look at the
|
||||
# response headers.
|
||||
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
client = nilmdb.Client(url = testurl)
|
||||
http = client.http
|
||||
|
||||
# Use a warning rather than returning a test failure, so that we can
|
||||
@@ -307,7 +354,7 @@ class TestClient(object):
|
||||
x = http.get("stream/extract",
|
||||
{ "path": "/newton/prep",
|
||||
"start": "123",
|
||||
"end": "123" }, retjson=False)
|
||||
"end": "124" }, retjson=False)
|
||||
if "Transfer-Encoding: chunked" not in http._headers:
|
||||
warnings.warn("Non-chunked HTTP response for /stream/extract")
|
||||
if "Content-Type: text/plain;charset=utf-8" not in http._headers:
|
||||
@@ -320,9 +367,11 @@ class TestClient(object):
|
||||
"header in /stream/extract response:\n" +
|
||||
http._headers)
|
||||
|
||||
def test_client_8_unicode(self):
|
||||
client.close()
|
||||
|
||||
def test_client_08_unicode(self):
|
||||
# Basic Unicode tests
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
client = nilmdb.Client(url = testurl)
|
||||
|
||||
# Delete streams that exist
|
||||
for stream in client.stream_list():
|
||||
@@ -356,3 +405,174 @@ class TestClient(object):
|
||||
eq_(client.stream_get_metadata(raw[0]), meta1)
|
||||
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
|
||||
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
|
||||
|
||||
client.close()
|
||||
|
||||
def test_client_09_closing(self):
|
||||
# Make sure we actually close sockets correctly. New
|
||||
# connections will block for a while if they're not, since the
|
||||
# server will stop accepting new connections.
|
||||
for test in [1, 2]:
|
||||
start = time.time()
|
||||
for i in range(50):
|
||||
if time.time() - start > 15:
|
||||
raise AssertionError("Connections seem to be blocking... "
|
||||
"probably not closing properly.")
|
||||
if test == 1:
|
||||
# explicit close
|
||||
client = nilmdb.Client(url = testurl)
|
||||
with assert_raises(ClientError) as e:
|
||||
client.stream_remove("/newton/prep", 123, 120)
|
||||
client.close() # remove this to see the failure
|
||||
elif test == 2:
|
||||
# use the context manager
|
||||
with nilmdb.Client(url = testurl) as c:
|
||||
with assert_raises(ClientError) as e:
|
||||
c.stream_remove("/newton/prep", 123, 120)
|
||||
|
||||
def test_client_10_context(self):
|
||||
# Test using the client's stream insertion context manager to
|
||||
# insert data.
|
||||
client = nilmdb.Client(testurl)
|
||||
|
||||
client.stream_create("/context/test", "uint16_1")
|
||||
with client.stream_insert_context("/context/test") as ctx:
|
||||
# override _max_data to trigger frequent server updates
|
||||
ctx._max_data = 15
|
||||
|
||||
with assert_raises(ValueError):
|
||||
ctx.insert_line("100 1")
|
||||
|
||||
ctx.insert_line("100 1\n")
|
||||
ctx.insert_iter([ "101 1\n",
|
||||
"102 1\n",
|
||||
"103 1\n" ])
|
||||
ctx.insert_line("104 1\n")
|
||||
ctx.insert_line("105 1\n")
|
||||
ctx.finalize()
|
||||
|
||||
ctx.insert_line("106 1\n")
|
||||
ctx.update_end(106.5)
|
||||
ctx.finalize()
|
||||
ctx.update_start(106.8)
|
||||
ctx.insert_line("107 1\n")
|
||||
ctx.insert_line("108 1\n")
|
||||
ctx.insert_line("109 1\n")
|
||||
ctx.insert_line("110 1\n")
|
||||
ctx.insert_line("111 1\n")
|
||||
ctx.update_end(113)
|
||||
ctx.insert_line("112 1\n")
|
||||
ctx.update_end(114)
|
||||
ctx.insert_line("113 1\n")
|
||||
ctx.update_end(115)
|
||||
ctx.insert_line("114 1\n")
|
||||
ctx.finalize()
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_context("/context/test", 100, 200) as ctx:
|
||||
ctx.insert_line("115 1\n")
|
||||
|
||||
with assert_raises(ClientError):
|
||||
with client.stream_insert_context("/context/test", 200, 300) as ctx:
|
||||
ctx.insert_line("115 1\n")
|
||||
|
||||
with client.stream_insert_context("/context/test", 200, 300) as ctx:
|
||||
# make sure our override wasn't permanent
|
||||
ne_(ctx._max_data, 15)
|
||||
ctx.insert_line("225 1\n")
|
||||
ctx.finalize()
|
||||
|
||||
eq_(list(client.stream_intervals("/context/test")),
|
||||
[ [ 100, 105.000001 ],
|
||||
[ 106, 106.5 ],
|
||||
[ 106.8, 115 ],
|
||||
[ 200, 300 ] ])
|
||||
|
||||
client.stream_destroy("/context/test")
|
||||
client.close()
|
||||
|
||||
def test_client_11_emptyintervals(self):
|
||||
# Empty intervals are ok! If recording detection events
|
||||
# by inserting rows into the database, we want to be able to
|
||||
# have an interval where no events occurred. Test them here.
|
||||
client = nilmdb.Client(testurl)
|
||||
client.stream_create("/empty/test", "uint16_1")
|
||||
|
||||
def info():
|
||||
result = []
|
||||
for interval in list(client.stream_intervals("/empty/test")):
|
||||
result.append((client.stream_count("/empty/test", *interval),
|
||||
interval))
|
||||
return result
|
||||
|
||||
eq_(info(), [])
|
||||
|
||||
# Insert a region with just a few points
|
||||
with client.stream_insert_context("/empty/test") as ctx:
|
||||
ctx.update_start(100)
|
||||
ctx.insert_line("140 1\n")
|
||||
ctx.insert_line("150 1\n")
|
||||
ctx.insert_line("160 1\n")
|
||||
ctx.update_end(200)
|
||||
ctx.finalize()
|
||||
|
||||
eq_(info(), [(3, [100, 200])])
|
||||
|
||||
# Delete chunk, which will leave one data point and two intervals
|
||||
client.stream_remove("/empty/test", 145, 175)
|
||||
eq_(info(), [(1, [100, 145]),
|
||||
(0, [175, 200])])
|
||||
|
||||
# Try also creating a completely empty interval from scratch,
|
||||
# in a few different ways.
|
||||
client.stream_insert_block("/empty/test", "", 300, 350)
|
||||
client.stream_insert("/empty/test", [], 400, 450)
|
||||
with client.stream_insert_context("/empty/test", 500, 550):
|
||||
pass
|
||||
|
||||
# If enough timestamps aren't provided, empty streams won't be created.
|
||||
client.stream_insert("/empty/test", [])
|
||||
with client.stream_insert_context("/empty/test"):
|
||||
pass
|
||||
client.stream_insert("/empty/test", [], start = 600)
|
||||
with client.stream_insert_context("/empty/test", start = 700):
|
||||
pass
|
||||
client.stream_insert("/empty/test", [], end = 850)
|
||||
with client.stream_insert_context("/empty/test", end = 950):
|
||||
pass
|
||||
|
||||
# Try various things that might cause problems
|
||||
with client.stream_insert_context("/empty/test", 1000, 1050):
|
||||
ctx.finalize() # inserts [1000, 1050]
|
||||
ctx.finalize() # nothing
|
||||
ctx.finalize() # nothing
|
||||
ctx.insert_line("1100 1\n")
|
||||
ctx.finalize() # inserts [1100, 1100.000001]
|
||||
ctx.update_start(1199)
|
||||
ctx.insert_line("1200 1\n")
|
||||
ctx.update_end(1250)
|
||||
ctx.finalize() # inserts [1199, 1250]
|
||||
ctx.update_start(1299)
|
||||
ctx.finalize() # nothing
|
||||
ctx.update_end(1350)
|
||||
ctx.finalize() # nothing
|
||||
ctx.update_start(1400)
|
||||
ctx.update_end(1450)
|
||||
ctx.finalize()
|
||||
# implicit last finalize inserts [1400, 1450]
|
||||
|
||||
# Check everything
|
||||
eq_(info(), [(1, [100, 145]),
|
||||
(0, [175, 200]),
|
||||
(0, [300, 350]),
|
||||
(0, [400, 450]),
|
||||
(0, [500, 550]),
|
||||
(0, [1000, 1050]),
|
||||
(1, [1100, 1100.000001]),
|
||||
(1, [1199, 1250]),
|
||||
(0, [1400, 1450]),
|
||||
])
|
||||
|
||||
# Clean up
|
||||
client.stream_destroy("/empty/test")
|
||||
client.close()
|
||||
|
@@ -194,9 +194,11 @@ class TestCmdline(object):
|
||||
def test_02_info(self):
|
||||
self.ok("info")
|
||||
self.contain("Server URL: http://localhost:12380/")
|
||||
self.contain("Client version: " + nilmdb.__version__)
|
||||
self.contain("Server version: " + test_server.version)
|
||||
self.contain("Server database path")
|
||||
self.contain("Server database size")
|
||||
self.contain("Server database free space")
|
||||
|
||||
def test_03_createlist(self):
|
||||
# Basic stream tests, like those in test_client.
|
||||
@@ -272,7 +274,7 @@ class TestCmdline(object):
|
||||
|
||||
# reversed range
|
||||
self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01")
|
||||
self.contain("start is after end")
|
||||
self.contain("start must precede end")
|
||||
|
||||
def test_04_metadata(self):
|
||||
# Set / get metadata
|
||||
@@ -442,7 +444,7 @@ class TestCmdline(object):
|
||||
self.contain("no intervals")
|
||||
|
||||
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'"
|
||||
+ " --end='23 Mar 2012 10:05:15.50'")
|
||||
+ " --end='23 Mar 2012 10:05:15.51'")
|
||||
lines_(self.captured, 2)
|
||||
self.contain("10:05:15.500")
|
||||
|
||||
@@ -471,29 +473,29 @@ class TestCmdline(object):
|
||||
|
||||
# empty ranges return error 2
|
||||
self.fail("extract -a /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30' " +
|
||||
"--end '23 Mar 2012 10:00:30'",
|
||||
"--start '23 Mar 2012 20:00:30' " +
|
||||
"--end '23 Mar 2012 20:00:31'",
|
||||
exitcode = 2, require_error = False)
|
||||
self.contain("no data")
|
||||
self.fail("extract -a /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30.000001' " +
|
||||
"--end '23 Mar 2012 10:00:30.000001'",
|
||||
"--start '23 Mar 2012 20:00:30.000001' " +
|
||||
"--end '23 Mar 2012 20:00:30.000002'",
|
||||
exitcode = 2, require_error = False)
|
||||
self.contain("no data")
|
||||
self.fail("extract -a /newton/prep " +
|
||||
"--start '23 Mar 2022 10:00:30' " +
|
||||
"--end '23 Mar 2022 10:00:30'",
|
||||
"--end '23 Mar 2022 10:00:31'",
|
||||
exitcode = 2, require_error = False)
|
||||
self.contain("no data")
|
||||
|
||||
# but are ok if we're just counting results
|
||||
self.ok("extract --count /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30' " +
|
||||
"--end '23 Mar 2012 10:00:30'")
|
||||
"--start '23 Mar 2012 20:00:30' " +
|
||||
"--end '23 Mar 2012 20:00:31'")
|
||||
self.match("0\n")
|
||||
self.ok("extract -c /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30.000001' " +
|
||||
"--end '23 Mar 2012 10:00:30.000001'")
|
||||
"--start '23 Mar 2012 20:00:30.000001' " +
|
||||
"--end '23 Mar 2012 20:00:30.000002'")
|
||||
self.match("0\n")
|
||||
|
||||
# Check various dumps against stored copies of how they should appear
|
||||
@@ -540,31 +542,31 @@ class TestCmdline(object):
|
||||
self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01")
|
||||
self.contain("No stream at path")
|
||||
|
||||
# empty or backward ranges return errors
|
||||
self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01")
|
||||
self.contain("start is after end")
|
||||
self.contain("start must precede end")
|
||||
|
||||
# empty ranges return success, backwards ranges return error
|
||||
self.ok("remove /newton/prep " +
|
||||
self.fail("remove /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30' " +
|
||||
"--end '23 Mar 2012 10:00:30'")
|
||||
self.match("")
|
||||
self.ok("remove /newton/prep " +
|
||||
self.contain("start must precede end")
|
||||
self.fail("remove /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30.000001' " +
|
||||
"--end '23 Mar 2012 10:00:30.000001'")
|
||||
self.match("")
|
||||
self.ok("remove /newton/prep " +
|
||||
self.contain("start must precede end")
|
||||
self.fail("remove /newton/prep " +
|
||||
"--start '23 Mar 2022 10:00:30' " +
|
||||
"--end '23 Mar 2022 10:00:30'")
|
||||
self.match("")
|
||||
self.contain("start must precede end")
|
||||
|
||||
# Verbose
|
||||
self.ok("remove -c /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30' " +
|
||||
"--end '23 Mar 2012 10:00:30'")
|
||||
"--start '23 Mar 2022 20:00:30' " +
|
||||
"--end '23 Mar 2022 20:00:31'")
|
||||
self.match("0\n")
|
||||
self.ok("remove --count /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30' " +
|
||||
"--end '23 Mar 2012 10:00:30'")
|
||||
"--start '23 Mar 2022 20:00:30' " +
|
||||
"--end '23 Mar 2022 20:00:31'")
|
||||
self.match("0\n")
|
||||
|
||||
# Make sure we have the data we expect
|
||||
@@ -765,7 +767,7 @@ class TestCmdline(object):
|
||||
"tests/data/prep-20120323T1000")
|
||||
|
||||
# Should take up about 2.8 MB here (including directory entries)
|
||||
du_before = nilmdb.utils.diskusage.du_bytes(testdb)
|
||||
du_before = nilmdb.utils.diskusage.du(testdb)
|
||||
|
||||
# Make sure we have the data we expect
|
||||
self.ok("list --detail")
|
||||
@@ -815,7 +817,7 @@ class TestCmdline(object):
|
||||
|
||||
# We have 1/8 of the data that we had before, so the file size
|
||||
# should have dropped below 1/4 of what it used to be
|
||||
du_after = nilmdb.utils.diskusage.du_bytes(testdb)
|
||||
du_after = nilmdb.utils.diskusage.du(testdb)
|
||||
lt_(du_after, (du_before / 4))
|
||||
|
||||
# Remove anything that came from the 10:02 data file
|
||||
|
@@ -55,7 +55,7 @@ class TestInterval:
|
||||
for x in [ "03/24/2012", "03/25/2012", "03/26/2012" ] ]
|
||||
|
||||
# basic construction
|
||||
i = Interval(d1, d1)
|
||||
i = Interval(d1, d2)
|
||||
i = Interval(d1, d3)
|
||||
eq_(i.start, d1)
|
||||
eq_(i.end, d3)
|
||||
@@ -77,8 +77,8 @@ class TestInterval:
|
||||
assert(Interval(d1, d3) > Interval(d1, d2))
|
||||
assert(Interval(d1, d2) < Interval(d2, d3))
|
||||
assert(Interval(d1, d3) < Interval(d2, d3))
|
||||
assert(Interval(d2, d2) > Interval(d1, d3))
|
||||
assert(Interval(d3, d3) == Interval(d3, d3))
|
||||
assert(Interval(d2, d2+0.01) > Interval(d1, d3))
|
||||
assert(Interval(d3, d3+0.01) == Interval(d3, d3+0.01))
|
||||
#with assert_raises(TypeError): # was AttributeError, that's wrong
|
||||
# x = (i == 123)
|
||||
|
||||
@@ -293,7 +293,7 @@ class TestIntervalDB:
|
||||
# actual start, end can be a subset
|
||||
a = DBInterval(150, 200, 100, 200, 10000, 20000)
|
||||
b = DBInterval(100, 150, 100, 200, 10000, 20000)
|
||||
c = DBInterval(150, 150, 100, 200, 10000, 20000)
|
||||
c = DBInterval(150, 160, 100, 200, 10000, 20000)
|
||||
|
||||
# Make a set of DBIntervals
|
||||
iseta = IntervalSet([a, b])
|
||||
|
@@ -93,6 +93,13 @@ class Test00Nilmdb(object): # named 00 so it runs first
|
||||
eq_(db.stream_get_metadata("/newton/prep"), meta1)
|
||||
eq_(db.stream_get_metadata("/newton/raw"), meta1)
|
||||
|
||||
# fill in some test coverage for start >= end
|
||||
with assert_raises(nilmdb.server.NilmDBError):
|
||||
db.stream_remove("/newton/prep", 0, 0)
|
||||
with assert_raises(nilmdb.server.NilmDBError):
|
||||
db.stream_remove("/newton/prep", 1, 0)
|
||||
db.stream_remove("/newton/prep", 0, 1)
|
||||
|
||||
db.close()
|
||||
|
||||
class TestBlockingServer(object):
|
||||
|
Reference in New Issue
Block a user