Compare commits

...

72 Commits

Author SHA1 Message Date
204a6ecb15 Optimize bulkdata.append() by postponing flushes & mmap resize
Rather than flushing and resizing after each row is written to the
file, have the file object iterate by itself and do all of the
writes.  Only flush and resize the mmap after finishing.  This should
be pretty safe to do, especially since nothing is concurrent at the
moment.
2013-03-01 16:30:49 -05:00
5db3b186a4 Make test_mustclose more complete 2013-03-01 16:30:22 -05:00
fe640cf421 Remove must_close verification wrappers on bulkdata
At this point we know that the close() behavior is correct, so it's
not worth slowing everything down for these checks.
2013-03-01 16:11:44 -05:00
ca67c79fe4 Improve test_layout_speed 2013-03-01 16:04:10 -05:00
8917bcd4bf Fix test case failures due to increased client chunk size 2013-03-01 16:04:00 -05:00
a75ec98673 Slight speed improvements in layout.pyx 2013-03-01 16:03:38 -05:00
e476338d61 Remove outdated numpy dependency 2013-03-01 16:03:19 -05:00
d752b882f2 Bump up block sizes in client
This will help amortize the sqlite synchronization costs.
2013-02-28 21:11:57 -05:00
ade27773e6 Add --nosync option to nilmdb-server script 2013-02-28 20:45:08 -05:00
0c1a1d2388 Fix nilmdb-server script 2013-02-28 18:53:06 -05:00
e3f335dfe5 Move time parsing from cmdline into nilmdb.utils.time 2013-02-28 17:09:26 -05:00
7a191c0ebb Fix versioneer to update versions on install 2013-02-28 14:50:53 -05:00
55bf11e393 Fix error when pyximport is too old 2013-02-26 22:21:23 -05:00
e90dcd10f3 Update README and setup.py with python-requests dependency 2013-02-26 22:00:42 -05:00
7d44f4eaa0 Cleanup Makefile; make tests run through setup.py when outside emacs 2013-02-26 22:00:42 -05:00
f541432d44 Merge branch 'requests' 2013-02-26 21:59:15 -05:00
aa4e32f78a Merge branch 'curl-multi' 2013-02-26 21:59:03 -05:00
2bc1416c00 Merge branch 'fixups' 2013-02-26 21:58:55 -05:00
68bbbf757d Remove nilmdb.utils.urllib
python-requests seems to handle UTF-8 just fine.
2013-02-26 19:46:22 -05:00
3df96fdfdd Reorder code 2013-02-26 19:41:55 -05:00
740ab76eaf Re-add persistent connection test for Requests based httpclient 2013-02-26 19:41:27 -05:00
ce13a47fea Save full response object for tests 2013-02-26 17:45:41 -05:00
50a4a60786 Replace pyCurl with Requests
Only tested with v1.1.0.  It's not clear how well older versions will
work.
2013-02-26 17:45:40 -05:00
14afa02db6 Temporarily remove curl-specific keepalive tests 2013-02-26 17:45:40 -05:00
cc990d6ce4 Test persistent connections 2013-02-26 13:41:40 -05:00
0f5162e0c0 Always use the curl multi interface
.. even for non-generator requests
2013-02-26 13:39:33 -05:00
b26cd52f8c Work around curl multi bug 2013-02-26 13:38:42 -05:00
236d925a1d Make sure we use POST when requested, even if the body is empty 2013-02-25 21:05:01 -05:00
a4a4bc61ba Switch to using pycurl.Multi instead of Iteratorizer 2013-02-25 21:05:01 -05:00
3d82888580 Enforce method types, and require POST for actions that change things.
This is a pretty big change that will render existing clients unable
to modify the database, but it's important that we use POST or PUT
instead of GET for anything that may change state, in case this
is ever put behind a cache.
2013-02-25 21:05:01 -05:00
749b878904 Add an explicit lock to httpclient's public methods
This is to prevent possible reentrancy problems.
2013-02-25 18:06:00 -05:00
f396e3934c Remove cherrypy version check
Dependencies should be handled by installation, not at runtime.
2013-02-25 16:50:19 -05:00
dd7594b5fa Fix issue where PUT responses were being dropped
PUTs generate a "HTTP/1.1 100 Continue" response before the
"HTTP/1.1 200 OK" response, and so we were mistakenly picking up
the 100 status code and not returning any data.  Improve the
header callback to correctly process any number of status codes.
2013-02-23 17:51:59 -05:00
4ac1beee6d layout: allow zero and negative timestamps in parser 2013-02-23 16:58:49 -05:00
8c0ce736d8 Disable use of signals in Curl
Various places suggest that this is needed for better thread-safety,
and the only drawback is that some systems cannot timeout properly on
DNS lookups.
2013-02-23 16:15:28 -05:00
8858c9426f Fix error message text in nilmdb.server.Server 2013-02-23 16:13:47 -05:00
9123ccb583 Merge branch 'decorator-work' 2013-02-23 14:38:36 -05:00
5dce851bef Merge branch 'client-insert-context' 2013-02-23 14:37:59 -05:00
5b0441de6b Give serializer and iteratorizer threads names 2013-02-23 14:28:37 -05:00
317c53ab6f Improve serializer_proxy and verify_thread_proxy
These functions can now take an object or a type (class).

If given an object, they will wrap subsequent calls to that object.
If given a type, they will return an object that can be instantiated
to create a new object, and all calls including __init__ will be
covered by the serialization or thread verification.
2013-02-23 14:28:37 -05:00
7db4411462 Cleanup nilmdb.utils.must_close a bit 2013-02-23 11:28:03 -05:00
422317850e Replace threadsafety class decorator version, add explicit proxy version
Like the serializer changes, the class decorator was too fragile.
2013-02-23 11:25:40 -05:00
965537d8cb Implement verify_thread_safety to check for unsafe access patterns
Occasional segfaults may be the result of performing thread-unsafe
operations.  This class decorator verifies that all of its methods
are called in a thread-safe manner.

It can separately warn about:
- two threads calling methods in a function (the kind of thing sqlite
  doesn't like)
- recursion
- concurrency (two different threads functions at the same time)
2013-02-23 11:25:02 -05:00
0dcdec5949 Turn on sqlite thread safety checks -- serializer should fully protect it 2013-02-23 11:25:01 -05:00
7fce305a1d Make server check that the db object has been wrapped in a serializer
It's only the server that calls it in multiple threads.
2013-02-23 11:25:01 -05:00
dfbbe23512 Switch to explicitly wrapping nilmdb objects in a serializer_proxy
This is quite a bit simpler than the class decorator method, so it
may be more reliable.
2013-02-23 11:23:54 -05:00
7761a91242 Remove class decorator version of the serializer; it's too fragile 2013-02-23 11:23:54 -05:00
9b06e46bf1 Add back a proxy version of the Serializer, which is much simpler. 2013-02-23 11:23:54 -05:00
171e6f1871 Replace "serializer" function with a "serialized" decorator
This decorator makes a class always be serialized, including its
instantiation, in a separate thread.  This is an improvement over
the old Serializer() object wrapper, which didn't put the
instantiation into the new thread.
2013-02-23 11:23:54 -05:00
1431e41d16 Allow inserting empty intervals in the database, and add tests for it.
Previously, we could get empty intervals anyway by having a non-empty
interval and removing a smaller interval around each piece of data.
Turns out that empty intervals are OK and needed in some situations,
so explicitly allow and test for it.
2013-02-21 14:07:35 -05:00
a49c655816 Strictly enforce (start < end) for all intervals.
Previously, we allowed start == end, but this doesn't make sense with
half-open intervals.
2013-02-21 14:06:40 -05:00
30e3ffc0e9 Fix check for interval ends to be None, so that zero doesn't confuse it 2013-02-21 12:42:33 -05:00
db7211c3a9 Have server verify that start <= end before creating intervals
Also rename _fill_in_limits to _check_user_times
2013-02-21 12:38:51 -05:00
c6d57cf5c3 Fix errors with calculating limits when start==end==None
This also has the effect of now handling negative timestamps
correctly.
2013-02-19 19:27:06 -05:00
ca5253ddee Fix and test stream_count 2013-02-19 18:26:44 -05:00
e19da84b2e server: always return None instead of sometimes returning "ok"
Previously some functions returned the string "ok".
2013-02-19 18:26:44 -05:00
3e8e3542fd Test for detecting nested HTTP requests 2013-02-19 18:26:44 -05:00
2f7365412d client: detect and give a more clear error when HTTP requests are nested 2013-02-19 17:20:07 -05:00
bba9ad131e Add test for client.stream_insert_context 2013-02-19 17:19:45 -05:00
ee24380d1f Replace duplicated URL in tests with a variable 2013-02-19 15:27:51 -05:00
bfcd91acf8 client tests: renumber 2013-02-19 15:25:34 -05:00
d97291d4d3 client: Use .stream_insert_block from within .stream_insert_context
Avoids duplicating code.
2013-02-19 15:25:01 -05:00
a61fbbcf45 Big rework of client stream_insert_context
Now supports these operations:
  ctx.insert_line()
  ctx.insert_iter()
  ctx.finalize() (end the current contiguous interval, so a new one
                  can be started with a gap)
  ctx.update_end() (update ending timestamp before finalizing interval)
  ctx.update_start() (update starting timestamp for new interval)
2013-02-18 18:06:03 -05:00
5adc8fd0a7 Remove nilmdb.utils.misc.pairwise, as it's no longer used. 2013-02-18 18:06:03 -05:00
251a486c28 client.py: Significant speedup in stream_insert_context
block_data += "string" is fast with local variables, but slow with
variables inside some namespace.  Instead, build a list of strings and
join them once at the end.  This fixes the slowdown that resulted from
the stream_insert_context cleanup.
2013-02-18 18:06:03 -05:00
1edb96a0bd Add client.stream_insert_context, convert everything to use it. Slow.
Not sure why this is so painfully slow.  Need more testing;
might have to scratch the idea.
2013-02-18 18:06:03 -05:00
52e674a192 Fix warning in mustclose decorator 2013-02-18 18:05:45 -05:00
e241c13bf1 Remove must_close decorator from client
It still should be closed, but warning each time was mostly for
debugging and it's kind of annoying when writing one-off programs
where it's OK to just let things get torn down as they're completed.
Not closing is not fatal in terms of data integrity etc.
2013-02-18 18:02:05 -05:00
b53ff31212 client: Add must_close() decorator to nilmdb.Client, and fix tests
Test suite wasn't closing connections correctly.
2013-02-16 18:55:23 -05:00
2045e89f24 client: Add context manager functionality, test closing 2013-02-16 18:55:20 -05:00
841b2dab5c server: Replace /dbpath and /dbsize with a more generic /dbinfo
Update tests accordingly.  This isn't backwards compatible, but
existing clients don't rely on it.
2013-02-14 16:57:33 -05:00
d634f7d3cf bulkdata: Use file writes instead of writing to the mmap.
Extending and then writing to the mmap file has a problem: if the disk
fills up, the mapping becomes invalid, and the Python interpreter will
get a SIGBUS, killing it.  It's difficult to catch this gracefully;
there's no way to do that with existing modules.  Instead, switch to
only using mmap when reading, and normal file writes when writing.
Since we only ever append, it should have similar performance.
2013-02-13 20:30:39 -05:00
39 changed files with 1331 additions and 696 deletions

View File

@@ -21,7 +21,13 @@ lint:
pylint --rcfile=.pylintrc nilmdb
test:
ifeq ($(INSIDE_EMACS), t)
# Use the slightly more flexible script
python tests/runtests.py
else
# Let setup.py check dependencies, build stuff, and run the test
python setup.py nosetests
endif
clean::
find . -name '*pyc' | xargs rm -f
@@ -33,4 +39,4 @@ clean::
gitclean::
git clean -dXf
.PHONY: all build dist sdist install docs lint test clean
.PHONY: all version build dist sdist install docs lint test clean

View File

@@ -7,11 +7,15 @@ Prerequisites:
sudo apt-get install python2.7 python2.7-dev python-setuptools cython
# Base NilmDB dependencies
sudo apt-get install python-cherrypy3 python-decorator python-simplejson python-pycurl python-dateutil python-tz
sudo apt-get install python-cherrypy3 python-decorator python-simplejson
sudo apt-get install python-requests python-dateutil python-tz python-psutil
# Tools for running tests
sudo apt-get install python-nose python-coverage
Test:
python setup.py nosetests
Install:
python setup.py install

View File

@@ -8,22 +8,35 @@ import nilmdb.client.httpclient
import time
import simplejson as json
import contextlib
def float_to_string(f):
"""Use repr to maintain full precision in the string output."""
return repr(float(f))
def extract_timestamp(line):
"""Extract just the timestamp from a line of data text"""
return float(line.split()[0])
class Client(object):
"""Main client interface to the Nilm database."""
def __init__(self, url):
self.http = nilmdb.client.httpclient.HTTPClient(url)
# __enter__/__exit__ allow this class to be a context manager
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def _json_param(self, data):
"""Return compact json-encoded version of parameter"""
return json.dumps(data, separators=(',',':'))
def close(self):
"""Close the connection; safe to call multiple times"""
self.http.close()
def geturl(self):
@@ -34,13 +47,10 @@ class Client(object):
"""Return server version"""
return self.http.get("version")
def dbpath(self):
"""Return server database path"""
return self.http.get("dbpath")
def dbsize(self):
"""Return server database size as human readable string"""
return self.http.get("dbsize")
def dbinfo(self):
"""Return server database info (path, size, free space)
as a dictionary."""
return self.http.get("dbinfo")
def stream_list(self, path = None, layout = None):
params = {}
@@ -63,7 +73,7 @@ class Client(object):
"path": path,
"data": self._json_param(data)
}
return self.http.get("stream/set_metadata", params)
return self.http.post("stream/set_metadata", params)
def stream_update_metadata(self, path, data):
"""Update stream metadata from a dictionary"""
@@ -71,18 +81,18 @@ class Client(object):
"path": path,
"data": self._json_param(data)
}
return self.http.get("stream/update_metadata", params)
return self.http.post("stream/update_metadata", params)
def stream_create(self, path, layout):
"""Create a new stream"""
params = { "path": path,
"layout" : layout }
return self.http.get("stream/create", params)
return self.http.post("stream/create", params)
def stream_destroy(self, path):
"""Delete stream and its contents"""
params = { "path": path }
return self.http.get("stream/destroy", params)
return self.http.post("stream/destroy", params)
def stream_remove(self, path, start = None, end = None):
"""Remove data from the specified time range"""
@@ -93,81 +103,47 @@ class Client(object):
params["start"] = float_to_string(start)
if end is not None:
params["end"] = float_to_string(end)
return self.http.get("stream/remove", params)
return self.http.post("stream/remove", params)
@contextlib.contextmanager
def stream_insert_context(self, path, start = None, end = None):
"""Return a context manager that allows data to be efficiently
inserted into a stream in a piecewise manner. Data is be provided
as single lines, and is aggregated and sent to the server in larger
chunks as necessary. Data lines must match the database layout for
the given path, and end with a newline.
Example:
with client.stream_insert_context('/path', start, end) as ctx:
ctx.insert_line('1234567890.0 1 2 3 4\\n')
ctx.insert_line('1234567891.0 1 2 3 4\\n')
For more details, see help for nilmdb.client.client.StreamInserter
This may make multiple requests to the server, if the data is
large enough or enough time has passed between insertions.
"""
ctx = StreamInserter(self, path, start, end)
yield ctx
ctx.finalize()
def stream_insert(self, path, data, start = None, end = None):
"""Insert data into a stream. data should be a file-like object
that provides ASCII data that matches the database layout for path.
"""Insert rows of data into a stream. data should be an
iterable object that provides ASCII data that matches the
database layout for path. See stream_insert_context for
details on the 'start' and 'end' parameters."""
with self.stream_insert_context(path, start, end) as ctx:
ctx.insert_iter(data)
return ctx.last_response
start and end are the starting and ending timestamp of this
stream; all timestamps t in the data must satisfy 'start <= t
< end'. If left unspecified, 'start' is the timestamp of the
first line of data, and 'end' is the timestamp on the last line
of data, plus a small delta of 1μs.
"""
params = { "path": path }
# See design.md for a discussion of how much data to send.
# These are soft limits -- actual data might be rounded up.
max_data = 1048576
max_time = 30
end_epsilon = 1e-6
def extract_timestamp(line):
return float(line.split()[0])
def sendit():
# If we have more data after this, use the timestamp of
# the next line as the end. Otherwise, use the given
# overall end time, or add end_epsilon to the last data
# point.
if nextline:
block_end = extract_timestamp(nextline)
if end and block_end > end:
# This is unexpected, but we'll defer to the server
# to return an error in this case.
block_end = end
elif end:
block_end = end
else:
block_end = extract_timestamp(line) + end_epsilon
# Send it
params["start"] = float_to_string(block_start)
params["end"] = float_to_string(block_end)
return self.http.put("stream/insert", block_data, params)
clock_start = time.time()
block_data = ""
block_start = start
result = None
line = None
nextline = None
for (line, nextline) in nilmdb.utils.misc.pairwise(data):
# If we don't have a starting time, extract it from the first line
if block_start is None:
block_start = extract_timestamp(line)
clock_elapsed = time.time() - clock_start
block_data += line
# If we have enough data, or enough time has elapsed,
# send this block to the server, and empty things out
# for the next block.
if (len(block_data) > max_data) or (clock_elapsed > max_time):
result = sendit()
block_start = None
block_data = ""
clock_start = time.time()
# One last block?
if len(block_data):
result = sendit()
# Return the most recent JSON result we got back, or None if
# we didn't make any requests.
return result
def stream_insert_block(self, path, block, start, end):
"""Insert an entire block of data into a stream. Like
stream_insert, except 'block' contains multiple lines of ASCII
text and is sent in one single chunk."""
params = { "path": path,
"start": float_to_string(start),
"end": float_to_string(end) }
return self.http.put("stream/insert", block, params)
def stream_intervals(self, path, start = None, end = None):
"""
@@ -180,7 +156,7 @@ class Client(object):
params["start"] = float_to_string(start)
if end is not None:
params["end"] = float_to_string(end)
return self.http.get_gen("stream/intervals", params, retjson = True)
return self.http.get_gen("stream/intervals", params)
def stream_extract(self, path, start = None, end = None, count = False):
"""
@@ -188,8 +164,8 @@ class Client(object):
lines of ASCII-formatted data that matches the database
layout for the given path.
Specify count=True to just get a count of values rather than
the actual data.
Specify count = True to return a count of matching data points
rather than the actual data. The output format is unchanged.
"""
params = {
"path": path,
@@ -200,5 +176,203 @@ class Client(object):
params["end"] = float_to_string(end)
if count:
params["count"] = 1
return self.http.get_gen("stream/extract", params)
return self.http.get_gen("stream/extract", params, retjson = False)
def stream_count(self, path, start = None, end = None):
"""
Return the number of rows of data in the stream that satisfy
the given timestamps.
"""
counts = list(self.stream_extract(path, start, end, count = True))
return int(counts[0])
class StreamInserter(object):
"""Object returned by stream_insert_context() that manages
the insertion of rows of data into a particular path.
The basic data flow is that we are filling a contiguous interval
on the server, with no gaps, that extends from timestamp 'start'
to timestamp 'end'. Data timestamps satisfy 'start <= t < end'.
Data is provided by the user one line at a time with
.insert_line() or .insert_iter().
1. The first inserted line begins a new interval that starts at
'start'. If 'start' is not given, it is deduced from the first
line's timestamp.
2. Subsequent lines go into the same contiguous interval. As lines
are inserted, this routine may make multiple insertion requests to
the server, but will structure the timestamps to leave no gaps.
3. The current contiguous interval can be completed by manually
calling .finalize(), which the context manager will also do
automatically. This will send any remaining data to the server,
using the 'end' timestamp to end the interval.
After a .finalize(), inserting new data goes back to step 1.
.update_start() can be called before step 1 to change the start
time for the interval. .update_end() can be called before step 3
to change the end time for the interval.
"""
# See design.md for a discussion of how much data to send.
# These are soft limits -- actual data might be rounded up.
# We send when we have a certain amount of data queued, or
# when a certain amount of time has passed since the last send.
_max_data = 2 * 1024 * 1024
_max_time = 30
# Delta to add to the final timestamp, if "end" wasn't given
_end_epsilon = 1e-6
def __init__(self, client, path, start = None, end = None):
"""'http' is the httpclient object. 'path' is the database
path to insert to. 'start' and 'end' are used for the first
contiguous interval."""
self.last_response = None
self._client = client
self._path = path
# Start and end for the overall contiguous interval we're
# filling
self._interval_start = start
self._interval_end = end
# Data for the specific block we're building up to send
self._block_data = []
self._block_len = 0
self._block_start = None
# Time of last request
self._last_time = time.time()
# We keep a buffer of the two most recently inserted lines.
# Only the older one actually gets processed; the newer one
# is used to "look-ahead" to the next timestamp if we need
# to internally split an insertion into two requests.
self._line_old = None
self._line_new = None
def insert_iter(self, iter):
"""Insert all lines of ASCII formatted data from the given
iterable. Lines must be terminated with '\\n'."""
for line in iter:
self.insert_line(line)
def insert_line(self, line, allow_intermediate = True):
"""Insert a single line of ASCII formatted data. Line
must be terminated with '\\n'."""
if line and (len(line) < 1 or line[-1] != '\n'):
raise ValueError("lines must end in with a newline character")
# Store this new line, but process the previous (old) one.
# This lets us "look ahead" to the next line.
self._line_old = self._line_new
self._line_new = line
if self._line_old is None:
return
# If starting a new block, pull out the timestamp if needed.
if self._block_start is None:
if self._interval_start is not None:
# User provided a start timestamp. Use it once, then
# clear it for the next block.
self._block_start = self._interval_start
self._interval_start = None
else:
# Extract timestamp from the first row
self._block_start = extract_timestamp(self._line_old)
# Save the line
self._block_data.append(self._line_old)
self._block_len += len(self._line_old)
if allow_intermediate:
# Send an intermediate block to the server if needed.
elapsed = time.time() - self._last_time
if (self._block_len > self._max_data) or (elapsed > self._max_time):
self._send_block_intermediate()
def update_start(self, start):
"""Update the start time for the next contiguous interval.
Call this before starting to insert data for a new interval,
for example, after .finalize()"""
self._interval_start = start
def update_end(self, end):
"""Update the end time for the current contiguous interval.
Call this before .finalize()"""
self._interval_end = end
def finalize(self):
"""Stop filling the current contiguous interval.
All outstanding data will be sent, and the interval end
time of the interval will be taken from the 'end' argument
used when initializing this class, or the most recent
value passed to update_end(), or the last timestamp plus
a small epsilon value if no other endpoint was provided.
If more data is inserted after a finalize(), it will become
part of a new interval and there may be a gap left in-between."""
# Special marker tells insert_line that this is the end
self.insert_line(None, allow_intermediate = False)
if self._block_len > 0:
# We have data pending, so send the final block
self._send_block_final()
elif None not in (self._interval_start, self._interval_end):
# We have no data, but enough information to create an
# empty interval.
self._block_start = self._interval_start
self._interval_start = None
self._send_block_final()
else:
# No data, and no timestamps to use to create an empty
# interval.
pass
# Make sure both timestamps are emptied for future intervals.
self._interval_start = None
self._interval_end = None
def _send_block_intermediate(self):
"""Send data, when we still have more data to send.
Use the timestamp from the next line, so that the blocks
are contiguous."""
block_end = extract_timestamp(self._line_new)
if self._interval_end is not None and block_end > self._interval_end:
# Something's fishy -- the timestamp we found is after
# the user's specified end. Limit it here, and the
# server will return an error.
block_end = self._interval_end
self._send_block(block_end)
def _send_block_final(self):
"""Send data, when this is the last block for the interval.
There is no next line, so figure out the actual interval end
using interval_end or end_epsilon."""
if self._interval_end is not None:
# Use the user's specified end timestamp
block_end = self._interval_end
# Clear it in case we send more intervals in the future.
self._interval_end = None
else:
# Add an epsilon to the last timestamp we saw
block_end = extract_timestamp(self._line_old) + self._end_epsilon
self._send_block(block_end)
def _send_block(self, block_end):
"""Send current block to the server"""
self.last_response = self._client.stream_insert_block(
self._path, "".join(self._block_data),
self._block_start, block_end)
# Clear out the block
self._block_data = []
self._block_len = 0
self._block_start = None
# Note when we sent it
self._last_time = time.time()

View File

@@ -6,8 +6,7 @@ from nilmdb.client.errors import ClientError, ServerError, Error
import simplejson as json
import urlparse
import pycurl
import cStringIO
import requests
class HTTPClient(object):
"""Class to manage and perform HTTP requests from the client"""
@@ -19,28 +18,19 @@ class HTTPClient(object):
if '://' not in reparsed:
reparsed = urlparse.urlparse("http://" + baseurl).geturl()
self.baseurl = reparsed
self.curl = pycurl.Curl()
self.curl.setopt(pycurl.SSL_VERIFYHOST, 2)
self.curl.setopt(pycurl.FOLLOWLOCATION, 1)
self.curl.setopt(pycurl.MAXREDIRS, 5)
self._setup_url()
def _setup_url(self, url = "", params = ""):
url = urlparse.urljoin(self.baseurl, url)
if params:
url = urlparse.urljoin(
url, "?" + nilmdb.utils.urllib.urlencode(params))
self.curl.setopt(pycurl.URL, url)
self.url = url
# Build Requests session object, enable SSL verification
self.session = requests.Session()
self.session.verify = True
def _check_error(self, body = None):
code = self.curl.getinfo(pycurl.RESPONSE_CODE)
if code == 200:
return
# Saved response, so that tests can verify a few things.
self._last_response = {}
def _handle_error(self, url, code, body):
# Default variables for exception. We use the entire body as
# the default message, in case we can't extract it from a JSON
# response.
args = { "url" : self.url,
args = { "url" : url,
"status" : str(code),
"message" : body,
"traceback" : None }
@@ -64,133 +54,67 @@ class HTTPClient(object):
else:
raise Error(**args)
def _req_generator(self, url, params):
"""
Like self._req(), but runs the perform in a separate thread.
It returns a generator that spits out arbitrary-sized chunks
of the resulting data, instead of using the WRITEFUNCTION
callback.
"""
self._setup_url(url, params)
self._status = None
error_body = ""
self._headers = ""
def header_callback(data):
if self._status is None:
self._status = int(data.split(" ")[1])
self._headers += data
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
def func(callback):
self.curl.setopt(pycurl.WRITEFUNCTION, callback)
self.curl.perform()
try:
with nilmdb.utils.Iteratorizer(func, curl_hack = True) as it:
for i in it:
if self._status == 200:
# If we had a 200 response, yield the data to caller.
yield i
else:
# Otherwise, collect it into an error string.
error_body += i
except pycurl.error as e:
raise ServerError(status = "502 Error",
url = self.url,
message = e[1])
# Raise an exception if there was an error
self._check_error(error_body)
def _req(self, url, params):
"""
GET or POST that returns raw data. Returns the body
data as a string, or raises an error if it contained an error.
"""
self._setup_url(url, params)
body = cStringIO.StringIO()
self.curl.setopt(pycurl.WRITEFUNCTION, body.write)
self._headers = ""
def header_callback(data):
self._headers += data
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
try:
self.curl.perform()
except pycurl.error as e:
raise ServerError(status = "502 Error",
url = self.url,
message = e[1])
body_str = body.getvalue()
# Raise an exception if there was an error
self._check_error(body_str)
return body_str
def close(self):
self.curl.close()
self.session.close()
def _iterate_lines(self, it):
def _do_req(self, method, url, query_data, body_data, stream):
url = urlparse.urljoin(self.baseurl, url)
try:
response = self.session.request(method, url,
params = query_data,
data = body_data)
except requests.RequestException as e:
raise ServerError(status = "502 Error", url = url,
message = str(e.message))
if response.status_code != 200:
self._handle_error(url, response.status_code, response.content)
self._last_response = response
if response.headers["content-type"] in ("application/json",
"application/x-json-stream"):
return (response, True)
else:
return (response, False)
# Normal versions that return data directly
def _req(self, method, url, query = None, body = None):
"""
Given an iterator that returns arbitrarily-sized chunks
of data, return '\n'-delimited lines of text
Make a request and return the body data as a string or parsed
JSON object, or raise an error if it contained an error.
"""
partial = ""
for chunk in it:
partial += chunk
lines = partial.split("\n")
for line in lines[0:-1]:
yield line
partial = lines[-1]
if partial != "":
yield partial
(response, isjson) = self._do_req(method, url, query, body, False)
if isjson:
return json.loads(response.content)
return response.content
# Non-generator versions
def _doreq(self, url, params, retjson):
def get(self, url, params = None):
"""Simple GET (parameters in URL)"""
return self._req("GET", url, params, None)
def post(self, url, params = None):
"""Simple POST (parameters in body)"""
return self._req("POST", url, None, params)
def put(self, url, data, params = None):
"""Simple PUT (parameters in URL, data in body)"""
return self._req("PUT", url, params, data)
# Generator versions that return data one line at a time.
def _req_gen(self, method, url, query = None, body = None):
"""
Perform a request, and return the body.
url: URL to request (relative to baseurl)
params: dictionary of query parameters
retjson: expect JSON and return python objects instead of string
Make a request and return a generator that gives back strings
or JSON decoded lines of the body data, or raise an error if
it contained an eror.
"""
out = self._req(url, params)
if retjson:
return json.loads(out)
return out
def get(self, url, params = None, retjson = True):
"""Simple GET"""
self.curl.setopt(pycurl.UPLOAD, 0)
return self._doreq(url, params, retjson)
def put(self, url, postdata, params = None, retjson = True):
"""Simple PUT"""
self.curl.setopt(pycurl.UPLOAD, 1)
self._setup_url(url, params)
data = cStringIO.StringIO(postdata)
self.curl.setopt(pycurl.READFUNCTION, data.read)
return self._doreq(url, params, retjson)
# Generator versions
def _doreq_gen(self, url, params, retjson):
"""
Perform a request, and return lines of the body in a generator.
url: URL to request (relative to baseurl)
params: dictionary of query parameters
retjson: expect JSON and yield python objects instead of strings
"""
for line in self._iterate_lines(self._req_generator(url, params)):
if retjson:
(response, isjson) = self._do_req(method, url, query, body, True)
for line in response.iter_lines():
if isjson:
yield json.loads(line)
else:
yield line
def get_gen(self, url, params = None, retjson = True):
"""Simple GET, returning a generator"""
self.curl.setopt(pycurl.UPLOAD, 0)
return self._doreq_gen(url, params, retjson)
def get_gen(self, url, params = None):
"""Simple GET (parameters in URL) returning a generator"""
return self._req_gen("GET", url, params)
def put_gen(self, url, postdata, params = None, retjson = True):
"""Simple PUT, returning a generator"""
self.curl.setopt(pycurl.UPLOAD, 1)
self._setup_url(url, params)
data = cStringIO.StringIO(postdata)
self.curl.setopt(pycurl.READFUNCTION, data.read)
return self._doreq_gen(url, params, retjson)
# Not much use for a POST or PUT generator, since they don't
# return much data.

View File

@@ -3,9 +3,9 @@
import nilmdb
from nilmdb.utils.printf import *
from nilmdb.utils import datetime_tz
import nilmdb.utils.time
import sys
import re
import argparse
from argparse import ArgumentDefaultsHelpFormatter as def_form
@@ -33,63 +33,11 @@ class Cmdline(object):
def arg_time(self, toparse):
"""Parse a time string argument"""
try:
return self.parse_time(toparse).totimestamp()
return nilmdb.utils.time.parse_time(toparse).totimestamp()
except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse))
def parse_time(self, toparse):
"""
Parse a free-form time string and return a datetime_tz object.
If the string doesn't contain a timestamp, the current local
timezone is assumed (e.g. from the TZ env var).
"""
# If string isn't "now" and doesn't contain at least 4 digits,
# consider it invalid. smartparse might otherwise accept
# empty strings and strings with just separators.
if toparse != "now" and len(re.findall(r"\d", toparse)) < 4:
raise ValueError("not enough digits for a timestamp")
# Try to just parse the time as given
try:
return datetime_tz.datetime_tz.smartparse(toparse)
except ValueError:
pass
# Try to extract a substring in a condensed format that we expect
# to see in a filename or header comment
res = re.search(r"(^|[^\d])(" # non-numeric or SOL
r"(199\d|2\d\d\d)" # year
r"[-/]?" # separator
r"(0[1-9]|1[012])" # month
r"[-/]?" # separator
r"([012]\d|3[01])" # day
r"[-T ]?" # separator
r"([01]\d|2[0-3])" # hour
r"[:]?" # separator
r"([0-5]\d)" # minute
r"[:]?" # separator
r"([0-5]\d)?" # second
r"([-+]\d\d\d\d)?" # timezone
r")", toparse)
if res is not None:
try:
return datetime_tz.datetime_tz.smartparse(res.group(2))
except ValueError:
pass
# Could also try to successively parse substrings, but let's
# just give up for now.
raise ValueError("unable to parse timestamp")
def time_string(self, timestamp):
"""
Convert a Unix timestamp to a string for printing, using the
local timezone for display (e.g. from the TZ env var).
"""
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp)
return dt.strftime("%a, %d %b %Y %H:%M:%S.%f %z")
def parser_setup(self):
self.parser = JimArgumentParser(add_help = False,
formatter_class = def_form)

View File

@@ -45,7 +45,7 @@ def cmd_extract(self):
if self.args.timestamp_raw:
time_string = repr
else:
time_string = self.time_string
time_string = nilmdb.utils.time.format_time
if self.args.annotate:
printf("# path: %s\n", self.args.path)

View File

@@ -1,5 +1,6 @@
import nilmdb
from nilmdb.utils.printf import *
from nilmdb.utils import human_size
from argparse import ArgumentDefaultsHelpFormatter as def_form
@@ -17,5 +18,7 @@ def cmd_info(self):
printf("Client version: %s\n", nilmdb.__version__)
printf("Server version: %s\n", self.client.version())
printf("Server URL: %s\n", self.client.geturl())
printf("Server database path: %s\n", self.client.dbpath())
printf("Server database size: %s\n", self.client.dbsize())
dbinfo = self.client.dbinfo()
printf("Server database path: %s\n", dbinfo["path"])
printf("Server database size: %s\n", human_size(dbinfo["size"]))
printf("Server database free space: %s\n", human_size(dbinfo["free"]))

View File

@@ -2,6 +2,7 @@ from nilmdb.utils.printf import *
import nilmdb
import nilmdb.client
import nilmdb.utils.timestamper as timestamper
import nilmdb.utils.time
import sys
@@ -73,7 +74,7 @@ def cmd_insert(self):
start = self.args.start
else:
try:
start = self.parse_time(filename)
start = nilmdb.utils.time.parse_time(filename)
except ValueError:
self.die("error extracting time from filename '%s'",
filename)

View File

@@ -1,4 +1,5 @@
from nilmdb.utils.printf import *
import nilmdb.utils.time
import fnmatch
import argparse
@@ -47,8 +48,8 @@ def cmd_list_verify(self):
self.args.path = self.args.path_positional
if self.args.start is not None and self.args.end is not None:
if self.args.start > self.args.end:
self.parser.error("start is after end")
if self.args.start >= self.args.end:
self.parser.error("start must precede end")
def cmd_list(self):
"""List available streams"""
@@ -57,7 +58,7 @@ def cmd_list(self):
if self.args.timestamp_raw:
time_string = repr
else:
time_string = self.time_string
time_string = nilmdb.utils.time.format_time
for (path, layout) in streams:
if not (fnmatch.fnmatch(path, self.args.path) and

View File

@@ -8,8 +8,7 @@ def setup(self, sub):
Remove all data from a specified time range within a
stream.
""")
cmd.set_defaults(verify = cmd_remove_verify,
handler = cmd_remove)
cmd.set_defaults(handler = cmd_remove)
group = cmd.add_argument_group("Data selection")
group.add_argument("path",
@@ -25,11 +24,6 @@ def setup(self, sub):
group.add_argument("-c", "--count", action="store_true",
help="Output number of data points removed")
def cmd_remove_verify(self):
if self.args.start is not None and self.args.end is not None:
if self.args.start > self.args.end:
self.parser.error("start is after end")
def cmd_remove(self):
try:
count = self.client.stream_remove(self.args.path,

View File

@@ -25,6 +25,9 @@ def main():
default = os.path.join(os.getcwd(), "db"))
group.add_argument('-q', '--quiet', help = 'Silence output',
action = 'store_true')
group.add_argument('-n', '--nosync', help = 'Use asynchronous '
'commits for sqlite transactions',
action = 'store_true', default = False)
group = parser.add_argument_group("Debug options")
group.add_argument('-y', '--yappi', help = 'Run under yappi profiler and '
@@ -33,8 +36,10 @@ def main():
args = parser.parse_args()
# Create database object
db = nilmdb.server.NilmDB(args.database)
# Create database object. Needs to be serialized before passing
# to the Server.
db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(args.database,
sync = not args.nosync)
# Configure the server
if args.quiet:

View File

@@ -5,15 +5,15 @@ from __future__ import absolute_import
# Try to set up pyximport to automatically rebuild Cython modules. If
# this doesn't work, it's OK, as long as the modules were built externally.
# (e.g. python setup.py build_ext --inplace)
try:
try: # pragma: no cover
import Cython
import distutils.version
if (distutils.version.LooseVersion(Cython.__version__) <
distutils.version.LooseVersion("0.16")): # pragma: no cover
distutils.version.LooseVersion("0.17")): # pragma: no cover
raise ImportError("Cython version too old")
import pyximport
pyximport.install(inplace = True, build_in_temp = False)
except ImportError: # pragma: no cover
except (ImportError, TypeError): # pragma: no cover
pass
import nilmdb.server.layout

View File

@@ -13,12 +13,22 @@ import struct
import mmap
import re
# If we have the faulthandler module, use it. All of the mmap stuff
# might trigger a SIGSEGV or SIGBUS if we're not careful, and
# faulthandler will give a traceback in that case. (the Python
# interpreter will still die either way).
try: # pragma: no cover
import faulthandler
faulthandler.enable()
except: # pragma: no cover
pass
# Up to 256 open file descriptors at any given time.
# These variables are global so they can be used in the decorator arguments.
table_cache_size = 16
fd_cache_size = 16
@nilmdb.utils.must_close(wrap_verify = True)
@nilmdb.utils.must_close(wrap_verify = False)
class BulkData(object):
def __init__(self, basepath, **kwargs):
self.basepath = basepath
@@ -161,7 +171,65 @@ class BulkData(object):
ospath = os.path.join(self.root, *elements)
return Table(ospath)
@nilmdb.utils.must_close(wrap_verify = True)
@nilmdb.utils.must_close(wrap_verify = False)
class File(object):
"""Object representing a single file on disk. Data can be appended,
or the self.mmap handle can be used for random reads."""
def __init__(self, root, subdir, filename):
# Create path if it doesn't exist
try:
os.mkdir(os.path.join(root, subdir))
except OSError:
pass
# Open/create file
self._f = open(os.path.join(root, subdir, filename), "a+b", 0)
# Seek to end, and get size
self._f.seek(0, 2)
self.size = self._f.tell()
# Open mmap object
self.mmap = None
self._mmap_reopen()
def _mmap_reopen(self):
if self.size == 0:
# Don't mmap if the file is empty; it would fail
pass
elif self.mmap is None:
# Not opened yet, so open it
self.mmap = mmap.mmap(self._f.fileno(), 0)
else:
# Already opened, so just resize it
self.mmap.resize(self.size)
def close(self):
if self.mmap is not None:
self.mmap.close()
self._f.close()
def append(self, data): # pragma: no cover (below version used instead)
# Write data, flush it, and resize our mmap accordingly
self._f.write(data)
self._f.flush()
self.size += len(data)
self._mmap_reopen()
def append_pack_iter(self, count, packer, dataiter):
# An optimized verison of append, to avoid flushing the file
# and resizing the mmap after each data point.
try:
for i in xrange(count):
row = dataiter.next()
self._f.write(packer(*row))
finally:
self._f.flush()
self.size = self._f.tell()
self._mmap_reopen()
@nilmdb.utils.must_close(wrap_verify = False)
class Table(object):
"""Tools to help access a single table (data at a specific OS path)."""
# See design.md for design details
@@ -211,7 +279,7 @@ class Table(object):
self.nrows = self._get_nrows()
def close(self):
self.mmap_open.cache_remove_all()
self.file_open.cache_remove_all()
# Internal helpers
def _get_nrows(self):
@@ -275,37 +343,11 @@ class Table(object):
# Cache open files
@nilmdb.utils.lru_cache(size = fd_cache_size,
keys = slice(0, 3), # exclude newsize
onremove = lambda x: x.close())
def mmap_open(self, subdir, filename, newsize = None):
onremove = lambda f: f.close())
def file_open(self, subdir, filename):
"""Open and map a given 'subdir/filename' (relative to self.root).
Will be automatically closed when evicted from the cache.
If 'newsize' is provided, the file is truncated to the given
size before the mapping is returned. (Note that the LRU cache
on this function means the truncate will only happen if the
object isn't already cached; mmap.resize should be used too.)"""
try:
os.mkdir(os.path.join(self.root, subdir))
except OSError:
pass
f = open(os.path.join(self.root, subdir, filename), "a+", 0)
if newsize is not None:
# mmap can't map a zero-length file, so this allows the
# caller to set the filesize between file creation and
# mmap.
f.truncate(newsize)
mm = mmap.mmap(f.fileno(), 0)
return mm
def mmap_open_resize(self, subdir, filename, newsize):
"""Open and map a given 'subdir/filename' (relative to self.root).
The file is resized to the given size."""
# Pass new size to mmap_open
mm = self.mmap_open(subdir, filename, newsize)
# In case we got a cached copy, need to call mm.resize too.
mm.resize(newsize)
return mm
Will be automatically closed when evicted from the cache."""
return File(self.root, subdir, filename)
def append(self, data):
"""Append the data and flush it to disk.
@@ -317,14 +359,11 @@ class Table(object):
(subdir, fname, offset, count) = self._offset_from_row(self.nrows)
if count > remaining:
count = remaining
newsize = offset + count * self.packer.size
mm = self.mmap_open_resize(subdir, fname, newsize)
mm.seek(offset)
f = self.file_open(subdir, fname)
# Write the data
for i in xrange(count):
row = dataiter.next()
mm.write(self.packer.pack(*row))
f.append_pack_iter(count, self.packer.pack, dataiter)
remaining -= count
self.nrows += count
@@ -351,7 +390,7 @@ class Table(object):
(subdir, filename, offset, count) = self._offset_from_row(row)
if count > remaining:
count = remaining
mm = self.mmap_open(subdir, filename)
mm = self.file_open(subdir, filename).mmap
for i in xrange(count):
ret.append(list(self.packer.unpack_from(mm, offset)))
offset += self.packer.size
@@ -363,7 +402,7 @@ class Table(object):
if key < 0 or key >= self.nrows:
raise IndexError("Index out of range")
(subdir, filename, offset, count) = self._offset_from_row(key)
mm = self.mmap_open(subdir, filename)
mm = self.file_open(subdir, filename).mmap
# unpack_from ignores the mmap object's current seek position
return list(self.packer.unpack_from(mm, offset))
@@ -410,8 +449,8 @@ class Table(object):
# are generally easier if we don't have to special-case that.
if (len(merged) == 1 and
merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
# Close potentially open file in mmap_open LRU cache
self.mmap_open.cache_remove(self, subdir, filename)
# Close potentially open file in file_open LRU cache
self.file_open.cache_remove(self, subdir, filename)
# Delete files
os.remove(datafile)

View File

@@ -36,7 +36,7 @@ cdef class Interval:
"""
'start' and 'end' are arbitrary floats that represent time
"""
if start > end:
if start >= end:
# Explicitly disallow zero-width intervals (since they're half-open)
raise IntervalError("start %s must precede end %s" % (start, end))
self.start = float(start)

View File

@@ -4,7 +4,6 @@ import time
import sys
import inspect
import cStringIO
import numpy as np
cdef enum:
max_value_count = 64
@@ -42,10 +41,12 @@ class Layout:
if datatype == 'uint16':
self.parse = self.parse_uint16
self.format = self.format_uint16
self.format_str = "%.6f" + " %d" * self.count
self.format = self.format_generic
elif datatype == 'float32' or datatype == 'float64':
self.parse = self.parse_float64
self.format = self.format_float64
self.format_str = "%.6f" + " %f" * self.count
self.format = self.format_generic
else:
raise KeyError("invalid type")
@@ -57,15 +58,15 @@ class Layout:
cdef double ts
# Return doubles even in float32 case, since they're going into
# a Python array which would upconvert to double anyway.
result = []
result = [0] * (self.count + 1)
cdef char *end
ts = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("bad timestamp")
result.append(ts)
result[0] = ts
for n in range(self.count):
text = end
result.append(libc.stdlib.strtod(text, &end))
result[n+1] = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("wrong number of values")
n = 0
@@ -79,18 +80,18 @@ class Layout:
cdef int n
cdef double ts
cdef int v
result = []
cdef char *end
result = [0] * (self.count + 1)
ts = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("bad timestamp")
result.append(ts)
result[0] = ts
for n in range(self.count):
text = end
v = libc.stdlib.strtol(text, &end, 10)
if v < 0 or v > 65535:
raise ValueError("value out of range")
result.append(v)
result[n+1] = v
if end == text:
raise ValueError("wrong number of values")
n = 0
@@ -101,25 +102,12 @@ class Layout:
return (ts, result)
# Formatters
def format_float64(self, d):
def format_generic(self, d):
n = len(d) - 1
if n != self.count:
raise ValueError("wrong number of values for layout type: "
"got %d, wanted %d" % (n, self.count))
s = "%.6f" % d[0]
for i in range(n):
s += " %f" % d[i+1]
return s + "\n"
def format_uint16(self, d):
n = len(d) - 1
if n != self.count:
raise ValueError("wrong number of values for layout type: "
"got %d, wanted %d" % (n, self.count))
s = "%.6f" % d[0]
for i in range(n):
s += " %d" % d[i+1]
return s + "\n"
return (self.format_str % tuple(d)) + "\n"
# Get a layout by name
def get_named(typestring):
@@ -154,7 +142,7 @@ class Parser(object):
layout, into an internal data structure suitable for a
pytables 'table.append(parser.data)'.
"""
cdef double last_ts = 0, ts
cdef double last_ts = -1e12, ts
cdef int n = 0, i
cdef char *line

View File

@@ -97,12 +97,7 @@ class NilmDB(object):
# SQLite database too
sqlfilename = os.path.join(self.basepath, "data.sql")
# We use check_same_thread = False, assuming that the rest
# of the code (e.g. Server) will be smart and not access this
# database from multiple threads simultaneously. Otherwise
# false positives will occur when the database is only opened
# in one thread, and only accessed in another.
self.con = sqlite3.connect(sqlfilename, check_same_thread = False)
self.con = sqlite3.connect(sqlfilename, check_same_thread = True)
self._sql_schema_update()
# See big comment at top about the performance implications of this
@@ -142,6 +137,15 @@ class NilmDB(object):
with self.con:
cur.execute("PRAGMA user_version = {v:d}".format(v=version))
def _check_user_times(self, start, end):
if start is None:
start = -1e12
if end is None:
end = 1e12
if start >= end:
raise NilmDBError("start must precede end")
return (start, end)
@nilmdb.utils.lru_cache(size = 16)
def _get_intervals(self, stream_id):
"""
@@ -303,7 +307,8 @@ class NilmDB(object):
"""
stream_id = self._stream_id(path)
intervals = self._get_intervals(stream_id)
requested = Interval(start or 0, end or 1e12)
(start, end) = self._check_user_times(start, end)
requested = Interval(start, end)
result = []
for n, i in enumerate(intervals.intersection(requested)):
if n >= self.max_results:
@@ -396,7 +401,7 @@ class NilmDB(object):
path: Path at which to add the data
start: Starting timestamp
end: Ending timestamp
data: Rows of data, to be passed to PyTable's table.append
data: Rows of data, to be passed to bulkdata table.append
method. E.g. nilmdb.layout.Parser.data
"""
# First check for basic overlap using timestamp info given.
@@ -417,7 +422,7 @@ class NilmDB(object):
self._add_interval(stream_id, interval, row_start, row_end)
# And that's all
return "ok"
return
def _find_start(self, table, dbinterval):
"""
@@ -475,7 +480,8 @@ class NilmDB(object):
stream_id = self._stream_id(path)
table = self.data.getnode(path)
intervals = self._get_intervals(stream_id)
requested = Interval(start or 0, end or 1e12)
(start, end) = self._check_user_times(start, end)
requested = Interval(start, end)
result = []
matched = 0
remaining = self.max_results
@@ -521,12 +527,10 @@ class NilmDB(object):
stream_id = self._stream_id(path)
table = self.data.getnode(path)
intervals = self._get_intervals(stream_id)
to_remove = Interval(start or 0, end or 1e12)
(start, end) = self._check_user_times(start, end)
to_remove = Interval(start, end)
removed = 0
if start == end:
return 0
# Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now.
all_candidates = list(intervals.intersection(to_remove, orig = True))

View File

@@ -13,12 +13,7 @@ import os
import simplejson as json
import decorator
import traceback
try:
cherrypy.tools.json_out
except: # pragma: no cover
sys.stderr.write("Cherrypy 3.2+ required\n")
sys.exit(1)
import psutil
class NilmApp(object):
def __init__(self, db):
@@ -76,6 +71,17 @@ def exception_to_httperror(*expected):
# care of that.
return decorator.decorator(wrapper)
# Custom Cherrypy tools
def allow_methods(methods):
method = cherrypy.request.method.upper()
if method not in methods:
if method in cherrypy.request.methods_with_bodies:
cherrypy.request.body.read()
allowed = ', '.join(methods)
cherrypy.response.headers['Allow'] = allowed
raise cherrypy.HTTPError(405, method + " not allowed; use " + allowed)
cherrypy.tools.allow_methods = cherrypy.Tool('before_handler', allow_methods)
# CherryPy apps
class Root(NilmApp):
"""Root application for NILM database"""
@@ -99,17 +105,16 @@ class Root(NilmApp):
def version(self):
return nilmdb.__version__
# /dbpath
# /dbinfo
@cherrypy.expose
@cherrypy.tools.json_out()
def dbpath(self):
return self.db.get_basepath()
# /dbsize
@cherrypy.expose
@cherrypy.tools.json_out()
def dbsize(self):
return nilmdb.utils.du(self.db.get_basepath())
def dbinfo(self):
"""Return a dictionary with the database path,
size of the database in bytes, and free disk space in bytes"""
path = self.db.get_basepath()
return { "path": path,
"size": nilmdb.utils.du(path),
"free": psutil.disk_usage(path).free }
class Stream(NilmApp):
"""Stream-specific operations"""
@@ -129,6 +134,7 @@ class Stream(NilmApp):
@cherrypy.expose
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, ValueError)
@cherrypy.tools.allow_methods(methods = ["POST"])
def create(self, path, layout):
"""Create a new stream in the database. Provide path
and one of the nilmdb.layout.layouts keys.
@@ -139,6 +145,7 @@ class Stream(NilmApp):
@cherrypy.expose
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError)
@cherrypy.tools.allow_methods(methods = ["POST"])
def destroy(self, path):
"""Delete a stream and its associated data."""
return self.db.stream_destroy(path)
@@ -171,29 +178,29 @@ class Stream(NilmApp):
@cherrypy.expose
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, LookupError, TypeError)
@cherrypy.tools.allow_methods(methods = ["POST"])
def set_metadata(self, path, data):
"""Set metadata for the named stream, replacing any
existing metadata. Data should be a json-encoded
dictionary"""
data_dict = json.loads(data)
self.db.stream_set_metadata(path, data_dict)
return "ok"
# /stream/update_metadata?path=/newton/prep&data=<json>
@cherrypy.expose
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError, LookupError, TypeError)
@cherrypy.tools.allow_methods(methods = ["POST"])
def update_metadata(self, path, data):
"""Update metadata for the named stream. Data
should be a json-encoded dictionary"""
data_dict = json.loads(data)
self.db.stream_update_metadata(path, data_dict)
return "ok"
# /stream/insert?path=/newton/prep
@cherrypy.expose
@cherrypy.tools.json_out()
#@cherrypy.tools.disable_prb()
@cherrypy.tools.allow_methods(methods = ["PUT"])
def insert(self, path, start, end):
"""
Insert new data into the database. Provide textual data
@@ -201,12 +208,9 @@ class Stream(NilmApp):
"""
# Important that we always read the input before throwing any
# errors, to keep lengths happy for persistent connections.
# However, CherryPy 3.2.2 has a bug where this fails for GET
# requests, so catch that. (issue #1134)
try:
body = cherrypy.request.body.read()
except TypeError:
raise cherrypy.HTTPError("400 Bad Request", "No request body")
# Note that CherryPy 3.2.2 has a bug where this fails for GET
# requests, if we ever want to handle those (issue #1134)
body = cherrypy.request.body.read()
# Check path and get layout
streams = self.db.stream_list(path = path)
@@ -223,19 +227,17 @@ class Stream(NilmApp):
"error parsing input data: " +
e.message)
if (not parser.min_timestamp or not parser.max_timestamp or
not len(parser.data)):
raise cherrypy.HTTPError("400 Bad Request",
"no data provided")
# Check limits
start = float(start)
end = float(end)
if parser.min_timestamp < start:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
if parser.min_timestamp is not None and parser.min_timestamp < start:
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
repr(parser.min_timestamp) +
" < start time " + repr(start))
if parser.max_timestamp >= end:
if parser.max_timestamp is not None and parser.max_timestamp >= end:
raise cherrypy.HTTPError("400 Bad Request", "Data timestamp " +
repr(parser.max_timestamp) +
" >= end time " + repr(end))
@@ -247,13 +249,14 @@ class Stream(NilmApp):
raise cherrypy.HTTPError("400 Bad Request", e.message)
# Done
return "ok"
return
# /stream/remove?path=/newton/prep
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError)
@cherrypy.tools.allow_methods(methods = ["POST"])
def remove(self, path, start = None, end = None):
"""
Remove data from the backend database. Removes all data in
@@ -265,26 +268,25 @@ class Stream(NilmApp):
if end is not None:
end = float(end)
if start is not None and end is not None:
if end < start:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"end before start")
"start must precede end")
return self.db.stream_remove(path, start, end)
# /stream/intervals?path=/newton/prep
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose
@chunked_response
@response_type("text/plain")
@response_type("application/x-json-stream")
def intervals(self, path, start = None, end = None):
"""
Get intervals from backend database. Streams the resulting
intervals as JSON strings separated by newlines. This may
intervals as JSON strings separated by CR LF pairs. This may
make multiple requests to the nilmdb backend to avoid causing
it to block for too long.
Note that the response type is set to 'text/plain' even
though we're sending back JSON; this is because we're not
really returning a single JSON object.
Note that the response type is the non-standard
'application/x-json-stream' for lack of a better option.
"""
if start is not None:
start = float(start)
@@ -292,9 +294,9 @@ class Stream(NilmApp):
end = float(end)
if start is not None and end is not None:
if end < start:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"end before start")
"start must precede end")
streams = self.db.stream_list(path = path)
if len(streams) != 1:
@@ -304,8 +306,8 @@ class Stream(NilmApp):
def content(start, end):
# Note: disable chunked responses to see tracebacks from here.
while True:
(intervals, restart) = self.db.stream_intervals(path, start, end)
response = ''.join([ json.dumps(i) + "\n" for i in intervals ])
(ints, restart) = self.db.stream_intervals(path, start, end)
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
yield response
if restart == 0:
break
@@ -332,9 +334,9 @@ class Stream(NilmApp):
# Check parameters
if start is not None and end is not None:
if end < start:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"end before start")
"start must precede end")
# Check path and get layout
streams = self.db.stream_list(path = path)
@@ -385,17 +387,20 @@ class Server(object):
# Save server version, just for verification during tests
self.version = nilmdb.__version__
# Need to wrap DB object in a serializer because we'll call
# into it from separate threads.
self.embedded = embedded
self.db = nilmdb.utils.Serializer(db)
self.db = db
if not getattr(db, "_thread_safe", None):
raise KeyError("Database object " + str(db) + " doesn't claim "
"to be thread safe. You should pass "
"nilmdb.utils.serializer_proxy(NilmDB)(args) "
"rather than NilmDB(args).")
# Build up global server configuration
cherrypy.config.update({
'server.socket_host': host,
'server.socket_port': port,
'engine.autoreload_on': False,
'server.max_request_body_size': 4*1024*1024,
'server.max_request_body_size': 8*1024*1024,
})
if self.embedded:
cherrypy.config.update({ 'environment': 'embedded' })
@@ -412,6 +417,11 @@ class Server(object):
app_config.update({ 'response.headers.Access-Control-Allow-Origin':
'*' })
# Only allow GET and HEAD by default. Individual handlers
# can override.
app_config.update({ 'tools.allow_methods.on': True,
'tools.allow_methods.methods': ['GET', 'HEAD'] })
# Send tracebacks in error responses. They're hidden by the
# error_page function for client errors (code 400-499).
app_config.update({ 'request.show_tracebacks' : True })

View File

@@ -2,10 +2,9 @@
from nilmdb.utils.timer import Timer
from nilmdb.utils.iteratorizer import Iteratorizer
from nilmdb.utils.serializer import Serializer
from nilmdb.utils.serializer import serializer_proxy
from nilmdb.utils.lrucache import lru_cache
from nilmdb.utils.diskusage import du
from nilmdb.utils.diskusage import du, human_size
from nilmdb.utils.mustclose import must_close
from nilmdb.utils.urllib import urlencode
from nilmdb.utils import misc
from nilmdb.utils import atomic
import nilmdb.utils.threadsafety

View File

@@ -1,7 +1,7 @@
import os
from math import log
def sizeof_fmt(num):
def human_size(num):
"""Human friendly file size"""
unit_list = zip(['bytes', 'kiB', 'MiB', 'GiB', 'TiB'], [0, 0, 1, 2, 2])
if num > 1:
@@ -15,15 +15,11 @@ def sizeof_fmt(num):
if num == 1: # pragma: no cover
return '1 byte'
def du_bytes(path):
def du(path):
"""Like du -sb, returns total size of path in bytes."""
size = os.path.getsize(path)
if os.path.isdir(path):
for thisfile in os.listdir(path):
filepath = os.path.join(path, thisfile)
size += du_bytes(filepath)
size += du(filepath)
return size
def du(path):
"""Like du -sh, returns total size of path as a human-readable string."""
return sizeof_fmt(du_bytes(path))

View File

@@ -16,6 +16,7 @@ class IteratorizerThread(threading.Thread):
callback (provided by this class) as an argument
"""
threading.Thread.__init__(self)
self.name = "Iteratorizer-" + function.__name__ + "-" + self.name
self.function = function
self.queue = queue
self.die = False

View File

@@ -1,8 +0,0 @@
import itertools
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), ..., (sn,None)"
a, b = itertools.tee(iterable)
next(b, None)
return itertools.izip_longest(a, b)

View File

@@ -12,15 +12,12 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
already been called."""
def class_decorator(cls):
# Helper to replace a class method with a wrapper function,
# while maintaining argument specs etc.
def wrap_class_method(wrapper_func):
method = wrapper_func.__name__
if method in cls.__dict__:
orig = getattr(cls, method).im_func
else:
orig = lambda self: None
setattr(cls, method, decorator.decorator(wrapper_func, orig))
def wrap_class_method(wrapper):
try:
orig = getattr(cls, wrapper.__name__).im_func
except:
orig = lambda x: None
setattr(cls, wrapper.__name__, decorator.decorator(wrapper, orig))
@wrap_class_method
def __init__(orig, self, *args, **kwargs):
@@ -38,7 +35,8 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
@wrap_class_method
def close(orig, self, *args, **kwargs):
del self._must_close
if "_must_close" in self.__dict__:
del self._must_close
return orig(self, *args, **kwargs)
# Optionally wrap all other functions

View File

@@ -1,6 +1,10 @@
import Queue
import threading
import sys
import decorator
import inspect
import types
import functools
# This file provides a class that will wrap an object and serialize
# all calls to its methods. All calls to that object will be queued
@@ -12,8 +16,9 @@ import sys
class SerializerThread(threading.Thread):
"""Thread that retrieves call information from the queue, makes the
call, and returns the results."""
def __init__(self, call_queue):
def __init__(self, classname, call_queue):
threading.Thread.__init__(self)
self.name = "Serializer-" + classname + "-" + self.name
self.call_queue = call_queue
def run(self):
@@ -22,51 +27,83 @@ class SerializerThread(threading.Thread):
# Terminate if result_queue is None
if result_queue is None:
return
exception = None
result = None
try:
result = func(*args, **kwargs) # wrapped
except:
result_queue.put((sys.exc_info(), None))
exception = sys.exc_info()
# Ensure we delete these before returning a result, so
# we don't unncessarily hold onto a reference while
# we're waiting for the next call.
del func, args, kwargs
result_queue.put((exception, result))
del exception, result
def serializer_proxy(obj_or_type):
"""Wrap the given object or type in a SerializerObjectProxy.
Returns a SerializerObjectProxy object that proxies all method
calls to the object, as well as attribute retrievals.
The proxied requests, including instantiation, are performed in a
single thread and serialized between caller threads.
"""
class SerializerCallProxy(object):
def __init__(self, call_queue, func, objectproxy):
self.call_queue = call_queue
self.func = func
# Need to hold a reference to object proxy so it doesn't
# go away (and kill the thread) until after get called.
self.objectproxy = objectproxy
def __call__(self, *args, **kwargs):
result_queue = Queue.Queue()
self.call_queue.put((result_queue, self.func, args, kwargs))
( exc_info, result ) = result_queue.get()
if exc_info is None:
return result
else:
result_queue.put((None, result))
raise exc_info[0], exc_info[1], exc_info[2]
class WrapCall(object):
"""Wrap a callable using the given queues"""
class SerializerObjectProxy(object):
def __init__(self, obj_or_type, *args, **kwargs):
self.__object = obj_or_type
try:
if type(obj_or_type) in (types.TypeType, types.ClassType):
classname = obj_or_type.__name__
else:
classname = obj_or_type.__class__.__name__
except AttributeError: # pragma: no cover
classname = "???"
self.__call_queue = Queue.Queue()
self.__thread = SerializerThread(classname, self.__call_queue)
self.__thread.daemon = True
self.__thread.start()
self._thread_safe = True
def __init__(self, call_queue, result_queue, func):
self.call_queue = call_queue
self.result_queue = result_queue
self.func = func
def __getattr__(self, key):
if key.startswith("_SerializerObjectProxy__"): # pragma: no cover
raise AttributeError
attr = getattr(self.__object, key)
if not callable(attr):
getter = SerializerCallProxy(self.__call_queue, getattr, self)
return getter(self.__object, key)
r = SerializerCallProxy(self.__call_queue, attr, self)
return r
def __call__(self, *args, **kwargs):
self.call_queue.put((self.result_queue, self.func, args, kwargs))
( exc_info, result ) = self.result_queue.get()
if exc_info is None:
return result
else:
raise exc_info[0], exc_info[1], exc_info[2]
def __call__(self, *args, **kwargs):
"""Call this to instantiate the type, if a type was passed
to serializer_proxy. Otherwise, pass the call through."""
ret = SerializerCallProxy(self.__call_queue,
self.__object, self)(*args, **kwargs)
if type(self.__object) in (types.TypeType, types.ClassType):
# Instantiation
self.__object = ret
return self
return ret
class WrapObject(object):
"""Wrap all calls to methods in a target object with WrapCall"""
def __del__(self):
self.__call_queue.put((None, None, None, None))
self.__thread.join()
def __init__(self, target):
self.__wrap_target = target
self.__wrap_call_queue = Queue.Queue()
self.__wrap_serializer = SerializerThread(self.__wrap_call_queue)
self.__wrap_serializer.daemon = True
self.__wrap_serializer.start()
def __getattr__(self, key):
"""Wrap methods of self.__wrap_target in a WrapCall instance"""
func = getattr(self.__wrap_target, key)
if not callable(func):
raise TypeError("Can't serialize attribute %r (type: %s)"
% (key, type(func)))
result_queue = Queue.Queue()
return WrapCall(self.__wrap_call_queue, result_queue, func)
def __del__(self):
self.__wrap_call_queue.put((None, None, None, None))
self.__wrap_serializer.join()
# Just an alias
Serializer = WrapObject
return SerializerObjectProxy(obj_or_type)

View File

@@ -0,0 +1,109 @@
from nilmdb.utils.printf import *
import threading
import warnings
import types
def verify_proxy(obj_or_type, exception = False, check_thread = True,
check_concurrent = True):
"""Wrap the given object or type in a VerifyObjectProxy.
Returns a VerifyObjectProxy that proxies all method calls to the
given object, as well as attribute retrievals.
When calling methods, the following checks are performed. If
exception is True, an exception is raised. Otherwise, a warning
is printed.
check_thread = True # Warn/fail if two different threads call methods.
check_concurrent = True # Warn/fail if two functions are concurrently
# run through this proxy
"""
class Namespace(object):
pass
class VerifyCallProxy(object):
def __init__(self, func, parent_namespace):
self.func = func
self.parent_namespace = parent_namespace
def __call__(self, *args, **kwargs):
p = self.parent_namespace
this = threading.current_thread()
try:
callee = self.func.__name__
except AttributeError:
callee = "???"
if p.thread is None:
p.thread = this
p.thread_callee = callee
if check_thread and p.thread != this:
err = sprintf("unsafe threading: %s called %s.%s,"
" but %s called %s.%s",
p.thread.name, p.classname, p.thread_callee,
this.name, p.classname, callee)
if exception:
raise AssertionError(err)
else: # pragma: no cover
warnings.warn(err)
need_concur_unlock = False
if check_concurrent:
if p.concur_lock.acquire(False) == False:
err = sprintf("unsafe concurrency: %s called %s.%s "
"while %s is still in %s.%s",
this.name, p.classname, callee,
p.concur_tname, p.classname, p.concur_callee)
if exception:
raise AssertionError(err)
else: # pragma: no cover
warnings.warn(err)
else:
p.concur_tname = this.name
p.concur_callee = callee
need_concur_unlock = True
try:
ret = self.func(*args, **kwargs)
finally:
if need_concur_unlock:
p.concur_lock.release()
return ret
class VerifyObjectProxy(object):
def __init__(self, obj_or_type, *args, **kwargs):
p = Namespace()
self.__ns = p
p.thread = None
p.thread_callee = None
p.concur_lock = threading.Lock()
p.concur_tname = None
p.concur_callee = None
self.__obj = obj_or_type
try:
if type(obj_or_type) in (types.TypeType, types.ClassType):
p.classname = self.__obj.__name__
else:
p.classname = self.__obj.__class__.__name__
except AttributeError: # pragma: no cover
p.classname = "???"
def __getattr__(self, key):
if key.startswith("_VerifyObjectProxy__"): # pragma: no cover
raise AttributeError
attr = getattr(self.__obj, key)
if not callable(attr):
return VerifyCallProxy(getattr, self.__ns)(self.__obj, key)
return VerifyCallProxy(attr, self.__ns)
def __call__(self, *args, **kwargs):
"""Call this to instantiate the type, if a type was passed
to verify_proxy. Otherwise, pass the call through."""
ret = VerifyCallProxy(self.__obj, self.__ns)(*args, **kwargs)
if type(self.__obj) in (types.TypeType, types.ClassType):
# Instantiation
self.__obj = ret
return self
return ret
return VerifyObjectProxy(obj_or_type)

54
nilmdb/utils/time.py Normal file
View File

@@ -0,0 +1,54 @@
from nilmdb.utils import datetime_tz
import re
def parse_time(toparse):
"""
Parse a free-form time string and return a datetime_tz object.
If the string doesn't contain a timestamp, the current local
timezone is assumed (e.g. from the TZ env var).
"""
# If string isn't "now" and doesn't contain at least 4 digits,
# consider it invalid. smartparse might otherwise accept
# empty strings and strings with just separators.
if toparse != "now" and len(re.findall(r"\d", toparse)) < 4:
raise ValueError("not enough digits for a timestamp")
# Try to just parse the time as given
try:
return datetime_tz.datetime_tz.smartparse(toparse)
except ValueError:
pass
# Try to extract a substring in a condensed format that we expect
# to see in a filename or header comment
res = re.search(r"(^|[^\d])(" # non-numeric or SOL
r"(199\d|2\d\d\d)" # year
r"[-/]?" # separator
r"(0[1-9]|1[012])" # month
r"[-/]?" # separator
r"([012]\d|3[01])" # day
r"[-T ]?" # separator
r"([01]\d|2[0-3])" # hour
r"[:]?" # separator
r"([0-5]\d)" # minute
r"[:]?" # separator
r"([0-5]\d)?" # second
r"([-+]\d\d\d\d)?" # timezone
r")", toparse)
if res is not None:
try:
return datetime_tz.datetime_tz.smartparse(res.group(2))
except ValueError:
pass
# Could also try to successively parse substrings, but let's
# just give up for now.
raise ValueError("unable to parse timestamp")
def format_time(timestamp):
"""
Convert a Unix timestamp to a string for printing, using the
local timezone for display (e.g. from the TZ env var).
"""
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp)
return dt.strftime("%a, %d %b %Y %H:%M:%S.%f %z")

View File

@@ -6,6 +6,7 @@
# foo.flush()
from __future__ import print_function
from __future__ import absolute_import
import contextlib
import time

View File

@@ -1,37 +0,0 @@
from __future__ import absolute_import
from urllib import quote_plus, _is_unicode
# urllib.urlencode insists on encoding Unicode as ASCII. This is based
# on that function, except we always encode it as UTF-8 instead.
def urlencode(query):
"""Encode a dictionary into a URL query string.
If any values in the query arg are sequences, each sequence
element is converted to a separate parameter.
"""
query = query.items()
l = []
for k, v in query:
k = quote_plus(str(k))
if isinstance(v, str):
v = quote_plus(v)
l.append(k + '=' + v)
elif _is_unicode(v):
v = quote_plus(v.encode("utf-8","strict"))
l.append(k + '=' + v)
else:
try:
# is this a sufficient test for sequence-ness?
len(v)
except TypeError:
# not a sequence
v = quote_plus(str(v))
l.append(k + '=' + v)
else:
# loop over the sequence
for elt in v:
l.append(k + '=' + quote_plus(str(elt)))
return '&'.join(l)

View File

@@ -20,6 +20,7 @@ cover-erase=1
stop=1
verbosity=2
tests=tests
#tests=tests/test_threadsafety.py
#tests=tests/test_bulkdata.py
#tests=tests/test_mustclose.py
#tests=tests/test_lrucache.py

View File

@@ -114,6 +114,8 @@ setup(name='nilmdb',
'pycurl',
'python-dateutil',
'pytz',
'psutil >= 0.3.0',
'requests >= 1.1.0, < 2.0.0',
],
packages = [ 'nilmdb',
'nilmdb.utils',

View File

@@ -1,4 +1,5 @@
test_printf.py
test_threadsafety.py
test_lrucache.py
test_mustclose.py

View File

@@ -6,6 +6,7 @@ from nilmdb.utils import timestamper
from nilmdb.client import ClientError, ServerError
from nilmdb.utils import datetime_tz
from nose.plugins.skip import SkipTest
from nose.tools import *
from nose.tools import assert_raises
import itertools
@@ -18,10 +19,13 @@ import simplejson as json
import unittest
import warnings
import resource
import time
import re
from testutil.helpers import *
testdb = "tests/client-testdb"
testurl = "http://localhost:12380/"
def setup_module():
global test_server, test_db
@@ -29,7 +33,7 @@ def setup_module():
recursive_unlink(testdb)
# Start web app on a custom port
test_db = nilmdb.NilmDB(testdb, sync = False)
test_db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(testdb, sync = False)
test_server = nilmdb.Server(test_db, host = "127.0.0.1",
port = 12380, stoppable = False,
fast_shutdown = True,
@@ -44,28 +48,32 @@ def teardown_module():
class TestClient(object):
def test_client_1_basic(self):
def test_client_01_basic(self):
# Test a fake host
client = nilmdb.Client(url = "http://localhost:1/")
with assert_raises(nilmdb.client.ServerError):
client.version()
client.close()
# Trigger same error with a PUT request
client = nilmdb.Client(url = "http://localhost:1/")
with assert_raises(nilmdb.client.ServerError):
client.version()
client.close()
# Then a fake URL on a real host
client = nilmdb.Client(url = "http://localhost:12380/fake/")
with assert_raises(nilmdb.client.ClientError):
client.version()
client.close()
# Now a real URL with no http:// prefix
client = nilmdb.Client(url = "localhost:12380")
version = client.version()
client.close()
# Now use the real URL
client = nilmdb.Client(url = "http://localhost:12380/")
client = nilmdb.Client(url = testurl)
version = client.version()
eq_(distutils.version.LooseVersion(version),
distutils.version.LooseVersion(test_server.version))
@@ -73,10 +81,11 @@ class TestClient(object):
# Bad URLs should give 404, not 500
with assert_raises(ClientError):
client.http.get("/stream/create")
client.close()
def test_client_2_createlist(self):
def test_client_02_createlist(self):
# Basic stream tests, like those in test_nilmdb:test_stream
client = nilmdb.Client(url = "http://localhost:12380/")
client = nilmdb.Client(url = testurl)
# Database starts empty
eq_(client.stream_list(), [])
@@ -90,6 +99,15 @@ class TestClient(object):
with assert_raises(ClientError):
client.stream_create("/newton/prep", "NoSuchLayout")
# Bad method types
with assert_raises(ClientError):
client.http.put("/stream/list","")
# Try a bunch of times to make sure the request body is getting consumed
for x in range(10):
with assert_raises(ClientError):
client.http.post("/stream/list")
client = nilmdb.Client(url = testurl)
# Create three streams
client.stream_create("/newton/prep", "PrepData")
client.stream_create("/newton/raw", "RawData")
@@ -101,8 +119,10 @@ class TestClient(object):
["/newton/zzz/rawnotch", "RawNotchedData"]
])
# Match just one type or one path
eq_(client.stream_list(layout="RawData"), [ ["/newton/raw", "RawData"] ])
eq_(client.stream_list(path="/newton/raw"), [ ["/newton/raw", "RawData"] ])
eq_(client.stream_list(layout="RawData"),
[ ["/newton/raw", "RawData"] ])
eq_(client.stream_list(path="/newton/raw"),
[ ["/newton/raw", "RawData"] ])
# Try messing with resource limits to trigger errors and get
# more coverage. Here, make it so we can only create files 1
@@ -114,9 +134,10 @@ class TestClient(object):
client.stream_create("/newton/hello", "RawData")
resource.setrlimit(resource.RLIMIT_FSIZE, limit)
client.close()
def test_client_3_metadata(self):
client = nilmdb.Client(url = "http://localhost:12380/")
def test_client_03_metadata(self):
client = nilmdb.Client(url = testurl)
# Set / get metadata
eq_(client.stream_get_metadata("/newton/prep"), {})
@@ -131,9 +152,10 @@ class TestClient(object):
client.stream_update_metadata("/newton/raw", meta3)
eq_(client.stream_get_metadata("/newton/prep"), meta1)
eq_(client.stream_get_metadata("/newton/raw"), meta1)
eq_(client.stream_get_metadata("/newton/raw", [ "description" ] ), meta2)
eq_(client.stream_get_metadata("/newton/raw", [ "description",
"v_scale" ] ), meta1)
eq_(client.stream_get_metadata("/newton/raw",
[ "description" ] ), meta2)
eq_(client.stream_get_metadata("/newton/raw",
[ "description", "v_scale" ] ), meta1)
# missing key
eq_(client.stream_get_metadata("/newton/raw", "descr"),
@@ -146,9 +168,14 @@ class TestClient(object):
client.stream_set_metadata("/newton/prep", [1,2,3])
with assert_raises(ClientError):
client.stream_update_metadata("/newton/prep", [1,2,3])
client.close()
def test_client_4_insert(self):
client = nilmdb.Client(url = "http://localhost:12380/")
def test_client_04_insert(self):
client = nilmdb.Client(url = testurl)
# Limit _max_data to 1 MB, since our test file is 1.5 MB
old_max_data = nilmdb.client.client.StreamInserter._max_data
nilmdb.client.client.StreamInserter._max_data = 1 * 1024 * 1024
datetime_tz.localtz_set("America/New_York")
@@ -177,12 +204,33 @@ class TestClient(object):
result = client.stream_insert("/newton/prep", data)
eq_(result, None)
# Try forcing a server request with empty data
# It's OK to insert an empty interval
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": 1, "end": 2 })
eq_(list(client.stream_intervals("/newton/prep")), [[1, 2]])
client.stream_remove("/newton/prep")
eq_(list(client.stream_intervals("/newton/prep")), [])
# Timestamps can be negative too
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": -2, "end": -1 })
eq_(list(client.stream_intervals("/newton/prep")), [[-2, -1]])
client.stream_remove("/newton/prep")
eq_(list(client.stream_intervals("/newton/prep")), [])
# Intervals that end at zero shouldn't be any different
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": -1, "end": 0 })
eq_(list(client.stream_intervals("/newton/prep")), [[-1, 0]])
client.stream_remove("/newton/prep")
eq_(list(client.stream_intervals("/newton/prep")), [])
# Try forcing a server request with equal start and end
with assert_raises(ClientError) as e:
client.http.put("stream/insert", "", { "path": "/newton/prep",
"start": 0, "end": 0 })
in_("400 Bad Request", str(e.exception))
in_("no data provided", str(e.exception))
in_("start must precede end", str(e.exception))
# Specify start/end (starts too late)
data = timestamper.TimestamperRate(testfile, start, 120)
@@ -201,14 +249,14 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception))
# Client chunks the input, so the exact timestamp here might change
# if the chunk positions change.
in_("Data timestamp 1332511271.016667 >= end time 1332511201.0",
str(e.exception))
assert(re.search("Data timestamp 13325[0-9]+\.[0-9]+ "
">= end time 1332511201.0", str(e.exception))
is not None)
# Now do the real load
data = timestamper.TimestamperRate(testfile, start, 120)
result = client.stream_insert("/newton/prep", data,
start, start + 119.999777)
eq_(result, "ok")
# Verify the intervals. Should be just one, even if the data
# was inserted in chunks, due to nilmdb interval concatenation.
@@ -222,20 +270,28 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception))
in_("verlap", str(e.exception))
def test_client_5_extractremove(self):
# Misc tests for extract and remove. Most of them are in test_cmdline.
client = nilmdb.Client(url = "http://localhost:12380/")
nilmdb.client.client.StreamInserter._max_data = old_max_data
client.close()
for x in client.stream_extract("/newton/prep", 123, 123):
def test_client_05_extractremove(self):
# Misc tests for extract and remove. Most of them are in test_cmdline.
client = nilmdb.Client(url = testurl)
for x in client.stream_extract("/newton/prep", 999123, 999124):
raise AssertionError("shouldn't be any data for this request")
with assert_raises(ClientError) as e:
client.stream_remove("/newton/prep", 123, 120)
def test_client_6_generators(self):
# Test count
eq_(client.stream_count("/newton/prep"), 14400)
client.close()
def test_client_06_generators(self):
# A lot of the client functionality is already tested by test_cmdline,
# but this gets a bit more coverage that cmdline misses.
client = nilmdb.Client(url = "http://localhost:12380/")
client = nilmdb.Client(url = testurl)
# Trigger a client error in generator
start = datetime_tz.datetime_tz.smartparse("20120323T2000")
@@ -246,7 +302,7 @@ class TestClient(object):
start.totimestamp(),
end.totimestamp()).next()
in_("400 Bad Request", str(e.exception))
in_("end before start", str(e.exception))
in_("start must precede end", str(e.exception))
# Trigger a curl error in generator
with assert_raises(ServerError) as e:
@@ -256,24 +312,6 @@ class TestClient(object):
with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl/").next()
# Check non-json version of string output
eq_(json.loads(client.http.get("/stream/list",retjson=False)),
client.http.get("/stream/list",retjson=True))
# Check non-json version of generator output
for (a, b) in itertools.izip(
client.http.get_gen("/stream/list",retjson=False),
client.http.get_gen("/stream/list",retjson=True)):
eq_(json.loads(a), b)
# Check PUT with generator out
with assert_raises(ClientError) as e:
client.http.put_gen("stream/insert", "",
{ "path": "/newton/prep",
"start": 0, "end": 0 }).next()
in_("400 Bad Request", str(e.exception))
in_("no data provided", str(e.exception))
# Check 404 for missing streams
for function in [ client.stream_intervals, client.stream_extract ]:
with assert_raises(ClientError) as e:
@@ -281,48 +319,58 @@ class TestClient(object):
in_("404 Not Found", str(e.exception))
in_("No such stream", str(e.exception))
def test_client_7_headers(self):
client.close()
def test_client_07_headers(self):
# Make sure that /stream/intervals and /stream/extract
# properly return streaming, chunked, text/plain response.
# Pokes around in client.http internals a bit to look at the
# response headers.
client = nilmdb.Client(url = "http://localhost:12380/")
client = nilmdb.Client(url = testurl)
http = client.http
# Use a warning rather than returning a test failure, so that we can
# still disable chunked responses for debugging.
# Use a warning rather than returning a test failure for the
# transfer-encoding, so that we can still disable chunked
# responses for debugging.
def headers():
h = ""
for (k, v) in http._last_response.headers.items():
h += k + ": " + v + "\n"
return h.lower()
# Intervals
x = http.get("stream/intervals", { "path": "/newton/prep" },
retjson=False)
lines_(x, 1)
if "Transfer-Encoding: chunked" not in http._headers:
x = http.get("stream/intervals", { "path": "/newton/prep" })
if "transfer-encoding: chunked" not in headers():
warnings.warn("Non-chunked HTTP response for /stream/intervals")
if "Content-Type: text/plain;charset=utf-8" not in http._headers:
raise AssertionError("/stream/intervals is not text/plain:\n" +
http._headers)
if "content-type: application/x-json-stream" not in headers():
raise AssertionError("/stream/intervals content type "
"is not application/x-json-stream:\n" +
headers())
# Extract
x = http.get("stream/extract",
{ "path": "/newton/prep",
"start": "123",
"end": "123" }, retjson=False)
if "Transfer-Encoding: chunked" not in http._headers:
"end": "124" })
if "transfer-encoding: chunked" not in headers():
warnings.warn("Non-chunked HTTP response for /stream/extract")
if "Content-Type: text/plain;charset=utf-8" not in http._headers:
if "content-type: text/plain;charset=utf-8" not in headers():
raise AssertionError("/stream/extract is not text/plain:\n" +
http._headers)
headers())
# Make sure Access-Control-Allow-Origin gets set
if "Access-Control-Allow-Origin: " not in http._headers:
if "access-control-allow-origin: " not in headers():
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
"header in /stream/extract response:\n" +
http._headers)
headers())
def test_client_8_unicode(self):
client.close()
def test_client_08_unicode(self):
# Basic Unicode tests
client = nilmdb.Client(url = "http://localhost:12380/")
client = nilmdb.Client(url = testurl)
# Delete streams that exist
for stream in client.stream_list():
@@ -356,3 +404,209 @@ class TestClient(object):
eq_(client.stream_get_metadata(raw[0]), meta1)
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
client.close()
def test_client_09_closing(self):
# Make sure we actually close sockets correctly. New
# connections will block for a while if they're not, since the
# server will stop accepting new connections.
for test in [1, 2]:
start = time.time()
for i in range(50):
if time.time() - start > 15:
raise AssertionError("Connections seem to be blocking... "
"probably not closing properly.")
if test == 1:
# explicit close
client = nilmdb.Client(url = testurl)
with assert_raises(ClientError) as e:
client.stream_remove("/newton/prep", 123, 120)
client.close() # remove this to see the failure
elif test == 2:
# use the context manager
with nilmdb.Client(url = testurl) as c:
with assert_raises(ClientError) as e:
c.stream_remove("/newton/prep", 123, 120)
def test_client_10_context(self):
# Test using the client's stream insertion context manager to
# insert data.
client = nilmdb.Client(testurl)
client.stream_create("/context/test", "uint16_1")
with client.stream_insert_context("/context/test") as ctx:
# override _max_data to trigger frequent server updates
ctx._max_data = 15
with assert_raises(ValueError):
ctx.insert_line("100 1")
ctx.insert_line("100 1\n")
ctx.insert_iter([ "101 1\n",
"102 1\n",
"103 1\n" ])
ctx.insert_line("104 1\n")
ctx.insert_line("105 1\n")
ctx.finalize()
ctx.insert_line("106 1\n")
ctx.update_end(106.5)
ctx.finalize()
ctx.update_start(106.8)
ctx.insert_line("107 1\n")
ctx.insert_line("108 1\n")
ctx.insert_line("109 1\n")
ctx.insert_line("110 1\n")
ctx.insert_line("111 1\n")
ctx.update_end(113)
ctx.insert_line("112 1\n")
ctx.update_end(114)
ctx.insert_line("113 1\n")
ctx.update_end(115)
ctx.insert_line("114 1\n")
ctx.finalize()
with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 100, 200) as ctx:
ctx.insert_line("115 1\n")
with assert_raises(ClientError):
with client.stream_insert_context("/context/test", 200, 300) as ctx:
ctx.insert_line("115 1\n")
with client.stream_insert_context("/context/test", 200, 300) as ctx:
# make sure our override wasn't permanent
ne_(ctx._max_data, 15)
ctx.insert_line("225 1\n")
ctx.finalize()
eq_(list(client.stream_intervals("/context/test")),
[ [ 100, 105.000001 ],
[ 106, 106.5 ],
[ 106.8, 115 ],
[ 200, 300 ] ])
client.stream_destroy("/context/test")
client.close()
def test_client_11_emptyintervals(self):
# Empty intervals are ok! If recording detection events
# by inserting rows into the database, we want to be able to
# have an interval where no events occurred. Test them here.
client = nilmdb.Client(testurl)
client.stream_create("/empty/test", "uint16_1")
def info():
result = []
for interval in list(client.stream_intervals("/empty/test")):
result.append((client.stream_count("/empty/test", *interval),
interval))
return result
eq_(info(), [])
# Insert a region with just a few points
with client.stream_insert_context("/empty/test") as ctx:
ctx.update_start(100)
ctx.insert_line("140 1\n")
ctx.insert_line("150 1\n")
ctx.insert_line("160 1\n")
ctx.update_end(200)
ctx.finalize()
eq_(info(), [(3, [100, 200])])
# Delete chunk, which will leave one data point and two intervals
client.stream_remove("/empty/test", 145, 175)
eq_(info(), [(1, [100, 145]),
(0, [175, 200])])
# Try also creating a completely empty interval from scratch,
# in a few different ways.
client.stream_insert_block("/empty/test", "", 300, 350)
client.stream_insert("/empty/test", [], 400, 450)
with client.stream_insert_context("/empty/test", 500, 550):
pass
# If enough timestamps aren't provided, empty streams won't be created.
client.stream_insert("/empty/test", [])
with client.stream_insert_context("/empty/test"):
pass
client.stream_insert("/empty/test", [], start = 600)
with client.stream_insert_context("/empty/test", start = 700):
pass
client.stream_insert("/empty/test", [], end = 850)
with client.stream_insert_context("/empty/test", end = 950):
pass
# Try various things that might cause problems
with client.stream_insert_context("/empty/test", 1000, 1050):
ctx.finalize() # inserts [1000, 1050]
ctx.finalize() # nothing
ctx.finalize() # nothing
ctx.insert_line("1100 1\n")
ctx.finalize() # inserts [1100, 1100.000001]
ctx.update_start(1199)
ctx.insert_line("1200 1\n")
ctx.update_end(1250)
ctx.finalize() # inserts [1199, 1250]
ctx.update_start(1299)
ctx.finalize() # nothing
ctx.update_end(1350)
ctx.finalize() # nothing
ctx.update_start(1400)
ctx.update_end(1450)
ctx.finalize()
# implicit last finalize inserts [1400, 1450]
# Check everything
eq_(info(), [(1, [100, 145]),
(0, [175, 200]),
(0, [300, 350]),
(0, [400, 450]),
(0, [500, 550]),
(0, [1000, 1050]),
(1, [1100, 1100.000001]),
(1, [1199, 1250]),
(0, [1400, 1450]),
])
# Clean up
client.stream_destroy("/empty/test")
client.close()
def test_client_12_persistent(self):
# Check that connections are persistent when they should be.
# This is pretty hard to test; we have to poke deep into
# the Requests library.
with nilmdb.Client(url = testurl) as c:
def connections():
try:
poolmanager = c.http._last_response.connection.poolmanager
pool = poolmanager.pools[('http','localhost',12380)]
return (pool.num_connections, pool.num_requests)
except:
raise SkipTest("can't get connection info")
# First request makes a connection
c.stream_create("/persist/test", "uint16_1")
eq_(connections(), (1, 1))
# Non-generator
c.stream_list("/persist/test")
eq_(connections(), (1, 2))
c.stream_list("/persist/test")
eq_(connections(), (1, 3))
# Generators
for x in c.stream_intervals("/persist/test"):
pass
eq_(connections(), (1, 4))
for x in c.stream_intervals("/persist/test"):
pass
eq_(connections(), (1, 5))
# Clean up
c.stream_destroy("/persist/test")
eq_(connections(), (1, 6))

View File

@@ -27,9 +27,10 @@ testdb = "tests/cmdline-testdb"
def server_start(max_results = None, bulkdata_args = {}):
global test_server, test_db
# Start web app on a custom port
test_db = nilmdb.NilmDB(testdb, sync = False,
max_results = max_results,
bulkdata_args = bulkdata_args)
test_db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(
testdb, sync = False,
max_results = max_results,
bulkdata_args = bulkdata_args)
test_server = nilmdb.Server(test_db, host = "127.0.0.1",
port = 12380, stoppable = False,
fast_shutdown = True,
@@ -162,16 +163,16 @@ class TestCmdline(object):
# try some URL constructions
self.fail("--url http://nosuchurl/ info")
self.contain("Couldn't resolve host 'nosuchurl'")
self.contain("error connecting to server")
self.fail("--url nosuchurl info")
self.contain("Couldn't resolve host 'nosuchurl'")
self.contain("error connecting to server")
self.fail("-u nosuchurl/foo info")
self.contain("Couldn't resolve host 'nosuchurl'")
self.contain("error connecting to server")
self.fail("-u localhost:0 info")
self.contain("couldn't connect to host")
self.fail("-u localhost:1 info")
self.contain("error connecting to server")
self.ok("-u localhost:12380 info")
self.ok("info")
@@ -191,14 +192,32 @@ class TestCmdline(object):
self.fail("extract --start 2000-01-01 --start 2001-01-02")
self.contain("duplicated argument")
def test_02_info(self):
def test_02_parsetime(self):
os.environ['TZ'] = "America/New_York"
test = datetime_tz.datetime_tz.now()
parse_time = nilmdb.utils.time.parse_time
eq_(parse_time(str(test)), test)
test = datetime_tz.datetime_tz.smartparse("20120405 1400-0400")
eq_(parse_time("hi there 20120405 1400-0400 testing! 123"), test)
eq_(parse_time("20120405 1800 UTC"), test)
eq_(parse_time("20120405 1400-0400 UTC"), test)
for badtime in [ "20120405 1400-9999", "hello", "-", "", "4:00" ]:
with assert_raises(ValueError):
x = parse_time(badtime)
x = parse_time("now")
eq_(parse_time("snapshot-20120405-140000.raw.gz"), test)
eq_(parse_time("prep-20120405T1400"), test)
def test_03_info(self):
self.ok("info")
self.contain("Server URL: http://localhost:12380/")
self.contain("Client version: " + nilmdb.__version__)
self.contain("Server version: " + test_server.version)
self.contain("Server database path")
self.contain("Server database size")
self.contain("Server database free space")
def test_03_createlist(self):
def test_04_createlist(self):
# Basic stream tests, like those in test_client.
# No streams
@@ -272,9 +291,9 @@ class TestCmdline(object):
# reversed range
self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01")
self.contain("start is after end")
self.contain("start must precede end")
def test_04_metadata(self):
def test_05_metadata(self):
# Set / get metadata
self.fail("metadata")
self.fail("metadata --get")
@@ -331,22 +350,6 @@ class TestCmdline(object):
self.fail("metadata /newton/nosuchpath")
self.contain("No stream at path /newton/nosuchpath")
def test_05_parsetime(self):
os.environ['TZ'] = "America/New_York"
cmd = nilmdb.cmdline.Cmdline(None)
test = datetime_tz.datetime_tz.now()
eq_(cmd.parse_time(str(test)), test)
test = datetime_tz.datetime_tz.smartparse("20120405 1400-0400")
eq_(cmd.parse_time("hi there 20120405 1400-0400 testing! 123"), test)
eq_(cmd.parse_time("20120405 1800 UTC"), test)
eq_(cmd.parse_time("20120405 1400-0400 UTC"), test)
for badtime in [ "20120405 1400-9999", "hello", "-", "", "4:00" ]:
with assert_raises(ValueError):
x = cmd.parse_time(badtime)
x = cmd.parse_time("now")
eq_(cmd.parse_time("snapshot-20120405-140000.raw.gz"), test)
eq_(cmd.parse_time("prep-20120405T1400"), test)
def test_06_insert(self):
self.ok("insert --help")
@@ -442,7 +445,7 @@ class TestCmdline(object):
self.contain("no intervals")
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'"
+ " --end='23 Mar 2012 10:05:15.50'")
+ " --end='23 Mar 2012 10:05:15.51'")
lines_(self.captured, 2)
self.contain("10:05:15.500")
@@ -471,29 +474,29 @@ class TestCmdline(object):
# empty ranges return error 2
self.fail("extract -a /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'",
"--start '23 Mar 2012 20:00:30' " +
"--end '23 Mar 2012 20:00:31'",
exitcode = 2, require_error = False)
self.contain("no data")
self.fail("extract -a /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'",
"--start '23 Mar 2012 20:00:30.000001' " +
"--end '23 Mar 2012 20:00:30.000002'",
exitcode = 2, require_error = False)
self.contain("no data")
self.fail("extract -a /newton/prep " +
"--start '23 Mar 2022 10:00:30' " +
"--end '23 Mar 2022 10:00:30'",
"--end '23 Mar 2022 10:00:31'",
exitcode = 2, require_error = False)
self.contain("no data")
# but are ok if we're just counting results
self.ok("extract --count /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
"--start '23 Mar 2012 20:00:30' " +
"--end '23 Mar 2012 20:00:31'")
self.match("0\n")
self.ok("extract -c /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'")
"--start '23 Mar 2012 20:00:30.000001' " +
"--end '23 Mar 2012 20:00:30.000002'")
self.match("0\n")
# Check various dumps against stored copies of how they should appear
@@ -540,31 +543,31 @@ class TestCmdline(object):
self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01")
self.contain("No stream at path")
# empty or backward ranges return errors
self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01")
self.contain("start is after end")
self.contain("start must precede end")
# empty ranges return success, backwards ranges return error
self.ok("remove /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
self.match("")
self.ok("remove /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'")
self.match("")
self.ok("remove /newton/prep " +
"--start '23 Mar 2022 10:00:30' " +
"--end '23 Mar 2022 10:00:30'")
self.match("")
self.fail("remove /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
self.contain("start must precede end")
self.fail("remove /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'")
self.contain("start must precede end")
self.fail("remove /newton/prep " +
"--start '23 Mar 2022 10:00:30' " +
"--end '23 Mar 2022 10:00:30'")
self.contain("start must precede end")
# Verbose
self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
"--start '23 Mar 2022 20:00:30' " +
"--end '23 Mar 2022 20:00:31'")
self.match("0\n")
self.ok("remove --count /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
"--start '23 Mar 2022 20:00:30' " +
"--end '23 Mar 2022 20:00:31'")
self.match("0\n")
# Make sure we have the data we expect
@@ -765,7 +768,7 @@ class TestCmdline(object):
"tests/data/prep-20120323T1000")
# Should take up about 2.8 MB here (including directory entries)
du_before = nilmdb.utils.diskusage.du_bytes(testdb)
du_before = nilmdb.utils.diskusage.du(testdb)
# Make sure we have the data we expect
self.ok("list --detail")
@@ -815,7 +818,7 @@ class TestCmdline(object):
# We have 1/8 of the data that we had before, so the file size
# should have dropped below 1/4 of what it used to be
du_after = nilmdb.utils.diskusage.du_bytes(testdb)
du_after = nilmdb.utils.diskusage.du(testdb)
lt_(du_after, (du_before / 4))
# Remove anything that came from the 10:02 data file

View File

@@ -55,7 +55,7 @@ class TestInterval:
for x in [ "03/24/2012", "03/25/2012", "03/26/2012" ] ]
# basic construction
i = Interval(d1, d1)
i = Interval(d1, d2)
i = Interval(d1, d3)
eq_(i.start, d1)
eq_(i.end, d3)
@@ -77,8 +77,8 @@ class TestInterval:
assert(Interval(d1, d3) > Interval(d1, d2))
assert(Interval(d1, d2) < Interval(d2, d3))
assert(Interval(d1, d3) < Interval(d2, d3))
assert(Interval(d2, d2) > Interval(d1, d3))
assert(Interval(d3, d3) == Interval(d3, d3))
assert(Interval(d2, d2+0.01) > Interval(d1, d3))
assert(Interval(d3, d3+0.01) == Interval(d3, d3+0.01))
#with assert_raises(TypeError): # was AttributeError, that's wrong
# x = (i == 123)
@@ -293,7 +293,7 @@ class TestIntervalDB:
# actual start, end can be a subset
a = DBInterval(150, 200, 100, 200, 10000, 20000)
b = DBInterval(100, 150, 100, 200, 10000, 20000)
c = DBInterval(150, 150, 100, 200, 10000, 20000)
c = DBInterval(150, 160, 100, 200, 10000, 20000)
# Make a set of DBIntervals
iseta = IntervalSet([a, b])

View File

@@ -246,7 +246,7 @@ class TestLayoutSpeed:
parser = Parser(layout)
formatter = Formatter(layout)
parser.parse(data)
data = formatter.format(parser.data)
formatter.format(parser.data)
elapsed = time.time() - start
printf("roundtrip %s: %d ms, %.1f μs/row, %d rows/sec\n",
layout,
@@ -264,3 +264,8 @@ class TestLayoutSpeed:
return [ sprintf("%d", random.randint(0,65535))
for x in range(10) ]
do_speedtest("uint16_10", datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(6) ]
do_speedtest("uint16_6", datagen)

View File

@@ -34,6 +34,10 @@ class Bar:
def __del__(self):
fprintf(err, "Deleting\n")
@classmethod
def baz(self):
fprintf(err, "Baz\n")
def close(self):
fprintf(err, "Closing\n")

View File

@@ -16,6 +16,8 @@ import Queue
import cStringIO
import time
from nilmdb.utils import serializer_proxy
testdb = "tests/testdb"
#@atexit.register
@@ -93,16 +95,28 @@ class Test00Nilmdb(object): # named 00 so it runs first
eq_(db.stream_get_metadata("/newton/prep"), meta1)
eq_(db.stream_get_metadata("/newton/raw"), meta1)
# fill in some test coverage for start >= end
with assert_raises(nilmdb.server.NilmDBError):
db.stream_remove("/newton/prep", 0, 0)
with assert_raises(nilmdb.server.NilmDBError):
db.stream_remove("/newton/prep", 1, 0)
db.stream_remove("/newton/prep", 0, 1)
db.close()
class TestBlockingServer(object):
def setUp(self):
self.db = nilmdb.NilmDB(testdb, sync=False)
self.db = serializer_proxy(nilmdb.NilmDB)(testdb, sync=False)
def tearDown(self):
self.db.close()
def test_blocking_server(self):
# Server should fail if the database doesn't have a "_thread_safe"
# property.
with assert_raises(KeyError):
nilmdb.Server(object())
# Start web app on a custom port
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
port = 12380, stoppable = True)
@@ -133,7 +147,7 @@ class TestServer(object):
def setUp(self):
# Start web app on a custom port
self.db = nilmdb.NilmDB(testdb, sync=False)
self.db = serializer_proxy(nilmdb.NilmDB)(testdb, sync=False)
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
port = 12380, stoppable = False)
self.server.start(blocking = False)
@@ -194,12 +208,3 @@ class TestServer(object):
data = getjson("/stream/get_metadata?path=/newton/prep"
"&key=foo")
eq_(data, {'foo': None})
def test_insert(self):
# GET instead of POST (no body)
# (actual POST test is done by client code)
with assert_raises(HTTPError) as e:
getjson("/stream/insert?path=/newton/prep&start=0&end=0")
eq_(e.exception.code, 400)

View File

@@ -9,16 +9,28 @@ import time
from testutil.helpers import *
#raise nose.exc.SkipTest("Skip these")
class Foo(object):
val = 0
def __init__(self, asdf = "asdf"):
self.init_thread = threading.current_thread().name
@classmethod
def foo(self):
pass
def fail(self):
raise Exception("you asked me to do this")
def test(self, debug = False):
self.tester(debug)
def t(self):
pass
def tester(self, debug = False):
# purposely not thread-safe
self.test_thread = threading.current_thread().name
oldval = self.val
newval = oldval + 1
time.sleep(0.05)
@@ -46,27 +58,29 @@ class Base(object):
t.join()
self.verify_result()
def verify_result(self):
eq_(self.foo.val, 20)
eq_(self.foo.init_thread, self.foo.test_thread)
class TestUnserialized(Base):
def setUp(self):
self.foo = Foo()
def verify_result(self):
# This should have failed to increment properly
assert(self.foo.val != 20)
ne_(self.foo.val, 20)
# Init and tests ran in different threads
ne_(self.foo.init_thread, self.foo.test_thread)
class TestSerialized(Base):
class TestSerializer(Base):
def setUp(self):
self.realfoo = Foo()
self.foo = nilmdb.utils.Serializer(self.realfoo)
self.foo = nilmdb.utils.serializer_proxy(Foo)("qwer")
def tearDown(self):
del self.foo
def verify_result(self):
# This should have worked
eq_(self.realfoo.val, 20)
def test_attribute(self):
# Can't wrap attributes yet
with assert_raises(TypeError):
self.foo.val
def test_multi(self):
sp = nilmdb.utils.serializer_proxy
sp(Foo("x")).t()
sp(sp(Foo)("x")).t()
sp(sp(Foo))("x").t()
sp(sp(Foo("x"))).t()
sp(sp(Foo)("x")).t()
sp(sp(Foo))("x").t()

View File

@@ -0,0 +1,96 @@
import nilmdb
from nilmdb.utils.printf import *
import nose
from nose.tools import *
from nose.tools import assert_raises
from testutil.helpers import *
import threading
class Thread(threading.Thread):
def __init__(self, target):
self.target = target
threading.Thread.__init__(self)
def run(self):
try:
self.target()
except AssertionError as e:
self.error = e
else:
self.error = None
class Test():
def __init__(self):
self.test = 1234
@classmethod
def asdf(cls):
pass
def foo(self, exception = False, reenter = False):
if exception:
raise Exception()
self.bar(reenter)
def bar(self, reenter):
if reenter:
self.foo()
return 123
def baz_threaded(self, target):
t = Thread(target)
t.start()
t.join()
return t
def baz(self, target):
target()
class TestThreadSafety(object):
def tryit(self, c, threading_ok, concurrent_ok):
eq_(c.test, 1234)
c.foo()
t = Thread(c.foo)
t.start()
t.join()
if threading_ok and t.error:
raise Exception("got unexpected error: " + str(t.error))
if not threading_ok and not t.error:
raise Exception("failed to get expected error")
try:
c.baz(c.foo)
except AssertionError as e:
if concurrent_ok:
raise Exception("got unexpected error: " + str(e))
else:
if not concurrent_ok:
raise Exception("failed to get expected error")
t = c.baz_threaded(c.foo)
if (concurrent_ok and threading_ok) and t.error:
raise Exception("got unexpected error: " + str(t.error))
if not (concurrent_ok and threading_ok) and not t.error:
raise Exception("failed to get expected error")
def test(self):
proxy = nilmdb.utils.threadsafety.verify_proxy
self.tryit(Test(), True, True)
self.tryit(proxy(Test(), True, True, True), False, False)
self.tryit(proxy(Test(), True, True, False), False, True)
self.tryit(proxy(Test(), True, False, True), True, False)
self.tryit(proxy(Test(), True, False, False), True, True)
self.tryit(proxy(Test, True, True, True)(), False, False)
self.tryit(proxy(Test, True, True, False)(), False, True)
self.tryit(proxy(Test, True, False, True)(), True, False)
self.tryit(proxy(Test, True, False, False)(), True, True)
proxy(proxy(proxy(Test))()).foo()
c = proxy(Test())
c.foo()
try:
c.foo(exception = True)
except Exception:
pass
c.foo()

View File

@@ -83,7 +83,7 @@ To use it:
import os, sys, re
from distutils.core import Command
from distutils.command.sdist import sdist as _sdist
from distutils.command.build import build as _build
from distutils.command.build_py import build_py as _build_py
versionfile_source = None
versionfile_build = None
@@ -578,11 +578,10 @@ class cmd_version(Command):
ver = get_version(verbose=True)
print("Version is currently: %s" % ver)
class cmd_build(_build):
class cmd_build_py(_build_py):
def run(self):
versions = get_versions(verbose=True)
_build.run(self)
_build_py.run(self)
# now locate _version.py in the new build/ directory and replace it
# with an updated value
target_versionfile = os.path.join(self.build_lib, versionfile_build)
@@ -651,6 +650,6 @@ class cmd_update_files(Command):
def get_cmdclass():
return {'version': cmd_version,
'update_files': cmd_update_files,
'build': cmd_build,
'build_py': cmd_build_py,
'sdist': cmd_sdist,
}