Compare commits
39 Commits
unicode
...
nilmdb-0.1
Author | SHA1 | Date | |
---|---|---|---|
85be497edb | |||
bd1b7107af | |||
b8275f108d | |||
2820ff9758 | |||
a015de893d | |||
b7f746e66d | |||
40cf4941f0 | |||
8a418ceb3e | |||
0312b6eb07 | |||
077f197d24 | |||
62354b4dce | |||
5970cd85cf | |||
4f6a742e6c | |||
87b43e5d04 | |||
f0c2a64ae3 | |||
e5d3deb6fe | |||
d321058b48 | |||
cea83140c0 | |||
7807d6caf0 | |||
3d0fad3c2a | |||
fe3b087435 | |||
bcefe52298 | |||
f88c148ccc | |||
4a47b1d04a | |||
80da937cb7 | |||
c81972e66e | |||
b09362fde1 | |||
b7688844fa | |||
3d212e7592 | |||
7aedfdf9c3 | |||
ebd4f74959 | |||
ebe2fbab92 | |||
4831a0cae1 | |||
07192c6ffb | |||
09d325e8ab | |||
11b0293d5f | |||
493bbed82c | |||
3bc25daaab | |||
40a3bc4bc3 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -2,3 +2,6 @@ db/
|
||||
tests/*testdb/
|
||||
.coverage
|
||||
*.pyc
|
||||
design.html
|
||||
timeit*out
|
||||
|
||||
|
7
Makefile
7
Makefile
@@ -8,11 +8,14 @@ tool:
|
||||
lint:
|
||||
pylint -f parseable nilmdb
|
||||
|
||||
%.html: %.md
|
||||
pandoc -s $< > $@
|
||||
|
||||
test:
|
||||
nosetests
|
||||
python runtests.py
|
||||
|
||||
profile:
|
||||
nosetests --with-profile
|
||||
python runtests.py --with-profile
|
||||
|
||||
clean::
|
||||
find . -name '*pyc' | xargs rm -f
|
||||
|
@@ -1,4 +1,3 @@
|
||||
sudo apt-get install python-nose python-coverage
|
||||
sudo apt-get install python-tables python-cherrypy3
|
||||
sudo apt-get install python2.7 python-cherrypy3 python-decorator python-nose python-coverage
|
||||
sudo apt-get install cython # 0.17.1-1 or newer
|
||||
|
||||
|
6
TODO
6
TODO
@@ -1 +1,5 @@
|
||||
- Merge adjacent intervals on insert (maybe with client help?)
|
||||
-- Clean up error responses. Specifically I'd like to be able to add
|
||||
json-formatted data to OverflowError and DB parsing errors. It
|
||||
seems like subclassing cherrypy.HTTPError and overriding
|
||||
set_response is the best thing to do -- it would let me get rid
|
||||
of the _be_ie_unfriendly and other hacks in the server.
|
224
design.md
224
design.md
@@ -1,11 +1,12 @@
|
||||
Structure
|
||||
---------
|
||||
nilmdb.nilmdb is the NILM database interface. It tracks a PyTables
|
||||
database holds actual rows of data, and a SQL database tracks metadata
|
||||
and ranges.
|
||||
nilmdb.nilmdb is the NILM database interface. A nilmdb.BulkData
|
||||
interface stores data in flat files, and a SQL database tracks
|
||||
metadata and ranges.
|
||||
|
||||
Access to the nilmdb must be single-threaded. This is handled with
|
||||
the nilmdb.serializer class.
|
||||
the nilmdb.serializer class. In the future this could probably
|
||||
be turned into a per-path serialization.
|
||||
|
||||
nilmdb.server is a HTTP server that provides an interface to talk,
|
||||
thorugh the serialization layer, to the nilmdb object.
|
||||
@@ -18,13 +19,13 @@ Sqlite performance
|
||||
Committing a transaction in the default sync mode (PRAGMA synchronous=FULL)
|
||||
takes about 125msec. sqlite3 will commit transactions at 3 times:
|
||||
|
||||
1: explicit con.commit()
|
||||
1. explicit con.commit()
|
||||
|
||||
2: between a series of DML commands and non-DML commands, e.g.
|
||||
2. between a series of DML commands and non-DML commands, e.g.
|
||||
after a series of INSERT, SELECT, but before a CREATE TABLE or
|
||||
PRAGMA.
|
||||
|
||||
3: at the end of an explicit transaction, e.g. "with self.con as con:"
|
||||
3. at the end of an explicit transaction, e.g. "with self.con as con:"
|
||||
|
||||
To speed up testing, or if this transaction speed becomes an issue,
|
||||
the sync=False option to NilmDB will set PRAGMA synchronous=OFF.
|
||||
@@ -47,101 +48,105 @@ transfer?
|
||||
everything still gets buffered. Just a tradeoff of buffer size.
|
||||
|
||||
Before timestamps are added:
|
||||
|
||||
- Raw data is about 440 kB/s (9 channels)
|
||||
- Prep data is about 12.5 kB/s (1 phase)
|
||||
- How do we know how much data to send?
|
||||
|
||||
- Remember that we can only do maybe 8-50 transactions per second on
|
||||
the sqlite database. So if one block of inserted data is one
|
||||
transaction, we'd need the raw case to be around 64kB per request,
|
||||
ideally more.
|
||||
- Maybe use a range, based on how long it's taking to read the data
|
||||
- If no more data, send it
|
||||
- If data > 1 MB, send it
|
||||
- If more than 10 seconds have elapsed, send it
|
||||
- Should those numbers come from the server?
|
||||
- Remember that we can only do maybe 8-50 transactions per second on
|
||||
the sqlite database. So if one block of inserted data is one
|
||||
transaction, we'd need the raw case to be around 64kB per request,
|
||||
ideally more.
|
||||
- Maybe use a range, based on how long it's taking to read the data
|
||||
- If no more data, send it
|
||||
- If data > 1 MB, send it
|
||||
- If more than 10 seconds have elapsed, send it
|
||||
- Should those numbers come from the server?
|
||||
|
||||
Converting from ASCII to PyTables:
|
||||
|
||||
- For each row getting added, we need to set attributes on a PyTables
|
||||
Row object and call table.append(). This means that there isn't a
|
||||
particularly efficient way of converting from ascii.
|
||||
- Could create a function like nilmdb.layout.Layout("foo".fillRow(asciiline)
|
||||
- But this means we're doing parsing on the serialized side
|
||||
- Let's keep parsing on the threaded server side so we can detect
|
||||
errors better, and not block the serialized nilmdb for a slow
|
||||
parsing process.
|
||||
- But this means we're doing parsing on the serialized side
|
||||
- Let's keep parsing on the threaded server side so we can detect
|
||||
errors better, and not block the serialized nilmdb for a slow
|
||||
parsing process.
|
||||
- Client sends ASCII data
|
||||
- Server converts this ACSII data to a list of values
|
||||
- Maybe:
|
||||
- Maybe:
|
||||
|
||||
# threaded side creates this object
|
||||
parser = nilmdb.layout.Parser("layout_name")
|
||||
# threaded side parses and fills it with data
|
||||
parser.parse(textdata)
|
||||
# serialized side pulls out rows
|
||||
for n in xrange(parser.nrows):
|
||||
parser.fill_row(rowinstance, n)
|
||||
table.append()
|
||||
# threaded side creates this object
|
||||
parser = nilmdb.layout.Parser("layout_name")
|
||||
# threaded side parses and fills it with data
|
||||
parser.parse(textdata)
|
||||
# serialized side pulls out rows
|
||||
for n in xrange(parser.nrows):
|
||||
parser.fill_row(rowinstance, n)
|
||||
table.append()
|
||||
|
||||
|
||||
Inserting streams, inside nilmdb
|
||||
--------------------------------
|
||||
|
||||
- First check that the new stream doesn't overlap.
|
||||
- Get minimum timestamp, maximum timestamp from data parser.
|
||||
- (extend parser to verify monotonicity and track extents)
|
||||
- Get all intervals for this stream in the database
|
||||
- See if new interval overlaps any existing ones
|
||||
- If so, bail
|
||||
- Question: should we cache intervals inside NilmDB?
|
||||
- Assume database is fast for now, and always rebuild fom DB.
|
||||
- Can add a caching layer later if we need to.
|
||||
- `stream_get_ranges(path)` -> return IntervalSet?
|
||||
- Get minimum timestamp, maximum timestamp from data parser.
|
||||
- (extend parser to verify monotonicity and track extents)
|
||||
- Get all intervals for this stream in the database
|
||||
- See if new interval overlaps any existing ones
|
||||
- If so, bail
|
||||
- Question: should we cache intervals inside NilmDB?
|
||||
- Assume database is fast for now, and always rebuild fom DB.
|
||||
- Can add a caching layer later if we need to.
|
||||
- `stream_get_ranges(path)` -> return IntervalSet?
|
||||
|
||||
Speed
|
||||
-----
|
||||
|
||||
- First approach was quadratic. Adding four hours of data:
|
||||
|
||||
$ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-110000 /bpnilm/1/raw
|
||||
real 24m31.093s
|
||||
$ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-120001 /bpnilm/1/raw
|
||||
real 43m44.528s
|
||||
$ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-130002 /bpnilm/1/raw
|
||||
real 93m29.713s
|
||||
$ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-140003 /bpnilm/1/raw
|
||||
real 166m53.007s
|
||||
$ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-110000 /bpnilm/1/raw
|
||||
real 24m31.093s
|
||||
$ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-120001 /bpnilm/1/raw
|
||||
real 43m44.528s
|
||||
$ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-130002 /bpnilm/1/raw
|
||||
real 93m29.713s
|
||||
$ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-140003 /bpnilm/1/raw
|
||||
real 166m53.007s
|
||||
|
||||
- Disabling pytables indexing didn't help:
|
||||
|
||||
real 31m21.492s
|
||||
real 52m51.963s
|
||||
real 102m8.151s
|
||||
real 176m12.469s
|
||||
real 31m21.492s
|
||||
real 52m51.963s
|
||||
real 102m8.151s
|
||||
real 176m12.469s
|
||||
|
||||
- Server RAM usage is constant.
|
||||
|
||||
- Speed problems were due to IntervalSet speed, of parsing intervals
|
||||
from the database and adding the new one each time.
|
||||
|
||||
- First optimization is to cache result of `nilmdb:_get_intervals`,
|
||||
which gives the best speedup.
|
||||
- First optimization is to cache result of `nilmdb:_get_intervals`,
|
||||
which gives the best speedup.
|
||||
|
||||
- Also switched to internally using bxInterval from bx-python package.
|
||||
Speed of `tests/test_interval:TestIntervalSpeed` is pretty decent
|
||||
and seems to be growing logarithmically now. About 85μs per insertion
|
||||
for inserting 131k entries.
|
||||
- Also switched to internally using bxInterval from bx-python package.
|
||||
Speed of `tests/test_interval:TestIntervalSpeed` is pretty decent
|
||||
and seems to be growing logarithmically now. About 85μs per insertion
|
||||
for inserting 131k entries.
|
||||
|
||||
- Storing the interval data in SQL might be better, with a scheme like:
|
||||
http://www.logarithmic.net/pfh/blog/01235197474
|
||||
- Storing the interval data in SQL might be better, with a scheme like:
|
||||
http://www.logarithmic.net/pfh/blog/01235197474
|
||||
|
||||
- Next slowdown target is nilmdb.layout.Parser.parse().
|
||||
- Rewrote parsers using cython and sscanf
|
||||
- Stats (rev 10831), with _add_interval disabled
|
||||
layout.pyx.Parser.parse:128 6303 sec, 262k calls
|
||||
layout.pyx.parse:63 13913 sec, 5.1g calls
|
||||
numpy:records.py.fromrecords:569 7410 sec, 262k calls
|
||||
- Probably OK for now.
|
||||
- Rewrote parsers using cython and sscanf
|
||||
- Stats (rev 10831), with _add_interval disabled
|
||||
|
||||
layout.pyx.Parser.parse:128 6303 sec, 262k calls
|
||||
layout.pyx.parse:63 13913 sec, 5.1g calls
|
||||
numpy:records.py.fromrecords:569 7410 sec, 262k calls
|
||||
|
||||
- Probably OK for now.
|
||||
|
||||
- After all updates, now takes about 8.5 minutes to insert an hour of
|
||||
data, constant after adding 171 hours (4.9 billion data points)
|
||||
@@ -156,12 +161,12 @@ IntervalSet speed
|
||||
sorted list
|
||||
|
||||
- Replaced with bxInterval; now takes about log n time for an insertion
|
||||
- TestIntervalSpeed with range(17,18) and profiling
|
||||
- 85 μs each
|
||||
- 131072 calls to `__iadd__`
|
||||
- 131072 to bx.insert_interval
|
||||
- 131072 to bx.insert:395
|
||||
- 2355835 to bx.insert:106 (18x as many?)
|
||||
- TestIntervalSpeed with range(17,18) and profiling
|
||||
- 85 μs each
|
||||
- 131072 calls to `__iadd__`
|
||||
- 131072 to bx.insert_interval
|
||||
- 131072 to bx.insert:395
|
||||
- 2355835 to bx.insert:106 (18x as many?)
|
||||
|
||||
- Tried blist too, worse than bxinterval.
|
||||
|
||||
@@ -172,14 +177,14 @@ IntervalSet speed
|
||||
insert for 2**17 insertions, followed by total wall time and RAM
|
||||
usage for running "make test" with `test_rbtree` and `test_interval`
|
||||
with range(5,20):
|
||||
- old values with bxinterval:
|
||||
20.2 μS, total 20 s, 177 MB RAM
|
||||
- rbtree, plain python:
|
||||
97 μS, total 105 s, 846 MB RAM
|
||||
- rbtree converted to cython:
|
||||
26 μS, total 29 s, 320 MB RAM
|
||||
- rbtree and interval converted to cython:
|
||||
8.4 μS, total 12 s, 134 MB RAM
|
||||
- old values with bxinterval:
|
||||
20.2 μS, total 20 s, 177 MB RAM
|
||||
- rbtree, plain python:
|
||||
97 μS, total 105 s, 846 MB RAM
|
||||
- rbtree converted to cython:
|
||||
26 μS, total 29 s, 320 MB RAM
|
||||
- rbtree and interval converted to cython:
|
||||
8.4 μS, total 12 s, 134 MB RAM
|
||||
|
||||
Layouts
|
||||
-------
|
||||
@@ -198,3 +203,66 @@ handlers. For compatibility:
|
||||
"RawData" == "uint16_6"
|
||||
"RawNotchedData" == "uint16_9"
|
||||
"PrepData" == "float32_8"
|
||||
|
||||
|
||||
BulkData design
|
||||
---------------
|
||||
|
||||
BulkData is a custom bulk data storage system that was written to
|
||||
replace PyTables. The general structure is a `data` subdirectory in
|
||||
the main NilmDB directory. Within `data`, paths are created for each
|
||||
created stream. These locations are called tables. For example,
|
||||
tables might be located at
|
||||
|
||||
nilmdb/data/newton/raw/
|
||||
nilmdb/data/newton/prep/
|
||||
nilmdb/data/cottage/raw/
|
||||
|
||||
Each table contains:
|
||||
|
||||
- An unchanging `_format` file (Python pickle format) that describes
|
||||
parameters of how the data is broken up, like files per directory,
|
||||
rows per file, and the binary data format
|
||||
|
||||
- Hex named subdirectories `("%04x", although more than 65536 can exist)`
|
||||
|
||||
- Hex named files within those subdirectories, like:
|
||||
|
||||
/nilmdb/data/newton/raw/000b/010a
|
||||
|
||||
The data format of these files is raw binary, interpreted by the
|
||||
Python `struct` module according to the format string in the
|
||||
`_format` file.
|
||||
|
||||
- Same as above, with `.removed` suffix, is an optional file (Python
|
||||
pickle format) containing a list of row numbers that have been
|
||||
logically removed from the file. If this range covers the entire
|
||||
file, the entire file will be removed.
|
||||
|
||||
- Note that the `bulkdata.nrows` variable is calculated once in
|
||||
`BulkData.__init__()`, and only ever incremented during use. Thus,
|
||||
even if all data is removed, `nrows` can remain high. However, if
|
||||
the server is restarted, the newly calculated `nrows` may be lower
|
||||
than in a previous run due to deleted data. To be specific, this
|
||||
sequence of events:
|
||||
|
||||
- insert data
|
||||
- remove all data
|
||||
- insert data
|
||||
|
||||
will result in having different row numbers in the database, and
|
||||
differently numbered files on the filesystem, than the sequence:
|
||||
|
||||
- insert data
|
||||
- remove all data
|
||||
- restart server
|
||||
- insert data
|
||||
|
||||
This is okay! Everything should remain consistent both in the
|
||||
`BulkData` and `NilmDB`. Not attempting to readjust `nrows` during
|
||||
deletion makes the code quite a bit simpler.
|
||||
|
||||
- Similarly, data files are never truncated shorter. Removing data
|
||||
from the end of the file will not shorten it; it will only be
|
||||
deleted when it has been fully filled and all of the data has been
|
||||
subsequently removed.
|
||||
|
@@ -11,17 +11,32 @@ import cPickle as pickle
|
||||
import struct
|
||||
import fnmatch
|
||||
import mmap
|
||||
import re
|
||||
|
||||
# Up to 256 open file descriptors at any given time
|
||||
# Up to 256 open file descriptors at any given time.
|
||||
# These variables are global so they can be used in the decorator arguments.
|
||||
table_cache_size = 16
|
||||
fd_cache_size = 16
|
||||
|
||||
@nilmdb.utils.must_close()
|
||||
@nilmdb.utils.must_close(wrap_verify = True)
|
||||
class BulkData(object):
|
||||
def __init__(self, basepath):
|
||||
def __init__(self, basepath, **kwargs):
|
||||
self.basepath = basepath
|
||||
self.root = os.path.join(self.basepath, "data")
|
||||
|
||||
# Tuneables
|
||||
if "file_size" in kwargs:
|
||||
self.file_size = kwargs["file_size"]
|
||||
else:
|
||||
# Default to approximately 128 MiB per file
|
||||
self.file_size = 128 * 1024 * 1024
|
||||
|
||||
if "files_per_dir" in kwargs:
|
||||
self.files_per_dir = kwargs["files_per_dir"]
|
||||
else:
|
||||
# 32768 files per dir should work even on FAT32
|
||||
self.files_per_dir = 32768
|
||||
|
||||
# Make root path
|
||||
if not os.path.isdir(self.root):
|
||||
os.mkdir(self.root)
|
||||
@@ -55,7 +70,8 @@ class BulkData(object):
|
||||
raise ValueError("paths must start with /")
|
||||
[ group, node ] = path.rsplit("/", 1)
|
||||
if group == '':
|
||||
raise ValueError("invalid path")
|
||||
raise ValueError("invalid path; path must contain at least one "
|
||||
"folder")
|
||||
|
||||
# Get layout, and build format string for struct module
|
||||
try:
|
||||
@@ -81,26 +97,24 @@ class BulkData(object):
|
||||
# Create the table. Note that we make a distinction here
|
||||
# between NilmDB paths (always Unix style, split apart
|
||||
# manually) and OS paths (built up with os.path.join)
|
||||
try:
|
||||
# Make directories leading up to this one
|
||||
elements = path.lstrip('/').split('/')
|
||||
for i in range(len(elements)):
|
||||
ospath = os.path.join(self.root, *elements[0:i])
|
||||
if Table.exists(ospath):
|
||||
raise ValueError("path is subdir of existing node")
|
||||
if not os.path.isdir(ospath):
|
||||
os.mkdir(ospath)
|
||||
|
||||
# Make the final dir
|
||||
ospath = os.path.join(self.root, *elements)
|
||||
if os.path.isdir(ospath):
|
||||
raise ValueError("subdirs of this path already exist")
|
||||
os.mkdir(ospath)
|
||||
# Make directories leading up to this one
|
||||
elements = path.lstrip('/').split('/')
|
||||
for i in range(len(elements)):
|
||||
ospath = os.path.join(self.root, *elements[0:i])
|
||||
if Table.exists(ospath):
|
||||
raise ValueError("path is subdir of existing node")
|
||||
if not os.path.isdir(ospath):
|
||||
os.mkdir(ospath)
|
||||
|
||||
# Write format string to file
|
||||
Table.create(ospath, struct_fmt)
|
||||
except OSError as e:
|
||||
raise ValueError("error creating table at that path: " + e.strerror)
|
||||
# Make the final dir
|
||||
ospath = os.path.join(self.root, *elements)
|
||||
if os.path.isdir(ospath):
|
||||
raise ValueError("subdirs of this path already exist")
|
||||
os.mkdir(ospath)
|
||||
|
||||
# Write format string to file
|
||||
Table.create(ospath, struct_fmt, self.file_size, self.files_per_dir)
|
||||
|
||||
# Open and cache it
|
||||
self.getnode(unicodepath)
|
||||
@@ -118,13 +132,16 @@ class BulkData(object):
|
||||
ospath = os.path.join(self.root, *elements)
|
||||
|
||||
# Remove Table object from cache
|
||||
self.getnode.cache_remove(self, ospath)
|
||||
self.getnode.cache_remove(self, unicodepath)
|
||||
|
||||
# Remove the contents of the target directory
|
||||
if not os.path.isfile(os.path.join(ospath, "format")):
|
||||
if not Table.exists(ospath):
|
||||
raise ValueError("nothing at that path")
|
||||
for file in os.listdir(ospath):
|
||||
os.remove(os.path.join(ospath, file))
|
||||
for (root, dirs, files) in os.walk(ospath, topdown = False):
|
||||
for name in files:
|
||||
os.remove(os.path.join(root, name))
|
||||
for name in dirs:
|
||||
os.rmdir(os.path.join(root, name))
|
||||
|
||||
# Remove empty parent directories
|
||||
for i in reversed(range(len(elements))):
|
||||
@@ -145,23 +162,32 @@ class BulkData(object):
|
||||
ospath = os.path.join(self.root, *elements)
|
||||
return Table(ospath)
|
||||
|
||||
@nilmdb.utils.must_close()
|
||||
@nilmdb.utils.must_close(wrap_verify = True)
|
||||
class Table(object):
|
||||
"""Tools to help access a single table (data at a specific OS path)"""
|
||||
"""Tools to help access a single table (data at a specific OS path)."""
|
||||
# See design.md for design details
|
||||
|
||||
# Class methods, to help keep format details in this class.
|
||||
@classmethod
|
||||
def exists(cls, root):
|
||||
"""Return True if a table appears to exist at this OS path"""
|
||||
return os.path.isfile(os.path.join(root, "format"))
|
||||
return os.path.isfile(os.path.join(root, "_format"))
|
||||
|
||||
@classmethod
|
||||
def create(cls, root, struct_fmt):
|
||||
def create(cls, root, struct_fmt, file_size, files_per_dir):
|
||||
"""Initialize a table at the given OS path.
|
||||
'struct_fmt' is a Struct module format description"""
|
||||
format = { "rows_per_file": 4 * 1024 * 1024,
|
||||
"struct_fmt": struct_fmt }
|
||||
with open(os.path.join(root, "format"), "wb") as f:
|
||||
|
||||
# Calculate rows per file so that each file is approximately
|
||||
# file_size bytes.
|
||||
packer = struct.Struct(struct_fmt)
|
||||
rows_per_file = max(file_size // packer.size, 1)
|
||||
|
||||
format = { "rows_per_file": rows_per_file,
|
||||
"files_per_dir": files_per_dir,
|
||||
"struct_fmt": struct_fmt,
|
||||
"version": 1 }
|
||||
with open(os.path.join(root, "_format"), "wb") as f:
|
||||
pickle.dump(format, f, 2)
|
||||
|
||||
# Normal methods
|
||||
@@ -170,64 +196,101 @@ class Table(object):
|
||||
self.root = root
|
||||
|
||||
# Load the format and build packer
|
||||
with open(self._fullpath("format"), "rb") as f:
|
||||
with open(os.path.join(self.root, "_format"), "rb") as f:
|
||||
format = pickle.load(f)
|
||||
|
||||
if format["version"] != 1: # pragma: no cover (just future proofing)
|
||||
raise NotImplementedError("version " + format["version"] +
|
||||
" bulk data store not supported")
|
||||
|
||||
self.rows_per_file = format["rows_per_file"]
|
||||
self.files_per_dir = format["files_per_dir"]
|
||||
self.packer = struct.Struct(format["struct_fmt"])
|
||||
self.file_size = self.packer.size * self.rows_per_file
|
||||
|
||||
# Find nrows by locating the lexicographically last filename
|
||||
# and using its size.
|
||||
pattern = '[0-9a-f]' * 8
|
||||
allfiles = fnmatch.filter(os.listdir(self.root), pattern)
|
||||
if allfiles:
|
||||
filename = max(allfiles)
|
||||
offset = os.path.getsize(self._fullpath(filename))
|
||||
self.nrows = self._row_from_fnoffset(filename, offset)
|
||||
else:
|
||||
self.nrows = 0
|
||||
# Find nrows
|
||||
self.nrows = self._get_nrows()
|
||||
|
||||
def close(self):
|
||||
self.mmap_open.cache_remove_all()
|
||||
|
||||
# Internal helpers
|
||||
def _fullpath(self, filename):
|
||||
return os.path.join(self.root, filename)
|
||||
def _get_nrows(self):
|
||||
"""Find nrows by locating the lexicographically last filename
|
||||
and using its size"""
|
||||
# Note that this just finds a 'nrows' that is guaranteed to be
|
||||
# greater than the row number of any piece of data that
|
||||
# currently exists, not necessarily all data that _ever_
|
||||
# existed.
|
||||
regex = re.compile("^[0-9a-f]{4,}$")
|
||||
|
||||
def _fnoffset_from_row(self, row):
|
||||
"""Return a (filename, offset, count) tuple:
|
||||
# Find the last directory. We sort and loop through all of them,
|
||||
# starting with the numerically greatest, because the dirs could be
|
||||
# empty if something was deleted.
|
||||
subdirs = sorted(filter(regex.search, os.listdir(self.root)),
|
||||
key = lambda x: int(x, 16), reverse = True)
|
||||
|
||||
for subdir in subdirs:
|
||||
# Now find the last file in that dir
|
||||
path = os.path.join(self.root, subdir)
|
||||
files = filter(regex.search, os.listdir(path))
|
||||
if not files: # pragma: no cover (shouldn't occur)
|
||||
# Empty dir: try the next one
|
||||
continue
|
||||
|
||||
# Find the numerical max
|
||||
filename = max(files, key = lambda x: int(x, 16))
|
||||
offset = os.path.getsize(os.path.join(self.root, subdir, filename))
|
||||
|
||||
# Convert to row number
|
||||
return self._row_from_offset(subdir, filename, offset)
|
||||
|
||||
# No files, so no data
|
||||
return 0
|
||||
|
||||
def _offset_from_row(self, row):
|
||||
"""Return a (subdir, filename, offset, count) tuple:
|
||||
|
||||
subdir: subdirectory for the file
|
||||
filename: the filename that contains the specified row
|
||||
offset: byte offset of the specified row within the file
|
||||
count: number of rows (starting at offste) that fit in the file
|
||||
count: number of rows (starting at offset) that fit in the file
|
||||
"""
|
||||
filenum = row // self.rows_per_file
|
||||
filename = sprintf("%08x", filenum)
|
||||
# It's OK if these format specifiers are too short; the filenames
|
||||
# will just get longer but will still sort correctly.
|
||||
dirname = sprintf("%04x", filenum // self.files_per_dir)
|
||||
filename = sprintf("%04x", filenum % self.files_per_dir)
|
||||
offset = (row % self.rows_per_file) * self.packer.size
|
||||
count = self.rows_per_file - (row % self.rows_per_file)
|
||||
return (filename, offset, count)
|
||||
return (dirname, filename, offset, count)
|
||||
|
||||
def _row_from_fnoffset(self, filename, offset):
|
||||
def _row_from_offset(self, subdir, filename, offset):
|
||||
"""Return the row number that corresponds to the given
|
||||
filename and byte-offset within that file."""
|
||||
filenum = int(filename, 16)
|
||||
if (offset % self.packer.size) != 0:
|
||||
'subdir/filename' and byte-offset within that file."""
|
||||
if (offset % self.packer.size) != 0: # pragma: no cover; shouldn't occur
|
||||
raise ValueError("file offset is not a multiple of data size")
|
||||
filenum = int(subdir, 16) * self.files_per_dir + int(filename, 16)
|
||||
row = (filenum * self.rows_per_file) + (offset // self.packer.size)
|
||||
return row
|
||||
|
||||
# Cache open files
|
||||
@nilmdb.utils.lru_cache(size = fd_cache_size,
|
||||
keys = slice(0,3), # exclude newsize
|
||||
onremove = lambda x: x.close())
|
||||
def mmap_open(self, file, newsize = None):
|
||||
"""Open and map a given filename (relative to self.root).
|
||||
def mmap_open(self, subdir, filename, newsize = None):
|
||||
"""Open and map a given 'subdir/filename' (relative to self.root).
|
||||
Will be automatically closed when evicted from the cache.
|
||||
|
||||
If 'newsize' is provided, the file is truncated to the given
|
||||
size before the mapping is returned. (Note that the LRU cache
|
||||
on this function means the truncate will only happen if the
|
||||
object isn't already cached; mmap.resize should be used too)"""
|
||||
f = open(os.path.join(self.root, file), "a+", 0)
|
||||
object isn't already cached; mmap.resize should be used too.)"""
|
||||
try:
|
||||
os.mkdir(os.path.join(self.root, subdir))
|
||||
except OSError:
|
||||
pass
|
||||
f = open(os.path.join(self.root, subdir, filename), "a+", 0)
|
||||
if newsize is not None:
|
||||
# mmap can't map a zero-length file, so this allows the
|
||||
# caller to set the filesize between file creation and
|
||||
@@ -236,6 +299,15 @@ class Table(object):
|
||||
mm = mmap.mmap(f.fileno(), 0)
|
||||
return mm
|
||||
|
||||
def mmap_open_resize(self, subdir, filename, newsize):
|
||||
"""Open and map a given 'subdir/filename' (relative to self.root).
|
||||
The file is resized to the given size."""
|
||||
# Pass new size to mmap_open
|
||||
mm = self.mmap_open(subdir, filename, newsize)
|
||||
# In case we got a cached copy, need to call mm.resize too.
|
||||
mm.resize(newsize)
|
||||
return mm
|
||||
|
||||
def append(self, data):
|
||||
"""Append the data and flush it to disk.
|
||||
data is a nested Python list [[row],[row],[...]]"""
|
||||
@@ -243,18 +315,13 @@ class Table(object):
|
||||
dataiter = iter(data)
|
||||
while remaining:
|
||||
# See how many rows we can fit into the current file, and open it
|
||||
(filename, offset, count) = self._fnoffset_from_row(self.nrows)
|
||||
(subdir, fname, offset, count) = self._offset_from_row(self.nrows)
|
||||
if count > remaining:
|
||||
count = remaining
|
||||
newsize = offset + count * self.packer.size
|
||||
mm = self.mmap_open(filename, newsize)
|
||||
mm = self.mmap_open_resize(subdir, fname, newsize)
|
||||
mm.seek(offset)
|
||||
|
||||
# Extend the file to the target length. We specified
|
||||
# newsize when opening, but that may have been ignored if
|
||||
# the mmap_open returned a cached object.
|
||||
mm.resize(newsize)
|
||||
|
||||
# Write the data
|
||||
for i in xrange(count):
|
||||
row = dataiter.next()
|
||||
@@ -282,10 +349,10 @@ class Table(object):
|
||||
row = key.start
|
||||
remaining = key.stop - key.start
|
||||
while remaining:
|
||||
(filename, offset, count) = self._fnoffset_from_row(row)
|
||||
(subdir, filename, offset, count) = self._offset_from_row(row)
|
||||
if count > remaining:
|
||||
count = remaining
|
||||
mm = self.mmap_open(filename)
|
||||
mm = self.mmap_open(subdir, filename)
|
||||
for i in xrange(count):
|
||||
ret.append(list(self.packer.unpack_from(mm, offset)))
|
||||
offset += self.packer.size
|
||||
@@ -296,10 +363,93 @@ class Table(object):
|
||||
# Handle single points
|
||||
if key < 0 or key >= self.nrows:
|
||||
raise IndexError("Index out of range")
|
||||
(filename, offset, count) = self._fnoffset_from_row(key)
|
||||
mm = self.mmap_open(filename)
|
||||
(subdir, filename, offset, count) = self._offset_from_row(key)
|
||||
mm = self.mmap_open(subdir, filename)
|
||||
# unpack_from ignores the mmap object's current seek position
|
||||
return self.packer.unpack_from(mm, offset)
|
||||
return list(self.packer.unpack_from(mm, offset))
|
||||
|
||||
def _remove_rows(self, subdir, filename, start, stop):
|
||||
"""Helper to mark specific rows as being removed from a
|
||||
file, and potentially removing or truncating the file itself."""
|
||||
# Import an existing list of deleted rows for this file
|
||||
datafile = os.path.join(self.root, subdir, filename)
|
||||
cachefile = datafile + ".removed"
|
||||
try:
|
||||
with open(cachefile, "rb") as f:
|
||||
ranges = pickle.load(f)
|
||||
cachefile_present = True
|
||||
except:
|
||||
ranges = []
|
||||
cachefile_present = False
|
||||
|
||||
# Append our new range and sort
|
||||
ranges.append((start, stop))
|
||||
ranges.sort()
|
||||
|
||||
# Merge adjacent ranges into "out"
|
||||
merged = []
|
||||
prev = None
|
||||
for new in ranges:
|
||||
if prev is None:
|
||||
# No previous range, so remember this one
|
||||
prev = new
|
||||
elif prev[1] == new[0]:
|
||||
# Previous range connected to this new one; extend prev
|
||||
prev = (prev[0], new[1])
|
||||
else:
|
||||
# Not connected; append previous and start again
|
||||
merged.append(prev)
|
||||
prev = new
|
||||
if prev is not None:
|
||||
merged.append(prev)
|
||||
|
||||
# If the range covered the whole file, we can delete it now.
|
||||
# Note that the last file in a table may be only partially
|
||||
# full (smaller than self.rows_per_file). We purposely leave
|
||||
# those files around rather than deleting them, because the
|
||||
# remainder will be filled on a subsequent append(), and things
|
||||
# are generally easier if we don't have to special-case that.
|
||||
if (len(merged) == 1 and
|
||||
merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
|
||||
# Close potentially open file in mmap_open LRU cache
|
||||
self.mmap_open.cache_remove(self, subdir, filename)
|
||||
|
||||
# Delete files
|
||||
os.remove(datafile)
|
||||
if cachefile_present:
|
||||
os.remove(cachefile)
|
||||
|
||||
# Try deleting subdir, too
|
||||
try:
|
||||
os.rmdir(os.path.join(self.root, subdir))
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
# Update cache. Try to do it atomically.
|
||||
nilmdb.utils.atomic.replace_file(cachefile,
|
||||
pickle.dumps(merged, 2))
|
||||
|
||||
def remove(self, start, stop):
|
||||
"""Remove specified rows [start, stop) from this table.
|
||||
|
||||
If a file is left empty, it is fully removed. Otherwise, a
|
||||
parallel data file is used to remember which rows have been
|
||||
removed, and the file is otherwise untouched."""
|
||||
if start < 0 or start > stop or stop > self.nrows:
|
||||
raise IndexError("Index out of range")
|
||||
|
||||
row = start
|
||||
remaining = stop - start
|
||||
while remaining:
|
||||
# Loop through each file that we need to touch
|
||||
(subdir, filename, offset, count) = self._offset_from_row(row)
|
||||
if count > remaining:
|
||||
count = remaining
|
||||
row_offset = offset // self.packer.size
|
||||
# Mark the rows as being removed
|
||||
self._remove_rows(subdir, filename, row_offset, row_offset + count)
|
||||
remaining -= count
|
||||
row += count
|
||||
|
||||
class TimestampOnlyTable(object):
|
||||
"""Helper that lets us pass a Tables object into bisect, by
|
||||
|
@@ -12,6 +12,7 @@ import os
|
||||
import simplejson as json
|
||||
import itertools
|
||||
|
||||
import nilmdb.utils
|
||||
import nilmdb.httpclient
|
||||
|
||||
# Other functions expect to see these in the nilmdb.client namespace
|
||||
@@ -96,6 +97,17 @@ class Client(object):
|
||||
params = { "path": path }
|
||||
return self.http.get("stream/destroy", params)
|
||||
|
||||
def stream_remove(self, path, start = None, end = None):
|
||||
"""Remove data from the specified time range"""
|
||||
params = {
|
||||
"path": path
|
||||
}
|
||||
if start is not None:
|
||||
params["start"] = float_to_string(start)
|
||||
if end is not None:
|
||||
params["end"] = float_to_string(end)
|
||||
return self.http.get("stream/remove", params)
|
||||
|
||||
def stream_insert(self, path, data, start = None, end = None):
|
||||
"""Insert data into a stream. data should be a file-like object
|
||||
that provides ASCII data that matches the database layout for path.
|
||||
@@ -114,11 +126,6 @@ class Client(object):
|
||||
max_time = 30
|
||||
end_epsilon = 1e-6
|
||||
|
||||
def pairwise(iterable):
|
||||
"s -> (s0,s1), (s1,s2), ..., (sn,None)"
|
||||
a, b = itertools.tee(iterable)
|
||||
next(b, None)
|
||||
return itertools.izip_longest(a, b)
|
||||
|
||||
def extract_timestamp(line):
|
||||
return float(line.split()[0])
|
||||
@@ -148,7 +155,7 @@ class Client(object):
|
||||
block_data = ""
|
||||
block_start = start
|
||||
result = None
|
||||
for (line, nextline) in pairwise(data):
|
||||
for (line, nextline) in nilmdb.utils.misc.pairwise(data):
|
||||
# If we don't have a starting time, extract it from the first line
|
||||
if block_start is None:
|
||||
block_start = extract_timestamp(line)
|
||||
|
@@ -11,12 +11,12 @@ import re
|
||||
import argparse
|
||||
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
||||
|
||||
version = "0.1"
|
||||
version = "1.0"
|
||||
|
||||
# Valid subcommands. Defined in separate files just to break
|
||||
# things up -- they're still called with Cmdline as self.
|
||||
subcommands = [ "info", "create", "list", "metadata", "insert", "extract",
|
||||
"destroy" ]
|
||||
"remove", "destroy" ]
|
||||
|
||||
# Import the subcommand modules. Equivalent way of doing this would be
|
||||
# from . import info as cmd_info
|
||||
@@ -24,10 +24,16 @@ subcmd_mods = {}
|
||||
for cmd in subcommands:
|
||||
subcmd_mods[cmd] = __import__("nilmdb.cmdline." + cmd, fromlist = [ cmd ])
|
||||
|
||||
class JimArgumentParser(argparse.ArgumentParser):
|
||||
def error(self, message):
|
||||
self.print_usage(sys.stderr)
|
||||
self.exit(2, sprintf("error: %s\n", message))
|
||||
|
||||
class Cmdline(object):
|
||||
|
||||
def __init__(self, argv):
|
||||
self.argv = argv
|
||||
self.client = None
|
||||
|
||||
def arg_time(self, toparse):
|
||||
"""Parse a time string argument"""
|
||||
@@ -93,8 +99,8 @@ class Cmdline(object):
|
||||
version_string = sprintf("nilmtool %s, client library %s",
|
||||
version, nilmdb.Client.client_version)
|
||||
|
||||
self.parser = argparse.ArgumentParser(add_help = False,
|
||||
formatter_class = def_form)
|
||||
self.parser = JimArgumentParser(add_help = False,
|
||||
formatter_class = def_form)
|
||||
|
||||
group = self.parser.add_argument_group("General options")
|
||||
group.add_argument("-h", "--help", action='help',
|
||||
@@ -119,7 +125,8 @@ class Cmdline(object):
|
||||
|
||||
def die(self, formatstr, *args):
|
||||
fprintf(sys.stderr, formatstr + "\n", *args)
|
||||
self.client.close()
|
||||
if self.client:
|
||||
self.client.close()
|
||||
sys.exit(-1)
|
||||
|
||||
def run(self):
|
||||
@@ -131,13 +138,17 @@ class Cmdline(object):
|
||||
self.parser_setup()
|
||||
self.args = self.parser.parse_args(self.argv)
|
||||
|
||||
# Run arg verify handler if there is one
|
||||
if "verify" in self.args:
|
||||
self.args.verify(self)
|
||||
|
||||
self.client = nilmdb.Client(self.args.url)
|
||||
|
||||
# Make a test connection to make sure things work
|
||||
try:
|
||||
server_version = self.client.version()
|
||||
except nilmdb.client.Error as e:
|
||||
self.die("Error connecting to server: %s", str(e))
|
||||
self.die("error connecting to server: %s", str(e))
|
||||
|
||||
# Now dispatch client request to appropriate function. Parser
|
||||
# should have ensured that we don't have any unknown commands
|
||||
|
@@ -24,4 +24,4 @@ def cmd_create(self):
|
||||
try:
|
||||
self.client.stream_create(self.args.path, self.args.layout)
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("Error creating stream: %s", str(e))
|
||||
self.die("error creating stream: %s", str(e))
|
||||
|
@@ -22,4 +22,4 @@ def cmd_destroy(self):
|
||||
try:
|
||||
self.client.stream_destroy(self.args.path)
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("Error deleting stream: %s", str(e))
|
||||
self.die("error destroying stream: %s", str(e))
|
||||
|
@@ -9,17 +9,18 @@ def setup(self, sub):
|
||||
description="""
|
||||
Extract data from a stream.
|
||||
""")
|
||||
cmd.set_defaults(handler = cmd_extract)
|
||||
cmd.set_defaults(verify = cmd_extract_verify,
|
||||
handler = cmd_extract)
|
||||
|
||||
group = cmd.add_argument_group("Data selection")
|
||||
group.add_argument("path",
|
||||
help="Path of stream, e.g. /foo/bar")
|
||||
group.add_argument("-s", "--start", required=True,
|
||||
metavar="TIME", type=self.arg_time,
|
||||
help="Starting timestamp (free-form)")
|
||||
help="Starting timestamp (free-form, inclusive)")
|
||||
group.add_argument("-e", "--end", required=True,
|
||||
metavar="TIME", type=self.arg_time,
|
||||
help="Ending timestamp (free-form)")
|
||||
help="Ending timestamp (free-form, noninclusive)")
|
||||
|
||||
group = cmd.add_argument_group("Output format")
|
||||
group.add_argument("-b", "--bare", action="store_true",
|
||||
@@ -30,10 +31,15 @@ def setup(self, sub):
|
||||
group.add_argument("-c", "--count", action="store_true",
|
||||
help="Just output a count of matched data points")
|
||||
|
||||
def cmd_extract_verify(self):
|
||||
if self.args.start is not None and self.args.end is not None:
|
||||
if self.args.start > self.args.end:
|
||||
self.parser.error("start is after end")
|
||||
|
||||
def cmd_extract(self):
|
||||
streams = self.client.stream_list(self.args.path)
|
||||
if len(streams) != 1:
|
||||
self.die("Error getting stream info for path %s", self.args.path)
|
||||
self.die("error getting stream info for path %s", self.args.path)
|
||||
layout = streams[0][1]
|
||||
|
||||
if self.args.annotate:
|
||||
|
@@ -51,12 +51,12 @@ def cmd_insert(self):
|
||||
# Find requested stream
|
||||
streams = self.client.stream_list(self.args.path)
|
||||
if len(streams) != 1:
|
||||
self.die("Error getting stream info for path %s", self.args.path)
|
||||
self.die("error getting stream info for path %s", self.args.path)
|
||||
|
||||
layout = streams[0][1]
|
||||
|
||||
if self.args.start and len(self.args.file) != 1:
|
||||
self.die("--start can only be used with one input file, for now")
|
||||
self.die("error: --start can only be used with one input file")
|
||||
|
||||
for filename in self.args.file:
|
||||
if filename == '-':
|
||||
@@ -65,7 +65,7 @@ def cmd_insert(self):
|
||||
try:
|
||||
infile = open(filename, "r")
|
||||
except IOError:
|
||||
self.die("Error opening input file %s", filename)
|
||||
self.die("error opening input file %s", filename)
|
||||
|
||||
# Build a timestamper for this file
|
||||
if self.args.none:
|
||||
@@ -77,11 +77,11 @@ def cmd_insert(self):
|
||||
try:
|
||||
start = self.parse_time(filename)
|
||||
except ValueError:
|
||||
self.die("Error extracting time from filename '%s'",
|
||||
self.die("error extracting time from filename '%s'",
|
||||
filename)
|
||||
|
||||
if not self.args.rate:
|
||||
self.die("Need to specify --rate")
|
||||
self.die("error: --rate is needed, but was not specified")
|
||||
rate = self.args.rate
|
||||
|
||||
ts = nilmdb.timestamper.TimestamperRate(infile, start, rate)
|
||||
@@ -100,6 +100,6 @@ def cmd_insert(self):
|
||||
# ugly bracketed ranges of 16-digit numbers and a mangled URL.
|
||||
# Need to consider adding something like e.prettyprint()
|
||||
# that is smarter about the contents of the error.
|
||||
self.die("Error inserting data: %s", str(e))
|
||||
self.die("error inserting data: %s", str(e))
|
||||
|
||||
return
|
||||
|
@@ -3,6 +3,7 @@ from nilmdb.utils.printf import *
|
||||
import nilmdb.client
|
||||
|
||||
import fnmatch
|
||||
import argparse
|
||||
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
||||
|
||||
def setup(self, sub):
|
||||
@@ -13,23 +14,41 @@ def setup(self, sub):
|
||||
optionally filtering by layout or path. Wildcards
|
||||
are accepted.
|
||||
""")
|
||||
cmd.set_defaults(handler = cmd_list)
|
||||
cmd.set_defaults(verify = cmd_list_verify,
|
||||
handler = cmd_list)
|
||||
|
||||
group = cmd.add_argument_group("Stream filtering")
|
||||
group.add_argument("-p", "--path", metavar="PATH", default="*",
|
||||
help="Match only this path (-p can be omitted)")
|
||||
group.add_argument("path_positional", default="*",
|
||||
nargs="?", help=argparse.SUPPRESS)
|
||||
group.add_argument("-l", "--layout", default="*",
|
||||
help="Match only this stream layout")
|
||||
group.add_argument("-p", "--path", default="*",
|
||||
help="Match only this path")
|
||||
|
||||
group = cmd.add_argument_group("Interval details")
|
||||
group.add_argument("-d", "--detail", action="store_true",
|
||||
help="Show available data time intervals")
|
||||
group.add_argument("-s", "--start",
|
||||
metavar="TIME", type=self.arg_time,
|
||||
help="Starting timestamp (free-form)")
|
||||
help="Starting timestamp (free-form, inclusive)")
|
||||
group.add_argument("-e", "--end",
|
||||
metavar="TIME", type=self.arg_time,
|
||||
help="Ending timestamp (free-form)")
|
||||
help="Ending timestamp (free-form, noninclusive)")
|
||||
|
||||
def cmd_list_verify(self):
|
||||
# A hidden "path_positional" argument lets the user leave off the
|
||||
# "-p" when specifying the path. Handle it here.
|
||||
got_opt = self.args.path != "*"
|
||||
got_pos = self.args.path_positional != "*"
|
||||
if got_pos:
|
||||
if got_opt:
|
||||
self.parser.error("too many paths specified")
|
||||
else:
|
||||
self.args.path = self.args.path_positional
|
||||
|
||||
if self.args.start is not None and self.args.end is not None:
|
||||
if self.args.start > self.args.end:
|
||||
self.parser.error("start is after end")
|
||||
|
||||
def cmd_list(self):
|
||||
"""List available streams"""
|
||||
|
@@ -43,21 +43,21 @@ def cmd_metadata(self):
|
||||
for keyval in keyvals:
|
||||
kv = keyval.split('=')
|
||||
if len(kv) != 2 or kv[0] == "":
|
||||
self.die("Error parsing key=value argument '%s'", keyval)
|
||||
self.die("error parsing key=value argument '%s'", keyval)
|
||||
data[kv[0]] = kv[1]
|
||||
|
||||
# Make the call
|
||||
try:
|
||||
handler(self.args.path, data)
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("Error setting/updating metadata: %s", str(e))
|
||||
self.die("error setting/updating metadata: %s", str(e))
|
||||
else:
|
||||
# Get (or unspecified)
|
||||
keys = self.args.get or None
|
||||
try:
|
||||
data = self.client.stream_get_metadata(self.args.path, keys)
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("Error getting metadata: %s", str(e))
|
||||
self.die("error getting metadata: %s", str(e))
|
||||
for key, value in sorted(data.items()):
|
||||
# Omit nonexistant keys
|
||||
if value is None:
|
||||
|
45
nilmdb/cmdline/remove.py
Normal file
45
nilmdb/cmdline/remove.py
Normal file
@@ -0,0 +1,45 @@
|
||||
from __future__ import absolute_import
|
||||
from __future__ import print_function
|
||||
from nilmdb.utils.printf import *
|
||||
import nilmdb.client
|
||||
import sys
|
||||
|
||||
def setup(self, sub):
|
||||
cmd = sub.add_parser("remove", help="Remove data",
|
||||
description="""
|
||||
Remove all data from a specified time range within a
|
||||
stream.
|
||||
""")
|
||||
cmd.set_defaults(verify = cmd_remove_verify,
|
||||
handler = cmd_remove)
|
||||
|
||||
group = cmd.add_argument_group("Data selection")
|
||||
group.add_argument("path",
|
||||
help="Path of stream, e.g. /foo/bar")
|
||||
group.add_argument("-s", "--start", required=True,
|
||||
metavar="TIME", type=self.arg_time,
|
||||
help="Starting timestamp (free-form, inclusive)")
|
||||
group.add_argument("-e", "--end", required=True,
|
||||
metavar="TIME", type=self.arg_time,
|
||||
help="Ending timestamp (free-form, noninclusive)")
|
||||
|
||||
group = cmd.add_argument_group("Output format")
|
||||
group.add_argument("-c", "--count", action="store_true",
|
||||
help="Output number of data points removed")
|
||||
|
||||
def cmd_remove_verify(self):
|
||||
if self.args.start is not None and self.args.end is not None:
|
||||
if self.args.start > self.args.end:
|
||||
self.parser.error("start is after end")
|
||||
|
||||
def cmd_remove(self):
|
||||
try:
|
||||
count = self.client.stream_remove(self.args.path,
|
||||
self.args.start, self.args.end)
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("error removing data: %s", str(e))
|
||||
|
||||
if self.args.count:
|
||||
printf("%d\n", count)
|
||||
|
||||
return 0
|
@@ -26,12 +26,19 @@ class Error(Exception):
|
||||
self.url = url # URL we were requesting
|
||||
self.traceback = traceback # server traceback, if available
|
||||
def __str__(self):
|
||||
s = sprintf("[%s]", self.status)
|
||||
if self.message:
|
||||
s += sprintf(" %s", self.message)
|
||||
if self.traceback: # pragma: no cover
|
||||
s += sprintf("\nServer traceback:\n%s", self.traceback)
|
||||
return s
|
||||
def __repr__(self): # pragma: no cover
|
||||
s = sprintf("[%s]", self.status)
|
||||
if self.message:
|
||||
s += sprintf(" %s", self.message)
|
||||
if self.url:
|
||||
s += sprintf(" (%s)", self.url)
|
||||
if self.traceback: # pragma: no cover
|
||||
if self.traceback:
|
||||
s += sprintf("\nServer traceback:\n%s", self.traceback)
|
||||
return s
|
||||
class ClientError(Error):
|
||||
@@ -59,7 +66,7 @@ class HTTPClient(object):
|
||||
url = urlparse.urljoin(self.baseurl, url)
|
||||
if params:
|
||||
url = urlparse.urljoin(
|
||||
url, "?" + nilmdb.utils.urllib.urlencode(params, True))
|
||||
url, "?" + nilmdb.utils.urllib.urlencode(params))
|
||||
self.curl.setopt(pycurl.URL, url)
|
||||
self.url = url
|
||||
|
||||
|
@@ -37,6 +37,7 @@ cdef class Interval:
|
||||
'start' and 'end' are arbitrary floats that represent time
|
||||
"""
|
||||
if start > end:
|
||||
# Explicitly disallow zero-width intervals (since they're half-open)
|
||||
raise IntervalError("start %s must precede end %s" % (start, end))
|
||||
self.start = float(start)
|
||||
self.end = float(end)
|
||||
@@ -278,7 +279,7 @@ cdef class IntervalSet:
|
||||
|
||||
return out
|
||||
|
||||
def intersection(self, Interval interval not None):
|
||||
def intersection(self, Interval interval not None, orig = False):
|
||||
"""
|
||||
Compute a sequence of intervals that correspond to the
|
||||
intersection between `self` and the provided interval.
|
||||
@@ -287,6 +288,10 @@ cdef class IntervalSet:
|
||||
|
||||
Output intervals are built as subsets of the intervals in the
|
||||
first argument (self).
|
||||
|
||||
If orig = True, also return the original interval that was
|
||||
(potentially) subsetted to make the one that is being
|
||||
returned.
|
||||
"""
|
||||
if not isinstance(interval, Interval):
|
||||
raise TypeError("bad type")
|
||||
@@ -294,11 +299,17 @@ cdef class IntervalSet:
|
||||
i = n.obj
|
||||
if i:
|
||||
if i.start >= interval.start and i.end <= interval.end:
|
||||
yield i
|
||||
if orig:
|
||||
yield (i, i)
|
||||
else:
|
||||
yield i
|
||||
else:
|
||||
subset = i.subset(max(i.start, interval.start),
|
||||
min(i.end, interval.end))
|
||||
yield subset
|
||||
if orig:
|
||||
yield (subset, i)
|
||||
else:
|
||||
yield subset
|
||||
|
||||
cpdef intersects(self, Interval other):
|
||||
"""Return True if this IntervalSet intersects another interval"""
|
||||
|
154
nilmdb/nilmdb.py
154
nilmdb/nilmdb.py
@@ -80,7 +80,7 @@ _sql_schema_updates = {
|
||||
class NilmDBError(Exception):
|
||||
"""Base exception for NilmDB errors"""
|
||||
def __init__(self, message = "Unspecified error"):
|
||||
Exception.__init__(self, self.__class__.__name__ + ": " + message)
|
||||
Exception.__init__(self, message)
|
||||
|
||||
class StreamError(NilmDBError):
|
||||
pass
|
||||
@@ -92,7 +92,8 @@ class OverlapError(NilmDBError):
|
||||
class NilmDB(object):
|
||||
verbose = 0
|
||||
|
||||
def __init__(self, basepath, sync=True, max_results=None):
|
||||
def __init__(self, basepath, sync=True, max_results=None,
|
||||
bulkdata_args={}):
|
||||
# set up path
|
||||
self.basepath = os.path.abspath(basepath)
|
||||
|
||||
@@ -104,7 +105,7 @@ class NilmDB(object):
|
||||
raise IOError("can't create tree " + self.basepath)
|
||||
|
||||
# Our data goes inside it
|
||||
self.data = bulkdata.BulkData(self.basepath)
|
||||
self.data = bulkdata.BulkData(self.basepath, **bulkdata_args)
|
||||
|
||||
# SQLite database too
|
||||
sqlfilename = os.path.join(self.basepath, "data.sql")
|
||||
@@ -173,6 +174,20 @@ class NilmDB(object):
|
||||
|
||||
return iset
|
||||
|
||||
def _sql_interval_insert(self, id, start, end, start_pos, end_pos):
|
||||
"""Helper that adds interval to the SQL database only"""
|
||||
self.con.execute("INSERT INTO ranges "
|
||||
"(stream_id,start_time,end_time,start_pos,end_pos) "
|
||||
"VALUES (?,?,?,?,?)",
|
||||
(id, start, end, start_pos, end_pos))
|
||||
|
||||
def _sql_interval_delete(self, id, start, end, start_pos, end_pos):
|
||||
"""Helper that removes interval from the SQL database only"""
|
||||
self.con.execute("DELETE FROM ranges WHERE "
|
||||
"stream_id=? AND start_time=? AND "
|
||||
"end_time=? AND start_pos=? AND end_pos=?",
|
||||
(id, start, end, start_pos, end_pos))
|
||||
|
||||
def _add_interval(self, stream_id, interval, start_pos, end_pos):
|
||||
"""
|
||||
Add interval to the internal interval cache, and to the database.
|
||||
@@ -191,7 +206,7 @@ class NilmDB(object):
|
||||
# time range [adjacent.start -> interval.end)
|
||||
# and database rows [ adjacent.start_pos -> end_pos ].
|
||||
# Only do this if the resulting interval isn't too large.
|
||||
max_merged_rows = 30000000 # a bit more than 1 hour at 8 KHz
|
||||
max_merged_rows = 8000 * 60 * 60 * 1.05 # 1.05 hours at 8 KHz
|
||||
adjacent = iset.find_end(interval.start)
|
||||
if (adjacent is not None and
|
||||
start_pos == adjacent.db_endpos and
|
||||
@@ -199,14 +214,9 @@ class NilmDB(object):
|
||||
# First delete the old one, both from our iset and the
|
||||
# database
|
||||
iset -= adjacent
|
||||
self.con.execute("DELETE FROM ranges WHERE "
|
||||
"stream_id=? AND start_time=? AND "
|
||||
"end_time=? AND start_pos=? AND "
|
||||
"end_pos=?", (stream_id,
|
||||
adjacent.db_start,
|
||||
adjacent.db_end,
|
||||
adjacent.db_startpos,
|
||||
adjacent.db_endpos))
|
||||
self._sql_interval_delete(stream_id,
|
||||
adjacent.db_start, adjacent.db_end,
|
||||
adjacent.db_startpos, adjacent.db_endpos)
|
||||
|
||||
# Now update our interval so the fallthrough add is
|
||||
# correct.
|
||||
@@ -219,14 +229,54 @@ class NilmDB(object):
|
||||
start_pos, end_pos))
|
||||
|
||||
# Insert into the database
|
||||
self.con.execute("INSERT INTO ranges "
|
||||
"(stream_id,start_time,end_time,start_pos,end_pos) "
|
||||
"VALUES (?,?,?,?,?)",
|
||||
(stream_id, interval.start, interval.end,
|
||||
int(start_pos), int(end_pos)))
|
||||
self._sql_interval_insert(stream_id, interval.start, interval.end,
|
||||
int(start_pos), int(end_pos))
|
||||
|
||||
self.con.commit()
|
||||
|
||||
def _remove_interval(self, stream_id, original, remove):
|
||||
"""
|
||||
Remove an interval from the internal cache and the database.
|
||||
|
||||
stream_id: id of stream
|
||||
original: original DBInterval; must be already present in DB
|
||||
to_remove: DBInterval to remove; must be subset of 'original'
|
||||
"""
|
||||
# Just return if we have nothing to remove
|
||||
if remove.start == remove.end: # pragma: no cover
|
||||
return
|
||||
|
||||
# Load this stream's intervals
|
||||
iset = self._get_intervals(stream_id)
|
||||
|
||||
# Remove existing interval from the cached set and the database
|
||||
iset -= original
|
||||
self._sql_interval_delete(stream_id,
|
||||
original.db_start, original.db_end,
|
||||
original.db_startpos, original.db_endpos)
|
||||
|
||||
# Add back the intervals that would be left over if the
|
||||
# requested interval is removed. There may be two of them, if
|
||||
# the removed piece was in the middle.
|
||||
def add(iset, start, end, start_pos, end_pos):
|
||||
iset += DBInterval(start, end, start, end, start_pos, end_pos)
|
||||
self._sql_interval_insert(stream_id, start, end, start_pos, end_pos)
|
||||
|
||||
if original.start != remove.start:
|
||||
# Interval before the removed region
|
||||
add(iset, original.start, remove.start,
|
||||
original.db_startpos, remove.db_startpos)
|
||||
|
||||
if original.end != remove.end:
|
||||
# Interval after the removed region
|
||||
add(iset, remove.end, original.end,
|
||||
remove.db_endpos, original.db_endpos)
|
||||
|
||||
# Commit SQL changes
|
||||
self.con.commit()
|
||||
|
||||
return
|
||||
|
||||
def stream_list(self, path = None, layout = None):
|
||||
"""Return list of [path, layout] lists of all streams
|
||||
in the database.
|
||||
@@ -341,7 +391,7 @@ class NilmDB(object):
|
||||
No way to undo it! Metadata is removed."""
|
||||
stream_id = self._stream_id(path)
|
||||
|
||||
# Delete the cached interval data
|
||||
# Delete the cached interval data (if it was cached)
|
||||
self._get_intervals.cache_remove(self, stream_id)
|
||||
|
||||
# Delete the data
|
||||
@@ -381,7 +431,7 @@ class NilmDB(object):
|
||||
# And that's all
|
||||
return "ok"
|
||||
|
||||
def _find_start(self, table, interval):
|
||||
def _find_start(self, table, dbinterval):
|
||||
"""
|
||||
Given a DBInterval, find the row in the database that
|
||||
corresponds to the start time. Return the first database
|
||||
@@ -389,14 +439,14 @@ class NilmDB(object):
|
||||
equal to 'start'.
|
||||
"""
|
||||
# Optimization for the common case where an interval wasn't truncated
|
||||
if interval.start == interval.db_start:
|
||||
return interval.db_startpos
|
||||
if dbinterval.start == dbinterval.db_start:
|
||||
return dbinterval.db_startpos
|
||||
return bisect.bisect_left(bulkdata.TimestampOnlyTable(table),
|
||||
interval.start,
|
||||
interval.db_startpos,
|
||||
interval.db_endpos)
|
||||
dbinterval.start,
|
||||
dbinterval.db_startpos,
|
||||
dbinterval.db_endpos)
|
||||
|
||||
def _find_end(self, table, interval):
|
||||
def _find_end(self, table, dbinterval):
|
||||
"""
|
||||
Given a DBInterval, find the row in the database that follows
|
||||
the end time. Return the first database position after the
|
||||
@@ -404,16 +454,16 @@ class NilmDB(object):
|
||||
to 'end'.
|
||||
"""
|
||||
# Optimization for the common case where an interval wasn't truncated
|
||||
if interval.end == interval.db_end:
|
||||
return interval.db_endpos
|
||||
if dbinterval.end == dbinterval.db_end:
|
||||
return dbinterval.db_endpos
|
||||
# Note that we still use bisect_left here, because we don't
|
||||
# want to include the given timestamp in the results. This is
|
||||
# so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return
|
||||
# non-overlapping data.
|
||||
return bisect.bisect_left(bulkdata.TimestampOnlyTable(table),
|
||||
interval.end,
|
||||
interval.db_startpos,
|
||||
interval.db_endpos)
|
||||
dbinterval.end,
|
||||
dbinterval.db_startpos,
|
||||
dbinterval.db_endpos)
|
||||
|
||||
def stream_extract(self, path, start = None, end = None, count = False):
|
||||
"""
|
||||
@@ -434,8 +484,8 @@ class NilmDB(object):
|
||||
than actually fetching the data. It is not limited by
|
||||
max_results.
|
||||
"""
|
||||
table = self.data.getnode(path)
|
||||
stream_id = self._stream_id(path)
|
||||
table = self.data.getnode(path)
|
||||
intervals = self._get_intervals(stream_id)
|
||||
requested = Interval(start or 0, end or 1e12)
|
||||
result = []
|
||||
@@ -472,3 +522,45 @@ class NilmDB(object):
|
||||
if count:
|
||||
return matched
|
||||
return (result, restart)
|
||||
|
||||
def stream_remove(self, path, start = None, end = None):
|
||||
"""
|
||||
Remove data from the specified time interval within a stream.
|
||||
Removes all data in the interval [start, end), and intervals
|
||||
are truncated or split appropriately. Returns the number of
|
||||
data points removed.
|
||||
"""
|
||||
stream_id = self._stream_id(path)
|
||||
table = self.data.getnode(path)
|
||||
intervals = self._get_intervals(stream_id)
|
||||
to_remove = Interval(start or 0, end or 1e12)
|
||||
removed = 0
|
||||
|
||||
if start == end:
|
||||
return 0
|
||||
|
||||
# Can't remove intervals from within the iterator, so we need to
|
||||
# remember what's currently in the intersection now.
|
||||
all_candidates = list(intervals.intersection(to_remove, orig = True))
|
||||
|
||||
for (dbint, orig) in all_candidates:
|
||||
# Find row start and end
|
||||
row_start = self._find_start(table, dbint)
|
||||
row_end = self._find_end(table, dbint)
|
||||
|
||||
# Adjust the DBInterval to match the newly found ends
|
||||
dbint.db_start = dbint.start
|
||||
dbint.db_end = dbint.end
|
||||
dbint.db_startpos = row_start
|
||||
dbint.db_endpos = row_end
|
||||
|
||||
# Remove interval from the database
|
||||
self._remove_interval(stream_id, orig, dbint)
|
||||
|
||||
# Remove data from the underlying table storage
|
||||
table.remove(row_start, row_end)
|
||||
|
||||
# Count how many were removed
|
||||
removed += row_end - row_start
|
||||
|
||||
return removed
|
||||
|
106
nilmdb/server.py
106
nilmdb/server.py
@@ -11,7 +11,10 @@ import sys
|
||||
import time
|
||||
import os
|
||||
import simplejson as json
|
||||
import functools
|
||||
import decorator
|
||||
import traceback
|
||||
|
||||
from nilmdb.nilmdb import NilmDBError
|
||||
|
||||
try:
|
||||
import cherrypy
|
||||
@@ -24,46 +27,51 @@ class NilmApp(object):
|
||||
def __init__(self, db):
|
||||
self.db = db
|
||||
|
||||
version = "1.1"
|
||||
version = "1.2"
|
||||
|
||||
# Decorators
|
||||
def chunked_response(func):
|
||||
"""Decorator to enable chunked responses"""
|
||||
"""Decorator to enable chunked responses."""
|
||||
# Set this to False to get better tracebacks from some requests
|
||||
# (/stream/extract, /stream/intervals).
|
||||
func._cp_config = { 'response.stream': True }
|
||||
return func
|
||||
|
||||
def workaround_cp_bug_1200(func): # pragma: no cover (just a workaround)
|
||||
@decorator.decorator
|
||||
def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
|
||||
"""Decorator to work around CherryPy bug #1200 in a response
|
||||
generator"""
|
||||
# Even if chunked responses are disabled, you may still miss miss
|
||||
# LookupError, or UnicodeError exceptions due to CherryPy bug
|
||||
# #1200. This throws them as generic Exceptions insteads.
|
||||
import traceback
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
for val in func(*args, **kwargs):
|
||||
yield val
|
||||
except (LookupError, UnicodeError) as e:
|
||||
raise Exception("bug workaround; real exception is:\n" +
|
||||
traceback.format_exc())
|
||||
return wrapper
|
||||
generator.
|
||||
|
||||
def exception_to_httperror(response = "400 Bad Request"):
|
||||
"""Return a decorator that catches Exception and throws
|
||||
a HTTPError describing it instead"""
|
||||
def decorator(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
message = sprintf("%s: %s", type(e).__name__, str(e))
|
||||
raise cherrypy.HTTPError(response, message)
|
||||
return wrapper
|
||||
return decorator
|
||||
Even if chunked responses are disabled, LookupError or
|
||||
UnicodeError exceptions may still be swallowed by CherryPy due to
|
||||
bug #1200. This throws them as generic Exceptions instead so that
|
||||
they make it through.
|
||||
"""
|
||||
try:
|
||||
for val in func(*args, **kwargs):
|
||||
yield val
|
||||
except (LookupError, UnicodeError) as e:
|
||||
raise Exception("bug workaround; real exception is:\n" +
|
||||
traceback.format_exc())
|
||||
|
||||
def exception_to_httperror(*expected):
|
||||
"""Return a decorator-generating function that catches expected
|
||||
errors and throws a HTTPError describing it instead.
|
||||
|
||||
@exception_to_httperror(NilmDBError, ValueError)
|
||||
def foo():
|
||||
pass
|
||||
"""
|
||||
def wrapper(func, *args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except expected as e:
|
||||
message = sprintf("%s", str(e))
|
||||
raise cherrypy.HTTPError("400 Bad Request", message)
|
||||
# We need to preserve the function's argspecs for CherryPy to
|
||||
# handle argument errors correctly. Decorator.decorator takes
|
||||
# care of that.
|
||||
return decorator.decorator(wrapper)
|
||||
|
||||
# CherryPy apps
|
||||
class Root(NilmApp):
|
||||
@@ -118,7 +126,7 @@ class Stream(NilmApp):
|
||||
# /stream/create?path=/newton/prep&layout=PrepData
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror()
|
||||
@exception_to_httperror(NilmDBError, ValueError)
|
||||
def create(self, path, layout):
|
||||
"""Create a new stream in the database. Provide path
|
||||
and one of the nilmdb.layout.layouts keys.
|
||||
@@ -128,7 +136,7 @@ class Stream(NilmApp):
|
||||
# /stream/destroy?path=/newton/prep
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror()
|
||||
@exception_to_httperror(NilmDBError)
|
||||
def destroy(self, path):
|
||||
"""Delete a stream and its associated data."""
|
||||
return self.db.stream_destroy(path)
|
||||
@@ -160,7 +168,7 @@ class Stream(NilmApp):
|
||||
# /stream/set_metadata?path=/newton/prep&data=<json>
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror()
|
||||
@exception_to_httperror(NilmDBError, LookupError, TypeError)
|
||||
def set_metadata(self, path, data):
|
||||
"""Set metadata for the named stream, replacing any
|
||||
existing metadata. Data should be a json-encoded
|
||||
@@ -172,7 +180,7 @@ class Stream(NilmApp):
|
||||
# /stream/update_metadata?path=/newton/prep&data=<json>
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror()
|
||||
@exception_to_httperror(NilmDBError, LookupError, TypeError)
|
||||
def update_metadata(self, path, data):
|
||||
"""Update metadata for the named stream. Data
|
||||
should be a json-encoded dictionary"""
|
||||
@@ -210,7 +218,7 @@ class Stream(NilmApp):
|
||||
parser.parse(body)
|
||||
except nilmdb.layout.ParserError as e:
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"Error parsing input data: " +
|
||||
"error parsing input data: " +
|
||||
e.message)
|
||||
|
||||
if (not parser.min_timestamp or not parser.max_timestamp or
|
||||
@@ -239,6 +247,27 @@ class Stream(NilmApp):
|
||||
# Done
|
||||
return "ok"
|
||||
|
||||
# /stream/remove?path=/newton/prep
|
||||
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError)
|
||||
def remove(self, path, start = None, end = None):
|
||||
"""
|
||||
Remove data from the backend database. Removes all data in
|
||||
the interval [start, end). Returns the number of data points
|
||||
removed.
|
||||
"""
|
||||
if start is not None:
|
||||
start = float(start)
|
||||
if end is not None:
|
||||
end = float(end)
|
||||
if start is not None and end is not None:
|
||||
if end < start:
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"end before start")
|
||||
return self.db.stream_remove(path, start, end)
|
||||
|
||||
# /stream/intervals?path=/newton/prep
|
||||
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
@cherrypy.expose
|
||||
@@ -366,6 +395,11 @@ class Server(object):
|
||||
cherrypy.config.update({ 'request.show_tracebacks' : True })
|
||||
self.force_traceback = force_traceback
|
||||
|
||||
# Patch CherryPy error handler to never pad out error messages.
|
||||
# This isn't necessary, but then again, neither is padding the
|
||||
# error messages.
|
||||
cherrypy._cperror._ie_friendly_error_sizes = {}
|
||||
|
||||
cherrypy.tree.apps = {}
|
||||
cherrypy.tree.mount(Root(self.db, self.version), "/")
|
||||
cherrypy.tree.mount(Stream(self.db), "/stream")
|
||||
|
@@ -7,3 +7,5 @@ from .lrucache import lru_cache
|
||||
from .diskusage import du
|
||||
from .mustclose import must_close
|
||||
from .urllib import urlencode
|
||||
from . import misc
|
||||
from . import atomic
|
||||
|
26
nilmdb/utils/atomic.py
Normal file
26
nilmdb/utils/atomic.py
Normal file
@@ -0,0 +1,26 @@
|
||||
# Atomic file writing helper.
|
||||
|
||||
import os
|
||||
|
||||
def replace_file(filename, content):
|
||||
"""Attempt to atomically and durably replace the filename with the
|
||||
given contents. This is intended to be 'pretty good on most
|
||||
OSes', but not necessarily bulletproof."""
|
||||
|
||||
newfilename = filename + ".new"
|
||||
|
||||
# Write to new file, flush it
|
||||
with open(newfilename, "wb") as f:
|
||||
f.write(content)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
|
||||
# Move new file over old one
|
||||
try:
|
||||
os.rename(newfilename, filename)
|
||||
except OSError: # pragma: no cover
|
||||
# Some OSes might not support renaming over an existing file.
|
||||
# This is definitely NOT atomic!
|
||||
os.remove(filename)
|
||||
os.rename(newfilename, filename)
|
||||
|
@@ -4,17 +4,19 @@
|
||||
# with added 'destructor' functionality.
|
||||
|
||||
import collections
|
||||
import functools
|
||||
import decorator
|
||||
import warnings
|
||||
|
||||
def lru_cache(size = 10, onremove = None):
|
||||
def lru_cache(size = 10, onremove = None, keys = slice(None)):
|
||||
"""Least-recently-used cache decorator.
|
||||
|
||||
@lru_cache(size = 10, onevict = None)
|
||||
def f(...):
|
||||
pass
|
||||
|
||||
Given a function and arguments, memoize its return value.
|
||||
Up to 'size' elements are cached.
|
||||
Given a function and arguments, memoize its return value. Up to
|
||||
'size' elements are cached. 'keys' is a slice object that
|
||||
represents which arguments are used as the cache key.
|
||||
|
||||
When evicting a value from the cache, call the function
|
||||
'onremove' with the value that's being evicted.
|
||||
@@ -24,43 +26,52 @@ def lru_cache(size = 10, onremove = None):
|
||||
f.cache_hits and f.cache_misses give statistics.
|
||||
"""
|
||||
|
||||
def decorator(func):
|
||||
def decorate(func):
|
||||
cache = collections.OrderedDict() # order: least- to most-recent
|
||||
|
||||
def evict(value):
|
||||
if onremove:
|
||||
onremove(value)
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
key = args + tuple(sorted(kwargs.items()))
|
||||
def wrapper(orig, *args, **kwargs):
|
||||
if kwargs:
|
||||
raise NotImplementedError("kwargs not supported")
|
||||
key = args[keys]
|
||||
try:
|
||||
value = cache.pop(key)
|
||||
wrapper.cache_hits += 1
|
||||
orig.cache_hits += 1
|
||||
except KeyError:
|
||||
value = func(*args, **kwargs)
|
||||
wrapper.cache_misses += 1
|
||||
value = orig(*args)
|
||||
orig.cache_misses += 1
|
||||
if len(cache) >= size:
|
||||
evict(cache.popitem(0)[1]) # evict LRU cache entry
|
||||
cache[key] = value # (re-)insert this key at end
|
||||
return value
|
||||
|
||||
def cache_remove(*args, **kwargs):
|
||||
"""Remove the described key from this cache, if present.
|
||||
Note that if the original wrapped function was implicitly
|
||||
passed 'self', you need to pass it as an argument here too."""
|
||||
key = args + tuple(sorted(kwargs.items()))
|
||||
def cache_remove(*args):
|
||||
"""Remove the described key from this cache, if present."""
|
||||
key = args
|
||||
if key in cache:
|
||||
evict(cache.pop(key))
|
||||
else:
|
||||
if len(cache) > 0 and len(args) != len(cache.iterkeys().next()):
|
||||
raise KeyError("trying to remove from LRU cache, but "
|
||||
"number of arguments doesn't match the "
|
||||
"cache key length")
|
||||
|
||||
def cache_remove_all():
|
||||
for key in cache:
|
||||
evict(cache.pop(key))
|
||||
|
||||
wrapper.cache_hits = 0
|
||||
wrapper.cache_misses = 0
|
||||
wrapper.cache_remove = cache_remove
|
||||
wrapper.cache_remove_all = cache_remove_all
|
||||
def cache_info():
|
||||
return (func.cache_hits, func.cache_misses)
|
||||
|
||||
return wrapper
|
||||
return decorator
|
||||
new = decorator.decorator(wrapper, func)
|
||||
func.cache_hits = 0
|
||||
func.cache_misses = 0
|
||||
new.cache_info = cache_info
|
||||
new.cache_remove = cache_remove
|
||||
new.cache_remove_all = cache_remove_all
|
||||
return new
|
||||
|
||||
return decorate
|
||||
|
8
nilmdb/utils/misc.py
Normal file
8
nilmdb/utils/misc.py
Normal file
@@ -0,0 +1,8 @@
|
||||
import itertools
|
||||
|
||||
def pairwise(iterable):
|
||||
"s -> (s0,s1), (s1,s2), ..., (sn,None)"
|
||||
a, b = itertools.tee(iterable)
|
||||
next(b, None)
|
||||
return itertools.izip_longest(a, b)
|
||||
|
@@ -1,42 +1,63 @@
|
||||
# Class decorator that warns on stderr at deletion time if the class's
|
||||
# close() member wasn't called.
|
||||
|
||||
from nilmdb.utils.printf import *
|
||||
import sys
|
||||
import inspect
|
||||
import decorator
|
||||
|
||||
def must_close(errorfile = sys.stderr):
|
||||
def decorator(cls):
|
||||
def dummy(*args, **kwargs):
|
||||
pass
|
||||
if "__init__" not in cls.__dict__:
|
||||
cls.__init__ = dummy
|
||||
if "__del__" not in cls.__dict__:
|
||||
cls.__del__ = dummy
|
||||
if "close" not in cls.__dict__:
|
||||
cls.close = dummy
|
||||
def must_close(errorfile = sys.stderr, wrap_verify = False):
|
||||
"""Class decorator that warns on 'errorfile' at deletion time if
|
||||
the class's close() member wasn't called.
|
||||
|
||||
orig_init = cls.__init__
|
||||
orig_del = cls.__del__
|
||||
orig_close = cls.close
|
||||
If 'wrap_verify' is True, every class method is wrapped with a
|
||||
verifier that will raise AssertionError if the .close() method has
|
||||
already been called."""
|
||||
def class_decorator(cls):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
ret = orig_init(self, *args, **kwargs)
|
||||
# Helper to replace a class method with a wrapper function,
|
||||
# while maintaining argument specs etc.
|
||||
def wrap_class_method(wrapper_func):
|
||||
method = wrapper_func.__name__
|
||||
if method in cls.__dict__:
|
||||
orig = getattr(cls, method).im_func
|
||||
else:
|
||||
orig = lambda self: None
|
||||
setattr(cls, method, decorator.decorator(wrapper_func, orig))
|
||||
|
||||
@wrap_class_method
|
||||
def __init__(orig, self, *args, **kwargs):
|
||||
ret = orig(self, *args, **kwargs)
|
||||
self.__dict__["_must_close"] = True
|
||||
self.__dict__["_must_close_initialized"] = True
|
||||
return ret
|
||||
|
||||
def __del__(self):
|
||||
@wrap_class_method
|
||||
def __del__(orig, self, *args, **kwargs):
|
||||
if "_must_close" in self.__dict__:
|
||||
fprintf(errorfile, "error: %s.close() wasn't called!\n",
|
||||
self.__class__.__name__)
|
||||
return orig_del(self)
|
||||
return orig(self, *args, **kwargs)
|
||||
|
||||
def close(self, *args, **kwargs):
|
||||
@wrap_class_method
|
||||
def close(orig, self, *args, **kwargs):
|
||||
del self._must_close
|
||||
return orig_close(self)
|
||||
return orig(self, *args, **kwargs)
|
||||
|
||||
cls.__init__ = __init__
|
||||
cls.__del__ = __del__
|
||||
cls.close = close
|
||||
# Optionally wrap all other functions
|
||||
def verifier(orig, self, *args, **kwargs):
|
||||
if ("_must_close" not in self.__dict__ and
|
||||
"_must_close_initialized" in self.__dict__):
|
||||
raise AssertionError("called " + str(orig) + " after close")
|
||||
return orig(self, *args, **kwargs)
|
||||
if wrap_verify:
|
||||
for (name, method) in inspect.getmembers(cls, inspect.ismethod):
|
||||
# Skip class methods
|
||||
if method.__self__ is not None:
|
||||
continue
|
||||
# Skip some methods
|
||||
if name in [ "__del__", "__init__" ]:
|
||||
continue
|
||||
# Set up wrapper
|
||||
setattr(cls, name, decorator.decorator(verifier,
|
||||
method.im_func))
|
||||
|
||||
return cls
|
||||
return decorator
|
||||
return class_decorator
|
||||
|
@@ -1,68 +1,40 @@
|
||||
from __future__ import absolute_import
|
||||
from urllib import quote_plus, _is_unicode
|
||||
|
||||
# urllib.urlencode insists on encoding Unicode as ASCII. This is an
|
||||
# exact copy of that function, except we encode it as UTF-8 instead.
|
||||
# urllib.urlencode insists on encoding Unicode as ASCII. This is based
|
||||
# on that function, except we always encode it as UTF-8 instead.
|
||||
|
||||
def urlencode(query, doseq=0):
|
||||
"""Encode a sequence of two-element tuples or dictionary into a URL query string.
|
||||
def urlencode(query):
|
||||
"""Encode a dictionary into a URL query string.
|
||||
|
||||
If any values in the query arg are sequences and doseq is true, each
|
||||
sequence element is converted to a separate parameter.
|
||||
|
||||
If the query arg is a sequence of two-element tuples, the order of the
|
||||
parameters in the output will match the order of parameters in the
|
||||
input.
|
||||
If any values in the query arg are sequences, each sequence
|
||||
element is converted to a separate parameter.
|
||||
"""
|
||||
|
||||
if hasattr(query,"items"):
|
||||
# mapping objects
|
||||
query = query.items()
|
||||
else:
|
||||
# it's a bother at times that strings and string-like objects are
|
||||
# sequences...
|
||||
try:
|
||||
# non-sequence items should not work with len()
|
||||
# non-empty strings will fail this
|
||||
if len(query) and not isinstance(query[0], tuple):
|
||||
raise TypeError
|
||||
# zero-length sequences of all types will get here and succeed,
|
||||
# but that's a minor nit - since the original implementation
|
||||
# allowed empty dicts that type of behavior probably should be
|
||||
# preserved for consistency
|
||||
except TypeError:
|
||||
ty,va,tb = sys.exc_info()
|
||||
raise TypeError, "not a valid non-string sequence or mapping object", tb
|
||||
query = query.items()
|
||||
|
||||
l = []
|
||||
if not doseq:
|
||||
# preserve old behavior
|
||||
for k, v in query:
|
||||
k = quote_plus(str(k))
|
||||
v = quote_plus(str(v))
|
||||
for k, v in query:
|
||||
k = quote_plus(str(k))
|
||||
if isinstance(v, str):
|
||||
v = quote_plus(v)
|
||||
l.append(k + '=' + v)
|
||||
else:
|
||||
for k, v in query:
|
||||
k = quote_plus(str(k))
|
||||
if isinstance(v, str):
|
||||
v = quote_plus(v)
|
||||
l.append(k + '=' + v)
|
||||
elif _is_unicode(v):
|
||||
# is there a reasonable way to convert to ASCII?
|
||||
# encode generates a string, but "replace" or "ignore"
|
||||
# lose information and "strict" can raise UnicodeError
|
||||
v = quote_plus(v.encode("utf-8","strict"))
|
||||
elif _is_unicode(v):
|
||||
# is there a reasonable way to convert to ASCII?
|
||||
# encode generates a string, but "replace" or "ignore"
|
||||
# lose information and "strict" can raise UnicodeError
|
||||
v = quote_plus(v.encode("utf-8","strict"))
|
||||
l.append(k + '=' + v)
|
||||
else:
|
||||
try:
|
||||
# is this a sufficient test for sequence-ness?
|
||||
len(v)
|
||||
except TypeError:
|
||||
# not a sequence
|
||||
v = quote_plus(str(v))
|
||||
l.append(k + '=' + v)
|
||||
else:
|
||||
try:
|
||||
# is this a sufficient test for sequence-ness?
|
||||
len(v)
|
||||
except TypeError:
|
||||
# not a sequence
|
||||
v = quote_plus(str(v))
|
||||
l.append(k + '=' + v)
|
||||
else:
|
||||
# loop over the sequence
|
||||
for elt in v:
|
||||
l.append(k + '=' + quote_plus(str(elt)))
|
||||
# loop over the sequence
|
||||
for elt in v:
|
||||
l.append(k + '=' + quote_plus(str(elt)))
|
||||
return '&'.join(l)
|
||||
|
46
runtests.py
Executable file
46
runtests.py
Executable file
@@ -0,0 +1,46 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import nose
|
||||
import os
|
||||
import sys
|
||||
import glob
|
||||
from collections import OrderedDict
|
||||
|
||||
class JimOrderPlugin(nose.plugins.Plugin):
|
||||
"""When searching for tests and encountering a directory that
|
||||
contains a 'test.order' file, run tests listed in that file, in the
|
||||
order that they're listed. Globs are OK in that file and duplicates
|
||||
are removed."""
|
||||
name = 'jimorder'
|
||||
score = 10000
|
||||
|
||||
def prepareTestLoader(self, loader):
|
||||
def wrap(func):
|
||||
def wrapper(name, *args, **kwargs):
|
||||
addr = nose.selector.TestAddress(
|
||||
name, workingDir=loader.workingDir)
|
||||
try:
|
||||
order = os.path.join(addr.filename, "test.order")
|
||||
except:
|
||||
order = None
|
||||
if order and os.path.exists(order):
|
||||
files = []
|
||||
for line in open(order):
|
||||
line = line.split('#')[0].strip()
|
||||
if not line:
|
||||
continue
|
||||
fn = os.path.join(addr.filename, line.strip())
|
||||
files.extend(sorted(glob.glob(fn)) or [fn])
|
||||
files = list(OrderedDict.fromkeys(files))
|
||||
tests = [ wrapper(fn, *args, **kwargs) for fn in files ]
|
||||
return loader.suiteClass(tests)
|
||||
return func(name, *args, **kwargs)
|
||||
return wrapper
|
||||
loader.loadTestsFromName = wrap(loader.loadTestsFromName)
|
||||
return loader
|
||||
|
||||
# Use setup.cfg for most of the test configuration. Adding
|
||||
# --with-jimorder here means that a normal "nosetests" run will
|
||||
# still work, it just won't support test.order.
|
||||
nose.main(addplugins = [ JimOrderPlugin() ],
|
||||
argv = sys.argv + ["--with-jimorder"])
|
@@ -8,8 +8,12 @@ cover-package=nilmdb
|
||||
cover-erase=
|
||||
##cover-html= # this works, puts html output in cover/ dir
|
||||
##cover-branches= # need nose 1.1.3 for this
|
||||
#debug=nose
|
||||
#debug-log=nose.log
|
||||
stop=
|
||||
verbosity=2
|
||||
tests=tests
|
||||
#tests=tests/test_bulkdata.py
|
||||
#tests=tests/test_mustclose.py
|
||||
#tests=tests/test_lrucache.py
|
||||
#tests=tests/test_cmdline.py
|
||||
@@ -23,6 +27,7 @@ verbosity=2
|
||||
#tests=tests/test_serializer.py
|
||||
#tests=tests/test_iteratorizer.py
|
||||
#tests=tests/test_client.py:TestClient.test_client_nilmdb
|
||||
#tests=tests/test_nilmdb.py
|
||||
#with-profile=
|
||||
#profile-sort=time
|
||||
##profile-restrict=10 # doesn't work right, treated as string or something
|
||||
|
19
tests/data/prep-20120323T1002-first19lines
Normal file
19
tests/data/prep-20120323T1002-first19lines
Normal file
@@ -0,0 +1,19 @@
|
||||
2.56437e+05 2.24430e+05 4.01161e+03 3.47534e+03 7.49589e+03 3.38894e+03 2.61397e+02 3.73126e+03
|
||||
2.53963e+05 2.24167e+05 5.62107e+03 1.54801e+03 9.16517e+03 3.52293e+03 1.05893e+03 2.99696e+03
|
||||
2.58508e+05 2.24930e+05 6.01140e+03 8.18866e+02 9.03995e+03 4.48244e+03 2.49039e+03 2.67934e+03
|
||||
2.59627e+05 2.26022e+05 4.47450e+03 2.42302e+03 7.41419e+03 5.07197e+03 2.43938e+03 2.96296e+03
|
||||
2.55187e+05 2.24632e+05 4.73857e+03 3.39804e+03 7.39512e+03 4.72645e+03 1.83903e+03 3.39353e+03
|
||||
2.57102e+05 2.21623e+05 6.14413e+03 1.44109e+03 8.75648e+03 3.49532e+03 1.86994e+03 3.75253e+03
|
||||
2.63653e+05 2.21770e+05 6.22177e+03 7.38962e+02 9.54760e+03 2.66682e+03 1.46266e+03 3.33257e+03
|
||||
2.63613e+05 2.25256e+05 4.47712e+03 2.43745e+03 8.51021e+03 3.85563e+03 9.59442e+02 2.38718e+03
|
||||
2.55350e+05 2.26264e+05 4.28372e+03 3.92394e+03 7.91247e+03 5.46652e+03 1.28499e+03 2.09372e+03
|
||||
2.52727e+05 2.24609e+05 5.85193e+03 2.49198e+03 8.54063e+03 5.62305e+03 2.33978e+03 3.00714e+03
|
||||
2.58475e+05 2.23578e+05 5.92487e+03 1.39448e+03 8.77962e+03 4.54418e+03 2.13203e+03 3.84976e+03
|
||||
2.61563e+05 2.24609e+05 4.33614e+03 2.45575e+03 8.05538e+03 3.46911e+03 6.27873e+02 3.66420e+03
|
||||
2.56401e+05 2.24441e+05 4.18715e+03 3.45717e+03 7.90669e+03 3.53355e+03 -5.84482e+00 2.96687e+03
|
||||
2.54745e+05 2.22644e+05 6.02005e+03 1.94721e+03 9.28939e+03 3.80020e+03 1.34820e+03 2.37785e+03
|
||||
2.60723e+05 2.22660e+05 6.69719e+03 1.03048e+03 9.26124e+03 4.34917e+03 2.84530e+03 2.73619e+03
|
||||
2.63089e+05 2.25711e+05 4.77887e+03 2.60417e+03 7.39660e+03 4.59811e+03 2.17472e+03 3.40729e+03
|
||||
2.55843e+05 2.27128e+05 4.02413e+03 4.39323e+03 6.79336e+03 4.62535e+03 7.52009e+02 3.44647e+03
|
||||
2.51904e+05 2.24868e+05 5.82289e+03 3.02127e+03 8.46160e+03 3.80298e+03 8.07212e+02 3.53468e+03
|
||||
2.57670e+05 2.22974e+05 6.73436e+03 1.60956e+03 9.92960e+03 2.98028e+03 1.44168e+03 3.05351e+03
|
18
tests/test.order
Normal file
18
tests/test.order
Normal file
@@ -0,0 +1,18 @@
|
||||
test_printf.py
|
||||
test_lrucache.py
|
||||
test_mustclose.py
|
||||
|
||||
test_serializer.py
|
||||
test_iteratorizer.py
|
||||
|
||||
test_timestamper.py
|
||||
test_layout.py
|
||||
test_rbtree.py
|
||||
test_interval.py
|
||||
|
||||
test_bulkdata.py
|
||||
test_nilmdb.py
|
||||
test_client.py
|
||||
test_cmdline.py
|
||||
|
||||
test_*.py
|
103
tests/test_bulkdata.py
Normal file
103
tests/test_bulkdata.py
Normal file
@@ -0,0 +1,103 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import nilmdb
|
||||
from nilmdb.utils.printf import *
|
||||
import nilmdb.bulkdata
|
||||
|
||||
from nose.tools import *
|
||||
from nose.tools import assert_raises
|
||||
import itertools
|
||||
|
||||
from testutil.helpers import *
|
||||
|
||||
testdb = "tests/bulkdata-testdb"
|
||||
|
||||
from nilmdb.bulkdata import BulkData
|
||||
|
||||
class TestBulkData(object):
|
||||
|
||||
def test_bulkdata(self):
|
||||
for (size, files, db) in [ ( 0, 0, testdb ),
|
||||
( 25, 1000, testdb ),
|
||||
( 1000, 3, testdb.decode("utf-8") ) ]:
|
||||
recursive_unlink(db)
|
||||
os.mkdir(db)
|
||||
self.do_basic(db, size, files)
|
||||
|
||||
def do_basic(self, db, size, files):
|
||||
"""Do the basic test with variable file_size and files_per_dir"""
|
||||
if not size or not files:
|
||||
data = BulkData(db)
|
||||
else:
|
||||
data = BulkData(db, file_size = size, files_per_dir = files)
|
||||
|
||||
# create empty
|
||||
with assert_raises(ValueError):
|
||||
data.create("/foo", "uint16_8")
|
||||
with assert_raises(ValueError):
|
||||
data.create("foo/bar", "uint16_8")
|
||||
with assert_raises(ValueError):
|
||||
data.create("/foo/bar", "uint8_8")
|
||||
data.create("/foo/bar", "uint16_8")
|
||||
data.create(u"/foo/baz/quux", "float64_16")
|
||||
with assert_raises(ValueError):
|
||||
data.create("/foo/bar/baz", "uint16_8")
|
||||
with assert_raises(ValueError):
|
||||
data.create("/foo/baz", "float64_16")
|
||||
|
||||
# get node -- see if caching works
|
||||
nodes = []
|
||||
for i in range(5000):
|
||||
nodes.append(data.getnode("/foo/bar"))
|
||||
nodes.append(data.getnode("/foo/baz/quux"))
|
||||
del nodes
|
||||
|
||||
# Test node
|
||||
node = data.getnode("/foo/bar")
|
||||
with assert_raises(IndexError):
|
||||
x = node[0]
|
||||
raw = []
|
||||
for i in range(1000):
|
||||
raw.append([10000+i, 1, 2, 3, 4, 5, 6, 7, 8 ])
|
||||
node.append(raw[0:1])
|
||||
node.append(raw[1:100])
|
||||
node.append(raw[100:])
|
||||
|
||||
misc_slices = [ 0, 100, slice(None), slice(0), slice(10),
|
||||
slice(5,10), slice(3,None), slice(3,-3),
|
||||
slice(20,10), slice(200,100,-1), slice(None,0,-1),
|
||||
slice(100,500,5) ]
|
||||
# Extract slices
|
||||
for s in misc_slices:
|
||||
eq_(node[s], raw[s])
|
||||
|
||||
# Get some coverage of remove; remove is more fully tested
|
||||
# in cmdline
|
||||
with assert_raises(IndexError):
|
||||
node.remove(9999,9998)
|
||||
|
||||
# close, reopen
|
||||
# reopen
|
||||
data.close()
|
||||
if not size or not files:
|
||||
data = BulkData(db)
|
||||
else:
|
||||
data = BulkData(db, file_size = size, files_per_dir = files)
|
||||
node = data.getnode("/foo/bar")
|
||||
|
||||
# Extract slices
|
||||
for s in misc_slices:
|
||||
eq_(node[s], raw[s])
|
||||
|
||||
# destroy
|
||||
with assert_raises(ValueError):
|
||||
data.destroy("/foo")
|
||||
with assert_raises(ValueError):
|
||||
data.destroy("/foo/baz")
|
||||
with assert_raises(ValueError):
|
||||
data.destroy("/foo/qwerty")
|
||||
data.destroy("/foo/baz/quux")
|
||||
data.destroy("/foo/bar")
|
||||
|
||||
# close
|
||||
data.close()
|
@@ -17,8 +17,9 @@ import cStringIO
|
||||
import simplejson as json
|
||||
import unittest
|
||||
import warnings
|
||||
import resource
|
||||
|
||||
from test_helpers import *
|
||||
from testutil.helpers import *
|
||||
|
||||
testdb = "tests/client-testdb"
|
||||
|
||||
@@ -69,7 +70,11 @@ class TestClient(object):
|
||||
eq_(distutils.version.StrictVersion(version),
|
||||
distutils.version.StrictVersion(test_server.version))
|
||||
|
||||
def test_client_2_nilmdb(self):
|
||||
# Bad URLs should give 404, not 500
|
||||
with assert_raises(ClientError):
|
||||
client.http.get("/stream/create")
|
||||
|
||||
def test_client_2_createlist(self):
|
||||
# Basic stream tests, like those in test_nilmdb:test_stream
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
|
||||
@@ -99,6 +104,20 @@ class TestClient(object):
|
||||
eq_(client.stream_list(layout="RawData"), [ ["/newton/raw", "RawData"] ])
|
||||
eq_(client.stream_list(path="/newton/raw"), [ ["/newton/raw", "RawData"] ])
|
||||
|
||||
# Try messing with resource limits to trigger errors and get
|
||||
# more coverage. Here, make it so we can only create files 1
|
||||
# byte in size, which will trigger an IOError in the server when
|
||||
# we create a table.
|
||||
limit = resource.getrlimit(resource.RLIMIT_FSIZE)
|
||||
resource.setrlimit(resource.RLIMIT_FSIZE, (1, limit[1]))
|
||||
with assert_raises(ServerError) as e:
|
||||
client.stream_create("/newton/hello", "RawData")
|
||||
resource.setrlimit(resource.RLIMIT_FSIZE, limit)
|
||||
|
||||
|
||||
def test_client_3_metadata(self):
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
|
||||
# Set / get metadata
|
||||
eq_(client.stream_get_metadata("/newton/prep"), {})
|
||||
eq_(client.stream_get_metadata("/newton/raw"), {})
|
||||
@@ -128,7 +147,7 @@ class TestClient(object):
|
||||
with assert_raises(ClientError):
|
||||
client.stream_update_metadata("/newton/prep", [1,2,3])
|
||||
|
||||
def test_client_3_insert(self):
|
||||
def test_client_4_insert(self):
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
|
||||
datetime_tz.localtz_set("America/New_York")
|
||||
@@ -201,16 +220,19 @@ class TestClient(object):
|
||||
with assert_raises(ClientError) as e:
|
||||
result = client.stream_insert("/newton/prep", data)
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("OverlapError", str(e.exception))
|
||||
in_("verlap", str(e.exception))
|
||||
|
||||
def test_client_4_extract(self):
|
||||
# Misc tests for extract. Most of them are in test_cmdline.
|
||||
def test_client_5_extractremove(self):
|
||||
# Misc tests for extract and remove. Most of them are in test_cmdline.
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
|
||||
for x in client.stream_extract("/newton/prep", 123, 123):
|
||||
raise Exception("shouldn't be any data for this request")
|
||||
|
||||
def test_client_5_generators(self):
|
||||
with assert_raises(ClientError) as e:
|
||||
client.stream_remove("/newton/prep", 123, 120)
|
||||
|
||||
def test_client_6_generators(self):
|
||||
# A lot of the client functionality is already tested by test_cmdline,
|
||||
# but this gets a bit more coverage that cmdline misses.
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
@@ -259,7 +281,7 @@ class TestClient(object):
|
||||
in_("404 Not Found", str(e.exception))
|
||||
in_("No such stream", str(e.exception))
|
||||
|
||||
def test_client_6_chunked(self):
|
||||
def test_client_7_chunked(self):
|
||||
# Make sure that /stream/intervals and /stream/extract
|
||||
# properly return streaming, chunked response. Pokes around
|
||||
# in client.http internals a bit to look at the response
|
||||
@@ -282,7 +304,7 @@ class TestClient(object):
|
||||
if "transfer-encoding: chunked" not in client.http._headers.lower():
|
||||
warnings.warn("Non-chunked HTTP response for /stream/extract")
|
||||
|
||||
def test_client_7_unicode(self):
|
||||
def test_client_8_unicode(self):
|
||||
# Basic Unicode tests
|
||||
client = nilmdb.Client(url = "http://localhost:12380/")
|
||||
|
||||
|
@@ -4,11 +4,13 @@ import nilmdb
|
||||
from nilmdb.utils.printf import *
|
||||
import nilmdb.cmdline
|
||||
|
||||
import unittest
|
||||
from nose.tools import *
|
||||
from nose.tools import assert_raises
|
||||
import itertools
|
||||
import datetime_tz
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
import threading
|
||||
@@ -18,14 +20,16 @@ import Queue
|
||||
import StringIO
|
||||
import shlex
|
||||
|
||||
from test_helpers import *
|
||||
from testutil.helpers import *
|
||||
|
||||
testdb = "tests/cmdline-testdb"
|
||||
|
||||
def server_start(max_results = None):
|
||||
def server_start(max_results = None, bulkdata_args = {}):
|
||||
global test_server, test_db
|
||||
# Start web app on a custom port
|
||||
test_db = nilmdb.NilmDB(testdb, sync = False, max_results = max_results)
|
||||
test_db = nilmdb.NilmDB(testdb, sync = False,
|
||||
max_results = max_results,
|
||||
bulkdata_args = bulkdata_args)
|
||||
test_server = nilmdb.Server(test_db, host = "127.0.0.1",
|
||||
port = 12380, stoppable = False,
|
||||
fast_shutdown = True,
|
||||
@@ -94,14 +98,24 @@ class TestCmdline(object):
|
||||
self.dump()
|
||||
eq_(self.exitcode, 0)
|
||||
|
||||
def fail(self, arg_string, infile = None, exitcode = None):
|
||||
def fail(self, arg_string, infile = None,
|
||||
exitcode = None, require_error = True):
|
||||
self.run(arg_string, infile)
|
||||
if exitcode is not None and self.exitcode != exitcode:
|
||||
# Wrong exit code
|
||||
self.dump()
|
||||
eq_(self.exitcode, exitcode)
|
||||
if self.exitcode == 0:
|
||||
# Success, when we wanted failure
|
||||
self.dump()
|
||||
ne_(self.exitcode, 0)
|
||||
# Make sure the output contains the word "error" at the
|
||||
# beginning of a line, but only if an exitcode wasn't
|
||||
# specified.
|
||||
if require_error and not re.search("^error",
|
||||
self.captured, re.MULTILINE):
|
||||
raise AssertionError("command failed, but output doesn't "
|
||||
"contain the string 'error'")
|
||||
|
||||
def contain(self, checkstring):
|
||||
in_(checkstring, self.captured)
|
||||
@@ -131,7 +145,7 @@ class TestCmdline(object):
|
||||
def dump(self):
|
||||
printf("-----dump start-----\n%s-----dump end-----\n", self.captured)
|
||||
|
||||
def test_cmdline_01_basic(self):
|
||||
def test_01_basic(self):
|
||||
|
||||
# help
|
||||
self.ok("--help")
|
||||
@@ -177,14 +191,14 @@ class TestCmdline(object):
|
||||
self.fail("extract --start 2000-01-01 --start 2001-01-02")
|
||||
self.contain("duplicated argument")
|
||||
|
||||
def test_cmdline_02_info(self):
|
||||
def test_02_info(self):
|
||||
self.ok("info")
|
||||
self.contain("Server URL: http://localhost:12380/")
|
||||
self.contain("Server version: " + test_server.version)
|
||||
self.contain("Server database path")
|
||||
self.contain("Server database size")
|
||||
|
||||
def test_cmdline_03_createlist(self):
|
||||
def test_03_createlist(self):
|
||||
# Basic stream tests, like those in test_client.
|
||||
|
||||
# No streams
|
||||
@@ -201,6 +215,10 @@ class TestCmdline(object):
|
||||
# Bad layout type
|
||||
self.fail("create /newton/prep NoSuchLayout")
|
||||
self.contain("no such layout")
|
||||
self.fail("create /newton/prep float32_0")
|
||||
self.contain("no such layout")
|
||||
self.fail("create /newton/prep float33_1")
|
||||
self.contain("no such layout")
|
||||
|
||||
# Create a few streams
|
||||
self.ok("create /newton/zzz/rawnotch RawNotchedData")
|
||||
@@ -224,10 +242,17 @@ class TestCmdline(object):
|
||||
"/newton/raw RawData\n"
|
||||
"/newton/zzz/rawnotch RawNotchedData\n")
|
||||
|
||||
# Match just one type or one path
|
||||
# Match just one type or one path. Also check
|
||||
# that --path is optional
|
||||
self.ok("list --path /newton/raw")
|
||||
self.match("/newton/raw RawData\n")
|
||||
|
||||
self.ok("list /newton/raw")
|
||||
self.match("/newton/raw RawData\n")
|
||||
|
||||
self.fail("list -p /newton/raw /newton/raw")
|
||||
self.contain("too many paths")
|
||||
|
||||
self.ok("list --layout RawData")
|
||||
self.match("/newton/raw RawData\n")
|
||||
|
||||
@@ -239,10 +264,17 @@ class TestCmdline(object):
|
||||
self.ok("list --path *zzz* --layout Raw*")
|
||||
self.match("/newton/zzz/rawnotch RawNotchedData\n")
|
||||
|
||||
self.ok("list *zzz* --layout Raw*")
|
||||
self.match("/newton/zzz/rawnotch RawNotchedData\n")
|
||||
|
||||
self.ok("list --path *zzz* --layout Prep*")
|
||||
self.match("")
|
||||
|
||||
def test_cmdline_04_metadata(self):
|
||||
# reversed range
|
||||
self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01")
|
||||
self.contain("start is after end")
|
||||
|
||||
def test_04_metadata(self):
|
||||
# Set / get metadata
|
||||
self.fail("metadata")
|
||||
self.fail("metadata --get")
|
||||
@@ -299,7 +331,7 @@ class TestCmdline(object):
|
||||
self.fail("metadata /newton/nosuchpath")
|
||||
self.contain("No stream at path /newton/nosuchpath")
|
||||
|
||||
def test_cmdline_05_parsetime(self):
|
||||
def test_05_parsetime(self):
|
||||
os.environ['TZ'] = "America/New_York"
|
||||
cmd = nilmdb.cmdline.Cmdline(None)
|
||||
test = datetime_tz.datetime_tz.now()
|
||||
@@ -314,17 +346,17 @@ class TestCmdline(object):
|
||||
eq_(cmd.parse_time("snapshot-20120405-140000.raw.gz"), test)
|
||||
eq_(cmd.parse_time("prep-20120405T1400"), test)
|
||||
|
||||
def test_cmdline_06_insert(self):
|
||||
def test_06_insert(self):
|
||||
self.ok("insert --help")
|
||||
|
||||
self.fail("insert /foo/bar baz qwer")
|
||||
self.contain("Error getting stream info")
|
||||
self.contain("error getting stream info")
|
||||
|
||||
self.fail("insert /newton/prep baz qwer")
|
||||
self.match("Error opening input file baz\n")
|
||||
self.match("error opening input file baz\n")
|
||||
|
||||
self.fail("insert /newton/prep")
|
||||
self.contain("Error extracting time")
|
||||
self.contain("error extracting time")
|
||||
|
||||
self.fail("insert --start 19801205 /newton/prep 1 2 3 4")
|
||||
self.contain("--start can only be used with one input file")
|
||||
@@ -365,7 +397,7 @@ class TestCmdline(object):
|
||||
os.environ['TZ'] = "UTC"
|
||||
self.fail("insert --rate 120 /newton/raw "
|
||||
"tests/data/prep-20120323T1004")
|
||||
self.contain("Error parsing input data")
|
||||
self.contain("error parsing input data")
|
||||
|
||||
# empty data does nothing
|
||||
self.ok("insert --rate 120 --start '03/23/2012 06:05:00' /newton/prep "
|
||||
@@ -374,7 +406,7 @@ class TestCmdline(object):
|
||||
# bad start time
|
||||
self.fail("insert --rate 120 --start 'whatever' /newton/prep /dev/null")
|
||||
|
||||
def test_cmdline_07_detail(self):
|
||||
def test_07_detail(self):
|
||||
# Just count the number of lines, it's probably fine
|
||||
self.ok("list --detail")
|
||||
lines_(self.captured, 8)
|
||||
@@ -408,23 +440,30 @@ class TestCmdline(object):
|
||||
self.ok("list --detail")
|
||||
lines_(self.captured, 8)
|
||||
|
||||
def test_cmdline_08_extract(self):
|
||||
def test_08_extract(self):
|
||||
# nonexistent stream
|
||||
self.fail("extract /no/such/foo --start 2000-01-01 --end 2020-01-01")
|
||||
self.contain("Error getting stream info")
|
||||
self.contain("error getting stream info")
|
||||
|
||||
# empty ranges return an error
|
||||
# reversed range
|
||||
self.fail("extract -a /newton/prep --start 2020-01-01 --end 2000-01-01")
|
||||
self.contain("start is after end")
|
||||
|
||||
# empty ranges return error 2
|
||||
self.fail("extract -a /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30' " +
|
||||
"--end '23 Mar 2012 10:00:30'", exitcode = 2)
|
||||
"--end '23 Mar 2012 10:00:30'",
|
||||
exitcode = 2, require_error = False)
|
||||
self.contain("no data")
|
||||
self.fail("extract -a /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30.000001' " +
|
||||
"--end '23 Mar 2012 10:00:30.000001'", exitcode = 2)
|
||||
"--end '23 Mar 2012 10:00:30.000001'",
|
||||
exitcode = 2, require_error = False)
|
||||
self.contain("no data")
|
||||
self.fail("extract -a /newton/prep " +
|
||||
"--start '23 Mar 2022 10:00:30' " +
|
||||
"--end '23 Mar 2022 10:00:30'", exitcode = 2)
|
||||
"--end '23 Mar 2022 10:00:30'",
|
||||
exitcode = 2, require_error = False)
|
||||
self.contain("no data")
|
||||
|
||||
# but are ok if we're just counting results
|
||||
@@ -463,7 +502,7 @@ class TestCmdline(object):
|
||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
self.match("43200\n")
|
||||
|
||||
def test_cmdline_09_truncated(self):
|
||||
def test_09_truncated(self):
|
||||
# Test truncated responses by overriding the nilmdb max_results
|
||||
server_stop()
|
||||
server_start(max_results = 2)
|
||||
@@ -472,7 +511,102 @@ class TestCmdline(object):
|
||||
server_stop()
|
||||
server_start()
|
||||
|
||||
def test_cmdline_10_destroy(self):
|
||||
def test_10_remove(self):
|
||||
# Removing data
|
||||
|
||||
# Try nonexistent stream
|
||||
self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01")
|
||||
self.contain("No stream at path")
|
||||
|
||||
self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01")
|
||||
self.contain("start is after end")
|
||||
|
||||
# empty ranges return success, backwards ranges return error
|
||||
self.ok("remove /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30' " +
|
||||
"--end '23 Mar 2012 10:00:30'")
|
||||
self.match("")
|
||||
self.ok("remove /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30.000001' " +
|
||||
"--end '23 Mar 2012 10:00:30.000001'")
|
||||
self.match("")
|
||||
self.ok("remove /newton/prep " +
|
||||
"--start '23 Mar 2022 10:00:30' " +
|
||||
"--end '23 Mar 2022 10:00:30'")
|
||||
self.match("")
|
||||
|
||||
# Verbose
|
||||
self.ok("remove -c /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30' " +
|
||||
"--end '23 Mar 2012 10:00:30'")
|
||||
self.match("0\n")
|
||||
self.ok("remove --count /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30' " +
|
||||
"--end '23 Mar 2012 10:00:30'")
|
||||
self.match("0\n")
|
||||
|
||||
# Make sure we have the data we expect
|
||||
self.ok("list --detail /newton/prep")
|
||||
self.match("/newton/prep PrepData\n" +
|
||||
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
|
||||
" [ Fri, 23 Mar 2012 10:02:00.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:03:59.991668 +0000 ]\n"
|
||||
" [ Fri, 23 Mar 2012 10:04:00.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:05:59.991668 +0000 ]\n")
|
||||
|
||||
# Remove various chunks of prep data and make sure
|
||||
# they're gone.
|
||||
self.ok("remove -c /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30' " +
|
||||
"--end '23 Mar 2012 10:00:40'")
|
||||
self.match("1200\n")
|
||||
|
||||
self.ok("remove -c /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:10' " +
|
||||
"--end '23 Mar 2012 10:00:20'")
|
||||
self.match("1200\n")
|
||||
|
||||
self.ok("remove -c /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:05' " +
|
||||
"--end '23 Mar 2012 10:00:25'")
|
||||
self.match("1200\n")
|
||||
|
||||
self.ok("remove -c /newton/prep " +
|
||||
"--start '23 Mar 2012 10:03:50' " +
|
||||
"--end '23 Mar 2012 10:06:50'")
|
||||
self.match("15600\n")
|
||||
|
||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
self.match("24000\n")
|
||||
|
||||
# See the missing chunks in list output
|
||||
self.ok("list --detail /newton/prep")
|
||||
self.match("/newton/prep PrepData\n" +
|
||||
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n"
|
||||
" [ Fri, 23 Mar 2012 10:00:25.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:00:30.000000 +0000 ]\n"
|
||||
" [ Fri, 23 Mar 2012 10:00:40.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
|
||||
" [ Fri, 23 Mar 2012 10:02:00.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:03:50.000000 +0000 ]\n")
|
||||
|
||||
# Remove all data, verify it's missing
|
||||
self.ok("remove /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
self.match("") # no count requested this time
|
||||
self.ok("list --detail /newton/prep")
|
||||
self.match("/newton/prep PrepData\n" +
|
||||
" (no intervals)\n")
|
||||
|
||||
# Reinsert some data, to verify that no overlaps with deleted
|
||||
# data are reported
|
||||
os.environ['TZ'] = "UTC"
|
||||
self.ok("insert --rate 120 /newton/prep "
|
||||
"tests/data/prep-20120323T1000 "
|
||||
"tests/data/prep-20120323T1002")
|
||||
|
||||
def test_11_destroy(self):
|
||||
# Delete records
|
||||
self.ok("destroy --help")
|
||||
|
||||
@@ -493,7 +627,7 @@ class TestCmdline(object):
|
||||
|
||||
# Notice how they're not empty
|
||||
self.ok("list --detail")
|
||||
lines_(self.captured, 8)
|
||||
lines_(self.captured, 7)
|
||||
|
||||
# Delete some
|
||||
self.ok("destroy /newton/prep")
|
||||
@@ -523,7 +657,7 @@ class TestCmdline(object):
|
||||
self.ok("list --detail --path " + path)
|
||||
self.contain("(no intervals)")
|
||||
|
||||
def test_cmdline_11_unicode(self):
|
||||
def test_12_unicode(self):
|
||||
# Unicode paths.
|
||||
self.ok("destroy /newton/asdf/qwer")
|
||||
self.ok("destroy /newton/prep")
|
||||
@@ -540,3 +674,149 @@ class TestCmdline(object):
|
||||
self.ok(u"metadata /düsseldorf/raw --update 'α=β ε τ α'")
|
||||
self.ok(u"metadata /düsseldorf/raw")
|
||||
self.match(u"α=β ε τ α\nγ=δ\n")
|
||||
|
||||
self.ok(u"destroy /düsseldorf/raw")
|
||||
|
||||
def test_13_files(self):
|
||||
# Test BulkData's ability to split into multiple files,
|
||||
# by forcing the file size to be really small.
|
||||
server_stop()
|
||||
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
|
||||
"files_per_dir" : 3 })
|
||||
|
||||
# Fill data
|
||||
self.ok("create /newton/prep float32_8")
|
||||
os.environ['TZ'] = "UTC"
|
||||
with open("tests/data/prep-20120323T1004-timestamped") as input:
|
||||
self.ok("insert --none /newton/prep", input)
|
||||
|
||||
# Extract it
|
||||
self.ok("extract /newton/prep --start '2000-01-01' " +
|
||||
"--end '2012-03-23 10:04:01'")
|
||||
lines_(self.captured, 120)
|
||||
self.ok("extract /newton/prep --start '2000-01-01' " +
|
||||
"--end '2022-03-23 10:04:01'")
|
||||
lines_(self.captured, 14400)
|
||||
|
||||
# Make sure there were lots of files generated in the database
|
||||
# dir
|
||||
nfiles = 0
|
||||
for (dirpath, dirnames, filenames) in os.walk(testdb):
|
||||
nfiles += len(filenames)
|
||||
assert(nfiles > 500)
|
||||
|
||||
# Make sure we can restart the server with a different file
|
||||
# size and have it still work
|
||||
server_stop()
|
||||
server_start()
|
||||
self.ok("extract /newton/prep --start '2000-01-01' " +
|
||||
"--end '2022-03-23 10:04:01'")
|
||||
lines_(self.captured, 14400)
|
||||
|
||||
# Now recreate the data one more time and make sure there are
|
||||
# fewer files.
|
||||
self.ok("destroy /newton/prep")
|
||||
self.fail("destroy /newton/prep") # already destroyed
|
||||
self.ok("create /newton/prep float32_8")
|
||||
os.environ['TZ'] = "UTC"
|
||||
with open("tests/data/prep-20120323T1004-timestamped") as input:
|
||||
self.ok("insert --none /newton/prep", input)
|
||||
nfiles = 0
|
||||
for (dirpath, dirnames, filenames) in os.walk(testdb):
|
||||
nfiles += len(filenames)
|
||||
lt_(nfiles, 50)
|
||||
self.ok("destroy /newton/prep") # destroy again
|
||||
|
||||
def test_14_remove_files(self):
|
||||
# Test BulkData's ability to remove when data is split into
|
||||
# multiple files. Should be a fairly comprehensive test of
|
||||
# remove functionality.
|
||||
server_stop()
|
||||
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
|
||||
"files_per_dir" : 3 })
|
||||
|
||||
# Insert data. Just for fun, insert out of order
|
||||
self.ok("create /newton/prep PrepData")
|
||||
os.environ['TZ'] = "UTC"
|
||||
self.ok("insert --rate 120 /newton/prep "
|
||||
"tests/data/prep-20120323T1002 "
|
||||
"tests/data/prep-20120323T1000")
|
||||
|
||||
# Should take up about 2.8 MB here (including directory entries)
|
||||
du_before = nilmdb.utils.diskusage.du_bytes(testdb)
|
||||
|
||||
# Make sure we have the data we expect
|
||||
self.ok("list --detail")
|
||||
self.match("/newton/prep PrepData\n" +
|
||||
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
|
||||
" [ Fri, 23 Mar 2012 10:02:00.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:03:59.991668 +0000 ]\n")
|
||||
|
||||
# Remove various chunks of prep data and make sure
|
||||
# they're gone.
|
||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
self.match("28800\n")
|
||||
|
||||
self.ok("remove -c /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:30' " +
|
||||
"--end '23 Mar 2012 10:03:30'")
|
||||
self.match("21600\n")
|
||||
|
||||
self.ok("remove -c /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:10' " +
|
||||
"--end '23 Mar 2012 10:00:20'")
|
||||
self.match("1200\n")
|
||||
|
||||
self.ok("remove -c /newton/prep " +
|
||||
"--start '23 Mar 2012 10:00:05' " +
|
||||
"--end '23 Mar 2012 10:00:25'")
|
||||
self.match("1200\n")
|
||||
|
||||
self.ok("remove -c /newton/prep " +
|
||||
"--start '23 Mar 2012 10:03:50' " +
|
||||
"--end '23 Mar 2012 10:06:50'")
|
||||
self.match("1200\n")
|
||||
|
||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
self.match("3600\n")
|
||||
|
||||
# See the missing chunks in list output
|
||||
self.ok("list --detail")
|
||||
self.match("/newton/prep PrepData\n" +
|
||||
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n"
|
||||
" [ Fri, 23 Mar 2012 10:00:25.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:00:30.000000 +0000 ]\n"
|
||||
" [ Fri, 23 Mar 2012 10:03:30.000000 +0000"
|
||||
" -> Fri, 23 Mar 2012 10:03:50.000000 +0000 ]\n")
|
||||
|
||||
# We have 1/8 of the data that we had before, so the file size
|
||||
# should have dropped below 1/4 of what it used to be
|
||||
du_after = nilmdb.utils.diskusage.du_bytes(testdb)
|
||||
lt_(du_after, (du_before / 4))
|
||||
|
||||
# Remove anything that came from the 10:02 data file
|
||||
self.ok("remove /newton/prep " +
|
||||
"--start '23 Mar 2012 10:02:00' --end '2020-01-01'")
|
||||
|
||||
# Re-insert 19 lines from that file, then remove them again.
|
||||
# With the specific file_size above, this will cause the last
|
||||
# file in the bulk data storage to be exactly file_size large,
|
||||
# so removing the data should also remove that last file.
|
||||
self.ok("insert --rate 120 /newton/prep " +
|
||||
"tests/data/prep-20120323T1002-first19lines")
|
||||
self.ok("remove /newton/prep " +
|
||||
"--start '23 Mar 2012 10:02:00' --end '2020-01-01'")
|
||||
|
||||
# Shut down and restart server, to force nrows to get refreshed.
|
||||
server_stop()
|
||||
server_start()
|
||||
|
||||
# Re-add the full 10:02 data file. This tests adding new data once
|
||||
# we removed data near the end.
|
||||
self.ok("insert --rate 120 /newton/prep tests/data/prep-20120323T1002")
|
||||
|
||||
# See if we can extract it all
|
||||
self.ok("extract /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
lines_(self.captured, 15600)
|
||||
|
@@ -10,13 +10,13 @@ import itertools
|
||||
|
||||
from nilmdb.interval import Interval, DBInterval, IntervalSet, IntervalError
|
||||
|
||||
from test_helpers import *
|
||||
from testutil.helpers import *
|
||||
import unittest
|
||||
|
||||
# set to False to skip live renders
|
||||
do_live_renders = False
|
||||
def render(iset, description = "", live = True):
|
||||
import renderdot
|
||||
import testutil.renderdot as renderdot
|
||||
r = renderdot.RBTreeRenderer(iset.tree)
|
||||
return r.render(description, live and do_live_renders)
|
||||
|
||||
@@ -345,14 +345,15 @@ class TestIntervalSpeed:
|
||||
def test_interval_speed(self):
|
||||
import yappi
|
||||
import time
|
||||
import aplotter
|
||||
import testutil.aplotter as aplotter
|
||||
import random
|
||||
import math
|
||||
|
||||
print
|
||||
yappi.start()
|
||||
speeds = {}
|
||||
for j in [ 2**x for x in range(5,20) ]:
|
||||
limit = 10 # was 20
|
||||
for j in [ 2**x for x in range(5,limit) ]:
|
||||
start = time.time()
|
||||
iset = IntervalSet()
|
||||
for i in random.sample(xrange(j),j):
|
||||
|
@@ -7,7 +7,7 @@ from nose.tools import assert_raises
|
||||
import threading
|
||||
import time
|
||||
|
||||
from test_helpers import *
|
||||
from testutil.helpers import *
|
||||
|
||||
def func_with_callback(a, b, callback):
|
||||
callback(a)
|
||||
|
@@ -20,7 +20,7 @@ import cStringIO
|
||||
import random
|
||||
import unittest
|
||||
|
||||
from test_helpers import *
|
||||
from testutil.helpers import *
|
||||
|
||||
from nilmdb.layout import *
|
||||
|
||||
|
@@ -6,8 +6,9 @@ from nose.tools import *
|
||||
from nose.tools import assert_raises
|
||||
import threading
|
||||
import time
|
||||
import inspect
|
||||
|
||||
from test_helpers import *
|
||||
from testutil.helpers import *
|
||||
|
||||
@nilmdb.utils.lru_cache(size = 3)
|
||||
def foo1(n):
|
||||
@@ -24,30 +25,59 @@ foo3d.destructed = []
|
||||
def foo3(n):
|
||||
return n
|
||||
|
||||
class Foo:
|
||||
def __init__(self):
|
||||
self.calls = 0
|
||||
@nilmdb.utils.lru_cache(size = 3, keys = slice(1, 2))
|
||||
def foo(self, n, **kwargs):
|
||||
self.calls += 1
|
||||
|
||||
class TestLRUCache(object):
|
||||
def test(self):
|
||||
|
||||
[ foo1(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ]
|
||||
eq_((foo1.cache_hits, foo1.cache_misses), (6, 3))
|
||||
eq_(foo1.cache_info(), (6, 3))
|
||||
[ foo1(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ]
|
||||
eq_((foo1.cache_hits, foo1.cache_misses), (15, 3))
|
||||
eq_(foo1.cache_info(), (15, 3))
|
||||
[ foo1(n) for n in [ 4, 2, 1, 1, 4 ] ]
|
||||
eq_((foo1.cache_hits, foo1.cache_misses), (18, 5))
|
||||
eq_(foo1.cache_info(), (18, 5))
|
||||
|
||||
[ foo2(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ]
|
||||
eq_((foo2.cache_hits, foo2.cache_misses), (6, 3))
|
||||
eq_(foo2.cache_info(), (6, 3))
|
||||
[ foo2(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ]
|
||||
eq_((foo2.cache_hits, foo2.cache_misses), (15, 3))
|
||||
eq_(foo2.cache_info(), (15, 3))
|
||||
[ foo2(n) for n in [ 4, 2, 1, 1, 4 ] ]
|
||||
eq_((foo2.cache_hits, foo2.cache_misses), (19, 4))
|
||||
eq_(foo2.cache_info(), (19, 4))
|
||||
|
||||
[ foo3(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ]
|
||||
eq_((foo3.cache_hits, foo3.cache_misses), (6, 3))
|
||||
eq_(foo3.cache_info(), (6, 3))
|
||||
[ foo3(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ]
|
||||
eq_((foo3.cache_hits, foo3.cache_misses), (15, 3))
|
||||
eq_(foo3.cache_info(), (15, 3))
|
||||
[ foo3(n) for n in [ 4, 2, 1, 1, 4 ] ]
|
||||
eq_((foo3.cache_hits, foo3.cache_misses), (18, 5))
|
||||
eq_(foo3.cache_info(), (18, 5))
|
||||
eq_(foo3d.destructed, [1, 3])
|
||||
with assert_raises(KeyError):
|
||||
foo3.cache_remove(1,2,3)
|
||||
foo3.cache_remove(1)
|
||||
eq_(foo3d.destructed, [1, 3, 1])
|
||||
foo3.cache_remove_all()
|
||||
eq_(foo3d.destructed, [1, 3, 1, 2, 4 ])
|
||||
|
||||
foo = Foo()
|
||||
foo.foo(5)
|
||||
foo.foo(6)
|
||||
foo.foo(7)
|
||||
foo.foo(5)
|
||||
eq_(foo.calls, 3)
|
||||
|
||||
# Can't handle keyword arguments right now
|
||||
with assert_raises(NotImplementedError):
|
||||
foo.foo(3, asdf = 7)
|
||||
|
||||
# Verify that argspecs were maintained
|
||||
eq_(inspect.getargspec(foo1),
|
||||
inspect.ArgSpec(args=['n'],
|
||||
varargs=None, keywords=None, defaults=None))
|
||||
eq_(inspect.getargspec(foo.foo),
|
||||
inspect.ArgSpec(args=['self', 'n'],
|
||||
varargs=None, keywords="kwargs", defaults=None))
|
||||
|
@@ -5,15 +5,29 @@ import nose
|
||||
from nose.tools import *
|
||||
from nose.tools import assert_raises
|
||||
|
||||
from test_helpers import *
|
||||
from testutil.helpers import *
|
||||
|
||||
import sys
|
||||
import cStringIO
|
||||
import gc
|
||||
|
||||
import inspect
|
||||
|
||||
err = cStringIO.StringIO()
|
||||
|
||||
@nilmdb.utils.must_close(errorfile = err)
|
||||
class Foo:
|
||||
def __init__(self, arg):
|
||||
fprintf(err, "Init %s\n", arg)
|
||||
|
||||
def __del__(self):
|
||||
fprintf(err, "Deleting\n")
|
||||
|
||||
def close(self):
|
||||
fprintf(err, "Closing\n")
|
||||
|
||||
@nilmdb.utils.must_close(errorfile = err, wrap_verify = True)
|
||||
class Bar:
|
||||
def __init__(self):
|
||||
fprintf(err, "Init\n")
|
||||
|
||||
@@ -23,8 +37,11 @@ class Foo:
|
||||
def close(self):
|
||||
fprintf(err, "Closing\n")
|
||||
|
||||
def blah(self, arg):
|
||||
fprintf(err, "Blah %s\n", arg)
|
||||
|
||||
@nilmdb.utils.must_close(errorfile = err)
|
||||
class Bar:
|
||||
class Baz:
|
||||
pass
|
||||
|
||||
class TestMustClose(object):
|
||||
@@ -34,26 +51,60 @@ class TestMustClose(object):
|
||||
# garbage collect the object (and call its __del__ function)
|
||||
# right after a "del x".
|
||||
|
||||
x = Foo()
|
||||
# Trigger error
|
||||
err.truncate()
|
||||
x = Foo("hi")
|
||||
# Verify that the arg spec was maintained
|
||||
eq_(inspect.getargspec(x.__init__),
|
||||
inspect.ArgSpec(args = ['self', 'arg'],
|
||||
varargs = None, keywords = None, defaults = None))
|
||||
del x
|
||||
gc.collect()
|
||||
eq_(err.getvalue(),
|
||||
"Init\n"
|
||||
"Init hi\n"
|
||||
"error: Foo.close() wasn't called!\n"
|
||||
"Deleting\n")
|
||||
|
||||
# No error
|
||||
err.truncate(0)
|
||||
|
||||
y = Foo()
|
||||
y = Foo("bye")
|
||||
y.close()
|
||||
del y
|
||||
gc.collect()
|
||||
eq_(err.getvalue(),
|
||||
"Init\n"
|
||||
"Init bye\n"
|
||||
"Closing\n"
|
||||
"Deleting\n")
|
||||
|
||||
# Verify function calls when wrap_verify is True
|
||||
err.truncate(0)
|
||||
|
||||
z = Bar()
|
||||
eq_(inspect.getargspec(z.blah),
|
||||
inspect.ArgSpec(args = ['self', 'arg'],
|
||||
varargs = None, keywords = None, defaults = None))
|
||||
z.blah("boo")
|
||||
z.close()
|
||||
with assert_raises(AssertionError) as e:
|
||||
z.blah("hello")
|
||||
in_("called <function blah at 0x", str(e.exception))
|
||||
in_("> after close", str(e.exception))
|
||||
# Since the most recent assertion references 'z',
|
||||
# we need to raise another assertion here so that
|
||||
# 'z' will get properly deleted.
|
||||
with assert_raises(AssertionError):
|
||||
raise AssertionError()
|
||||
del z
|
||||
gc.collect()
|
||||
eq_(err.getvalue(),
|
||||
"Init\n"
|
||||
"Blah boo\n"
|
||||
"Closing\n"
|
||||
"Deleting\n")
|
||||
|
||||
# Class with missing methods
|
||||
err.truncate(0)
|
||||
w = Baz()
|
||||
w.close()
|
||||
del w
|
||||
eq_(err.getvalue(), "")
|
||||
|
||||
|
@@ -22,7 +22,7 @@ testdb = "tests/testdb"
|
||||
#def cleanup():
|
||||
# os.unlink(testdb)
|
||||
|
||||
from test_helpers import *
|
||||
from testutil.helpers import *
|
||||
|
||||
class Test00Nilmdb(object): # named 00 so it runs first
|
||||
def test_NilmDB(self):
|
||||
|
@@ -6,7 +6,7 @@ from nose.tools import assert_raises
|
||||
from cStringIO import StringIO
|
||||
import sys
|
||||
|
||||
from test_helpers import *
|
||||
from testutil.helpers import *
|
||||
|
||||
class TestPrintf(object):
|
||||
def test_printf(self):
|
||||
|
@@ -8,13 +8,13 @@ from nose.tools import assert_raises
|
||||
|
||||
from nilmdb.rbtree import RBTree, RBNode
|
||||
|
||||
from test_helpers import *
|
||||
from testutil.helpers import *
|
||||
import unittest
|
||||
|
||||
# set to False to skip live renders
|
||||
do_live_renders = False
|
||||
def render(tree, description = "", live = True):
|
||||
import renderdot
|
||||
import testutil.renderdot as renderdot
|
||||
r = renderdot.RBTreeRenderer(tree)
|
||||
return r.render(description, live and do_live_renders)
|
||||
|
||||
|
@@ -7,7 +7,7 @@ from nose.tools import assert_raises
|
||||
import threading
|
||||
import time
|
||||
|
||||
from test_helpers import *
|
||||
from testutil.helpers import *
|
||||
|
||||
#raise nose.exc.SkipTest("Skip these")
|
||||
|
||||
|
@@ -9,7 +9,7 @@ import os
|
||||
import sys
|
||||
import cStringIO
|
||||
|
||||
from test_helpers import *
|
||||
from testutil.helpers import *
|
||||
|
||||
class TestTimestamper(object):
|
||||
|
||||
|
1
tests/testutil/__init__.py
Normal file
1
tests/testutil/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# empty
|
@@ -12,6 +12,10 @@ def eq_(a, b):
|
||||
if not a == b:
|
||||
raise AssertionError("%s != %s" % (myrepr(a), myrepr(b)))
|
||||
|
||||
def lt_(a, b):
|
||||
if not a < b:
|
||||
raise AssertionError("%s is not less than %s" % (myrepr(a), myrepr(b)))
|
||||
|
||||
def in_(a, b):
|
||||
if a not in b:
|
||||
raise AssertionError("%s not in %s" % (myrepr(a), myrepr(b)))
|
||||
@@ -23,6 +27,8 @@ def ne_(a, b):
|
||||
def lines_(a, n):
|
||||
l = a.count('\n')
|
||||
if not l == n:
|
||||
if len(a) > 5000:
|
||||
a = a[0:5000] + " ... truncated"
|
||||
raise AssertionError("wanted %d lines, got %d in output: '%s'"
|
||||
% (n, l, a))
|
||||
|
Reference in New Issue
Block a user