Compare commits

...

39 Commits

Author SHA1 Message Date
85be497edb Fix README 2013-01-21 17:30:01 -05:00
bd1b7107af Update TODO, clean up bulkdata error message 2013-01-21 11:43:28 -05:00
b8275f108d Make error message more helpful 2013-01-18 17:27:57 -05:00
2820ff9758 More fixes to mustclose decorator and argspecs 2013-01-18 17:21:30 -05:00
a015de893d Cleanup 2013-01-18 17:14:26 -05:00
b7f746e66d Fix lrucache decorator argspecs 2013-01-18 17:13:50 -05:00
40cf4941f0 Test that argspecs are maintained in lrucache 2013-01-18 17:01:46 -05:00
8a418ceb3e Fix issue where mustclose decorator doesn't maintain argspec 2013-01-18 16:57:15 -05:00
0312b6eb07 Test for issue where mustclose decorator didn't maintain argspec 2013-01-18 16:55:51 -05:00
077f197d24 Fix server returning 500 for bad HTTP parameters 2013-01-18 16:54:49 -05:00
62354b4dce Add test for bad-parameters-give-500-error 2013-01-17 19:58:48 -05:00
5970cd85cf Disable "ie-friendly" error message padding in CherryPy 2013-01-16 17:57:45 -05:00
4f6a742e6c Fix test failure 2013-01-16 17:31:31 -05:00
87b43e5d04 Command line errors cleaned up and made more consistent 2013-01-16 16:52:43 -05:00
f0c2a64ae3 Update doc formatting, .gitignore 2013-01-09 23:36:23 -05:00
e5d3deb6fe Removal support is complete.
`nrows` may change if you restart the server; documented why this is
the case in the design.md file.  It's not a problem.
2013-01-09 23:26:59 -05:00
d321058b48 Add basic versioning to bulkdata table format file. 2013-01-09 19:26:24 -05:00
cea83140c0 More work towards correctly removing rows. 2013-01-09 19:25:45 -05:00
7807d6caf0 Progress and tests for bulkdata.remove
Passes tests, but doesn't really handle nrows (and removing partially
full files) correctly, when deleting near the end of the data.
2013-01-09 17:39:29 -05:00
3d0fad3c2a Move some helper functions around 2013-01-09 17:39:29 -05:00
fe3b087435 Remove implemented in nilmdb; still needs bulkdata changes. 2013-01-08 21:07:52 -05:00
bcefe52298 nilmdb: Bring out range manipulating SQL so we can reuse it 2013-01-08 18:45:03 -05:00
f88c148ccc Interval removal work in progress. Needs NilmDB and BulkData work. 2013-01-08 18:37:01 -05:00
4a47b1d04a remove support: command line, client 2013-01-06 20:13:57 -05:00
80da937cb7 cmdline: return error when start > end (extract, list, remove) 2013-01-06 20:13:28 -05:00
c81972e66e Minor testsuite and commandline fixes.
Now supports "list /foo/bar" in addition to the older "list --path /foo/bar"
2013-01-06 19:25:07 -05:00
b09362fde1 Full coverage of nilmdb.utils.mustclose 2013-01-05 18:02:53 -05:00
b7688844fa Add a Nosetests plugin that lets me specify a test order within a directory. 2013-01-05 18:02:37 -05:00
3d212e7592 Move test helpers into subdirectory 2013-01-05 15:00:34 -05:00
7aedfdf9c3 Add lower level bulkdata test 2013-01-05 14:55:22 -05:00
ebd4f74959 Remove "pragma: no cover" from things that should get tested 2013-01-05 14:52:06 -05:00
ebe2fbab92 Add wrap_verify option to nilmdb.utils.must_close decorator 2013-01-05 14:51:41 -05:00
4831a0cae1 Small doc updates 2013-01-04 17:27:04 -05:00
07192c6ffb nilmdb.BulkData: Switch to nested subdir/filename layout
Use numbered subdirectories to avoid having too many files in one dir.
Add appropriate tests.

Also fix an issue where the mmap_open LRU cache could inappropriately
open a file twice because it was using the optional "newsize"
parameter as a key -- now lrucache can be given a slice object that
describes which arguments are important.
2013-01-04 16:51:05 -05:00
09d325e8ab Rename format -> _format in data dirs 2013-01-03 20:46:15 -05:00
11b0293d5f Clean up BulkData file size calculations, test more thoroughly
Now the goal is 128 MiB files, rather than a specific length.
2013-01-03 20:19:01 -05:00
493bbed82c More coverage and tests 2013-01-03 19:21:12 -05:00
3bc25daaab Trim urllib to get full coverage of the features in use 2013-01-03 17:10:07 -05:00
40a3bc4bc3 Update README with Python 2.7 requirement 2013-01-03 17:09:51 -05:00
46 changed files with 1507 additions and 426 deletions

3
.gitignore vendored
View File

@@ -2,3 +2,6 @@ db/
tests/*testdb/ tests/*testdb/
.coverage .coverage
*.pyc *.pyc
design.html
timeit*out

View File

@@ -8,11 +8,14 @@ tool:
lint: lint:
pylint -f parseable nilmdb pylint -f parseable nilmdb
%.html: %.md
pandoc -s $< > $@
test: test:
nosetests python runtests.py
profile: profile:
nosetests --with-profile python runtests.py --with-profile
clean:: clean::
find . -name '*pyc' | xargs rm -f find . -name '*pyc' | xargs rm -f

View File

@@ -1,4 +1,3 @@
sudo apt-get install python-nose python-coverage sudo apt-get install python2.7 python-cherrypy3 python-decorator python-nose python-coverage
sudo apt-get install python-tables python-cherrypy3
sudo apt-get install cython # 0.17.1-1 or newer sudo apt-get install cython # 0.17.1-1 or newer

6
TODO
View File

@@ -1 +1,5 @@
- Merge adjacent intervals on insert (maybe with client help?) -- Clean up error responses. Specifically I'd like to be able to add
json-formatted data to OverflowError and DB parsing errors. It
seems like subclassing cherrypy.HTTPError and overriding
set_response is the best thing to do -- it would let me get rid
of the _be_ie_unfriendly and other hacks in the server.

224
design.md
View File

@@ -1,11 +1,12 @@
Structure Structure
--------- ---------
nilmdb.nilmdb is the NILM database interface. It tracks a PyTables nilmdb.nilmdb is the NILM database interface. A nilmdb.BulkData
database holds actual rows of data, and a SQL database tracks metadata interface stores data in flat files, and a SQL database tracks
and ranges. metadata and ranges.
Access to the nilmdb must be single-threaded. This is handled with Access to the nilmdb must be single-threaded. This is handled with
the nilmdb.serializer class. the nilmdb.serializer class. In the future this could probably
be turned into a per-path serialization.
nilmdb.server is a HTTP server that provides an interface to talk, nilmdb.server is a HTTP server that provides an interface to talk,
thorugh the serialization layer, to the nilmdb object. thorugh the serialization layer, to the nilmdb object.
@@ -18,13 +19,13 @@ Sqlite performance
Committing a transaction in the default sync mode (PRAGMA synchronous=FULL) Committing a transaction in the default sync mode (PRAGMA synchronous=FULL)
takes about 125msec. sqlite3 will commit transactions at 3 times: takes about 125msec. sqlite3 will commit transactions at 3 times:
1: explicit con.commit() 1. explicit con.commit()
2: between a series of DML commands and non-DML commands, e.g. 2. between a series of DML commands and non-DML commands, e.g.
after a series of INSERT, SELECT, but before a CREATE TABLE or after a series of INSERT, SELECT, but before a CREATE TABLE or
PRAGMA. PRAGMA.
3: at the end of an explicit transaction, e.g. "with self.con as con:" 3. at the end of an explicit transaction, e.g. "with self.con as con:"
To speed up testing, or if this transaction speed becomes an issue, To speed up testing, or if this transaction speed becomes an issue,
the sync=False option to NilmDB will set PRAGMA synchronous=OFF. the sync=False option to NilmDB will set PRAGMA synchronous=OFF.
@@ -47,101 +48,105 @@ transfer?
everything still gets buffered. Just a tradeoff of buffer size. everything still gets buffered. Just a tradeoff of buffer size.
Before timestamps are added: Before timestamps are added:
- Raw data is about 440 kB/s (9 channels) - Raw data is about 440 kB/s (9 channels)
- Prep data is about 12.5 kB/s (1 phase) - Prep data is about 12.5 kB/s (1 phase)
- How do we know how much data to send? - How do we know how much data to send?
- Remember that we can only do maybe 8-50 transactions per second on - Remember that we can only do maybe 8-50 transactions per second on
the sqlite database. So if one block of inserted data is one the sqlite database. So if one block of inserted data is one
transaction, we'd need the raw case to be around 64kB per request, transaction, we'd need the raw case to be around 64kB per request,
ideally more. ideally more.
- Maybe use a range, based on how long it's taking to read the data - Maybe use a range, based on how long it's taking to read the data
- If no more data, send it - If no more data, send it
- If data > 1 MB, send it - If data > 1 MB, send it
- If more than 10 seconds have elapsed, send it - If more than 10 seconds have elapsed, send it
- Should those numbers come from the server? - Should those numbers come from the server?
Converting from ASCII to PyTables: Converting from ASCII to PyTables:
- For each row getting added, we need to set attributes on a PyTables - For each row getting added, we need to set attributes on a PyTables
Row object and call table.append(). This means that there isn't a Row object and call table.append(). This means that there isn't a
particularly efficient way of converting from ascii. particularly efficient way of converting from ascii.
- Could create a function like nilmdb.layout.Layout("foo".fillRow(asciiline) - Could create a function like nilmdb.layout.Layout("foo".fillRow(asciiline)
- But this means we're doing parsing on the serialized side - But this means we're doing parsing on the serialized side
- Let's keep parsing on the threaded server side so we can detect - Let's keep parsing on the threaded server side so we can detect
errors better, and not block the serialized nilmdb for a slow errors better, and not block the serialized nilmdb for a slow
parsing process. parsing process.
- Client sends ASCII data - Client sends ASCII data
- Server converts this ACSII data to a list of values - Server converts this ACSII data to a list of values
- Maybe: - Maybe:
# threaded side creates this object # threaded side creates this object
parser = nilmdb.layout.Parser("layout_name") parser = nilmdb.layout.Parser("layout_name")
# threaded side parses and fills it with data # threaded side parses and fills it with data
parser.parse(textdata) parser.parse(textdata)
# serialized side pulls out rows # serialized side pulls out rows
for n in xrange(parser.nrows): for n in xrange(parser.nrows):
parser.fill_row(rowinstance, n) parser.fill_row(rowinstance, n)
table.append() table.append()
Inserting streams, inside nilmdb Inserting streams, inside nilmdb
-------------------------------- --------------------------------
- First check that the new stream doesn't overlap. - First check that the new stream doesn't overlap.
- Get minimum timestamp, maximum timestamp from data parser. - Get minimum timestamp, maximum timestamp from data parser.
- (extend parser to verify monotonicity and track extents) - (extend parser to verify monotonicity and track extents)
- Get all intervals for this stream in the database - Get all intervals for this stream in the database
- See if new interval overlaps any existing ones - See if new interval overlaps any existing ones
- If so, bail - If so, bail
- Question: should we cache intervals inside NilmDB? - Question: should we cache intervals inside NilmDB?
- Assume database is fast for now, and always rebuild fom DB. - Assume database is fast for now, and always rebuild fom DB.
- Can add a caching layer later if we need to. - Can add a caching layer later if we need to.
- `stream_get_ranges(path)` -> return IntervalSet? - `stream_get_ranges(path)` -> return IntervalSet?
Speed Speed
----- -----
- First approach was quadratic. Adding four hours of data: - First approach was quadratic. Adding four hours of data:
$ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-110000 /bpnilm/1/raw $ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-110000 /bpnilm/1/raw
real 24m31.093s real 24m31.093s
$ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-120001 /bpnilm/1/raw $ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-120001 /bpnilm/1/raw
real 43m44.528s real 43m44.528s
$ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-130002 /bpnilm/1/raw $ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-130002 /bpnilm/1/raw
real 93m29.713s real 93m29.713s
$ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-140003 /bpnilm/1/raw $ time zcat /home/jim/bpnilm-data/snapshot-1-20110513-110002.raw.gz | ./nilmtool.py insert -s 20110513-140003 /bpnilm/1/raw
real 166m53.007s real 166m53.007s
- Disabling pytables indexing didn't help: - Disabling pytables indexing didn't help:
real 31m21.492s real 31m21.492s
real 52m51.963s real 52m51.963s
real 102m8.151s real 102m8.151s
real 176m12.469s real 176m12.469s
- Server RAM usage is constant. - Server RAM usage is constant.
- Speed problems were due to IntervalSet speed, of parsing intervals - Speed problems were due to IntervalSet speed, of parsing intervals
from the database and adding the new one each time. from the database and adding the new one each time.
- First optimization is to cache result of `nilmdb:_get_intervals`, - First optimization is to cache result of `nilmdb:_get_intervals`,
which gives the best speedup. which gives the best speedup.
- Also switched to internally using bxInterval from bx-python package. - Also switched to internally using bxInterval from bx-python package.
Speed of `tests/test_interval:TestIntervalSpeed` is pretty decent Speed of `tests/test_interval:TestIntervalSpeed` is pretty decent
and seems to be growing logarithmically now. About 85μs per insertion and seems to be growing logarithmically now. About 85μs per insertion
for inserting 131k entries. for inserting 131k entries.
- Storing the interval data in SQL might be better, with a scheme like: - Storing the interval data in SQL might be better, with a scheme like:
http://www.logarithmic.net/pfh/blog/01235197474 http://www.logarithmic.net/pfh/blog/01235197474
- Next slowdown target is nilmdb.layout.Parser.parse(). - Next slowdown target is nilmdb.layout.Parser.parse().
- Rewrote parsers using cython and sscanf - Rewrote parsers using cython and sscanf
- Stats (rev 10831), with _add_interval disabled - Stats (rev 10831), with _add_interval disabled
layout.pyx.Parser.parse:128 6303 sec, 262k calls
layout.pyx.parse:63 13913 sec, 5.1g calls layout.pyx.Parser.parse:128 6303 sec, 262k calls
numpy:records.py.fromrecords:569 7410 sec, 262k calls layout.pyx.parse:63 13913 sec, 5.1g calls
- Probably OK for now. numpy:records.py.fromrecords:569 7410 sec, 262k calls
- Probably OK for now.
- After all updates, now takes about 8.5 minutes to insert an hour of - After all updates, now takes about 8.5 minutes to insert an hour of
data, constant after adding 171 hours (4.9 billion data points) data, constant after adding 171 hours (4.9 billion data points)
@@ -156,12 +161,12 @@ IntervalSet speed
sorted list sorted list
- Replaced with bxInterval; now takes about log n time for an insertion - Replaced with bxInterval; now takes about log n time for an insertion
- TestIntervalSpeed with range(17,18) and profiling - TestIntervalSpeed with range(17,18) and profiling
- 85 μs each - 85 μs each
- 131072 calls to `__iadd__` - 131072 calls to `__iadd__`
- 131072 to bx.insert_interval - 131072 to bx.insert_interval
- 131072 to bx.insert:395 - 131072 to bx.insert:395
- 2355835 to bx.insert:106 (18x as many?) - 2355835 to bx.insert:106 (18x as many?)
- Tried blist too, worse than bxinterval. - Tried blist too, worse than bxinterval.
@@ -172,14 +177,14 @@ IntervalSet speed
insert for 2**17 insertions, followed by total wall time and RAM insert for 2**17 insertions, followed by total wall time and RAM
usage for running "make test" with `test_rbtree` and `test_interval` usage for running "make test" with `test_rbtree` and `test_interval`
with range(5,20): with range(5,20):
- old values with bxinterval: - old values with bxinterval:
20.2 μS, total 20 s, 177 MB RAM 20.2 μS, total 20 s, 177 MB RAM
- rbtree, plain python: - rbtree, plain python:
97 μS, total 105 s, 846 MB RAM 97 μS, total 105 s, 846 MB RAM
- rbtree converted to cython: - rbtree converted to cython:
26 μS, total 29 s, 320 MB RAM 26 μS, total 29 s, 320 MB RAM
- rbtree and interval converted to cython: - rbtree and interval converted to cython:
8.4 μS, total 12 s, 134 MB RAM 8.4 μS, total 12 s, 134 MB RAM
Layouts Layouts
------- -------
@@ -198,3 +203,66 @@ handlers. For compatibility:
"RawData" == "uint16_6" "RawData" == "uint16_6"
"RawNotchedData" == "uint16_9" "RawNotchedData" == "uint16_9"
"PrepData" == "float32_8" "PrepData" == "float32_8"
BulkData design
---------------
BulkData is a custom bulk data storage system that was written to
replace PyTables. The general structure is a `data` subdirectory in
the main NilmDB directory. Within `data`, paths are created for each
created stream. These locations are called tables. For example,
tables might be located at
nilmdb/data/newton/raw/
nilmdb/data/newton/prep/
nilmdb/data/cottage/raw/
Each table contains:
- An unchanging `_format` file (Python pickle format) that describes
parameters of how the data is broken up, like files per directory,
rows per file, and the binary data format
- Hex named subdirectories `("%04x", although more than 65536 can exist)`
- Hex named files within those subdirectories, like:
/nilmdb/data/newton/raw/000b/010a
The data format of these files is raw binary, interpreted by the
Python `struct` module according to the format string in the
`_format` file.
- Same as above, with `.removed` suffix, is an optional file (Python
pickle format) containing a list of row numbers that have been
logically removed from the file. If this range covers the entire
file, the entire file will be removed.
- Note that the `bulkdata.nrows` variable is calculated once in
`BulkData.__init__()`, and only ever incremented during use. Thus,
even if all data is removed, `nrows` can remain high. However, if
the server is restarted, the newly calculated `nrows` may be lower
than in a previous run due to deleted data. To be specific, this
sequence of events:
- insert data
- remove all data
- insert data
will result in having different row numbers in the database, and
differently numbered files on the filesystem, than the sequence:
- insert data
- remove all data
- restart server
- insert data
This is okay! Everything should remain consistent both in the
`BulkData` and `NilmDB`. Not attempting to readjust `nrows` during
deletion makes the code quite a bit simpler.
- Similarly, data files are never truncated shorter. Removing data
from the end of the file will not shorten it; it will only be
deleted when it has been fully filled and all of the data has been
subsequently removed.

View File

@@ -11,17 +11,32 @@ import cPickle as pickle
import struct import struct
import fnmatch import fnmatch
import mmap import mmap
import re
# Up to 256 open file descriptors at any given time # Up to 256 open file descriptors at any given time.
# These variables are global so they can be used in the decorator arguments.
table_cache_size = 16 table_cache_size = 16
fd_cache_size = 16 fd_cache_size = 16
@nilmdb.utils.must_close() @nilmdb.utils.must_close(wrap_verify = True)
class BulkData(object): class BulkData(object):
def __init__(self, basepath): def __init__(self, basepath, **kwargs):
self.basepath = basepath self.basepath = basepath
self.root = os.path.join(self.basepath, "data") self.root = os.path.join(self.basepath, "data")
# Tuneables
if "file_size" in kwargs:
self.file_size = kwargs["file_size"]
else:
# Default to approximately 128 MiB per file
self.file_size = 128 * 1024 * 1024
if "files_per_dir" in kwargs:
self.files_per_dir = kwargs["files_per_dir"]
else:
# 32768 files per dir should work even on FAT32
self.files_per_dir = 32768
# Make root path # Make root path
if not os.path.isdir(self.root): if not os.path.isdir(self.root):
os.mkdir(self.root) os.mkdir(self.root)
@@ -55,7 +70,8 @@ class BulkData(object):
raise ValueError("paths must start with /") raise ValueError("paths must start with /")
[ group, node ] = path.rsplit("/", 1) [ group, node ] = path.rsplit("/", 1)
if group == '': if group == '':
raise ValueError("invalid path") raise ValueError("invalid path; path must contain at least one "
"folder")
# Get layout, and build format string for struct module # Get layout, and build format string for struct module
try: try:
@@ -81,26 +97,24 @@ class BulkData(object):
# Create the table. Note that we make a distinction here # Create the table. Note that we make a distinction here
# between NilmDB paths (always Unix style, split apart # between NilmDB paths (always Unix style, split apart
# manually) and OS paths (built up with os.path.join) # manually) and OS paths (built up with os.path.join)
try:
# Make directories leading up to this one
elements = path.lstrip('/').split('/')
for i in range(len(elements)):
ospath = os.path.join(self.root, *elements[0:i])
if Table.exists(ospath):
raise ValueError("path is subdir of existing node")
if not os.path.isdir(ospath):
os.mkdir(ospath)
# Make the final dir # Make directories leading up to this one
ospath = os.path.join(self.root, *elements) elements = path.lstrip('/').split('/')
if os.path.isdir(ospath): for i in range(len(elements)):
raise ValueError("subdirs of this path already exist") ospath = os.path.join(self.root, *elements[0:i])
os.mkdir(ospath) if Table.exists(ospath):
raise ValueError("path is subdir of existing node")
if not os.path.isdir(ospath):
os.mkdir(ospath)
# Write format string to file # Make the final dir
Table.create(ospath, struct_fmt) ospath = os.path.join(self.root, *elements)
except OSError as e: if os.path.isdir(ospath):
raise ValueError("error creating table at that path: " + e.strerror) raise ValueError("subdirs of this path already exist")
os.mkdir(ospath)
# Write format string to file
Table.create(ospath, struct_fmt, self.file_size, self.files_per_dir)
# Open and cache it # Open and cache it
self.getnode(unicodepath) self.getnode(unicodepath)
@@ -118,13 +132,16 @@ class BulkData(object):
ospath = os.path.join(self.root, *elements) ospath = os.path.join(self.root, *elements)
# Remove Table object from cache # Remove Table object from cache
self.getnode.cache_remove(self, ospath) self.getnode.cache_remove(self, unicodepath)
# Remove the contents of the target directory # Remove the contents of the target directory
if not os.path.isfile(os.path.join(ospath, "format")): if not Table.exists(ospath):
raise ValueError("nothing at that path") raise ValueError("nothing at that path")
for file in os.listdir(ospath): for (root, dirs, files) in os.walk(ospath, topdown = False):
os.remove(os.path.join(ospath, file)) for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
# Remove empty parent directories # Remove empty parent directories
for i in reversed(range(len(elements))): for i in reversed(range(len(elements))):
@@ -145,23 +162,32 @@ class BulkData(object):
ospath = os.path.join(self.root, *elements) ospath = os.path.join(self.root, *elements)
return Table(ospath) return Table(ospath)
@nilmdb.utils.must_close() @nilmdb.utils.must_close(wrap_verify = True)
class Table(object): class Table(object):
"""Tools to help access a single table (data at a specific OS path)""" """Tools to help access a single table (data at a specific OS path)."""
# See design.md for design details
# Class methods, to help keep format details in this class. # Class methods, to help keep format details in this class.
@classmethod @classmethod
def exists(cls, root): def exists(cls, root):
"""Return True if a table appears to exist at this OS path""" """Return True if a table appears to exist at this OS path"""
return os.path.isfile(os.path.join(root, "format")) return os.path.isfile(os.path.join(root, "_format"))
@classmethod @classmethod
def create(cls, root, struct_fmt): def create(cls, root, struct_fmt, file_size, files_per_dir):
"""Initialize a table at the given OS path. """Initialize a table at the given OS path.
'struct_fmt' is a Struct module format description""" 'struct_fmt' is a Struct module format description"""
format = { "rows_per_file": 4 * 1024 * 1024,
"struct_fmt": struct_fmt } # Calculate rows per file so that each file is approximately
with open(os.path.join(root, "format"), "wb") as f: # file_size bytes.
packer = struct.Struct(struct_fmt)
rows_per_file = max(file_size // packer.size, 1)
format = { "rows_per_file": rows_per_file,
"files_per_dir": files_per_dir,
"struct_fmt": struct_fmt,
"version": 1 }
with open(os.path.join(root, "_format"), "wb") as f:
pickle.dump(format, f, 2) pickle.dump(format, f, 2)
# Normal methods # Normal methods
@@ -170,64 +196,101 @@ class Table(object):
self.root = root self.root = root
# Load the format and build packer # Load the format and build packer
with open(self._fullpath("format"), "rb") as f: with open(os.path.join(self.root, "_format"), "rb") as f:
format = pickle.load(f) format = pickle.load(f)
if format["version"] != 1: # pragma: no cover (just future proofing)
raise NotImplementedError("version " + format["version"] +
" bulk data store not supported")
self.rows_per_file = format["rows_per_file"] self.rows_per_file = format["rows_per_file"]
self.files_per_dir = format["files_per_dir"]
self.packer = struct.Struct(format["struct_fmt"]) self.packer = struct.Struct(format["struct_fmt"])
self.file_size = self.packer.size * self.rows_per_file self.file_size = self.packer.size * self.rows_per_file
# Find nrows by locating the lexicographically last filename # Find nrows
# and using its size. self.nrows = self._get_nrows()
pattern = '[0-9a-f]' * 8
allfiles = fnmatch.filter(os.listdir(self.root), pattern)
if allfiles:
filename = max(allfiles)
offset = os.path.getsize(self._fullpath(filename))
self.nrows = self._row_from_fnoffset(filename, offset)
else:
self.nrows = 0
def close(self): def close(self):
self.mmap_open.cache_remove_all() self.mmap_open.cache_remove_all()
# Internal helpers # Internal helpers
def _fullpath(self, filename): def _get_nrows(self):
return os.path.join(self.root, filename) """Find nrows by locating the lexicographically last filename
and using its size"""
# Note that this just finds a 'nrows' that is guaranteed to be
# greater than the row number of any piece of data that
# currently exists, not necessarily all data that _ever_
# existed.
regex = re.compile("^[0-9a-f]{4,}$")
def _fnoffset_from_row(self, row): # Find the last directory. We sort and loop through all of them,
"""Return a (filename, offset, count) tuple: # starting with the numerically greatest, because the dirs could be
# empty if something was deleted.
subdirs = sorted(filter(regex.search, os.listdir(self.root)),
key = lambda x: int(x, 16), reverse = True)
for subdir in subdirs:
# Now find the last file in that dir
path = os.path.join(self.root, subdir)
files = filter(regex.search, os.listdir(path))
if not files: # pragma: no cover (shouldn't occur)
# Empty dir: try the next one
continue
# Find the numerical max
filename = max(files, key = lambda x: int(x, 16))
offset = os.path.getsize(os.path.join(self.root, subdir, filename))
# Convert to row number
return self._row_from_offset(subdir, filename, offset)
# No files, so no data
return 0
def _offset_from_row(self, row):
"""Return a (subdir, filename, offset, count) tuple:
subdir: subdirectory for the file
filename: the filename that contains the specified row filename: the filename that contains the specified row
offset: byte offset of the specified row within the file offset: byte offset of the specified row within the file
count: number of rows (starting at offste) that fit in the file count: number of rows (starting at offset) that fit in the file
""" """
filenum = row // self.rows_per_file filenum = row // self.rows_per_file
filename = sprintf("%08x", filenum) # It's OK if these format specifiers are too short; the filenames
# will just get longer but will still sort correctly.
dirname = sprintf("%04x", filenum // self.files_per_dir)
filename = sprintf("%04x", filenum % self.files_per_dir)
offset = (row % self.rows_per_file) * self.packer.size offset = (row % self.rows_per_file) * self.packer.size
count = self.rows_per_file - (row % self.rows_per_file) count = self.rows_per_file - (row % self.rows_per_file)
return (filename, offset, count) return (dirname, filename, offset, count)
def _row_from_fnoffset(self, filename, offset): def _row_from_offset(self, subdir, filename, offset):
"""Return the row number that corresponds to the given """Return the row number that corresponds to the given
filename and byte-offset within that file.""" 'subdir/filename' and byte-offset within that file."""
filenum = int(filename, 16) if (offset % self.packer.size) != 0: # pragma: no cover; shouldn't occur
if (offset % self.packer.size) != 0:
raise ValueError("file offset is not a multiple of data size") raise ValueError("file offset is not a multiple of data size")
filenum = int(subdir, 16) * self.files_per_dir + int(filename, 16)
row = (filenum * self.rows_per_file) + (offset // self.packer.size) row = (filenum * self.rows_per_file) + (offset // self.packer.size)
return row return row
# Cache open files # Cache open files
@nilmdb.utils.lru_cache(size = fd_cache_size, @nilmdb.utils.lru_cache(size = fd_cache_size,
keys = slice(0,3), # exclude newsize
onremove = lambda x: x.close()) onremove = lambda x: x.close())
def mmap_open(self, file, newsize = None): def mmap_open(self, subdir, filename, newsize = None):
"""Open and map a given filename (relative to self.root). """Open and map a given 'subdir/filename' (relative to self.root).
Will be automatically closed when evicted from the cache. Will be automatically closed when evicted from the cache.
If 'newsize' is provided, the file is truncated to the given If 'newsize' is provided, the file is truncated to the given
size before the mapping is returned. (Note that the LRU cache size before the mapping is returned. (Note that the LRU cache
on this function means the truncate will only happen if the on this function means the truncate will only happen if the
object isn't already cached; mmap.resize should be used too)""" object isn't already cached; mmap.resize should be used too.)"""
f = open(os.path.join(self.root, file), "a+", 0) try:
os.mkdir(os.path.join(self.root, subdir))
except OSError:
pass
f = open(os.path.join(self.root, subdir, filename), "a+", 0)
if newsize is not None: if newsize is not None:
# mmap can't map a zero-length file, so this allows the # mmap can't map a zero-length file, so this allows the
# caller to set the filesize between file creation and # caller to set the filesize between file creation and
@@ -236,6 +299,15 @@ class Table(object):
mm = mmap.mmap(f.fileno(), 0) mm = mmap.mmap(f.fileno(), 0)
return mm return mm
def mmap_open_resize(self, subdir, filename, newsize):
"""Open and map a given 'subdir/filename' (relative to self.root).
The file is resized to the given size."""
# Pass new size to mmap_open
mm = self.mmap_open(subdir, filename, newsize)
# In case we got a cached copy, need to call mm.resize too.
mm.resize(newsize)
return mm
def append(self, data): def append(self, data):
"""Append the data and flush it to disk. """Append the data and flush it to disk.
data is a nested Python list [[row],[row],[...]]""" data is a nested Python list [[row],[row],[...]]"""
@@ -243,18 +315,13 @@ class Table(object):
dataiter = iter(data) dataiter = iter(data)
while remaining: while remaining:
# See how many rows we can fit into the current file, and open it # See how many rows we can fit into the current file, and open it
(filename, offset, count) = self._fnoffset_from_row(self.nrows) (subdir, fname, offset, count) = self._offset_from_row(self.nrows)
if count > remaining: if count > remaining:
count = remaining count = remaining
newsize = offset + count * self.packer.size newsize = offset + count * self.packer.size
mm = self.mmap_open(filename, newsize) mm = self.mmap_open_resize(subdir, fname, newsize)
mm.seek(offset) mm.seek(offset)
# Extend the file to the target length. We specified
# newsize when opening, but that may have been ignored if
# the mmap_open returned a cached object.
mm.resize(newsize)
# Write the data # Write the data
for i in xrange(count): for i in xrange(count):
row = dataiter.next() row = dataiter.next()
@@ -282,10 +349,10 @@ class Table(object):
row = key.start row = key.start
remaining = key.stop - key.start remaining = key.stop - key.start
while remaining: while remaining:
(filename, offset, count) = self._fnoffset_from_row(row) (subdir, filename, offset, count) = self._offset_from_row(row)
if count > remaining: if count > remaining:
count = remaining count = remaining
mm = self.mmap_open(filename) mm = self.mmap_open(subdir, filename)
for i in xrange(count): for i in xrange(count):
ret.append(list(self.packer.unpack_from(mm, offset))) ret.append(list(self.packer.unpack_from(mm, offset)))
offset += self.packer.size offset += self.packer.size
@@ -296,10 +363,93 @@ class Table(object):
# Handle single points # Handle single points
if key < 0 or key >= self.nrows: if key < 0 or key >= self.nrows:
raise IndexError("Index out of range") raise IndexError("Index out of range")
(filename, offset, count) = self._fnoffset_from_row(key) (subdir, filename, offset, count) = self._offset_from_row(key)
mm = self.mmap_open(filename) mm = self.mmap_open(subdir, filename)
# unpack_from ignores the mmap object's current seek position # unpack_from ignores the mmap object's current seek position
return self.packer.unpack_from(mm, offset) return list(self.packer.unpack_from(mm, offset))
def _remove_rows(self, subdir, filename, start, stop):
"""Helper to mark specific rows as being removed from a
file, and potentially removing or truncating the file itself."""
# Import an existing list of deleted rows for this file
datafile = os.path.join(self.root, subdir, filename)
cachefile = datafile + ".removed"
try:
with open(cachefile, "rb") as f:
ranges = pickle.load(f)
cachefile_present = True
except:
ranges = []
cachefile_present = False
# Append our new range and sort
ranges.append((start, stop))
ranges.sort()
# Merge adjacent ranges into "out"
merged = []
prev = None
for new in ranges:
if prev is None:
# No previous range, so remember this one
prev = new
elif prev[1] == new[0]:
# Previous range connected to this new one; extend prev
prev = (prev[0], new[1])
else:
# Not connected; append previous and start again
merged.append(prev)
prev = new
if prev is not None:
merged.append(prev)
# If the range covered the whole file, we can delete it now.
# Note that the last file in a table may be only partially
# full (smaller than self.rows_per_file). We purposely leave
# those files around rather than deleting them, because the
# remainder will be filled on a subsequent append(), and things
# are generally easier if we don't have to special-case that.
if (len(merged) == 1 and
merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
# Close potentially open file in mmap_open LRU cache
self.mmap_open.cache_remove(self, subdir, filename)
# Delete files
os.remove(datafile)
if cachefile_present:
os.remove(cachefile)
# Try deleting subdir, too
try:
os.rmdir(os.path.join(self.root, subdir))
except:
pass
else:
# Update cache. Try to do it atomically.
nilmdb.utils.atomic.replace_file(cachefile,
pickle.dumps(merged, 2))
def remove(self, start, stop):
"""Remove specified rows [start, stop) from this table.
If a file is left empty, it is fully removed. Otherwise, a
parallel data file is used to remember which rows have been
removed, and the file is otherwise untouched."""
if start < 0 or start > stop or stop > self.nrows:
raise IndexError("Index out of range")
row = start
remaining = stop - start
while remaining:
# Loop through each file that we need to touch
(subdir, filename, offset, count) = self._offset_from_row(row)
if count > remaining:
count = remaining
row_offset = offset // self.packer.size
# Mark the rows as being removed
self._remove_rows(subdir, filename, row_offset, row_offset + count)
remaining -= count
row += count
class TimestampOnlyTable(object): class TimestampOnlyTable(object):
"""Helper that lets us pass a Tables object into bisect, by """Helper that lets us pass a Tables object into bisect, by

View File

@@ -12,6 +12,7 @@ import os
import simplejson as json import simplejson as json
import itertools import itertools
import nilmdb.utils
import nilmdb.httpclient import nilmdb.httpclient
# Other functions expect to see these in the nilmdb.client namespace # Other functions expect to see these in the nilmdb.client namespace
@@ -96,6 +97,17 @@ class Client(object):
params = { "path": path } params = { "path": path }
return self.http.get("stream/destroy", params) return self.http.get("stream/destroy", params)
def stream_remove(self, path, start = None, end = None):
"""Remove data from the specified time range"""
params = {
"path": path
}
if start is not None:
params["start"] = float_to_string(start)
if end is not None:
params["end"] = float_to_string(end)
return self.http.get("stream/remove", params)
def stream_insert(self, path, data, start = None, end = None): def stream_insert(self, path, data, start = None, end = None):
"""Insert data into a stream. data should be a file-like object """Insert data into a stream. data should be a file-like object
that provides ASCII data that matches the database layout for path. that provides ASCII data that matches the database layout for path.
@@ -114,11 +126,6 @@ class Client(object):
max_time = 30 max_time = 30
end_epsilon = 1e-6 end_epsilon = 1e-6
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), ..., (sn,None)"
a, b = itertools.tee(iterable)
next(b, None)
return itertools.izip_longest(a, b)
def extract_timestamp(line): def extract_timestamp(line):
return float(line.split()[0]) return float(line.split()[0])
@@ -148,7 +155,7 @@ class Client(object):
block_data = "" block_data = ""
block_start = start block_start = start
result = None result = None
for (line, nextline) in pairwise(data): for (line, nextline) in nilmdb.utils.misc.pairwise(data):
# If we don't have a starting time, extract it from the first line # If we don't have a starting time, extract it from the first line
if block_start is None: if block_start is None:
block_start = extract_timestamp(line) block_start = extract_timestamp(line)

View File

@@ -11,12 +11,12 @@ import re
import argparse import argparse
from argparse import ArgumentDefaultsHelpFormatter as def_form from argparse import ArgumentDefaultsHelpFormatter as def_form
version = "0.1" version = "1.0"
# Valid subcommands. Defined in separate files just to break # Valid subcommands. Defined in separate files just to break
# things up -- they're still called with Cmdline as self. # things up -- they're still called with Cmdline as self.
subcommands = [ "info", "create", "list", "metadata", "insert", "extract", subcommands = [ "info", "create", "list", "metadata", "insert", "extract",
"destroy" ] "remove", "destroy" ]
# Import the subcommand modules. Equivalent way of doing this would be # Import the subcommand modules. Equivalent way of doing this would be
# from . import info as cmd_info # from . import info as cmd_info
@@ -24,10 +24,16 @@ subcmd_mods = {}
for cmd in subcommands: for cmd in subcommands:
subcmd_mods[cmd] = __import__("nilmdb.cmdline." + cmd, fromlist = [ cmd ]) subcmd_mods[cmd] = __import__("nilmdb.cmdline." + cmd, fromlist = [ cmd ])
class JimArgumentParser(argparse.ArgumentParser):
def error(self, message):
self.print_usage(sys.stderr)
self.exit(2, sprintf("error: %s\n", message))
class Cmdline(object): class Cmdline(object):
def __init__(self, argv): def __init__(self, argv):
self.argv = argv self.argv = argv
self.client = None
def arg_time(self, toparse): def arg_time(self, toparse):
"""Parse a time string argument""" """Parse a time string argument"""
@@ -93,8 +99,8 @@ class Cmdline(object):
version_string = sprintf("nilmtool %s, client library %s", version_string = sprintf("nilmtool %s, client library %s",
version, nilmdb.Client.client_version) version, nilmdb.Client.client_version)
self.parser = argparse.ArgumentParser(add_help = False, self.parser = JimArgumentParser(add_help = False,
formatter_class = def_form) formatter_class = def_form)
group = self.parser.add_argument_group("General options") group = self.parser.add_argument_group("General options")
group.add_argument("-h", "--help", action='help', group.add_argument("-h", "--help", action='help',
@@ -119,7 +125,8 @@ class Cmdline(object):
def die(self, formatstr, *args): def die(self, formatstr, *args):
fprintf(sys.stderr, formatstr + "\n", *args) fprintf(sys.stderr, formatstr + "\n", *args)
self.client.close() if self.client:
self.client.close()
sys.exit(-1) sys.exit(-1)
def run(self): def run(self):
@@ -131,13 +138,17 @@ class Cmdline(object):
self.parser_setup() self.parser_setup()
self.args = self.parser.parse_args(self.argv) self.args = self.parser.parse_args(self.argv)
# Run arg verify handler if there is one
if "verify" in self.args:
self.args.verify(self)
self.client = nilmdb.Client(self.args.url) self.client = nilmdb.Client(self.args.url)
# Make a test connection to make sure things work # Make a test connection to make sure things work
try: try:
server_version = self.client.version() server_version = self.client.version()
except nilmdb.client.Error as e: except nilmdb.client.Error as e:
self.die("Error connecting to server: %s", str(e)) self.die("error connecting to server: %s", str(e))
# Now dispatch client request to appropriate function. Parser # Now dispatch client request to appropriate function. Parser
# should have ensured that we don't have any unknown commands # should have ensured that we don't have any unknown commands

View File

@@ -24,4 +24,4 @@ def cmd_create(self):
try: try:
self.client.stream_create(self.args.path, self.args.layout) self.client.stream_create(self.args.path, self.args.layout)
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("Error creating stream: %s", str(e)) self.die("error creating stream: %s", str(e))

View File

@@ -22,4 +22,4 @@ def cmd_destroy(self):
try: try:
self.client.stream_destroy(self.args.path) self.client.stream_destroy(self.args.path)
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("Error deleting stream: %s", str(e)) self.die("error destroying stream: %s", str(e))

View File

@@ -9,17 +9,18 @@ def setup(self, sub):
description=""" description="""
Extract data from a stream. Extract data from a stream.
""") """)
cmd.set_defaults(handler = cmd_extract) cmd.set_defaults(verify = cmd_extract_verify,
handler = cmd_extract)
group = cmd.add_argument_group("Data selection") group = cmd.add_argument_group("Data selection")
group.add_argument("path", group.add_argument("path",
help="Path of stream, e.g. /foo/bar") help="Path of stream, e.g. /foo/bar")
group.add_argument("-s", "--start", required=True, group.add_argument("-s", "--start", required=True,
metavar="TIME", type=self.arg_time, metavar="TIME", type=self.arg_time,
help="Starting timestamp (free-form)") help="Starting timestamp (free-form, inclusive)")
group.add_argument("-e", "--end", required=True, group.add_argument("-e", "--end", required=True,
metavar="TIME", type=self.arg_time, metavar="TIME", type=self.arg_time,
help="Ending timestamp (free-form)") help="Ending timestamp (free-form, noninclusive)")
group = cmd.add_argument_group("Output format") group = cmd.add_argument_group("Output format")
group.add_argument("-b", "--bare", action="store_true", group.add_argument("-b", "--bare", action="store_true",
@@ -30,10 +31,15 @@ def setup(self, sub):
group.add_argument("-c", "--count", action="store_true", group.add_argument("-c", "--count", action="store_true",
help="Just output a count of matched data points") help="Just output a count of matched data points")
def cmd_extract_verify(self):
if self.args.start is not None and self.args.end is not None:
if self.args.start > self.args.end:
self.parser.error("start is after end")
def cmd_extract(self): def cmd_extract(self):
streams = self.client.stream_list(self.args.path) streams = self.client.stream_list(self.args.path)
if len(streams) != 1: if len(streams) != 1:
self.die("Error getting stream info for path %s", self.args.path) self.die("error getting stream info for path %s", self.args.path)
layout = streams[0][1] layout = streams[0][1]
if self.args.annotate: if self.args.annotate:

View File

@@ -51,12 +51,12 @@ def cmd_insert(self):
# Find requested stream # Find requested stream
streams = self.client.stream_list(self.args.path) streams = self.client.stream_list(self.args.path)
if len(streams) != 1: if len(streams) != 1:
self.die("Error getting stream info for path %s", self.args.path) self.die("error getting stream info for path %s", self.args.path)
layout = streams[0][1] layout = streams[0][1]
if self.args.start and len(self.args.file) != 1: if self.args.start and len(self.args.file) != 1:
self.die("--start can only be used with one input file, for now") self.die("error: --start can only be used with one input file")
for filename in self.args.file: for filename in self.args.file:
if filename == '-': if filename == '-':
@@ -65,7 +65,7 @@ def cmd_insert(self):
try: try:
infile = open(filename, "r") infile = open(filename, "r")
except IOError: except IOError:
self.die("Error opening input file %s", filename) self.die("error opening input file %s", filename)
# Build a timestamper for this file # Build a timestamper for this file
if self.args.none: if self.args.none:
@@ -77,11 +77,11 @@ def cmd_insert(self):
try: try:
start = self.parse_time(filename) start = self.parse_time(filename)
except ValueError: except ValueError:
self.die("Error extracting time from filename '%s'", self.die("error extracting time from filename '%s'",
filename) filename)
if not self.args.rate: if not self.args.rate:
self.die("Need to specify --rate") self.die("error: --rate is needed, but was not specified")
rate = self.args.rate rate = self.args.rate
ts = nilmdb.timestamper.TimestamperRate(infile, start, rate) ts = nilmdb.timestamper.TimestamperRate(infile, start, rate)
@@ -100,6 +100,6 @@ def cmd_insert(self):
# ugly bracketed ranges of 16-digit numbers and a mangled URL. # ugly bracketed ranges of 16-digit numbers and a mangled URL.
# Need to consider adding something like e.prettyprint() # Need to consider adding something like e.prettyprint()
# that is smarter about the contents of the error. # that is smarter about the contents of the error.
self.die("Error inserting data: %s", str(e)) self.die("error inserting data: %s", str(e))
return return

View File

@@ -3,6 +3,7 @@ from nilmdb.utils.printf import *
import nilmdb.client import nilmdb.client
import fnmatch import fnmatch
import argparse
from argparse import ArgumentDefaultsHelpFormatter as def_form from argparse import ArgumentDefaultsHelpFormatter as def_form
def setup(self, sub): def setup(self, sub):
@@ -13,23 +14,41 @@ def setup(self, sub):
optionally filtering by layout or path. Wildcards optionally filtering by layout or path. Wildcards
are accepted. are accepted.
""") """)
cmd.set_defaults(handler = cmd_list) cmd.set_defaults(verify = cmd_list_verify,
handler = cmd_list)
group = cmd.add_argument_group("Stream filtering") group = cmd.add_argument_group("Stream filtering")
group.add_argument("-p", "--path", metavar="PATH", default="*",
help="Match only this path (-p can be omitted)")
group.add_argument("path_positional", default="*",
nargs="?", help=argparse.SUPPRESS)
group.add_argument("-l", "--layout", default="*", group.add_argument("-l", "--layout", default="*",
help="Match only this stream layout") help="Match only this stream layout")
group.add_argument("-p", "--path", default="*",
help="Match only this path")
group = cmd.add_argument_group("Interval details") group = cmd.add_argument_group("Interval details")
group.add_argument("-d", "--detail", action="store_true", group.add_argument("-d", "--detail", action="store_true",
help="Show available data time intervals") help="Show available data time intervals")
group.add_argument("-s", "--start", group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time, metavar="TIME", type=self.arg_time,
help="Starting timestamp (free-form)") help="Starting timestamp (free-form, inclusive)")
group.add_argument("-e", "--end", group.add_argument("-e", "--end",
metavar="TIME", type=self.arg_time, metavar="TIME", type=self.arg_time,
help="Ending timestamp (free-form)") help="Ending timestamp (free-form, noninclusive)")
def cmd_list_verify(self):
# A hidden "path_positional" argument lets the user leave off the
# "-p" when specifying the path. Handle it here.
got_opt = self.args.path != "*"
got_pos = self.args.path_positional != "*"
if got_pos:
if got_opt:
self.parser.error("too many paths specified")
else:
self.args.path = self.args.path_positional
if self.args.start is not None and self.args.end is not None:
if self.args.start > self.args.end:
self.parser.error("start is after end")
def cmd_list(self): def cmd_list(self):
"""List available streams""" """List available streams"""

View File

@@ -43,21 +43,21 @@ def cmd_metadata(self):
for keyval in keyvals: for keyval in keyvals:
kv = keyval.split('=') kv = keyval.split('=')
if len(kv) != 2 or kv[0] == "": if len(kv) != 2 or kv[0] == "":
self.die("Error parsing key=value argument '%s'", keyval) self.die("error parsing key=value argument '%s'", keyval)
data[kv[0]] = kv[1] data[kv[0]] = kv[1]
# Make the call # Make the call
try: try:
handler(self.args.path, data) handler(self.args.path, data)
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("Error setting/updating metadata: %s", str(e)) self.die("error setting/updating metadata: %s", str(e))
else: else:
# Get (or unspecified) # Get (or unspecified)
keys = self.args.get or None keys = self.args.get or None
try: try:
data = self.client.stream_get_metadata(self.args.path, keys) data = self.client.stream_get_metadata(self.args.path, keys)
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("Error getting metadata: %s", str(e)) self.die("error getting metadata: %s", str(e))
for key, value in sorted(data.items()): for key, value in sorted(data.items()):
# Omit nonexistant keys # Omit nonexistant keys
if value is None: if value is None:

45
nilmdb/cmdline/remove.py Normal file
View File

@@ -0,0 +1,45 @@
from __future__ import absolute_import
from __future__ import print_function
from nilmdb.utils.printf import *
import nilmdb.client
import sys
def setup(self, sub):
cmd = sub.add_parser("remove", help="Remove data",
description="""
Remove all data from a specified time range within a
stream.
""")
cmd.set_defaults(verify = cmd_remove_verify,
handler = cmd_remove)
group = cmd.add_argument_group("Data selection")
group.add_argument("path",
help="Path of stream, e.g. /foo/bar")
group.add_argument("-s", "--start", required=True,
metavar="TIME", type=self.arg_time,
help="Starting timestamp (free-form, inclusive)")
group.add_argument("-e", "--end", required=True,
metavar="TIME", type=self.arg_time,
help="Ending timestamp (free-form, noninclusive)")
group = cmd.add_argument_group("Output format")
group.add_argument("-c", "--count", action="store_true",
help="Output number of data points removed")
def cmd_remove_verify(self):
if self.args.start is not None and self.args.end is not None:
if self.args.start > self.args.end:
self.parser.error("start is after end")
def cmd_remove(self):
try:
count = self.client.stream_remove(self.args.path,
self.args.start, self.args.end)
except nilmdb.client.ClientError as e:
self.die("error removing data: %s", str(e))
if self.args.count:
printf("%d\n", count)
return 0

View File

@@ -26,12 +26,19 @@ class Error(Exception):
self.url = url # URL we were requesting self.url = url # URL we were requesting
self.traceback = traceback # server traceback, if available self.traceback = traceback # server traceback, if available
def __str__(self): def __str__(self):
s = sprintf("[%s]", self.status)
if self.message:
s += sprintf(" %s", self.message)
if self.traceback: # pragma: no cover
s += sprintf("\nServer traceback:\n%s", self.traceback)
return s
def __repr__(self): # pragma: no cover
s = sprintf("[%s]", self.status) s = sprintf("[%s]", self.status)
if self.message: if self.message:
s += sprintf(" %s", self.message) s += sprintf(" %s", self.message)
if self.url: if self.url:
s += sprintf(" (%s)", self.url) s += sprintf(" (%s)", self.url)
if self.traceback: # pragma: no cover if self.traceback:
s += sprintf("\nServer traceback:\n%s", self.traceback) s += sprintf("\nServer traceback:\n%s", self.traceback)
return s return s
class ClientError(Error): class ClientError(Error):
@@ -59,7 +66,7 @@ class HTTPClient(object):
url = urlparse.urljoin(self.baseurl, url) url = urlparse.urljoin(self.baseurl, url)
if params: if params:
url = urlparse.urljoin( url = urlparse.urljoin(
url, "?" + nilmdb.utils.urllib.urlencode(params, True)) url, "?" + nilmdb.utils.urllib.urlencode(params))
self.curl.setopt(pycurl.URL, url) self.curl.setopt(pycurl.URL, url)
self.url = url self.url = url

View File

@@ -37,6 +37,7 @@ cdef class Interval:
'start' and 'end' are arbitrary floats that represent time 'start' and 'end' are arbitrary floats that represent time
""" """
if start > end: if start > end:
# Explicitly disallow zero-width intervals (since they're half-open)
raise IntervalError("start %s must precede end %s" % (start, end)) raise IntervalError("start %s must precede end %s" % (start, end))
self.start = float(start) self.start = float(start)
self.end = float(end) self.end = float(end)
@@ -278,7 +279,7 @@ cdef class IntervalSet:
return out return out
def intersection(self, Interval interval not None): def intersection(self, Interval interval not None, orig = False):
""" """
Compute a sequence of intervals that correspond to the Compute a sequence of intervals that correspond to the
intersection between `self` and the provided interval. intersection between `self` and the provided interval.
@@ -287,6 +288,10 @@ cdef class IntervalSet:
Output intervals are built as subsets of the intervals in the Output intervals are built as subsets of the intervals in the
first argument (self). first argument (self).
If orig = True, also return the original interval that was
(potentially) subsetted to make the one that is being
returned.
""" """
if not isinstance(interval, Interval): if not isinstance(interval, Interval):
raise TypeError("bad type") raise TypeError("bad type")
@@ -294,11 +299,17 @@ cdef class IntervalSet:
i = n.obj i = n.obj
if i: if i:
if i.start >= interval.start and i.end <= interval.end: if i.start >= interval.start and i.end <= interval.end:
yield i if orig:
yield (i, i)
else:
yield i
else: else:
subset = i.subset(max(i.start, interval.start), subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end)) min(i.end, interval.end))
yield subset if orig:
yield (subset, i)
else:
yield subset
cpdef intersects(self, Interval other): cpdef intersects(self, Interval other):
"""Return True if this IntervalSet intersects another interval""" """Return True if this IntervalSet intersects another interval"""

View File

@@ -80,7 +80,7 @@ _sql_schema_updates = {
class NilmDBError(Exception): class NilmDBError(Exception):
"""Base exception for NilmDB errors""" """Base exception for NilmDB errors"""
def __init__(self, message = "Unspecified error"): def __init__(self, message = "Unspecified error"):
Exception.__init__(self, self.__class__.__name__ + ": " + message) Exception.__init__(self, message)
class StreamError(NilmDBError): class StreamError(NilmDBError):
pass pass
@@ -92,7 +92,8 @@ class OverlapError(NilmDBError):
class NilmDB(object): class NilmDB(object):
verbose = 0 verbose = 0
def __init__(self, basepath, sync=True, max_results=None): def __init__(self, basepath, sync=True, max_results=None,
bulkdata_args={}):
# set up path # set up path
self.basepath = os.path.abspath(basepath) self.basepath = os.path.abspath(basepath)
@@ -104,7 +105,7 @@ class NilmDB(object):
raise IOError("can't create tree " + self.basepath) raise IOError("can't create tree " + self.basepath)
# Our data goes inside it # Our data goes inside it
self.data = bulkdata.BulkData(self.basepath) self.data = bulkdata.BulkData(self.basepath, **bulkdata_args)
# SQLite database too # SQLite database too
sqlfilename = os.path.join(self.basepath, "data.sql") sqlfilename = os.path.join(self.basepath, "data.sql")
@@ -173,6 +174,20 @@ class NilmDB(object):
return iset return iset
def _sql_interval_insert(self, id, start, end, start_pos, end_pos):
"""Helper that adds interval to the SQL database only"""
self.con.execute("INSERT INTO ranges "
"(stream_id,start_time,end_time,start_pos,end_pos) "
"VALUES (?,?,?,?,?)",
(id, start, end, start_pos, end_pos))
def _sql_interval_delete(self, id, start, end, start_pos, end_pos):
"""Helper that removes interval from the SQL database only"""
self.con.execute("DELETE FROM ranges WHERE "
"stream_id=? AND start_time=? AND "
"end_time=? AND start_pos=? AND end_pos=?",
(id, start, end, start_pos, end_pos))
def _add_interval(self, stream_id, interval, start_pos, end_pos): def _add_interval(self, stream_id, interval, start_pos, end_pos):
""" """
Add interval to the internal interval cache, and to the database. Add interval to the internal interval cache, and to the database.
@@ -191,7 +206,7 @@ class NilmDB(object):
# time range [adjacent.start -> interval.end) # time range [adjacent.start -> interval.end)
# and database rows [ adjacent.start_pos -> end_pos ]. # and database rows [ adjacent.start_pos -> end_pos ].
# Only do this if the resulting interval isn't too large. # Only do this if the resulting interval isn't too large.
max_merged_rows = 30000000 # a bit more than 1 hour at 8 KHz max_merged_rows = 8000 * 60 * 60 * 1.05 # 1.05 hours at 8 KHz
adjacent = iset.find_end(interval.start) adjacent = iset.find_end(interval.start)
if (adjacent is not None and if (adjacent is not None and
start_pos == adjacent.db_endpos and start_pos == adjacent.db_endpos and
@@ -199,14 +214,9 @@ class NilmDB(object):
# First delete the old one, both from our iset and the # First delete the old one, both from our iset and the
# database # database
iset -= adjacent iset -= adjacent
self.con.execute("DELETE FROM ranges WHERE " self._sql_interval_delete(stream_id,
"stream_id=? AND start_time=? AND " adjacent.db_start, adjacent.db_end,
"end_time=? AND start_pos=? AND " adjacent.db_startpos, adjacent.db_endpos)
"end_pos=?", (stream_id,
adjacent.db_start,
adjacent.db_end,
adjacent.db_startpos,
adjacent.db_endpos))
# Now update our interval so the fallthrough add is # Now update our interval so the fallthrough add is
# correct. # correct.
@@ -219,14 +229,54 @@ class NilmDB(object):
start_pos, end_pos)) start_pos, end_pos))
# Insert into the database # Insert into the database
self.con.execute("INSERT INTO ranges " self._sql_interval_insert(stream_id, interval.start, interval.end,
"(stream_id,start_time,end_time,start_pos,end_pos) " int(start_pos), int(end_pos))
"VALUES (?,?,?,?,?)",
(stream_id, interval.start, interval.end,
int(start_pos), int(end_pos)))
self.con.commit() self.con.commit()
def _remove_interval(self, stream_id, original, remove):
"""
Remove an interval from the internal cache and the database.
stream_id: id of stream
original: original DBInterval; must be already present in DB
to_remove: DBInterval to remove; must be subset of 'original'
"""
# Just return if we have nothing to remove
if remove.start == remove.end: # pragma: no cover
return
# Load this stream's intervals
iset = self._get_intervals(stream_id)
# Remove existing interval from the cached set and the database
iset -= original
self._sql_interval_delete(stream_id,
original.db_start, original.db_end,
original.db_startpos, original.db_endpos)
# Add back the intervals that would be left over if the
# requested interval is removed. There may be two of them, if
# the removed piece was in the middle.
def add(iset, start, end, start_pos, end_pos):
iset += DBInterval(start, end, start, end, start_pos, end_pos)
self._sql_interval_insert(stream_id, start, end, start_pos, end_pos)
if original.start != remove.start:
# Interval before the removed region
add(iset, original.start, remove.start,
original.db_startpos, remove.db_startpos)
if original.end != remove.end:
# Interval after the removed region
add(iset, remove.end, original.end,
remove.db_endpos, original.db_endpos)
# Commit SQL changes
self.con.commit()
return
def stream_list(self, path = None, layout = None): def stream_list(self, path = None, layout = None):
"""Return list of [path, layout] lists of all streams """Return list of [path, layout] lists of all streams
in the database. in the database.
@@ -341,7 +391,7 @@ class NilmDB(object):
No way to undo it! Metadata is removed.""" No way to undo it! Metadata is removed."""
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
# Delete the cached interval data # Delete the cached interval data (if it was cached)
self._get_intervals.cache_remove(self, stream_id) self._get_intervals.cache_remove(self, stream_id)
# Delete the data # Delete the data
@@ -381,7 +431,7 @@ class NilmDB(object):
# And that's all # And that's all
return "ok" return "ok"
def _find_start(self, table, interval): def _find_start(self, table, dbinterval):
""" """
Given a DBInterval, find the row in the database that Given a DBInterval, find the row in the database that
corresponds to the start time. Return the first database corresponds to the start time. Return the first database
@@ -389,14 +439,14 @@ class NilmDB(object):
equal to 'start'. equal to 'start'.
""" """
# Optimization for the common case where an interval wasn't truncated # Optimization for the common case where an interval wasn't truncated
if interval.start == interval.db_start: if dbinterval.start == dbinterval.db_start:
return interval.db_startpos return dbinterval.db_startpos
return bisect.bisect_left(bulkdata.TimestampOnlyTable(table), return bisect.bisect_left(bulkdata.TimestampOnlyTable(table),
interval.start, dbinterval.start,
interval.db_startpos, dbinterval.db_startpos,
interval.db_endpos) dbinterval.db_endpos)
def _find_end(self, table, interval): def _find_end(self, table, dbinterval):
""" """
Given a DBInterval, find the row in the database that follows Given a DBInterval, find the row in the database that follows
the end time. Return the first database position after the the end time. Return the first database position after the
@@ -404,16 +454,16 @@ class NilmDB(object):
to 'end'. to 'end'.
""" """
# Optimization for the common case where an interval wasn't truncated # Optimization for the common case where an interval wasn't truncated
if interval.end == interval.db_end: if dbinterval.end == dbinterval.db_end:
return interval.db_endpos return dbinterval.db_endpos
# Note that we still use bisect_left here, because we don't # Note that we still use bisect_left here, because we don't
# want to include the given timestamp in the results. This is # want to include the given timestamp in the results. This is
# so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return # so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return
# non-overlapping data. # non-overlapping data.
return bisect.bisect_left(bulkdata.TimestampOnlyTable(table), return bisect.bisect_left(bulkdata.TimestampOnlyTable(table),
interval.end, dbinterval.end,
interval.db_startpos, dbinterval.db_startpos,
interval.db_endpos) dbinterval.db_endpos)
def stream_extract(self, path, start = None, end = None, count = False): def stream_extract(self, path, start = None, end = None, count = False):
""" """
@@ -434,8 +484,8 @@ class NilmDB(object):
than actually fetching the data. It is not limited by than actually fetching the data. It is not limited by
max_results. max_results.
""" """
table = self.data.getnode(path)
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
table = self.data.getnode(path)
intervals = self._get_intervals(stream_id) intervals = self._get_intervals(stream_id)
requested = Interval(start or 0, end or 1e12) requested = Interval(start or 0, end or 1e12)
result = [] result = []
@@ -472,3 +522,45 @@ class NilmDB(object):
if count: if count:
return matched return matched
return (result, restart) return (result, restart)
def stream_remove(self, path, start = None, end = None):
"""
Remove data from the specified time interval within a stream.
Removes all data in the interval [start, end), and intervals
are truncated or split appropriately. Returns the number of
data points removed.
"""
stream_id = self._stream_id(path)
table = self.data.getnode(path)
intervals = self._get_intervals(stream_id)
to_remove = Interval(start or 0, end or 1e12)
removed = 0
if start == end:
return 0
# Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now.
all_candidates = list(intervals.intersection(to_remove, orig = True))
for (dbint, orig) in all_candidates:
# Find row start and end
row_start = self._find_start(table, dbint)
row_end = self._find_end(table, dbint)
# Adjust the DBInterval to match the newly found ends
dbint.db_start = dbint.start
dbint.db_end = dbint.end
dbint.db_startpos = row_start
dbint.db_endpos = row_end
# Remove interval from the database
self._remove_interval(stream_id, orig, dbint)
# Remove data from the underlying table storage
table.remove(row_start, row_end)
# Count how many were removed
removed += row_end - row_start
return removed

View File

@@ -11,7 +11,10 @@ import sys
import time import time
import os import os
import simplejson as json import simplejson as json
import functools import decorator
import traceback
from nilmdb.nilmdb import NilmDBError
try: try:
import cherrypy import cherrypy
@@ -24,46 +27,51 @@ class NilmApp(object):
def __init__(self, db): def __init__(self, db):
self.db = db self.db = db
version = "1.1" version = "1.2"
# Decorators # Decorators
def chunked_response(func): def chunked_response(func):
"""Decorator to enable chunked responses""" """Decorator to enable chunked responses."""
# Set this to False to get better tracebacks from some requests # Set this to False to get better tracebacks from some requests
# (/stream/extract, /stream/intervals). # (/stream/extract, /stream/intervals).
func._cp_config = { 'response.stream': True } func._cp_config = { 'response.stream': True }
return func return func
def workaround_cp_bug_1200(func): # pragma: no cover (just a workaround) @decorator.decorator
def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
"""Decorator to work around CherryPy bug #1200 in a response """Decorator to work around CherryPy bug #1200 in a response
generator""" generator.
# Even if chunked responses are disabled, you may still miss miss
# LookupError, or UnicodeError exceptions due to CherryPy bug
# #1200. This throws them as generic Exceptions insteads.
import traceback
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
for val in func(*args, **kwargs):
yield val
except (LookupError, UnicodeError) as e:
raise Exception("bug workaround; real exception is:\n" +
traceback.format_exc())
return wrapper
def exception_to_httperror(response = "400 Bad Request"): Even if chunked responses are disabled, LookupError or
"""Return a decorator that catches Exception and throws UnicodeError exceptions may still be swallowed by CherryPy due to
a HTTPError describing it instead""" bug #1200. This throws them as generic Exceptions instead so that
def decorator(func): they make it through.
@functools.wraps(func) """
def wrapper(*args, **kwargs): try:
try: for val in func(*args, **kwargs):
return func(*args, **kwargs) yield val
except Exception as e: except (LookupError, UnicodeError) as e:
message = sprintf("%s: %s", type(e).__name__, str(e)) raise Exception("bug workaround; real exception is:\n" +
raise cherrypy.HTTPError(response, message) traceback.format_exc())
return wrapper
return decorator def exception_to_httperror(*expected):
"""Return a decorator-generating function that catches expected
errors and throws a HTTPError describing it instead.
@exception_to_httperror(NilmDBError, ValueError)
def foo():
pass
"""
def wrapper(func, *args, **kwargs):
try:
return func(*args, **kwargs)
except expected as e:
message = sprintf("%s", str(e))
raise cherrypy.HTTPError("400 Bad Request", message)
# We need to preserve the function's argspecs for CherryPy to
# handle argument errors correctly. Decorator.decorator takes
# care of that.
return decorator.decorator(wrapper)
# CherryPy apps # CherryPy apps
class Root(NilmApp): class Root(NilmApp):
@@ -118,7 +126,7 @@ class Stream(NilmApp):
# /stream/create?path=/newton/prep&layout=PrepData # /stream/create?path=/newton/prep&layout=PrepData
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror() @exception_to_httperror(NilmDBError, ValueError)
def create(self, path, layout): def create(self, path, layout):
"""Create a new stream in the database. Provide path """Create a new stream in the database. Provide path
and one of the nilmdb.layout.layouts keys. and one of the nilmdb.layout.layouts keys.
@@ -128,7 +136,7 @@ class Stream(NilmApp):
# /stream/destroy?path=/newton/prep # /stream/destroy?path=/newton/prep
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror() @exception_to_httperror(NilmDBError)
def destroy(self, path): def destroy(self, path):
"""Delete a stream and its associated data.""" """Delete a stream and its associated data."""
return self.db.stream_destroy(path) return self.db.stream_destroy(path)
@@ -160,7 +168,7 @@ class Stream(NilmApp):
# /stream/set_metadata?path=/newton/prep&data=<json> # /stream/set_metadata?path=/newton/prep&data=<json>
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror() @exception_to_httperror(NilmDBError, LookupError, TypeError)
def set_metadata(self, path, data): def set_metadata(self, path, data):
"""Set metadata for the named stream, replacing any """Set metadata for the named stream, replacing any
existing metadata. Data should be a json-encoded existing metadata. Data should be a json-encoded
@@ -172,7 +180,7 @@ class Stream(NilmApp):
# /stream/update_metadata?path=/newton/prep&data=<json> # /stream/update_metadata?path=/newton/prep&data=<json>
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror() @exception_to_httperror(NilmDBError, LookupError, TypeError)
def update_metadata(self, path, data): def update_metadata(self, path, data):
"""Update metadata for the named stream. Data """Update metadata for the named stream. Data
should be a json-encoded dictionary""" should be a json-encoded dictionary"""
@@ -210,7 +218,7 @@ class Stream(NilmApp):
parser.parse(body) parser.parse(body)
except nilmdb.layout.ParserError as e: except nilmdb.layout.ParserError as e:
raise cherrypy.HTTPError("400 Bad Request", raise cherrypy.HTTPError("400 Bad Request",
"Error parsing input data: " + "error parsing input data: " +
e.message) e.message)
if (not parser.min_timestamp or not parser.max_timestamp or if (not parser.min_timestamp or not parser.max_timestamp or
@@ -239,6 +247,27 @@ class Stream(NilmApp):
# Done # Done
return "ok" return "ok"
# /stream/remove?path=/newton/prep
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose
@cherrypy.tools.json_out()
@exception_to_httperror(NilmDBError)
def remove(self, path, start = None, end = None):
"""
Remove data from the backend database. Removes all data in
the interval [start, end). Returns the number of data points
removed.
"""
if start is not None:
start = float(start)
if end is not None:
end = float(end)
if start is not None and end is not None:
if end < start:
raise cherrypy.HTTPError("400 Bad Request",
"end before start")
return self.db.stream_remove(path, start, end)
# /stream/intervals?path=/newton/prep # /stream/intervals?path=/newton/prep
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0 # /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose @cherrypy.expose
@@ -366,6 +395,11 @@ class Server(object):
cherrypy.config.update({ 'request.show_tracebacks' : True }) cherrypy.config.update({ 'request.show_tracebacks' : True })
self.force_traceback = force_traceback self.force_traceback = force_traceback
# Patch CherryPy error handler to never pad out error messages.
# This isn't necessary, but then again, neither is padding the
# error messages.
cherrypy._cperror._ie_friendly_error_sizes = {}
cherrypy.tree.apps = {} cherrypy.tree.apps = {}
cherrypy.tree.mount(Root(self.db, self.version), "/") cherrypy.tree.mount(Root(self.db, self.version), "/")
cherrypy.tree.mount(Stream(self.db), "/stream") cherrypy.tree.mount(Stream(self.db), "/stream")

View File

@@ -7,3 +7,5 @@ from .lrucache import lru_cache
from .diskusage import du from .diskusage import du
from .mustclose import must_close from .mustclose import must_close
from .urllib import urlencode from .urllib import urlencode
from . import misc
from . import atomic

26
nilmdb/utils/atomic.py Normal file
View File

@@ -0,0 +1,26 @@
# Atomic file writing helper.
import os
def replace_file(filename, content):
"""Attempt to atomically and durably replace the filename with the
given contents. This is intended to be 'pretty good on most
OSes', but not necessarily bulletproof."""
newfilename = filename + ".new"
# Write to new file, flush it
with open(newfilename, "wb") as f:
f.write(content)
f.flush()
os.fsync(f.fileno())
# Move new file over old one
try:
os.rename(newfilename, filename)
except OSError: # pragma: no cover
# Some OSes might not support renaming over an existing file.
# This is definitely NOT atomic!
os.remove(filename)
os.rename(newfilename, filename)

View File

@@ -4,17 +4,19 @@
# with added 'destructor' functionality. # with added 'destructor' functionality.
import collections import collections
import functools import decorator
import warnings
def lru_cache(size = 10, onremove = None): def lru_cache(size = 10, onremove = None, keys = slice(None)):
"""Least-recently-used cache decorator. """Least-recently-used cache decorator.
@lru_cache(size = 10, onevict = None) @lru_cache(size = 10, onevict = None)
def f(...): def f(...):
pass pass
Given a function and arguments, memoize its return value. Given a function and arguments, memoize its return value. Up to
Up to 'size' elements are cached. 'size' elements are cached. 'keys' is a slice object that
represents which arguments are used as the cache key.
When evicting a value from the cache, call the function When evicting a value from the cache, call the function
'onremove' with the value that's being evicted. 'onremove' with the value that's being evicted.
@@ -24,43 +26,52 @@ def lru_cache(size = 10, onremove = None):
f.cache_hits and f.cache_misses give statistics. f.cache_hits and f.cache_misses give statistics.
""" """
def decorator(func): def decorate(func):
cache = collections.OrderedDict() # order: least- to most-recent cache = collections.OrderedDict() # order: least- to most-recent
def evict(value): def evict(value):
if onremove: if onremove:
onremove(value) onremove(value)
@functools.wraps(func) def wrapper(orig, *args, **kwargs):
def wrapper(*args, **kwargs): if kwargs:
key = args + tuple(sorted(kwargs.items())) raise NotImplementedError("kwargs not supported")
key = args[keys]
try: try:
value = cache.pop(key) value = cache.pop(key)
wrapper.cache_hits += 1 orig.cache_hits += 1
except KeyError: except KeyError:
value = func(*args, **kwargs) value = orig(*args)
wrapper.cache_misses += 1 orig.cache_misses += 1
if len(cache) >= size: if len(cache) >= size:
evict(cache.popitem(0)[1]) # evict LRU cache entry evict(cache.popitem(0)[1]) # evict LRU cache entry
cache[key] = value # (re-)insert this key at end cache[key] = value # (re-)insert this key at end
return value return value
def cache_remove(*args, **kwargs): def cache_remove(*args):
"""Remove the described key from this cache, if present. """Remove the described key from this cache, if present."""
Note that if the original wrapped function was implicitly key = args
passed 'self', you need to pass it as an argument here too."""
key = args + tuple(sorted(kwargs.items()))
if key in cache: if key in cache:
evict(cache.pop(key)) evict(cache.pop(key))
else:
if len(cache) > 0 and len(args) != len(cache.iterkeys().next()):
raise KeyError("trying to remove from LRU cache, but "
"number of arguments doesn't match the "
"cache key length")
def cache_remove_all(): def cache_remove_all():
for key in cache: for key in cache:
evict(cache.pop(key)) evict(cache.pop(key))
wrapper.cache_hits = 0 def cache_info():
wrapper.cache_misses = 0 return (func.cache_hits, func.cache_misses)
wrapper.cache_remove = cache_remove
wrapper.cache_remove_all = cache_remove_all
return wrapper new = decorator.decorator(wrapper, func)
return decorator func.cache_hits = 0
func.cache_misses = 0
new.cache_info = cache_info
new.cache_remove = cache_remove
new.cache_remove_all = cache_remove_all
return new
return decorate

8
nilmdb/utils/misc.py Normal file
View File

@@ -0,0 +1,8 @@
import itertools
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), ..., (sn,None)"
a, b = itertools.tee(iterable)
next(b, None)
return itertools.izip_longest(a, b)

View File

@@ -1,42 +1,63 @@
# Class decorator that warns on stderr at deletion time if the class's
# close() member wasn't called.
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
import sys import sys
import inspect
import decorator
def must_close(errorfile = sys.stderr): def must_close(errorfile = sys.stderr, wrap_verify = False):
def decorator(cls): """Class decorator that warns on 'errorfile' at deletion time if
def dummy(*args, **kwargs): the class's close() member wasn't called.
pass
if "__init__" not in cls.__dict__:
cls.__init__ = dummy
if "__del__" not in cls.__dict__:
cls.__del__ = dummy
if "close" not in cls.__dict__:
cls.close = dummy
orig_init = cls.__init__ If 'wrap_verify' is True, every class method is wrapped with a
orig_del = cls.__del__ verifier that will raise AssertionError if the .close() method has
orig_close = cls.close already been called."""
def class_decorator(cls):
def __init__(self, *args, **kwargs): # Helper to replace a class method with a wrapper function,
ret = orig_init(self, *args, **kwargs) # while maintaining argument specs etc.
def wrap_class_method(wrapper_func):
method = wrapper_func.__name__
if method in cls.__dict__:
orig = getattr(cls, method).im_func
else:
orig = lambda self: None
setattr(cls, method, decorator.decorator(wrapper_func, orig))
@wrap_class_method
def __init__(orig, self, *args, **kwargs):
ret = orig(self, *args, **kwargs)
self.__dict__["_must_close"] = True self.__dict__["_must_close"] = True
self.__dict__["_must_close_initialized"] = True
return ret return ret
def __del__(self): @wrap_class_method
def __del__(orig, self, *args, **kwargs):
if "_must_close" in self.__dict__: if "_must_close" in self.__dict__:
fprintf(errorfile, "error: %s.close() wasn't called!\n", fprintf(errorfile, "error: %s.close() wasn't called!\n",
self.__class__.__name__) self.__class__.__name__)
return orig_del(self) return orig(self, *args, **kwargs)
def close(self, *args, **kwargs): @wrap_class_method
def close(orig, self, *args, **kwargs):
del self._must_close del self._must_close
return orig_close(self) return orig(self, *args, **kwargs)
cls.__init__ = __init__ # Optionally wrap all other functions
cls.__del__ = __del__ def verifier(orig, self, *args, **kwargs):
cls.close = close if ("_must_close" not in self.__dict__ and
"_must_close_initialized" in self.__dict__):
raise AssertionError("called " + str(orig) + " after close")
return orig(self, *args, **kwargs)
if wrap_verify:
for (name, method) in inspect.getmembers(cls, inspect.ismethod):
# Skip class methods
if method.__self__ is not None:
continue
# Skip some methods
if name in [ "__del__", "__init__" ]:
continue
# Set up wrapper
setattr(cls, name, decorator.decorator(verifier,
method.im_func))
return cls return cls
return decorator return class_decorator

View File

@@ -1,68 +1,40 @@
from __future__ import absolute_import from __future__ import absolute_import
from urllib import quote_plus, _is_unicode from urllib import quote_plus, _is_unicode
# urllib.urlencode insists on encoding Unicode as ASCII. This is an # urllib.urlencode insists on encoding Unicode as ASCII. This is based
# exact copy of that function, except we encode it as UTF-8 instead. # on that function, except we always encode it as UTF-8 instead.
def urlencode(query, doseq=0): def urlencode(query):
"""Encode a sequence of two-element tuples or dictionary into a URL query string. """Encode a dictionary into a URL query string.
If any values in the query arg are sequences and doseq is true, each If any values in the query arg are sequences, each sequence
sequence element is converted to a separate parameter. element is converted to a separate parameter.
If the query arg is a sequence of two-element tuples, the order of the
parameters in the output will match the order of parameters in the
input.
""" """
if hasattr(query,"items"): query = query.items()
# mapping objects
query = query.items()
else:
# it's a bother at times that strings and string-like objects are
# sequences...
try:
# non-sequence items should not work with len()
# non-empty strings will fail this
if len(query) and not isinstance(query[0], tuple):
raise TypeError
# zero-length sequences of all types will get here and succeed,
# but that's a minor nit - since the original implementation
# allowed empty dicts that type of behavior probably should be
# preserved for consistency
except TypeError:
ty,va,tb = sys.exc_info()
raise TypeError, "not a valid non-string sequence or mapping object", tb
l = [] l = []
if not doseq: for k, v in query:
# preserve old behavior k = quote_plus(str(k))
for k, v in query: if isinstance(v, str):
k = quote_plus(str(k)) v = quote_plus(v)
v = quote_plus(str(v))
l.append(k + '=' + v) l.append(k + '=' + v)
else: elif _is_unicode(v):
for k, v in query: # is there a reasonable way to convert to ASCII?
k = quote_plus(str(k)) # encode generates a string, but "replace" or "ignore"
if isinstance(v, str): # lose information and "strict" can raise UnicodeError
v = quote_plus(v) v = quote_plus(v.encode("utf-8","strict"))
l.append(k + '=' + v) l.append(k + '=' + v)
elif _is_unicode(v): else:
# is there a reasonable way to convert to ASCII? try:
# encode generates a string, but "replace" or "ignore" # is this a sufficient test for sequence-ness?
# lose information and "strict" can raise UnicodeError len(v)
v = quote_plus(v.encode("utf-8","strict")) except TypeError:
# not a sequence
v = quote_plus(str(v))
l.append(k + '=' + v) l.append(k + '=' + v)
else: else:
try: # loop over the sequence
# is this a sufficient test for sequence-ness? for elt in v:
len(v) l.append(k + '=' + quote_plus(str(elt)))
except TypeError:
# not a sequence
v = quote_plus(str(v))
l.append(k + '=' + v)
else:
# loop over the sequence
for elt in v:
l.append(k + '=' + quote_plus(str(elt)))
return '&'.join(l) return '&'.join(l)

46
runtests.py Executable file
View File

@@ -0,0 +1,46 @@
#!/usr/bin/python
import nose
import os
import sys
import glob
from collections import OrderedDict
class JimOrderPlugin(nose.plugins.Plugin):
"""When searching for tests and encountering a directory that
contains a 'test.order' file, run tests listed in that file, in the
order that they're listed. Globs are OK in that file and duplicates
are removed."""
name = 'jimorder'
score = 10000
def prepareTestLoader(self, loader):
def wrap(func):
def wrapper(name, *args, **kwargs):
addr = nose.selector.TestAddress(
name, workingDir=loader.workingDir)
try:
order = os.path.join(addr.filename, "test.order")
except:
order = None
if order and os.path.exists(order):
files = []
for line in open(order):
line = line.split('#')[0].strip()
if not line:
continue
fn = os.path.join(addr.filename, line.strip())
files.extend(sorted(glob.glob(fn)) or [fn])
files = list(OrderedDict.fromkeys(files))
tests = [ wrapper(fn, *args, **kwargs) for fn in files ]
return loader.suiteClass(tests)
return func(name, *args, **kwargs)
return wrapper
loader.loadTestsFromName = wrap(loader.loadTestsFromName)
return loader
# Use setup.cfg for most of the test configuration. Adding
# --with-jimorder here means that a normal "nosetests" run will
# still work, it just won't support test.order.
nose.main(addplugins = [ JimOrderPlugin() ],
argv = sys.argv + ["--with-jimorder"])

View File

@@ -8,8 +8,12 @@ cover-package=nilmdb
cover-erase= cover-erase=
##cover-html= # this works, puts html output in cover/ dir ##cover-html= # this works, puts html output in cover/ dir
##cover-branches= # need nose 1.1.3 for this ##cover-branches= # need nose 1.1.3 for this
#debug=nose
#debug-log=nose.log
stop= stop=
verbosity=2 verbosity=2
tests=tests
#tests=tests/test_bulkdata.py
#tests=tests/test_mustclose.py #tests=tests/test_mustclose.py
#tests=tests/test_lrucache.py #tests=tests/test_lrucache.py
#tests=tests/test_cmdline.py #tests=tests/test_cmdline.py
@@ -23,6 +27,7 @@ verbosity=2
#tests=tests/test_serializer.py #tests=tests/test_serializer.py
#tests=tests/test_iteratorizer.py #tests=tests/test_iteratorizer.py
#tests=tests/test_client.py:TestClient.test_client_nilmdb #tests=tests/test_client.py:TestClient.test_client_nilmdb
#tests=tests/test_nilmdb.py
#with-profile= #with-profile=
#profile-sort=time #profile-sort=time
##profile-restrict=10 # doesn't work right, treated as string or something ##profile-restrict=10 # doesn't work right, treated as string or something

View File

@@ -0,0 +1,19 @@
2.56437e+05 2.24430e+05 4.01161e+03 3.47534e+03 7.49589e+03 3.38894e+03 2.61397e+02 3.73126e+03
2.53963e+05 2.24167e+05 5.62107e+03 1.54801e+03 9.16517e+03 3.52293e+03 1.05893e+03 2.99696e+03
2.58508e+05 2.24930e+05 6.01140e+03 8.18866e+02 9.03995e+03 4.48244e+03 2.49039e+03 2.67934e+03
2.59627e+05 2.26022e+05 4.47450e+03 2.42302e+03 7.41419e+03 5.07197e+03 2.43938e+03 2.96296e+03
2.55187e+05 2.24632e+05 4.73857e+03 3.39804e+03 7.39512e+03 4.72645e+03 1.83903e+03 3.39353e+03
2.57102e+05 2.21623e+05 6.14413e+03 1.44109e+03 8.75648e+03 3.49532e+03 1.86994e+03 3.75253e+03
2.63653e+05 2.21770e+05 6.22177e+03 7.38962e+02 9.54760e+03 2.66682e+03 1.46266e+03 3.33257e+03
2.63613e+05 2.25256e+05 4.47712e+03 2.43745e+03 8.51021e+03 3.85563e+03 9.59442e+02 2.38718e+03
2.55350e+05 2.26264e+05 4.28372e+03 3.92394e+03 7.91247e+03 5.46652e+03 1.28499e+03 2.09372e+03
2.52727e+05 2.24609e+05 5.85193e+03 2.49198e+03 8.54063e+03 5.62305e+03 2.33978e+03 3.00714e+03
2.58475e+05 2.23578e+05 5.92487e+03 1.39448e+03 8.77962e+03 4.54418e+03 2.13203e+03 3.84976e+03
2.61563e+05 2.24609e+05 4.33614e+03 2.45575e+03 8.05538e+03 3.46911e+03 6.27873e+02 3.66420e+03
2.56401e+05 2.24441e+05 4.18715e+03 3.45717e+03 7.90669e+03 3.53355e+03 -5.84482e+00 2.96687e+03
2.54745e+05 2.22644e+05 6.02005e+03 1.94721e+03 9.28939e+03 3.80020e+03 1.34820e+03 2.37785e+03
2.60723e+05 2.22660e+05 6.69719e+03 1.03048e+03 9.26124e+03 4.34917e+03 2.84530e+03 2.73619e+03
2.63089e+05 2.25711e+05 4.77887e+03 2.60417e+03 7.39660e+03 4.59811e+03 2.17472e+03 3.40729e+03
2.55843e+05 2.27128e+05 4.02413e+03 4.39323e+03 6.79336e+03 4.62535e+03 7.52009e+02 3.44647e+03
2.51904e+05 2.24868e+05 5.82289e+03 3.02127e+03 8.46160e+03 3.80298e+03 8.07212e+02 3.53468e+03
2.57670e+05 2.22974e+05 6.73436e+03 1.60956e+03 9.92960e+03 2.98028e+03 1.44168e+03 3.05351e+03

18
tests/test.order Normal file
View File

@@ -0,0 +1,18 @@
test_printf.py
test_lrucache.py
test_mustclose.py
test_serializer.py
test_iteratorizer.py
test_timestamper.py
test_layout.py
test_rbtree.py
test_interval.py
test_bulkdata.py
test_nilmdb.py
test_client.py
test_cmdline.py
test_*.py

103
tests/test_bulkdata.py Normal file
View File

@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
import nilmdb
from nilmdb.utils.printf import *
import nilmdb.bulkdata
from nose.tools import *
from nose.tools import assert_raises
import itertools
from testutil.helpers import *
testdb = "tests/bulkdata-testdb"
from nilmdb.bulkdata import BulkData
class TestBulkData(object):
def test_bulkdata(self):
for (size, files, db) in [ ( 0, 0, testdb ),
( 25, 1000, testdb ),
( 1000, 3, testdb.decode("utf-8") ) ]:
recursive_unlink(db)
os.mkdir(db)
self.do_basic(db, size, files)
def do_basic(self, db, size, files):
"""Do the basic test with variable file_size and files_per_dir"""
if not size or not files:
data = BulkData(db)
else:
data = BulkData(db, file_size = size, files_per_dir = files)
# create empty
with assert_raises(ValueError):
data.create("/foo", "uint16_8")
with assert_raises(ValueError):
data.create("foo/bar", "uint16_8")
with assert_raises(ValueError):
data.create("/foo/bar", "uint8_8")
data.create("/foo/bar", "uint16_8")
data.create(u"/foo/baz/quux", "float64_16")
with assert_raises(ValueError):
data.create("/foo/bar/baz", "uint16_8")
with assert_raises(ValueError):
data.create("/foo/baz", "float64_16")
# get node -- see if caching works
nodes = []
for i in range(5000):
nodes.append(data.getnode("/foo/bar"))
nodes.append(data.getnode("/foo/baz/quux"))
del nodes
# Test node
node = data.getnode("/foo/bar")
with assert_raises(IndexError):
x = node[0]
raw = []
for i in range(1000):
raw.append([10000+i, 1, 2, 3, 4, 5, 6, 7, 8 ])
node.append(raw[0:1])
node.append(raw[1:100])
node.append(raw[100:])
misc_slices = [ 0, 100, slice(None), slice(0), slice(10),
slice(5,10), slice(3,None), slice(3,-3),
slice(20,10), slice(200,100,-1), slice(None,0,-1),
slice(100,500,5) ]
# Extract slices
for s in misc_slices:
eq_(node[s], raw[s])
# Get some coverage of remove; remove is more fully tested
# in cmdline
with assert_raises(IndexError):
node.remove(9999,9998)
# close, reopen
# reopen
data.close()
if not size or not files:
data = BulkData(db)
else:
data = BulkData(db, file_size = size, files_per_dir = files)
node = data.getnode("/foo/bar")
# Extract slices
for s in misc_slices:
eq_(node[s], raw[s])
# destroy
with assert_raises(ValueError):
data.destroy("/foo")
with assert_raises(ValueError):
data.destroy("/foo/baz")
with assert_raises(ValueError):
data.destroy("/foo/qwerty")
data.destroy("/foo/baz/quux")
data.destroy("/foo/bar")
# close
data.close()

View File

@@ -17,8 +17,9 @@ import cStringIO
import simplejson as json import simplejson as json
import unittest import unittest
import warnings import warnings
import resource
from test_helpers import * from testutil.helpers import *
testdb = "tests/client-testdb" testdb = "tests/client-testdb"
@@ -69,7 +70,11 @@ class TestClient(object):
eq_(distutils.version.StrictVersion(version), eq_(distutils.version.StrictVersion(version),
distutils.version.StrictVersion(test_server.version)) distutils.version.StrictVersion(test_server.version))
def test_client_2_nilmdb(self): # Bad URLs should give 404, not 500
with assert_raises(ClientError):
client.http.get("/stream/create")
def test_client_2_createlist(self):
# Basic stream tests, like those in test_nilmdb:test_stream # Basic stream tests, like those in test_nilmdb:test_stream
client = nilmdb.Client(url = "http://localhost:12380/") client = nilmdb.Client(url = "http://localhost:12380/")
@@ -99,6 +104,20 @@ class TestClient(object):
eq_(client.stream_list(layout="RawData"), [ ["/newton/raw", "RawData"] ]) eq_(client.stream_list(layout="RawData"), [ ["/newton/raw", "RawData"] ])
eq_(client.stream_list(path="/newton/raw"), [ ["/newton/raw", "RawData"] ]) eq_(client.stream_list(path="/newton/raw"), [ ["/newton/raw", "RawData"] ])
# Try messing with resource limits to trigger errors and get
# more coverage. Here, make it so we can only create files 1
# byte in size, which will trigger an IOError in the server when
# we create a table.
limit = resource.getrlimit(resource.RLIMIT_FSIZE)
resource.setrlimit(resource.RLIMIT_FSIZE, (1, limit[1]))
with assert_raises(ServerError) as e:
client.stream_create("/newton/hello", "RawData")
resource.setrlimit(resource.RLIMIT_FSIZE, limit)
def test_client_3_metadata(self):
client = nilmdb.Client(url = "http://localhost:12380/")
# Set / get metadata # Set / get metadata
eq_(client.stream_get_metadata("/newton/prep"), {}) eq_(client.stream_get_metadata("/newton/prep"), {})
eq_(client.stream_get_metadata("/newton/raw"), {}) eq_(client.stream_get_metadata("/newton/raw"), {})
@@ -128,7 +147,7 @@ class TestClient(object):
with assert_raises(ClientError): with assert_raises(ClientError):
client.stream_update_metadata("/newton/prep", [1,2,3]) client.stream_update_metadata("/newton/prep", [1,2,3])
def test_client_3_insert(self): def test_client_4_insert(self):
client = nilmdb.Client(url = "http://localhost:12380/") client = nilmdb.Client(url = "http://localhost:12380/")
datetime_tz.localtz_set("America/New_York") datetime_tz.localtz_set("America/New_York")
@@ -201,16 +220,19 @@ class TestClient(object):
with assert_raises(ClientError) as e: with assert_raises(ClientError) as e:
result = client.stream_insert("/newton/prep", data) result = client.stream_insert("/newton/prep", data)
in_("400 Bad Request", str(e.exception)) in_("400 Bad Request", str(e.exception))
in_("OverlapError", str(e.exception)) in_("verlap", str(e.exception))
def test_client_4_extract(self): def test_client_5_extractremove(self):
# Misc tests for extract. Most of them are in test_cmdline. # Misc tests for extract and remove. Most of them are in test_cmdline.
client = nilmdb.Client(url = "http://localhost:12380/") client = nilmdb.Client(url = "http://localhost:12380/")
for x in client.stream_extract("/newton/prep", 123, 123): for x in client.stream_extract("/newton/prep", 123, 123):
raise Exception("shouldn't be any data for this request") raise Exception("shouldn't be any data for this request")
def test_client_5_generators(self): with assert_raises(ClientError) as e:
client.stream_remove("/newton/prep", 123, 120)
def test_client_6_generators(self):
# A lot of the client functionality is already tested by test_cmdline, # A lot of the client functionality is already tested by test_cmdline,
# but this gets a bit more coverage that cmdline misses. # but this gets a bit more coverage that cmdline misses.
client = nilmdb.Client(url = "http://localhost:12380/") client = nilmdb.Client(url = "http://localhost:12380/")
@@ -259,7 +281,7 @@ class TestClient(object):
in_("404 Not Found", str(e.exception)) in_("404 Not Found", str(e.exception))
in_("No such stream", str(e.exception)) in_("No such stream", str(e.exception))
def test_client_6_chunked(self): def test_client_7_chunked(self):
# Make sure that /stream/intervals and /stream/extract # Make sure that /stream/intervals and /stream/extract
# properly return streaming, chunked response. Pokes around # properly return streaming, chunked response. Pokes around
# in client.http internals a bit to look at the response # in client.http internals a bit to look at the response
@@ -282,7 +304,7 @@ class TestClient(object):
if "transfer-encoding: chunked" not in client.http._headers.lower(): if "transfer-encoding: chunked" not in client.http._headers.lower():
warnings.warn("Non-chunked HTTP response for /stream/extract") warnings.warn("Non-chunked HTTP response for /stream/extract")
def test_client_7_unicode(self): def test_client_8_unicode(self):
# Basic Unicode tests # Basic Unicode tests
client = nilmdb.Client(url = "http://localhost:12380/") client = nilmdb.Client(url = "http://localhost:12380/")

View File

@@ -4,11 +4,13 @@ import nilmdb
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
import nilmdb.cmdline import nilmdb.cmdline
import unittest
from nose.tools import * from nose.tools import *
from nose.tools import assert_raises from nose.tools import assert_raises
import itertools import itertools
import datetime_tz import datetime_tz
import os import os
import re
import shutil import shutil
import sys import sys
import threading import threading
@@ -18,14 +20,16 @@ import Queue
import StringIO import StringIO
import shlex import shlex
from test_helpers import * from testutil.helpers import *
testdb = "tests/cmdline-testdb" testdb = "tests/cmdline-testdb"
def server_start(max_results = None): def server_start(max_results = None, bulkdata_args = {}):
global test_server, test_db global test_server, test_db
# Start web app on a custom port # Start web app on a custom port
test_db = nilmdb.NilmDB(testdb, sync = False, max_results = max_results) test_db = nilmdb.NilmDB(testdb, sync = False,
max_results = max_results,
bulkdata_args = bulkdata_args)
test_server = nilmdb.Server(test_db, host = "127.0.0.1", test_server = nilmdb.Server(test_db, host = "127.0.0.1",
port = 12380, stoppable = False, port = 12380, stoppable = False,
fast_shutdown = True, fast_shutdown = True,
@@ -94,14 +98,24 @@ class TestCmdline(object):
self.dump() self.dump()
eq_(self.exitcode, 0) eq_(self.exitcode, 0)
def fail(self, arg_string, infile = None, exitcode = None): def fail(self, arg_string, infile = None,
exitcode = None, require_error = True):
self.run(arg_string, infile) self.run(arg_string, infile)
if exitcode is not None and self.exitcode != exitcode: if exitcode is not None and self.exitcode != exitcode:
# Wrong exit code
self.dump() self.dump()
eq_(self.exitcode, exitcode) eq_(self.exitcode, exitcode)
if self.exitcode == 0: if self.exitcode == 0:
# Success, when we wanted failure
self.dump() self.dump()
ne_(self.exitcode, 0) ne_(self.exitcode, 0)
# Make sure the output contains the word "error" at the
# beginning of a line, but only if an exitcode wasn't
# specified.
if require_error and not re.search("^error",
self.captured, re.MULTILINE):
raise AssertionError("command failed, but output doesn't "
"contain the string 'error'")
def contain(self, checkstring): def contain(self, checkstring):
in_(checkstring, self.captured) in_(checkstring, self.captured)
@@ -131,7 +145,7 @@ class TestCmdline(object):
def dump(self): def dump(self):
printf("-----dump start-----\n%s-----dump end-----\n", self.captured) printf("-----dump start-----\n%s-----dump end-----\n", self.captured)
def test_cmdline_01_basic(self): def test_01_basic(self):
# help # help
self.ok("--help") self.ok("--help")
@@ -177,14 +191,14 @@ class TestCmdline(object):
self.fail("extract --start 2000-01-01 --start 2001-01-02") self.fail("extract --start 2000-01-01 --start 2001-01-02")
self.contain("duplicated argument") self.contain("duplicated argument")
def test_cmdline_02_info(self): def test_02_info(self):
self.ok("info") self.ok("info")
self.contain("Server URL: http://localhost:12380/") self.contain("Server URL: http://localhost:12380/")
self.contain("Server version: " + test_server.version) self.contain("Server version: " + test_server.version)
self.contain("Server database path") self.contain("Server database path")
self.contain("Server database size") self.contain("Server database size")
def test_cmdline_03_createlist(self): def test_03_createlist(self):
# Basic stream tests, like those in test_client. # Basic stream tests, like those in test_client.
# No streams # No streams
@@ -201,6 +215,10 @@ class TestCmdline(object):
# Bad layout type # Bad layout type
self.fail("create /newton/prep NoSuchLayout") self.fail("create /newton/prep NoSuchLayout")
self.contain("no such layout") self.contain("no such layout")
self.fail("create /newton/prep float32_0")
self.contain("no such layout")
self.fail("create /newton/prep float33_1")
self.contain("no such layout")
# Create a few streams # Create a few streams
self.ok("create /newton/zzz/rawnotch RawNotchedData") self.ok("create /newton/zzz/rawnotch RawNotchedData")
@@ -224,10 +242,17 @@ class TestCmdline(object):
"/newton/raw RawData\n" "/newton/raw RawData\n"
"/newton/zzz/rawnotch RawNotchedData\n") "/newton/zzz/rawnotch RawNotchedData\n")
# Match just one type or one path # Match just one type or one path. Also check
# that --path is optional
self.ok("list --path /newton/raw") self.ok("list --path /newton/raw")
self.match("/newton/raw RawData\n") self.match("/newton/raw RawData\n")
self.ok("list /newton/raw")
self.match("/newton/raw RawData\n")
self.fail("list -p /newton/raw /newton/raw")
self.contain("too many paths")
self.ok("list --layout RawData") self.ok("list --layout RawData")
self.match("/newton/raw RawData\n") self.match("/newton/raw RawData\n")
@@ -239,10 +264,17 @@ class TestCmdline(object):
self.ok("list --path *zzz* --layout Raw*") self.ok("list --path *zzz* --layout Raw*")
self.match("/newton/zzz/rawnotch RawNotchedData\n") self.match("/newton/zzz/rawnotch RawNotchedData\n")
self.ok("list *zzz* --layout Raw*")
self.match("/newton/zzz/rawnotch RawNotchedData\n")
self.ok("list --path *zzz* --layout Prep*") self.ok("list --path *zzz* --layout Prep*")
self.match("") self.match("")
def test_cmdline_04_metadata(self): # reversed range
self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01")
self.contain("start is after end")
def test_04_metadata(self):
# Set / get metadata # Set / get metadata
self.fail("metadata") self.fail("metadata")
self.fail("metadata --get") self.fail("metadata --get")
@@ -299,7 +331,7 @@ class TestCmdline(object):
self.fail("metadata /newton/nosuchpath") self.fail("metadata /newton/nosuchpath")
self.contain("No stream at path /newton/nosuchpath") self.contain("No stream at path /newton/nosuchpath")
def test_cmdline_05_parsetime(self): def test_05_parsetime(self):
os.environ['TZ'] = "America/New_York" os.environ['TZ'] = "America/New_York"
cmd = nilmdb.cmdline.Cmdline(None) cmd = nilmdb.cmdline.Cmdline(None)
test = datetime_tz.datetime_tz.now() test = datetime_tz.datetime_tz.now()
@@ -314,17 +346,17 @@ class TestCmdline(object):
eq_(cmd.parse_time("snapshot-20120405-140000.raw.gz"), test) eq_(cmd.parse_time("snapshot-20120405-140000.raw.gz"), test)
eq_(cmd.parse_time("prep-20120405T1400"), test) eq_(cmd.parse_time("prep-20120405T1400"), test)
def test_cmdline_06_insert(self): def test_06_insert(self):
self.ok("insert --help") self.ok("insert --help")
self.fail("insert /foo/bar baz qwer") self.fail("insert /foo/bar baz qwer")
self.contain("Error getting stream info") self.contain("error getting stream info")
self.fail("insert /newton/prep baz qwer") self.fail("insert /newton/prep baz qwer")
self.match("Error opening input file baz\n") self.match("error opening input file baz\n")
self.fail("insert /newton/prep") self.fail("insert /newton/prep")
self.contain("Error extracting time") self.contain("error extracting time")
self.fail("insert --start 19801205 /newton/prep 1 2 3 4") self.fail("insert --start 19801205 /newton/prep 1 2 3 4")
self.contain("--start can only be used with one input file") self.contain("--start can only be used with one input file")
@@ -365,7 +397,7 @@ class TestCmdline(object):
os.environ['TZ'] = "UTC" os.environ['TZ'] = "UTC"
self.fail("insert --rate 120 /newton/raw " self.fail("insert --rate 120 /newton/raw "
"tests/data/prep-20120323T1004") "tests/data/prep-20120323T1004")
self.contain("Error parsing input data") self.contain("error parsing input data")
# empty data does nothing # empty data does nothing
self.ok("insert --rate 120 --start '03/23/2012 06:05:00' /newton/prep " self.ok("insert --rate 120 --start '03/23/2012 06:05:00' /newton/prep "
@@ -374,7 +406,7 @@ class TestCmdline(object):
# bad start time # bad start time
self.fail("insert --rate 120 --start 'whatever' /newton/prep /dev/null") self.fail("insert --rate 120 --start 'whatever' /newton/prep /dev/null")
def test_cmdline_07_detail(self): def test_07_detail(self):
# Just count the number of lines, it's probably fine # Just count the number of lines, it's probably fine
self.ok("list --detail") self.ok("list --detail")
lines_(self.captured, 8) lines_(self.captured, 8)
@@ -408,23 +440,30 @@ class TestCmdline(object):
self.ok("list --detail") self.ok("list --detail")
lines_(self.captured, 8) lines_(self.captured, 8)
def test_cmdline_08_extract(self): def test_08_extract(self):
# nonexistent stream # nonexistent stream
self.fail("extract /no/such/foo --start 2000-01-01 --end 2020-01-01") self.fail("extract /no/such/foo --start 2000-01-01 --end 2020-01-01")
self.contain("Error getting stream info") self.contain("error getting stream info")
# empty ranges return an error # reversed range
self.fail("extract -a /newton/prep --start 2020-01-01 --end 2000-01-01")
self.contain("start is after end")
# empty ranges return error 2
self.fail("extract -a /newton/prep " + self.fail("extract -a /newton/prep " +
"--start '23 Mar 2012 10:00:30' " + "--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'", exitcode = 2) "--end '23 Mar 2012 10:00:30'",
exitcode = 2, require_error = False)
self.contain("no data") self.contain("no data")
self.fail("extract -a /newton/prep " + self.fail("extract -a /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " + "--start '23 Mar 2012 10:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'", exitcode = 2) "--end '23 Mar 2012 10:00:30.000001'",
exitcode = 2, require_error = False)
self.contain("no data") self.contain("no data")
self.fail("extract -a /newton/prep " + self.fail("extract -a /newton/prep " +
"--start '23 Mar 2022 10:00:30' " + "--start '23 Mar 2022 10:00:30' " +
"--end '23 Mar 2022 10:00:30'", exitcode = 2) "--end '23 Mar 2022 10:00:30'",
exitcode = 2, require_error = False)
self.contain("no data") self.contain("no data")
# but are ok if we're just counting results # but are ok if we're just counting results
@@ -463,7 +502,7 @@ class TestCmdline(object):
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01") self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("43200\n") self.match("43200\n")
def test_cmdline_09_truncated(self): def test_09_truncated(self):
# Test truncated responses by overriding the nilmdb max_results # Test truncated responses by overriding the nilmdb max_results
server_stop() server_stop()
server_start(max_results = 2) server_start(max_results = 2)
@@ -472,7 +511,102 @@ class TestCmdline(object):
server_stop() server_stop()
server_start() server_start()
def test_cmdline_10_destroy(self): def test_10_remove(self):
# Removing data
# Try nonexistent stream
self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01")
self.contain("No stream at path")
self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01")
self.contain("start is after end")
# empty ranges return success, backwards ranges return error
self.ok("remove /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
self.match("")
self.ok("remove /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'")
self.match("")
self.ok("remove /newton/prep " +
"--start '23 Mar 2022 10:00:30' " +
"--end '23 Mar 2022 10:00:30'")
self.match("")
# Verbose
self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
self.match("0\n")
self.ok("remove --count /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
self.match("0\n")
# Make sure we have the data we expect
self.ok("list --detail /newton/prep")
self.match("/newton/prep PrepData\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
" [ Fri, 23 Mar 2012 10:02:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:03:59.991668 +0000 ]\n"
" [ Fri, 23 Mar 2012 10:04:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:05:59.991668 +0000 ]\n")
# Remove various chunks of prep data and make sure
# they're gone.
self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:40'")
self.match("1200\n")
self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:10' " +
"--end '23 Mar 2012 10:00:20'")
self.match("1200\n")
self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:05' " +
"--end '23 Mar 2012 10:00:25'")
self.match("1200\n")
self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:03:50' " +
"--end '23 Mar 2012 10:06:50'")
self.match("15600\n")
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("24000\n")
# See the missing chunks in list output
self.ok("list --detail /newton/prep")
self.match("/newton/prep PrepData\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n"
" [ Fri, 23 Mar 2012 10:00:25.000000 +0000"
" -> Fri, 23 Mar 2012 10:00:30.000000 +0000 ]\n"
" [ Fri, 23 Mar 2012 10:00:40.000000 +0000"
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
" [ Fri, 23 Mar 2012 10:02:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:03:50.000000 +0000 ]\n")
# Remove all data, verify it's missing
self.ok("remove /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("") # no count requested this time
self.ok("list --detail /newton/prep")
self.match("/newton/prep PrepData\n" +
" (no intervals)\n")
# Reinsert some data, to verify that no overlaps with deleted
# data are reported
os.environ['TZ'] = "UTC"
self.ok("insert --rate 120 /newton/prep "
"tests/data/prep-20120323T1000 "
"tests/data/prep-20120323T1002")
def test_11_destroy(self):
# Delete records # Delete records
self.ok("destroy --help") self.ok("destroy --help")
@@ -493,7 +627,7 @@ class TestCmdline(object):
# Notice how they're not empty # Notice how they're not empty
self.ok("list --detail") self.ok("list --detail")
lines_(self.captured, 8) lines_(self.captured, 7)
# Delete some # Delete some
self.ok("destroy /newton/prep") self.ok("destroy /newton/prep")
@@ -523,7 +657,7 @@ class TestCmdline(object):
self.ok("list --detail --path " + path) self.ok("list --detail --path " + path)
self.contain("(no intervals)") self.contain("(no intervals)")
def test_cmdline_11_unicode(self): def test_12_unicode(self):
# Unicode paths. # Unicode paths.
self.ok("destroy /newton/asdf/qwer") self.ok("destroy /newton/asdf/qwer")
self.ok("destroy /newton/prep") self.ok("destroy /newton/prep")
@@ -540,3 +674,149 @@ class TestCmdline(object):
self.ok(u"metadata /düsseldorf/raw --update 'α=β ε τ α'") self.ok(u"metadata /düsseldorf/raw --update 'α=β ε τ α'")
self.ok(u"metadata /düsseldorf/raw") self.ok(u"metadata /düsseldorf/raw")
self.match(u"α=β ε τ α\nγ\n") self.match(u"α=β ε τ α\nγ\n")
self.ok(u"destroy /düsseldorf/raw")
def test_13_files(self):
# Test BulkData's ability to split into multiple files,
# by forcing the file size to be really small.
server_stop()
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 })
# Fill data
self.ok("create /newton/prep float32_8")
os.environ['TZ'] = "UTC"
with open("tests/data/prep-20120323T1004-timestamped") as input:
self.ok("insert --none /newton/prep", input)
# Extract it
self.ok("extract /newton/prep --start '2000-01-01' " +
"--end '2012-03-23 10:04:01'")
lines_(self.captured, 120)
self.ok("extract /newton/prep --start '2000-01-01' " +
"--end '2022-03-23 10:04:01'")
lines_(self.captured, 14400)
# Make sure there were lots of files generated in the database
# dir
nfiles = 0
for (dirpath, dirnames, filenames) in os.walk(testdb):
nfiles += len(filenames)
assert(nfiles > 500)
# Make sure we can restart the server with a different file
# size and have it still work
server_stop()
server_start()
self.ok("extract /newton/prep --start '2000-01-01' " +
"--end '2022-03-23 10:04:01'")
lines_(self.captured, 14400)
# Now recreate the data one more time and make sure there are
# fewer files.
self.ok("destroy /newton/prep")
self.fail("destroy /newton/prep") # already destroyed
self.ok("create /newton/prep float32_8")
os.environ['TZ'] = "UTC"
with open("tests/data/prep-20120323T1004-timestamped") as input:
self.ok("insert --none /newton/prep", input)
nfiles = 0
for (dirpath, dirnames, filenames) in os.walk(testdb):
nfiles += len(filenames)
lt_(nfiles, 50)
self.ok("destroy /newton/prep") # destroy again
def test_14_remove_files(self):
# Test BulkData's ability to remove when data is split into
# multiple files. Should be a fairly comprehensive test of
# remove functionality.
server_stop()
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 })
# Insert data. Just for fun, insert out of order
self.ok("create /newton/prep PrepData")
os.environ['TZ'] = "UTC"
self.ok("insert --rate 120 /newton/prep "
"tests/data/prep-20120323T1002 "
"tests/data/prep-20120323T1000")
# Should take up about 2.8 MB here (including directory entries)
du_before = nilmdb.utils.diskusage.du_bytes(testdb)
# Make sure we have the data we expect
self.ok("list --detail")
self.match("/newton/prep PrepData\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
" [ Fri, 23 Mar 2012 10:02:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:03:59.991668 +0000 ]\n")
# Remove various chunks of prep data and make sure
# they're gone.
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("28800\n")
self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:03:30'")
self.match("21600\n")
self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:10' " +
"--end '23 Mar 2012 10:00:20'")
self.match("1200\n")
self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:00:05' " +
"--end '23 Mar 2012 10:00:25'")
self.match("1200\n")
self.ok("remove -c /newton/prep " +
"--start '23 Mar 2012 10:03:50' " +
"--end '23 Mar 2012 10:06:50'")
self.match("1200\n")
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("3600\n")
# See the missing chunks in list output
self.ok("list --detail")
self.match("/newton/prep PrepData\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n"
" [ Fri, 23 Mar 2012 10:00:25.000000 +0000"
" -> Fri, 23 Mar 2012 10:00:30.000000 +0000 ]\n"
" [ Fri, 23 Mar 2012 10:03:30.000000 +0000"
" -> Fri, 23 Mar 2012 10:03:50.000000 +0000 ]\n")
# We have 1/8 of the data that we had before, so the file size
# should have dropped below 1/4 of what it used to be
du_after = nilmdb.utils.diskusage.du_bytes(testdb)
lt_(du_after, (du_before / 4))
# Remove anything that came from the 10:02 data file
self.ok("remove /newton/prep " +
"--start '23 Mar 2012 10:02:00' --end '2020-01-01'")
# Re-insert 19 lines from that file, then remove them again.
# With the specific file_size above, this will cause the last
# file in the bulk data storage to be exactly file_size large,
# so removing the data should also remove that last file.
self.ok("insert --rate 120 /newton/prep " +
"tests/data/prep-20120323T1002-first19lines")
self.ok("remove /newton/prep " +
"--start '23 Mar 2012 10:02:00' --end '2020-01-01'")
# Shut down and restart server, to force nrows to get refreshed.
server_stop()
server_start()
# Re-add the full 10:02 data file. This tests adding new data once
# we removed data near the end.
self.ok("insert --rate 120 /newton/prep tests/data/prep-20120323T1002")
# See if we can extract it all
self.ok("extract /newton/prep --start 2000-01-01 --end 2020-01-01")
lines_(self.captured, 15600)

View File

@@ -10,13 +10,13 @@ import itertools
from nilmdb.interval import Interval, DBInterval, IntervalSet, IntervalError from nilmdb.interval import Interval, DBInterval, IntervalSet, IntervalError
from test_helpers import * from testutil.helpers import *
import unittest import unittest
# set to False to skip live renders # set to False to skip live renders
do_live_renders = False do_live_renders = False
def render(iset, description = "", live = True): def render(iset, description = "", live = True):
import renderdot import testutil.renderdot as renderdot
r = renderdot.RBTreeRenderer(iset.tree) r = renderdot.RBTreeRenderer(iset.tree)
return r.render(description, live and do_live_renders) return r.render(description, live and do_live_renders)
@@ -345,14 +345,15 @@ class TestIntervalSpeed:
def test_interval_speed(self): def test_interval_speed(self):
import yappi import yappi
import time import time
import aplotter import testutil.aplotter as aplotter
import random import random
import math import math
print print
yappi.start() yappi.start()
speeds = {} speeds = {}
for j in [ 2**x for x in range(5,20) ]: limit = 10 # was 20
for j in [ 2**x for x in range(5,limit) ]:
start = time.time() start = time.time()
iset = IntervalSet() iset = IntervalSet()
for i in random.sample(xrange(j),j): for i in random.sample(xrange(j),j):

View File

@@ -7,7 +7,7 @@ from nose.tools import assert_raises
import threading import threading
import time import time
from test_helpers import * from testutil.helpers import *
def func_with_callback(a, b, callback): def func_with_callback(a, b, callback):
callback(a) callback(a)

View File

@@ -20,7 +20,7 @@ import cStringIO
import random import random
import unittest import unittest
from test_helpers import * from testutil.helpers import *
from nilmdb.layout import * from nilmdb.layout import *

View File

@@ -6,8 +6,9 @@ from nose.tools import *
from nose.tools import assert_raises from nose.tools import assert_raises
import threading import threading
import time import time
import inspect
from test_helpers import * from testutil.helpers import *
@nilmdb.utils.lru_cache(size = 3) @nilmdb.utils.lru_cache(size = 3)
def foo1(n): def foo1(n):
@@ -24,30 +25,59 @@ foo3d.destructed = []
def foo3(n): def foo3(n):
return n return n
class Foo:
def __init__(self):
self.calls = 0
@nilmdb.utils.lru_cache(size = 3, keys = slice(1, 2))
def foo(self, n, **kwargs):
self.calls += 1
class TestLRUCache(object): class TestLRUCache(object):
def test(self): def test(self):
[ foo1(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ] [ foo1(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ]
eq_((foo1.cache_hits, foo1.cache_misses), (6, 3)) eq_(foo1.cache_info(), (6, 3))
[ foo1(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ] [ foo1(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ]
eq_((foo1.cache_hits, foo1.cache_misses), (15, 3)) eq_(foo1.cache_info(), (15, 3))
[ foo1(n) for n in [ 4, 2, 1, 1, 4 ] ] [ foo1(n) for n in [ 4, 2, 1, 1, 4 ] ]
eq_((foo1.cache_hits, foo1.cache_misses), (18, 5)) eq_(foo1.cache_info(), (18, 5))
[ foo2(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ] [ foo2(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ]
eq_((foo2.cache_hits, foo2.cache_misses), (6, 3)) eq_(foo2.cache_info(), (6, 3))
[ foo2(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ] [ foo2(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ]
eq_((foo2.cache_hits, foo2.cache_misses), (15, 3)) eq_(foo2.cache_info(), (15, 3))
[ foo2(n) for n in [ 4, 2, 1, 1, 4 ] ] [ foo2(n) for n in [ 4, 2, 1, 1, 4 ] ]
eq_((foo2.cache_hits, foo2.cache_misses), (19, 4)) eq_(foo2.cache_info(), (19, 4))
[ foo3(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ] [ foo3(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ]
eq_((foo3.cache_hits, foo3.cache_misses), (6, 3)) eq_(foo3.cache_info(), (6, 3))
[ foo3(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ] [ foo3(n) for n in [ 1, 2, 3, 1, 2, 3, 1, 2, 3 ] ]
eq_((foo3.cache_hits, foo3.cache_misses), (15, 3)) eq_(foo3.cache_info(), (15, 3))
[ foo3(n) for n in [ 4, 2, 1, 1, 4 ] ] [ foo3(n) for n in [ 4, 2, 1, 1, 4 ] ]
eq_((foo3.cache_hits, foo3.cache_misses), (18, 5)) eq_(foo3.cache_info(), (18, 5))
eq_(foo3d.destructed, [1, 3]) eq_(foo3d.destructed, [1, 3])
with assert_raises(KeyError):
foo3.cache_remove(1,2,3)
foo3.cache_remove(1) foo3.cache_remove(1)
eq_(foo3d.destructed, [1, 3, 1]) eq_(foo3d.destructed, [1, 3, 1])
foo3.cache_remove_all() foo3.cache_remove_all()
eq_(foo3d.destructed, [1, 3, 1, 2, 4 ]) eq_(foo3d.destructed, [1, 3, 1, 2, 4 ])
foo = Foo()
foo.foo(5)
foo.foo(6)
foo.foo(7)
foo.foo(5)
eq_(foo.calls, 3)
# Can't handle keyword arguments right now
with assert_raises(NotImplementedError):
foo.foo(3, asdf = 7)
# Verify that argspecs were maintained
eq_(inspect.getargspec(foo1),
inspect.ArgSpec(args=['n'],
varargs=None, keywords=None, defaults=None))
eq_(inspect.getargspec(foo.foo),
inspect.ArgSpec(args=['self', 'n'],
varargs=None, keywords="kwargs", defaults=None))

View File

@@ -5,15 +5,29 @@ import nose
from nose.tools import * from nose.tools import *
from nose.tools import assert_raises from nose.tools import assert_raises
from test_helpers import * from testutil.helpers import *
import sys import sys
import cStringIO import cStringIO
import gc
import inspect
err = cStringIO.StringIO() err = cStringIO.StringIO()
@nilmdb.utils.must_close(errorfile = err) @nilmdb.utils.must_close(errorfile = err)
class Foo: class Foo:
def __init__(self, arg):
fprintf(err, "Init %s\n", arg)
def __del__(self):
fprintf(err, "Deleting\n")
def close(self):
fprintf(err, "Closing\n")
@nilmdb.utils.must_close(errorfile = err, wrap_verify = True)
class Bar:
def __init__(self): def __init__(self):
fprintf(err, "Init\n") fprintf(err, "Init\n")
@@ -23,8 +37,11 @@ class Foo:
def close(self): def close(self):
fprintf(err, "Closing\n") fprintf(err, "Closing\n")
def blah(self, arg):
fprintf(err, "Blah %s\n", arg)
@nilmdb.utils.must_close(errorfile = err) @nilmdb.utils.must_close(errorfile = err)
class Bar: class Baz:
pass pass
class TestMustClose(object): class TestMustClose(object):
@@ -34,26 +51,60 @@ class TestMustClose(object):
# garbage collect the object (and call its __del__ function) # garbage collect the object (and call its __del__ function)
# right after a "del x". # right after a "del x".
x = Foo() # Trigger error
err.truncate()
x = Foo("hi")
# Verify that the arg spec was maintained
eq_(inspect.getargspec(x.__init__),
inspect.ArgSpec(args = ['self', 'arg'],
varargs = None, keywords = None, defaults = None))
del x del x
gc.collect()
eq_(err.getvalue(), eq_(err.getvalue(),
"Init\n" "Init hi\n"
"error: Foo.close() wasn't called!\n" "error: Foo.close() wasn't called!\n"
"Deleting\n") "Deleting\n")
# No error
err.truncate(0) err.truncate(0)
y = Foo("bye")
y = Foo()
y.close() y.close()
del y del y
gc.collect()
eq_(err.getvalue(), eq_(err.getvalue(),
"Init\n" "Init bye\n"
"Closing\n" "Closing\n"
"Deleting\n") "Deleting\n")
# Verify function calls when wrap_verify is True
err.truncate(0) err.truncate(0)
z = Bar() z = Bar()
eq_(inspect.getargspec(z.blah),
inspect.ArgSpec(args = ['self', 'arg'],
varargs = None, keywords = None, defaults = None))
z.blah("boo")
z.close() z.close()
with assert_raises(AssertionError) as e:
z.blah("hello")
in_("called <function blah at 0x", str(e.exception))
in_("> after close", str(e.exception))
# Since the most recent assertion references 'z',
# we need to raise another assertion here so that
# 'z' will get properly deleted.
with assert_raises(AssertionError):
raise AssertionError()
del z del z
gc.collect()
eq_(err.getvalue(),
"Init\n"
"Blah boo\n"
"Closing\n"
"Deleting\n")
# Class with missing methods
err.truncate(0)
w = Baz()
w.close()
del w
eq_(err.getvalue(), "") eq_(err.getvalue(), "")

View File

@@ -22,7 +22,7 @@ testdb = "tests/testdb"
#def cleanup(): #def cleanup():
# os.unlink(testdb) # os.unlink(testdb)
from test_helpers import * from testutil.helpers import *
class Test00Nilmdb(object): # named 00 so it runs first class Test00Nilmdb(object): # named 00 so it runs first
def test_NilmDB(self): def test_NilmDB(self):

View File

@@ -6,7 +6,7 @@ from nose.tools import assert_raises
from cStringIO import StringIO from cStringIO import StringIO
import sys import sys
from test_helpers import * from testutil.helpers import *
class TestPrintf(object): class TestPrintf(object):
def test_printf(self): def test_printf(self):

View File

@@ -8,13 +8,13 @@ from nose.tools import assert_raises
from nilmdb.rbtree import RBTree, RBNode from nilmdb.rbtree import RBTree, RBNode
from test_helpers import * from testutil.helpers import *
import unittest import unittest
# set to False to skip live renders # set to False to skip live renders
do_live_renders = False do_live_renders = False
def render(tree, description = "", live = True): def render(tree, description = "", live = True):
import renderdot import testutil.renderdot as renderdot
r = renderdot.RBTreeRenderer(tree) r = renderdot.RBTreeRenderer(tree)
return r.render(description, live and do_live_renders) return r.render(description, live and do_live_renders)

View File

@@ -7,7 +7,7 @@ from nose.tools import assert_raises
import threading import threading
import time import time
from test_helpers import * from testutil.helpers import *
#raise nose.exc.SkipTest("Skip these") #raise nose.exc.SkipTest("Skip these")

View File

@@ -9,7 +9,7 @@ import os
import sys import sys
import cStringIO import cStringIO
from test_helpers import * from testutil.helpers import *
class TestTimestamper(object): class TestTimestamper(object):

View File

@@ -0,0 +1 @@
# empty

View File

@@ -12,6 +12,10 @@ def eq_(a, b):
if not a == b: if not a == b:
raise AssertionError("%s != %s" % (myrepr(a), myrepr(b))) raise AssertionError("%s != %s" % (myrepr(a), myrepr(b)))
def lt_(a, b):
if not a < b:
raise AssertionError("%s is not less than %s" % (myrepr(a), myrepr(b)))
def in_(a, b): def in_(a, b):
if a not in b: if a not in b:
raise AssertionError("%s not in %s" % (myrepr(a), myrepr(b))) raise AssertionError("%s not in %s" % (myrepr(a), myrepr(b)))
@@ -23,6 +27,8 @@ def ne_(a, b):
def lines_(a, n): def lines_(a, n):
l = a.count('\n') l = a.count('\n')
if not l == n: if not l == n:
if len(a) > 5000:
a = a[0:5000] + " ... truncated"
raise AssertionError("wanted %d lines, got %d in output: '%s'" raise AssertionError("wanted %d lines, got %d in output: '%s'"
% (n, l, a)) % (n, l, a))