Compare commits

...

41 Commits

Author SHA1 Message Date
ae9fe89759 Parse timestamps with '@' before any other checks 2013-04-04 14:43:18 -04:00
04def60021 Include stream path in "no such stream" errors 2013-04-02 21:06:49 -04:00
9ce0f69dff Add "--delete" option to "nilmtool metadata" tool
This is the same as "--update" with an empty string as the value.
2013-04-02 16:07:28 -04:00
90c3be91c4 Natural sort for streams in client.stream_list 2013-04-02 14:37:32 -04:00
ebccfb3531 Fix stream renaming when the new path is a parent of the old 2013-04-01 19:25:17 -04:00
e006f1d02e Change default URL to http://localhost/nilmdb/ 2013-04-01 18:04:31 -04:00
5292319802 server: consolidate time processing and checks 2013-03-30 21:16:40 -04:00
173121ca87 Switch URL to one that should definitely not resolve 2013-03-30 17:31:35 -04:00
26bab031bd Add StreamInserter.send() to trigger intermediate block send 2013-03-30 17:30:43 -04:00
b5fefffa09 Use a global cached server object for WSGI app
This is instead of caching it inside nilmdb.server.wsgi_application.
Might make things work a bit better in case the web server decides
to call wsgi_application multiple times.
2013-03-30 15:56:57 -04:00
dccb3e370a WSGI config needs to specify application group
This ensures that the same Python sub-instance handles the request,
even if it's coming in from two different virtual hosts.
2013-03-30 15:56:02 -04:00
95ca55aa7e Print out WSGI environment on DB init failure 2013-03-30 15:55:41 -04:00
e01813f29d Fix wsgi documentation 2013-03-25 13:52:32 -04:00
7f41e117a2 Fix tabs 2013-03-25 13:44:03 -04:00
dd5fc806e5 Restructure WSGI app to regenerate error on each call, if needed
This way, errors like "database already locked" can be fixed and the
page reloaded, without needing to restart Apache.
2013-03-24 21:52:11 -04:00
f8ca8d31e6 Remove Iteratorizer, as it's no longer needed 2013-03-24 21:31:03 -04:00
ed89d803f0 Remove aplotter code 2013-03-24 21:29:09 -04:00
3d24092cd2 Replace bare 'except:' with 'except: Exception'
Otherwise we might inadvertently catch SystemExit or KeyboardExit or
something we don't want to catch.
2013-03-24 21:28:01 -04:00
304bb43d85 Move lockfile out of data dir, to avoid stream tree conflicts 2013-03-24 21:23:45 -04:00
59a79a30a5 Remove lockfile when done.
This isn't necessary for correct behavior: if the database is killed,
the old flock() will go away when the file descriptor gets closed.
2013-03-24 21:20:47 -04:00
c0d450d39e Add locking mechanism to avoid multiple servers on one DB 2013-03-24 21:20:20 -04:00
6f14d609b2 Fix issue where bulkdata was accidentally closed 2013-03-24 21:16:18 -04:00
77ef87456f Improve WSGI application support, fix docs 2013-03-24 21:16:03 -04:00
32d6af935c Improve wsgi docs 2013-03-22 19:17:36 -04:00
6af3a6fc41 Add WSGI application support and documentation 2013-03-22 19:14:34 -04:00
f8a06fb3b7 Clarify default DB path in nilmdb_server.py help text 2013-03-22 15:09:37 -04:00
e790bb9e8a Fix test failure when tests are run as root 2013-03-21 14:33:02 -04:00
89be6f5931 Add option to include interval start/end markup on extract
When enabled, lines like "# interval-start 1234567890123456" and "#
interval-end 1234567890123456" will be added to the data output.  Note
that there may be an "interval-end" timestamp followed by an identical
"interval-start" timestamp, if the response at the nilmdb level was
split up into multiple chunks.

In general, assume contiguous data if previous_interval_end ==
new_interval_start.
2013-03-19 14:23:33 -04:00
4cdef3285d Destroy now requires that all data has been previously removed.
Added new flag "-R" to command line to perform an automatic removal.
This should be the last of the ways in which a single command could
block the nilmdb thread for a long time.
2013-03-18 19:39:03 -04:00
bcd82c4d59 Limit the number of rows removed per call to nilmdb.stream_remove
Server class will retry as needed, as with stream_extract and
stream_intervals.
2013-03-18 18:22:45 -04:00
caf63ab01f Fix stream_extract/stream_intervals restart around timestamp == 0. 2013-03-18 18:20:25 -04:00
2d72891162 Accept "min" and "max" as timestamps on command line 2013-03-18 18:19:24 -04:00
cda2ac3e77 Don't return a mutable interval from IntervalSet.intersection()
Instead, always take the subset, which creates a new interval.
Also adds a small optimization by moving the 'if orig' check outside the
loop.
2013-03-18 18:16:35 -04:00
57d3d60f6a Fix relative import problems 2013-03-18 16:27:27 -04:00
d6b5befe76 Don't use filenames as default arg completion 2013-03-16 17:27:58 -04:00
7429c1788d Update nilmdb.utils.time 2013-03-15 22:49:59 -04:00
0ef71c193b Remove layout.pyx, since rocket replaced it 2013-03-15 22:32:40 -04:00
4a50dd015e Merge branch 'python-intervals' 2013-03-15 21:39:11 -04:00
22274550ab Test python version of Interval too 2013-03-15 21:37:03 -04:00
4f06d6ae68 Move Interval set_difference inside nilmdb.utils for clients
Clients might need to to Interval math too, so move a simple Interval
class and start putting helpers in there.
2013-03-15 21:37:03 -04:00
c54d8041c3 Update design docs 2013-03-15 21:07:01 -04:00
36 changed files with 635 additions and 1292 deletions

View File

@@ -24,3 +24,5 @@ Usage:
nilmdb-server --help nilmdb-server --help
nilmtool --help nilmtool --help
See docs/wsgi.md for info on setting up a WSGI application in Apache.

View File

@@ -186,6 +186,19 @@ IntervalSet speed
- rbtree and interval converted to cython: - rbtree and interval converted to cython:
8.4 μS, total 12 s, 134 MB RAM 8.4 μS, total 12 s, 134 MB RAM
- Would like to move Interval itself back to Python so other
non-cythonized code like client code can use it more easily.
Testing speed with just `test_interval` being tested, with
`range(5,22)`, using `/usr/bin/time -v python tests/runtests.py`,
times recorded for 2097152:
- 52ae397 (Interval in cython):
12.6133 μs each, ratio 0.866533, total 47 sec, 399 MB RAM
- 9759dcf (Interval in python):
21.2937 μs each, ratio 1.462870, total 83 sec, 1107 MB RAM
That's a huge difference! Instead, will keep Interval and DBInterval
cythonized inside nilmdb, and just have an additional copy in
nilmdb.utils for clients to use.
Layouts Layouts
------- -------
Current/old design has specific layouts: RawData, PrepData, RawNotchedData. Current/old design has specific layouts: RawData, PrepData, RawNotchedData.

32
docs/wsgi.md Normal file
View File

@@ -0,0 +1,32 @@
WSGI Application in Apache
--------------------------
Install `apache2` and `libapache2-mod-wsgi`
We'll set up the database server at URL `http://myhost.com/nilmdb`.
The database will be stored in `/home/nilm/db`, and the process will
run as user `nilm`, group `nilm`.
First, create a WSGI script `/home/nilm/nilmdb.wsgi` containing:
import nilmdb.server
application = nilmdb.server.wsgi_application("/home/nilm/db", "/nilmdb")
The first parameter is the local filesystem path, and the second
parameter is the path part of the URL.
Then, set up Apache with a configuration like:
<VirtualHost>
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
WSGIApplicationGroup nilmdb-appgroup
WSGIProcessGroup nilmdb-procgroup
WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm
# Access control example:
<Location /nilmdb>
Order deny,allow
Deny from all
Allow from 1.2.3.4
</Location>
</VirtualHost>

View File

@@ -17,4 +17,4 @@ _nilmtool_argcomplete() {
unset COMPREPLY unset COMPREPLY
fi fi
} }
complete -o nospace -o default -F _nilmtool_argcomplete nilmtool complete -o nospace -F _nilmtool_argcomplete nilmtool

View File

@@ -6,6 +6,7 @@ import nilmdb.utils
import nilmdb.client.httpclient import nilmdb.client.httpclient
from nilmdb.client.errors import ClientError from nilmdb.client.errors import ClientError
import re
import time import time
import simplejson as json import simplejson as json
import contextlib import contextlib
@@ -65,7 +66,12 @@ class Client(object):
params["layout"] = layout params["layout"] = layout
if extended: if extended:
params["extended"] = 1 params["extended"] = 1
return self.http.get("stream/list", params) def sort_streams_nicely(x):
"""Human-friendly sort (/stream/2 before /stream/10)"""
num = lambda t: int(t) if t.isdigit() else t
key = lambda k: [ num(c) for c in re.split('([0-9]+)', k[0]) ]
return sorted(x, key = key)
return sort_streams_nicely(self.http.get("stream/list", params))
def stream_get_metadata(self, path, keys = None): def stream_get_metadata(self, path, keys = None):
params = { "path": path } params = { "path": path }
@@ -97,7 +103,7 @@ class Client(object):
return self.http.post("stream/create", params) return self.http.post("stream/create", params)
def stream_destroy(self, path): def stream_destroy(self, path):
"""Delete stream and its contents""" """Delete stream. Fails if any data is still present."""
params = { "path": path } params = { "path": path }
return self.http.post("stream/destroy", params) return self.http.post("stream/destroy", params)
@@ -171,7 +177,8 @@ class Client(object):
params["end"] = timestamp_to_string(end) params["end"] = timestamp_to_string(end)
return self.http.get_gen("stream/intervals", params) return self.http.get_gen("stream/intervals", params)
def stream_extract(self, path, start = None, end = None, count = False): def stream_extract(self, path, start = None, end = None,
count = False, markup = False):
""" """
Extract data from a stream. Returns a generator that yields Extract data from a stream. Returns a generator that yields
lines of ASCII-formatted data that matches the database lines of ASCII-formatted data that matches the database
@@ -179,6 +186,9 @@ class Client(object):
Specify count = True to return a count of matching data points Specify count = True to return a count of matching data points
rather than the actual data. The output format is unchanged. rather than the actual data. The output format is unchanged.
Specify markup = True to include comments in the returned data
that indicate interval starts and ends.
""" """
params = { params = {
"path": path, "path": path,
@@ -189,6 +199,8 @@ class Client(object):
params["end"] = timestamp_to_string(end) params["end"] = timestamp_to_string(end)
if count: if count:
params["count"] = 1 params["count"] = 1
if markup:
params["markup"] = 1
return self.http.get_gen("stream/extract", params) return self.http.get_gen("stream/extract", params)
def stream_count(self, path, start = None, end = None): def stream_count(self, path, start = None, end = None):
@@ -307,6 +319,11 @@ class StreamInserter(object):
part of a new interval and there may be a gap left in-between.""" part of a new interval and there may be a gap left in-between."""
self._send_block(final = True) self._send_block(final = True)
def send(self):
"""Send any data that we might have buffered up. Does not affect
any other treatment of timestamps or endpoints."""
self._send_block(final = False)
def _get_first_noncomment(self, block): def _get_first_noncomment(self, block):
"""Return the (start, end) indices of the first full line in """Return the (start, end) indices of the first full line in
block that isn't a comment, or raise IndexError if block that isn't a comment, or raise IndexError if

View File

@@ -16,7 +16,7 @@ class HTTPClient(object):
reparsed = urlparse.urlparse(baseurl).geturl() reparsed = urlparse.urlparse(baseurl).geturl()
if '://' not in reparsed: if '://' not in reparsed:
reparsed = urlparse.urlparse("http://" + baseurl).geturl() reparsed = urlparse.urlparse("http://" + baseurl).geturl()
self.baseurl = reparsed self.baseurl = reparsed.rstrip('/') + '/'
# Build Requests session object, enable SSL verification # Build Requests session object, enable SSL verification
self.session = requests.Session() self.session = requests.Session()

View File

@@ -81,7 +81,7 @@ class Cmdline(object):
def __init__(self, argv = None): def __init__(self, argv = None):
self.argv = argv or sys.argv[1:] self.argv = argv or sys.argv[1:]
self.client = None self.client = None
self.def_url = os.environ.get("NILMDB_URL", "http://localhost:12380") self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
self.subcmd = {} self.subcmd = {}
self.complete = Complete() self.complete = Complete()

View File

@@ -7,11 +7,14 @@ def setup(self, sub):
cmd = sub.add_parser("destroy", help="Delete a stream and all data", cmd = sub.add_parser("destroy", help="Delete a stream and all data",
formatter_class = def_form, formatter_class = def_form,
description=""" description="""
Destroy the stream at the specified path. All Destroy the stream at the specified path.
data and metadata related to the stream is The stream must be empty. All metadata
permanently deleted. related to the stream is permanently deleted.
""") """)
cmd.set_defaults(handler = cmd_destroy) cmd.set_defaults(handler = cmd_destroy)
group = cmd.add_argument_group("Options")
group.add_argument("-R", "--remove", action="store_true",
help="Remove all data before destroying stream")
group = cmd.add_argument_group("Required arguments") group = cmd.add_argument_group("Required arguments")
group.add_argument("path", group.add_argument("path",
help="Path of the stream to delete, e.g. /foo/bar", help="Path of the stream to delete, e.g. /foo/bar",
@@ -20,6 +23,11 @@ def setup(self, sub):
def cmd_destroy(self): def cmd_destroy(self):
"""Destroy stream""" """Destroy stream"""
if self.args.remove:
try:
count = self.client.stream_remove(self.args.path)
except nilmdb.client.ClientError as e:
self.die("error removing data: %s", str(e))
try: try:
self.client.stream_destroy(self.args.path) self.client.stream_destroy(self.args.path)
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:

View File

@@ -29,6 +29,8 @@ def setup(self, sub):
group.add_argument("-a", "--annotate", action="store_true", group.add_argument("-a", "--annotate", action="store_true",
help="Include comments with some information " help="Include comments with some information "
"about the stream") "about the stream")
group.add_argument("-m", "--markup", action="store_true",
help="Include comments with interval starts and ends")
group.add_argument("-T", "--timestamp-raw", action="store_true", group.add_argument("-T", "--timestamp-raw", action="store_true",
help="Show raw timestamps in annotated information") help="Show raw timestamps in annotated information")
group.add_argument("-c", "--count", action="store_true", group.add_argument("-c", "--count", action="store_true",
@@ -61,7 +63,8 @@ def cmd_extract(self):
for dataline in self.client.stream_extract(self.args.path, for dataline in self.client.stream_extract(self.args.path,
self.args.start, self.args.start,
self.args.end, self.args.end,
self.args.count): self.args.count,
self.args.markup):
if self.args.bare and not self.args.count: if self.args.bare and not self.args.count:
# Strip timestamp (first element). Doesn't make sense # Strip timestamp (first element). Doesn't make sense
# if we are only returning a count. # if we are only returning a count.

View File

@@ -9,7 +9,8 @@ def setup(self, sub):
a stream. a stream.
""", """,
usage="%(prog)s path [-g [key ...] | " usage="%(prog)s path [-g [key ...] | "
"-s key=value [...] | -u key=value [...]]") "-s key=value [...] | -u key=value [...]] | "
"-d [key ...]")
cmd.set_defaults(handler = cmd_metadata) cmd.set_defaults(handler = cmd_metadata)
group = cmd.add_argument_group("Required arguments") group = cmd.add_argument_group("Required arguments")
@@ -30,6 +31,9 @@ def setup(self, sub):
help="Update metadata using provided " help="Update metadata using provided "
"key=value pairs", "key=value pairs",
).completer = self.complete.meta_keyval ).completer = self.complete.meta_keyval
exc.add_argument("-d", "--delete", nargs="*", metavar="key",
help="Delete metadata for specified keys (default all)",
).completer = self.complete.meta_key
return cmd return cmd
def cmd_metadata(self): def cmd_metadata(self):
@@ -56,6 +60,16 @@ def cmd_metadata(self):
handler(self.args.path, data) handler(self.args.path, data)
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("error setting/updating metadata: %s", str(e)) self.die("error setting/updating metadata: %s", str(e))
elif self.args.delete is not None:
# Delete (by setting values to empty strings)
keys = self.args.delete or None
try:
data = self.client.stream_get_metadata(self.args.path, keys)
for key in data:
data[key] = ""
self.client.stream_update_metadata(self.args.path, data)
except nilmdb.client.ClientError as e:
self.die("error deleting metadata: %s", str(e))
else: else:
# Get (or unspecified) # Get (or unspecified)
keys = self.args.get or None keys = self.args.get or None
@@ -64,7 +78,7 @@ def cmd_metadata(self):
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("error getting metadata: %s", str(e)) self.die("error getting metadata: %s", str(e))
for key, value in sorted(data.items()): for key, value in sorted(data.items()):
# Omit nonexistant keys # Print nonexistant keys as having empty value
if value is None: if value is None:
value = "" value = ""
printf("%s=%s\n", key, value) printf("%s=%s\n", key, value)

View File

@@ -22,7 +22,7 @@ def main():
group.add_argument('-p', '--port', help = 'Listen on the given port', group.add_argument('-p', '--port', help = 'Listen on the given port',
type = int, default = 12380) type = int, default = 12380)
group.add_argument('-d', '--database', help = 'Database directory', group.add_argument('-d', '--database', help = 'Database directory',
default = os.path.join(os.getcwd(), "db")) default = "./db")
group.add_argument('-q', '--quiet', help = 'Silence output', group.add_argument('-q', '--quiet', help = 'Silence output',
action = 'store_true') action = 'store_true')
group.add_argument('-t', '--traceback', group.add_argument('-t', '--traceback',

View File

@@ -17,5 +17,5 @@ except (ImportError, TypeError): # pragma: no cover
pass pass
from nilmdb.server.nilmdb import NilmDB from nilmdb.server.nilmdb import NilmDB
from nilmdb.server.server import Server from nilmdb.server.server import Server, wsgi_application
from nilmdb.server.errors import NilmDBError, StreamError, OverlapError from nilmdb.server.errors import NilmDBError, StreamError, OverlapError

View File

@@ -14,6 +14,7 @@ import re
import sys import sys
import tempfile import tempfile
import nilmdb.utils.lock
from . import rocket from . import rocket
# Up to 256 open file descriptors at any given time. # Up to 256 open file descriptors at any given time.
@@ -26,6 +27,8 @@ class BulkData(object):
def __init__(self, basepath, **kwargs): def __init__(self, basepath, **kwargs):
self.basepath = basepath self.basepath = basepath
self.root = os.path.join(self.basepath, "data") self.root = os.path.join(self.basepath, "data")
self.lock = self.root + ".lock"
self.lockfile = None
# Tuneables # Tuneables
if "file_size" in kwargs: if "file_size" in kwargs:
@@ -44,8 +47,22 @@ class BulkData(object):
if not os.path.isdir(self.root): if not os.path.isdir(self.root):
os.mkdir(self.root) os.mkdir(self.root)
# Create the lock
self.lockfile = open(self.lock, "w")
if not nilmdb.utils.lock.exclusive_lock(self.lockfile):
raise IOError('database at "' + self.basepath +
'" is already locked by another process')
def close(self): def close(self):
self.getnode.cache_remove_all() self.getnode.cache_remove_all()
if self.lockfile:
nilmdb.utils.lock.exclusive_unlock(self.lockfile)
self.lockfile.close()
try:
os.unlink(self.lock)
except OSError: # pragma: no cover
pass
self.lockfile = None
def _encode_filename(self, path): def _encode_filename(self, path):
# Encode all paths to UTF-8, regardless of sys.getfilesystemencoding(), # Encode all paths to UTF-8, regardless of sys.getfilesystemencoding(),
@@ -62,7 +79,12 @@ class BulkData(object):
if Table.exists(ospath): if Table.exists(ospath):
raise ValueError("stream already exists at this path") raise ValueError("stream already exists at this path")
if os.path.isdir(ospath): if os.path.isdir(ospath):
raise ValueError("subdirs of this path already exist") # Look for any files in subdirectories. Fully empty subdirectories
# are OK; they might be there during a rename
for (root, dirs, files) in os.walk(ospath):
if len(files):
raise ValueError(
"non-empty subdirs of this path already exist")
def _create_parents(self, unicodepath): def _create_parents(self, unicodepath):
"""Verify the path name, and create parent directories if they """Verify the path name, and create parent directories if they
@@ -134,7 +156,7 @@ class BulkData(object):
# Open and cache it # Open and cache it
self.getnode(unicodepath) self.getnode(unicodepath)
except: except Exception:
exc_info = sys.exc_info() exc_info = sys.exc_info()
try: try:
os.rmdir(ospath) os.rmdir(ospath)
@@ -171,7 +193,6 @@ class BulkData(object):
# Basic checks # Basic checks
if oldospath == newospath: if oldospath == newospath:
raise ValueError("old and new paths are the same") raise ValueError("old and new paths are the same")
self._create_check_ospath(newospath)
# Move the table to a temporary location # Move the table to a temporary location
tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root) tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root)
@@ -179,6 +200,9 @@ class BulkData(object):
os.rename(oldospath, tmppath) os.rename(oldospath, tmppath)
try: try:
# Check destination path
self._create_check_ospath(newospath)
# Create parent dirs for new location # Create parent dirs for new location
self._create_parents(newunicodepath) self._create_parents(newunicodepath)
@@ -371,7 +395,7 @@ class Table(object):
# Try deleting subdir, too # Try deleting subdir, too
try: try:
os.rmdir(os.path.join(self.root, subdir)) os.rmdir(os.path.join(self.root, subdir))
except: except Exception:
pass pass
# Cache open files # Cache open files
@@ -504,7 +528,7 @@ class Table(object):
with open(cachefile, "rb") as f: with open(cachefile, "rb") as f:
ranges = pickle.load(f) ranges = pickle.load(f)
cachefile_present = True cachefile_present = True
except: except Exception:
ranges = [] ranges = []
cachefile_present = False cachefile_present = False

View File

@@ -1,5 +1,9 @@
"""Interval, IntervalSet """Interval, IntervalSet
The Interval implemented here is just like
nilmdb.utils.interval.Interval, except implemented in Cython for
speed.
Represents an interval of time, and a set of such intervals. Represents an interval of time, and a set of such intervals.
Intervals are half-open, ie. they include data points with timestamps Intervals are half-open, ie. they include data points with timestamps
@@ -23,6 +27,7 @@ from ..utils.time import min_timestamp as nilmdb_min_timestamp
from ..utils.time import max_timestamp as nilmdb_max_timestamp from ..utils.time import max_timestamp as nilmdb_max_timestamp
from ..utils.time import timestamp_to_string from ..utils.time import timestamp_to_string
from ..utils.iterator import imerge from ..utils.iterator import imerge
from ..utils.interval import IntervalError
import itertools import itertools
cimport rbtree cimport rbtree
@@ -30,10 +35,6 @@ from libc.stdint cimport uint64_t, int64_t
ctypedef int64_t timestamp_t ctypedef int64_t timestamp_t
class IntervalError(Exception):
"""Error due to interval overlap, etc"""
pass
cdef class Interval: cdef class Interval:
"""Represents an interval of time.""" """Represents an interval of time."""
@@ -59,17 +60,7 @@ cdef class Interval:
def __cmp__(self, Interval other): def __cmp__(self, Interval other):
"""Compare two intervals. If non-equal, order by start then end""" """Compare two intervals. If non-equal, order by start then end"""
if not isinstance(other, Interval): return cmp(self.start, other.start) or cmp(self.end, other.end)
raise TypeError("bad type")
if self.start == other.start:
if self.end < other.end:
return -1
if self.end > other.end:
return 1
return 0
if self.start < other.start:
return -1
return 1
cpdef intersects(self, Interval other): cpdef intersects(self, Interval other):
"""Return True if two Interval objects intersect""" """Return True if two Interval objects intersect"""
@@ -295,80 +286,18 @@ cdef class IntervalSet:
(potentially) subsetted to make the one that is being (potentially) subsetted to make the one that is being
returned. returned.
""" """
if not isinstance(interval, Interval): if orig:
raise TypeError("bad type") for n in self.tree.intersect(interval.start, interval.end):
for n in self.tree.intersect(interval.start, interval.end): i = n.obj
i = n.obj subset = i.subset(max(i.start, interval.start),
if i: min(i.end, interval.end))
if i.start >= interval.start and i.end <= interval.end: yield (subset, i)
if orig: else:
yield (i, i) for n in self.tree.intersect(interval.start, interval.end):
else: i = n.obj
yield i subset = i.subset(max(i.start, interval.start),
else: min(i.end, interval.end))
subset = i.subset(max(i.start, interval.start), yield subset
min(i.end, interval.end))
if orig:
yield (subset, i)
else:
yield subset
def set_difference(self, IntervalSet other not None,
Interval bounds = None):
"""
Compute the difference (self \\ other) between this
IntervalSet and the given IntervalSet; i.e., the ranges
that are present in 'self' but not 'other'.
If 'bounds' is not None, results are limited to the range
specified by the interval 'bounds'.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (self).
"""
# Iterate through all starts and ends in sorted order. Add a
# tag to the iterator so that we can figure out which one they
# were, after sorting.
def decorate(it, key_start, key_end):
for i in it:
yield i.start, key_start, i
yield i.end, key_end, i
if bounds is None:
bounds = Interval(nilmdb_min_timestamp,
nilmdb_max_timestamp)
self_iter = decorate(self.intersection(bounds), 0, 2)
other_iter = decorate(other.intersection(bounds), 1, 3)
# Now iterate over the timestamps of each start and end.
# At each point, evaluate which type of end it is, to determine
# how to build up the output intervals.
self_interval = None
other_interval = None
out_start = None
for (ts, k, i) in imerge(self_iter, other_iter):
if k == 0:
# start self interval
self_interval = i
if other_interval is None:
out_start = ts
elif k == 1:
# start other interval
other_interval = i
if out_start is not None and out_start != ts:
yield self_interval.subset(out_start, ts)
out_start = None
elif k == 2:
# end self interval
if out_start is not None and out_start != ts:
yield self_interval.subset(out_start, ts)
out_start = None
self_interval = None
elif k == 3:
# end other interval
other_interval = None
if self_interval:
out_start = ts
cpdef intersects(self, Interval other): cpdef intersects(self, Interval other):
"""Return True if this IntervalSet intersects another interval""" """Return True if this IntervalSet intersects another interval"""

View File

@@ -1,204 +0,0 @@
# cython: profile=False
import time
import sys
import inspect
import cStringIO
from ..utils.time import min_timestamp as nilmdb_min_timestamp
cdef enum:
max_value_count = 64
cimport cython
cimport libc.stdlib
cimport libc.stdio
cimport libc.string
class ParserError(Exception):
def __init__(self, line, message):
self.message = "line " + str(line) + ": " + message
Exception.__init__(self, self.message)
class FormatterError(Exception):
pass
class Layout:
"""Represents a NILM database layout"""
def __init__(self, typestring):
"""Initialize this Layout object to handle the specified
type string"""
try:
[ datatype, count ] = typestring.split("_")
except:
raise KeyError("invalid layout string")
try:
self.count = int(count)
except ValueError:
raise KeyError("invalid count")
if self.count < 1 or self.count > max_value_count:
raise KeyError("invalid count")
if datatype == 'uint16':
self.parse = self.parse_uint16
self.format_str = "%.6f" + " %d" * self.count
self.format = self.format_generic
elif datatype == 'float32':
self.parse = self.parse_float64
self.format_str = "%.6f" + " %.6e" * self.count
self.format = self.format_generic
elif datatype == 'float64':
self.parse = self.parse_float64
self.format_str = "%.6f" + " %.16e" * self.count
self.format = self.format_generic
else:
raise KeyError("invalid type")
self.datatype = datatype
# Parsers
def parse_float64(self, char *text):
cdef int n
cdef double ts
# Return doubles even in float32 case, since they're going into
# a Python array which would upconvert to double anyway.
result = [0] * (self.count + 1)
cdef char *end
ts = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("bad timestamp")
result[0] = ts
for n in range(self.count):
text = end
result[n+1] = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("wrong number of values")
n = 0
while end[n] == ' ':
n += 1
if end[n] != '\n' and end[n] != '#' and end[n] != '\0':
raise ValueError("extra data on line")
return (ts, result)
def parse_uint16(self, char *text):
cdef int n
cdef double ts
cdef int v
cdef char *end
result = [0] * (self.count + 1)
ts = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("bad timestamp")
result[0] = ts
for n in range(self.count):
text = end
v = libc.stdlib.strtol(text, &end, 10)
if v < 0 or v > 65535:
raise ValueError("value out of range")
result[n+1] = v
if end == text:
raise ValueError("wrong number of values")
n = 0
while end[n] == ' ':
n += 1
if end[n] != '\n' and end[n] != '#' and end[n] != '\0':
raise ValueError("extra data on line")
return (ts, result)
# Formatters
def format_generic(self, d):
n = len(d) - 1
if n != self.count:
raise ValueError("wrong number of values for layout type: "
"got %d, wanted %d" % (n, self.count))
return (self.format_str % tuple(d)) + "\n"
# Get a layout by name
def get_named(typestring):
try:
return Layout(typestring)
except KeyError:
compat = { "PrepData": "float32_8",
"RawData": "uint16_6",
"RawNotchedData": "uint16_9" }
return Layout(compat[typestring])
class Parser(object):
"""Object that parses and stores ASCII data for inclusion into the
database"""
def __init__(self, layout):
if issubclass(layout.__class__, Layout):
self.layout = layout
else:
try:
self.layout = get_named(layout)
except KeyError:
raise TypeError("unknown layout")
self.data = []
self.min_timestamp = None
self.max_timestamp = None
def parse(self, textdata):
"""
Parse the data, provided as lines of text, using the current
layout, into an internal data structure suitable for a
pytables 'table.append(parser.data)'.
"""
cdef double last_ts = nilmdb_min_timestamp
cdef double ts
cdef int n = 0, i
cdef char *line
indata = cStringIO.StringIO(textdata)
# Assume any parsing error is a real error.
# In the future we might want to skip completely empty lines,
# or partial lines right before EOF?
try:
self.data = []
for pyline in indata:
line = pyline
n += 1
if line[0] == '\#':
continue
(ts, row) = self.layout.parse(line)
if ts <= last_ts:
raise ValueError("timestamp is not "
"monotonically increasing")
last_ts = ts
self.data.append(row)
except (ValueError, IndexError, TypeError) as e:
raise ParserError(n, "error: " + e.message)
# Mark timestamp ranges
if len(self.data):
self.min_timestamp = self.data[0][0]
self.max_timestamp = self.data[-1][0]
class Formatter(object):
"""Object that formats database data into ASCII"""
def __init__(self, layout):
if issubclass(layout.__class__, Layout):
self.layout = layout
else:
try:
self.layout = get_named(layout)
except KeyError:
raise TypeError("unknown layout")
def format(self, data):
"""
Format raw data from the database, using the current layout,
as lines of ACSII text.
"""
text = cStringIO.StringIO()
try:
for row in data:
text.write(self.layout.format(row))
except (ValueError, IndexError, TypeError) as e:
raise FormatterError("formatting error: " + e.message)
return text.getvalue()

View File

@@ -12,8 +12,11 @@ Manages both the SQL database and the table storage backend.
from __future__ import absolute_import from __future__ import absolute_import
import nilmdb.utils import nilmdb.utils
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
from nilmdb.server.interval import (Interval, DBInterval, from nilmdb.utils.time import timestamp_to_string
IntervalSet, IntervalError)
from nilmdb.utils.interval import IntervalError
from nilmdb.server.interval import Interval, DBInterval, IntervalSet
from nilmdb.server import bulkdata from nilmdb.server import bulkdata
from nilmdb.server.errors import NilmDBError, StreamError, OverlapError from nilmdb.server.errors import NilmDBError, StreamError, OverlapError
@@ -81,7 +84,18 @@ class NilmDB(object):
verbose = 0 verbose = 0
def __init__(self, basepath, max_results=None, def __init__(self, basepath, max_results=None,
bulkdata_args=None): max_removals=None, bulkdata_args=None):
"""Initialize NilmDB at the given basepath.
Other arguments are for debugging / testing:
'max_results' is the max rows to send in a single
stream_intervals or stream_extract response.
'max_removals' is the max rows to delete at once
in stream_move.
'bulkdata_args' is kwargs for the bulkdata module.
"""
if bulkdata_args is None: if bulkdata_args is None:
bulkdata_args = {} bulkdata_args = {}
@@ -92,7 +106,9 @@ class NilmDB(object):
try: try:
os.makedirs(self.basepath) os.makedirs(self.basepath)
except OSError as e: except OSError as e:
if e.errno != errno.EEXIST: if e.errno != errno.EEXIST: # pragma: no cover
# (no coverage, because it's hard to trigger this case
# if tests are run as root)
raise IOError("can't create tree " + self.basepath) raise IOError("can't create tree " + self.basepath)
# Our data goes inside it # Our data goes inside it
@@ -103,19 +119,20 @@ class NilmDB(object):
self.con = sqlite3.connect(sqlfilename, check_same_thread = True) self.con = sqlite3.connect(sqlfilename, check_same_thread = True)
try: try:
self._sql_schema_update() self._sql_schema_update()
finally: # pragma: no cover except Exception: # pragma: no cover
self.data.close() self.data.close()
raise
# See big comment at top about the performance implications of this # See big comment at top about the performance implications of this
self.con.execute("PRAGMA synchronous=NORMAL") self.con.execute("PRAGMA synchronous=NORMAL")
self.con.execute("PRAGMA journal_mode=WAL") self.con.execute("PRAGMA journal_mode=WAL")
# Approximate largest number of elements that we want to send # Approximate largest number of elements that we want to send
# in a single reply (for stream_intervals, stream_extract) # in a single reply (for stream_intervals, stream_extract).
if max_results: self.max_results = max_results or 16384
self.max_results = max_results
else: # Remove up to this many rows per call to stream_remove.
self.max_results = 16384 self.max_removals = max_removals or 1048576
def get_basepath(self): def get_basepath(self):
return self.basepath return self.basepath
@@ -328,18 +345,18 @@ class NilmDB(object):
List all intervals in 'path' between 'start' and 'end'. If List all intervals in 'path' between 'start' and 'end'. If
'diffpath' is not none, list instead the set-difference 'diffpath' is not none, list instead the set-difference
between the intervals in the two streams; i.e. all interval between the intervals in the two streams; i.e. all interval
ranges that are present in 'path' but not 'path2'. ranges that are present in 'path' but not 'diffpath'.
Returns (intervals, restart) tuple. Returns (intervals, restart) tuple.
intervals is a list of [start,end] timestamps of all intervals 'intervals' is a list of [start,end] timestamps of all intervals
that exist for path, between start and end. that exist for path, between start and end.
restart, if nonzero, means that there were too many results to 'restart', if not None, means that there were too many results
return in a single request. The data is complete from the to return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated, starting timestamp to the point at which it was truncated, and
and a new request with a start time of 'restart' will fetch a new request with a start time of 'restart' will fetch the
the next block of data. next block of data.
""" """
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
intervals = self._get_intervals(stream_id) intervals = self._get_intervals(stream_id)
@@ -350,7 +367,9 @@ class NilmDB(object):
requested = Interval(start, end) requested = Interval(start, end)
result = [] result = []
if diffpath: if diffpath:
getter = intervals.set_difference(diffintervals, requested) getter = nilmdb.utils.interval.set_difference(
intervals.intersection(requested),
diffintervals.intersection(requested))
else: else:
getter = intervals.intersection(requested) getter = intervals.intersection(requested)
for n, i in enumerate(getter): for n, i in enumerate(getter):
@@ -359,7 +378,7 @@ class NilmDB(object):
break break
result.append([i.start, i.end]) result.append([i.start, i.end])
else: else:
restart = 0 restart = None
return (result, restart) return (result, restart)
def stream_create(self, path, layout_name): def stream_create(self, path, layout_name):
@@ -435,17 +454,22 @@ class NilmDB(object):
(newpath, stream_id)) (newpath, stream_id))
def stream_destroy(self, path): def stream_destroy(self, path):
"""Fully remove a table and all of its data from the database. """Fully remove a table from the database. Fails if there are
No way to undo it! Metadata is removed.""" any intervals data present; remove them first. Metadata is
also removed."""
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
# Delete the cached interval data (if it was cached) # Verify that no intervals are present, and clear the cache
iset = self._get_intervals(stream_id)
if len(iset):
raise NilmDBError("all intervals must be removed before "
"destroying a stream")
self._get_intervals.cache_remove(self, stream_id) self._get_intervals.cache_remove(self, stream_id)
# Delete the data # Delete the bulkdata storage
self.data.destroy(path) self.data.destroy(path)
# Delete metadata, stream, intervals # Delete metadata, stream, intervals (should be none)
with self.con as con: with self.con as con:
con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,)) con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,)) con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
@@ -513,23 +537,28 @@ class NilmDB(object):
dbinterval.db_startpos, dbinterval.db_startpos,
dbinterval.db_endpos) dbinterval.db_endpos)
def stream_extract(self, path, start = None, end = None, count = False): def stream_extract(self, path, start = None, end = None,
count = False, markup = False):
""" """
Returns (data, restart) tuple. Returns (data, restart) tuple.
data is ASCII-formatted data from the database, formatted 'data' is ASCII-formatted data from the database, formatted
according to the layout of the stream. according to the layout of the stream.
restart, if nonzero, means that there were too many results to 'restart', if not None, means that there were too many results to
return in a single request. The data is complete from the return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated, starting timestamp to the point at which it was truncated,
and a new request with a start time of 'restart' will fetch and a new request with a start time of 'restart' will fetch
the next block of data. the next block of data.
count, if true, means to not return raw data, but just the count 'count', if true, means to not return raw data, but just the count
of rows that would have been returned. This is much faster of rows that would have been returned. This is much faster
than actually fetching the data. It is not limited by than actually fetching the data. It is not limited by
max_results. max_results.
'markup', if true, indicates that returned data should be
marked with a comment denoting when a particular interval
starts, and another comment when an interval ends.
""" """
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
table = self.data.getnode(path) table = self.data.getnode(path)
@@ -539,7 +568,7 @@ class NilmDB(object):
result = [] result = []
matched = 0 matched = 0
remaining = self.max_results remaining = self.max_results
restart = 0 restart = None
for interval in intervals.intersection(requested): for interval in intervals.intersection(requested):
# Reading single rows from the table is too slow, so # Reading single rows from the table is too slow, so
# we use two bisections to find both the starting and # we use two bisections to find both the starting and
@@ -558,14 +587,26 @@ class NilmDB(object):
row_end = row_max row_end = row_max
restart = table[row_max] restart = table[row_max]
# Add markup
if markup:
result.append("# interval-start " +
timestamp_to_string(interval.start) + "\n")
# Gather these results up # Gather these results up
result.append(table.get_data(row_start, row_end)) result.append(table.get_data(row_start, row_end))
# Count them # Count them
remaining -= row_end - row_start remaining -= row_end - row_start
if restart: # Add markup, and exit if restart is set.
if restart is not None:
if markup:
result.append("# interval-end " +
timestamp_to_string(restart) + "\n")
break break
if markup:
result.append("# interval-end " +
timestamp_to_string(interval.end) + "\n")
if count: if count:
return matched return matched
@@ -574,9 +615,17 @@ class NilmDB(object):
def stream_remove(self, path, start = None, end = None): def stream_remove(self, path, start = None, end = None):
""" """
Remove data from the specified time interval within a stream. Remove data from the specified time interval within a stream.
Removes all data in the interval [start, end), and intervals
are truncated or split appropriately. Returns the number of Removes data in the interval [start, end), and intervals are
data points removed. truncated or split appropriately.
Returns a (removed, restart) tuple.
'removed' is the number of data points that were removed.
'restart', if not None, means there were too many rows to
remove in a single request. This function should be called
again with a start time of 'restart' to complete the removal.
""" """
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
table = self.data.getnode(path) table = self.data.getnode(path)
@@ -584,6 +633,8 @@ class NilmDB(object):
(start, end) = self._check_user_times(start, end) (start, end) = self._check_user_times(start, end)
to_remove = Interval(start, end) to_remove = Interval(start, end)
removed = 0 removed = 0
remaining = self.max_removals
restart = None
# Can't remove intervals from within the iterator, so we need to # Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now. # remember what's currently in the intersection now.
@@ -594,6 +645,13 @@ class NilmDB(object):
row_start = self._find_start(table, dbint) row_start = self._find_start(table, dbint)
row_end = self._find_end(table, dbint) row_end = self._find_end(table, dbint)
# Shorten it if we'll hit the maximum number of removals
row_max = row_start + remaining
if row_max < row_end:
row_end = row_max
dbint.end = table[row_max]
restart = dbint.end
# Adjust the DBInterval to match the newly found ends # Adjust the DBInterval to match the newly found ends
dbint.db_start = dbint.start dbint.db_start = dbint.start
dbint.db_end = dbint.end dbint.db_end = dbint.end
@@ -609,4 +667,7 @@ class NilmDB(object):
# Count how many were removed # Count how many were removed
removed += row_end - row_start removed += row_end - row_start
return removed if restart is not None:
break
return (removed, restart)

View File

@@ -11,9 +11,11 @@ from nilmdb.utils.time import string_to_timestamp
import cherrypy import cherrypy
import sys import sys
import os import os
import socket
import simplejson as json import simplejson as json
import decorator import decorator
import psutil import psutil
import traceback
class NilmApp(object): class NilmApp(object):
def __init__(self, db): def __init__(self, db):
@@ -172,6 +174,21 @@ class Root(NilmApp):
class Stream(NilmApp): class Stream(NilmApp):
"""Stream-specific operations""" """Stream-specific operations"""
# Helpers
def _get_times(self, start_param, end_param):
(start, end) = (None, None)
if start_param is not None:
start = string_to_timestamp(start_param)
if end_param is not None:
end = string_to_timestamp(end_param)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError(
"400 Bad Request",
sprintf("start must precede end (%s >= %s)",
start_param, end_param))
return (start, end)
# /stream/list # /stream/list
# /stream/list?layout=float32_8 # /stream/list?layout=float32_8
# /stream/list?path=/newton/prep&extended=1 # /stream/list?path=/newton/prep&extended=1
@@ -210,7 +227,7 @@ class Stream(NilmApp):
@exception_to_httperror(NilmDBError) @exception_to_httperror(NilmDBError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
def destroy(self, path): def destroy(self, path):
"""Delete a stream and its associated data.""" """Delete a stream. Fails if any data is still present."""
return self.db.stream_destroy(path) return self.db.stream_destroy(path)
# /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1 # /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1
@@ -300,16 +317,11 @@ class Stream(NilmApp):
body = cherrypy.request.body.read() body = cherrypy.request.body.read()
# Check path and get layout # Check path and get layout
streams = self.db.stream_list(path = path) if len(self.db.stream_list(path = path)) != 1:
if len(streams) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path)
raise cherrypy.HTTPError("404 Not Found", "No such stream")
# Check limits # Check limits
start = string_to_timestamp(start) (start, end) = self._get_times(start, end)
end = string_to_timestamp(end)
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
# Pass the data directly to nilmdb, which will parse it and # Pass the data directly to nilmdb, which will parse it and
# raise a ValueError if there are any problems. # raise a ValueError if there are any problems.
@@ -331,15 +343,15 @@ class Stream(NilmApp):
the interval [start, end). Returns the number of data points the interval [start, end). Returns the number of data points
removed. removed.
""" """
if start is not None: (start, end) = self._get_times(start, end)
start = string_to_timestamp(start) total_removed = 0
if end is not None: while True:
end = string_to_timestamp(end) (removed, restart) = self.db.stream_remove(path, start, end)
if start is not None and end is not None: total_removed += removed
if start >= end: if restart is None:
raise cherrypy.HTTPError("400 Bad Request", break
"start must precede end") start = restart
return self.db.stream_remove(path, start, end) return total_removed
# /stream/intervals?path=/newton/prep # /stream/intervals?path=/newton/prep
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0 # /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
@@ -362,15 +374,7 @@ class Stream(NilmApp):
Note that the response type is the non-standard Note that the response type is the non-standard
'application/x-json-stream' for lack of a better option. 'application/x-json-stream' for lack of a better option.
""" """
if start is not None: (start, end) = self._get_times(start, end)
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
if len(self.db.stream_list(path = path)) != 1: if len(self.db.stream_list(path = path)) != 1:
raise cherrypy.HTTPError("404", "No such stream: " + path) raise cherrypy.HTTPError("404", "No such stream: " + path)
@@ -386,7 +390,7 @@ class Stream(NilmApp):
diffpath) diffpath)
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ]) response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
yield response yield response
if restart == 0: if restart is None:
break break
start = restart start = restart
return content(start, end) return content(start, end)
@@ -395,47 +399,43 @@ class Stream(NilmApp):
@cherrypy.expose @cherrypy.expose
@chunked_response @chunked_response
@response_type("text/plain") @response_type("text/plain")
def extract(self, path, start = None, end = None, count = False): def extract(self, path, start = None, end = None,
count = False, markup = False):
""" """
Extract data from backend database. Streams the resulting Extract data from backend database. Streams the resulting
entries as ASCII text lines separated by newlines. This may entries as ASCII text lines separated by newlines. This may
make multiple requests to the nilmdb backend to avoid causing make multiple requests to the nilmdb backend to avoid causing
it to block for too long. it to block for too long.
Add count=True to return a count rather than actual data. If 'count' is True, returns a count rather than actual data.
"""
if start is not None:
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
# Check parameters If 'markup' is True, adds comments to the stream denoting each
if start is not None and end is not None: interval's start and end timestamp.
if start >= end: """
raise cherrypy.HTTPError("400 Bad Request", (start, end) = self._get_times(start, end)
"start must precede end")
# Check path and get layout # Check path and get layout
streams = self.db.stream_list(path = path) if len(self.db.stream_list(path = path)) != 1:
if len(streams) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path)
raise cherrypy.HTTPError("404 Not Found", "No such stream")
@workaround_cp_bug_1200 @workaround_cp_bug_1200
def content(start, end, count): def content(start, end):
# Note: disable chunked responses to see tracebacks from here. # Note: disable chunked responses to see tracebacks from here.
if count: if count:
matched = self.db.stream_extract(path, start, end, count) matched = self.db.stream_extract(path, start, end,
count = True)
yield sprintf("%d\n", matched) yield sprintf("%d\n", matched)
return return
while True: while True:
(data, restart) = self.db.stream_extract(path, start, end) (data, restart) = self.db.stream_extract(
path, start, end, count = False, markup = markup)
yield data yield data
if restart == 0: if restart is None:
return return
start = restart start = restart
return content(start, end, count) return content(start, end)
class Exiter(object): class Exiter(object):
"""App that exits the server, for testing""" """App that exits the server, for testing"""
@@ -453,7 +453,8 @@ class Server(object):
stoppable = False, # whether /exit URL exists stoppable = False, # whether /exit URL exists
embedded = True, # hide diagnostics and output, etc embedded = True, # hide diagnostics and output, etc
fast_shutdown = False, # don't wait for clients to disconn. fast_shutdown = False, # don't wait for clients to disconn.
force_traceback = False # include traceback in all errors force_traceback = False, # include traceback in all errors
basepath = '', # base URL path for cherrypy.tree
): ):
# Save server version, just for verification during tests # Save server version, just for verification during tests
self.version = nilmdb.__version__ self.version = nilmdb.__version__
@@ -513,7 +514,7 @@ class Server(object):
if stoppable: if stoppable:
root.exit = Exiter() root.exit = Exiter()
cherrypy.tree.apps = {} cherrypy.tree.apps = {}
cherrypy.tree.mount(root, "/", config = { "/" : app_config }) cherrypy.tree.mount(root, basepath, config = { "/" : app_config })
# Shutdowns normally wait for clients to disconnect. To speed # Shutdowns normally wait for clients to disconnect. To speed
# up tests, set fast_shutdown = True # up tests, set fast_shutdown = True
@@ -523,6 +524,9 @@ class Server(object):
else: else:
cherrypy.server.shutdown_timeout = 5 cherrypy.server.shutdown_timeout = 5
# Set up the WSGI application pointer for external programs
self.wsgi_application = cherrypy.tree
def json_error_page(self, status, message, traceback, version): def json_error_page(self, status, message, traceback, version):
"""Return a custom error page in JSON so the client can parse it""" """Return a custom error page in JSON so the client can parse it"""
errordata = { "status" : status, errordata = { "status" : status,
@@ -589,3 +593,55 @@ class Server(object):
def stop(self): def stop(self):
cherrypy.engine.exit() cherrypy.engine.exit()
# Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
# instance since the database can only be opened once. For this to
# work, the web server must use only a single process and single
# Python interpreter. Multiple threads are OK.
_wsgi_server = None
def wsgi_application(dbpath, basepath): # pragma: no cover
"""Return a WSGI application object with a database at the
specified path.
'dbpath' is a filesystem location, e.g. /home/nilm/db
'basepath' is the URL path of the application base, which
is the same as the first argument to Apache's WSGIScriptAlias
directive.
"""
def application(environ, start_response):
global _wsgi_server
if _wsgi_server is None:
# Try to start the server
try:
db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath)
_wsgi_server = nilmdb.server.Server(
db, embedded = True,
basepath = basepath.rstrip('/'))
except Exception:
# Build an error message on failure
import pprint
err = sprintf("Initializing database at path '%s' failed:\n\n",
dbpath)
err += traceback.format_exc()
try:
import pwd
import grp
err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s) "
"on host %s, pid %d\n",
os.getuid(), pwd.getpwuid(os.getuid())[0],
os.getgid(), grp.getgrgid(os.getgid())[0],
socket.gethostname(), os.getpid())
except ImportError:
pass
err += sprintf("\nEnvironment:\n%s\n", pprint.pformat(environ))
if _wsgi_server is None:
# Serve up the error with our own mini WSGI app.
headers = [ ('Content-type', 'text/plain'),
('Content-length', str(len(err))) ]
start_response("500 Internal Server Error", headers)
return [err]
# Call the normal application
return _wsgi_server.wsgi_application(environ, start_response)
return application

View File

@@ -1,7 +1,7 @@
"""NilmDB utilities""" """NilmDB utilities"""
from __future__ import absolute_import
from nilmdb.utils.timer import Timer from nilmdb.utils.timer import Timer
from nilmdb.utils.iteratorizer import Iteratorizer
from nilmdb.utils.serializer import serializer_proxy from nilmdb.utils.serializer import serializer_proxy
from nilmdb.utils.lrucache import lru_cache from nilmdb.utils.lrucache import lru_cache
from nilmdb.utils.diskusage import du, human_size from nilmdb.utils.diskusage import du, human_size
@@ -11,3 +11,5 @@ import nilmdb.utils.threadsafety
import nilmdb.utils.fallocate import nilmdb.utils.fallocate
import nilmdb.utils.time import nilmdb.utils.time
import nilmdb.utils.iterator import nilmdb.utils.iterator
import nilmdb.utils.interval
import nilmdb.utils.lock

106
nilmdb/utils/interval.py Normal file
View File

@@ -0,0 +1,106 @@
"""Interval. Like nilmdb.server.interval, but re-implemented here
in plain Python so clients have easier access to it.
Intervals are half-open, ie. they include data points with timestamps
[start, end)
"""
import nilmdb.utils.time
import nilmdb.utils.iterator
class IntervalError(Exception):
"""Error due to interval overlap, etc"""
pass
# Interval
class Interval:
"""Represents an interval of time."""
def __init__(self, start, end):
"""
'start' and 'end' are arbitrary numbers that represent time
"""
if start >= end:
# Explicitly disallow zero-width intervals (since they're half-open)
raise IntervalError("start %s must precede end %s" % (start, end))
self.start = start
self.end = end
def __repr__(self):
s = repr(self.start) + ", " + repr(self.end)
return self.__class__.__name__ + "(" + s + ")"
def __str__(self):
return ("[" + nilmdb.utils.time.timestamp_to_string(self.start) +
" -> " + nilmdb.utils.time.timestamp_to_string(self.end) + ")")
def __cmp__(self, other):
"""Compare two intervals. If non-equal, order by start then end"""
return cmp(self.start, other.start) or cmp(self.end, other.end)
def intersects(self, other):
"""Return True if two Interval objects intersect"""
if not isinstance(other, Interval):
raise TypeError("need an Interval")
if self.end <= other.start or self.start >= other.end:
return False
return True
def subset(self, start, end):
"""Return a new Interval that is a subset of this one"""
# A subclass that tracks additional data might override this.
if start < self.start or end > self.end:
raise IntervalError("not a subset")
return Interval(start, end)
def set_difference(a, b):
"""
Compute the difference (a \\ b) between the intervals in 'a' and
the intervals in 'b'; i.e., the ranges that are present in 'self'
but not 'other'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
# Iterate through all starts and ends in sorted order. Add a
# tag to the iterator so that we can figure out which one they
# were, after sorting.
def decorate(it, key_start, key_end):
for i in it:
yield i.start, key_start, i
yield i.end, key_end, i
a_iter = decorate(iter(a), 0, 2)
b_iter = decorate(iter(b), 1, 3)
# Now iterate over the timestamps of each start and end.
# At each point, evaluate which type of end it is, to determine
# how to build up the output intervals.
a_interval = None
b_interval = None
out_start = None
for (ts, k, i) in nilmdb.utils.iterator.imerge(a_iter, b_iter):
if k == 0:
# start a interval
a_interval = i
if b_interval is None:
out_start = ts
elif k == 1:
# start b interval
b_interval = i
if out_start is not None and out_start != ts:
yield a_interval.subset(out_start, ts)
out_start = None
elif k == 2:
# end a interval
if out_start is not None and out_start != ts:
yield a_interval.subset(out_start, ts)
out_start = None
a_interval = None
elif k == 3:
# end b interval
b_interval = None
if a_interval:
out_start = ts

View File

@@ -1,100 +0,0 @@
import Queue
import threading
import sys
import contextlib
# This file provides a context manager that converts a function
# that takes a callback into a generator that returns an iterable.
# This is done by running the function in a new thread.
# Based partially on http://stackoverflow.com/questions/9968592/
class IteratorizerThread(threading.Thread):
def __init__(self, queue, function, curl_hack):
"""
function: function to execute, which takes the
callback (provided by this class) as an argument
"""
threading.Thread.__init__(self)
self.name = "Iteratorizer-" + function.__name__ + "-" + self.name
self.function = function
self.queue = queue
self.die = False
self.curl_hack = curl_hack
def callback(self, data):
try:
if self.die:
raise Exception() # trigger termination
self.queue.put((1, data))
except:
if self.curl_hack:
# We can't raise exceptions, because the pycurl
# extension module will unconditionally print the
# exception itself, and not pass it up to the caller.
# Instead, just return a value that tells curl to
# abort. (-1 would be best, in case we were given 0
# bytes, but the extension doesn't support that).
self.queue.put((2, sys.exc_info()))
return 0
raise
def run(self):
try:
result = self.function(self.callback)
except:
self.queue.put((2, sys.exc_info()))
else:
self.queue.put((0, result))
@contextlib.contextmanager
def Iteratorizer(function, curl_hack = False):
"""
Context manager that takes a function expecting a callback,
and provides an iterable that yields the values passed to that
callback instead.
function: function to execute, which takes a callback
(provided by this context manager) as an argument
with iteratorizer(func) as it:
for i in it:
print 'callback was passed:', i
print 'function returned:', it.retval
"""
queue = Queue.Queue(maxsize = 1)
thread = IteratorizerThread(queue, function, curl_hack)
thread.daemon = True
thread.start()
class iteratorizer_gen(object):
def __init__(self, queue):
self.queue = queue
self.retval = None
def __iter__(self):
return self
def next(self):
(typ, data) = self.queue.get()
if typ == 0:
# function has returned
self.retval = data
raise StopIteration
elif typ == 1:
# data is available
return data
else:
# callback raised an exception
raise data[0], data[1], data[2]
try:
yield iteratorizer_gen(queue)
finally:
# Ask the thread to die, if it's still running.
thread.die = True
while thread.isAlive():
try:
queue.get(True, 0.01)
except: # pragma: no cover
pass

33
nilmdb/utils/lock.py Normal file
View File

@@ -0,0 +1,33 @@
# File locking
import warnings
try:
import fcntl
import errno
def exclusive_lock(f):
"""Acquire an exclusive lock. Returns True on successful
lock, or False on error."""
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
return False
else: # pragma: no cover
raise
return True
def exclusive_unlock(f):
"""Release an exclusive lock."""
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except ImportError: # pragma: no cover
def exclusive_lock(f):
"""Dummy lock function -- does not lock!"""
warnings.warn("Pretending to lock " + str(f))
return True
def exclusive_unlock(f):
"""Release an exclusive lock."""
return

View File

@@ -15,7 +15,7 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
def wrap_class_method(wrapper): def wrap_class_method(wrapper):
try: try:
orig = getattr(cls, wrapper.__name__).im_func orig = getattr(cls, wrapper.__name__).im_func
except: except Exception:
orig = lambda x: None orig = lambda x: None
setattr(cls, wrapper.__name__, decorator.decorator(wrapper, orig)) setattr(cls, wrapper.__name__, decorator.decorator(wrapper, orig))

View File

@@ -1,5 +1,8 @@
from __future__ import absolute_import
from nilmdb.utils import datetime_tz from nilmdb.utils import datetime_tz
import re import re
import time
# Range # Range
min_timestamp = (-2**63) min_timestamp = (-2**63)
@@ -36,6 +39,7 @@ def unix_to_timestamp(unix):
"""Convert a Unix timestamp (floating point seconds since epoch) """Convert a Unix timestamp (floating point seconds since epoch)
into a NILM timestamp (integer microseconds since epoch)""" into a NILM timestamp (integer microseconds since epoch)"""
return int(round(unix * 1e6)) return int(round(unix * 1e6))
seconds_to_timestamp = unix_to_timestamp
def timestamp_to_unix(timestamp): def timestamp_to_unix(timestamp):
"""Convert a NILM timestamp (integer microseconds since epoch) """Convert a NILM timestamp (integer microseconds since epoch)
@@ -56,6 +60,19 @@ def parse_time(toparse):
timestamp, the current local timezone is assumed (e.g. from the TZ timestamp, the current local timezone is assumed (e.g. from the TZ
env var). env var).
""" """
if toparse == "min":
return min_timestamp
if toparse == "max":
return max_timestamp
# If it starts with @, treat it as a NILM timestamp
# (integer microseconds since epoch)
try:
if toparse[0] == '@':
return int(toparse[1:])
except (ValueError, KeyError):
pass
# If string isn't "now" and doesn't contain at least 4 digits, # If string isn't "now" and doesn't contain at least 4 digits,
# consider it invalid. smartparse might otherwise accept # consider it invalid. smartparse might otherwise accept
# empty strings and strings with just separators. # empty strings and strings with just separators.
@@ -69,14 +86,6 @@ def parse_time(toparse):
except (ValueError, OverflowError): except (ValueError, OverflowError):
pass pass
# If it starts with @, treat it as a NILM timestamp
# (integer microseconds since epoch)
try:
if toparse[0] == '@':
return int(toparse[1:])
except (ValueError, KeyError):
pass
# If it's parseable as a float, treat it as a Unix or NILM # If it's parseable as a float, treat it as a Unix or NILM
# timestamp based on its range. # timestamp based on its range.
try: try:
@@ -118,4 +127,4 @@ def parse_time(toparse):
def now(): def now():
"""Return current timestamp""" """Return current timestamp"""
return unix_to_timestamp(datetime_tz.datetime_tz.utcnow().totimestamp()) return unix_to_timestamp(time.time())

View File

@@ -39,11 +39,10 @@ versioneer.parentdir_prefix = 'nilmdb-'
# Hack to workaround logging/multiprocessing issue: # Hack to workaround logging/multiprocessing issue:
# https://groups.google.com/d/msg/nose-users/fnJ-kAUbYHQ/_UsLN786ygcJ # https://groups.google.com/d/msg/nose-users/fnJ-kAUbYHQ/_UsLN786ygcJ
try: import multiprocessing try: import multiprocessing
except: pass except Exception: pass
# Use Cython if it's new enough, otherwise use preexisting C files. # Use Cython if it's new enough, otherwise use preexisting C files.
cython_modules = [ 'nilmdb.server.interval', cython_modules = [ 'nilmdb.server.interval',
'nilmdb.server.layout',
'nilmdb.server.rbtree' ] 'nilmdb.server.rbtree' ]
try: try:
import Cython import Cython

28
tests/data/extract-8 Normal file
View File

@@ -0,0 +1,28 @@
# interval-start 1332496919900000
1332496919900000 2.523050e+05 2.254020e+05 4.779410e+03 3.638030e+03 8.138070e+03 4.334460e+03 1.083780e+03 3.743730e+03
1332496919908333 2.551190e+05 2.237870e+05 5.965640e+03 2.076350e+03 9.468790e+03 3.693880e+03 1.247860e+03 3.393680e+03
1332496919916667 2.616370e+05 2.247980e+05 4.848970e+03 2.315620e+03 9.323300e+03 4.225460e+03 1.805780e+03 2.593050e+03
1332496919925000 2.606460e+05 2.251300e+05 3.061360e+03 3.951840e+03 7.662910e+03 5.341410e+03 1.986520e+03 2.276780e+03
1332496919933333 2.559710e+05 2.235030e+05 4.096030e+03 3.296970e+03 7.827080e+03 5.452120e+03 2.492520e+03 2.929450e+03
1332496919941667 2.579260e+05 2.217080e+05 5.472320e+03 1.555700e+03 8.495760e+03 4.491140e+03 2.379780e+03 3.741710e+03
1332496919950000 2.610180e+05 2.242350e+05 4.669770e+03 1.876190e+03 8.366680e+03 3.677510e+03 9.021690e+02 3.549040e+03
1332496919958333 2.569150e+05 2.274650e+05 2.785070e+03 3.751930e+03 7.440320e+03 3.964860e+03 -3.227860e+02 2.460890e+03
1332496919966667 2.509510e+05 2.262000e+05 3.772710e+03 3.131950e+03 8.159860e+03 4.539860e+03 7.375190e+02 2.126750e+03
1332496919975000 2.556710e+05 2.223720e+05 5.826200e+03 8.715560e+02 9.120240e+03 4.545110e+03 2.804310e+03 2.721000e+03
1332496919983333 2.649730e+05 2.214860e+05 5.839130e+03 4.659180e+02 8.628300e+03 3.934870e+03 2.972490e+03 3.773730e+03
1332496919991667 2.652170e+05 2.233920e+05 3.718770e+03 2.834970e+03 7.209900e+03 3.460260e+03 1.324930e+03 4.075960e+03
# interval-end 1332496919991668
# interval-start 1332496920000000
1332496920000000 2.564370e+05 2.244300e+05 4.011610e+03 3.475340e+03 7.495890e+03 3.388940e+03 2.613970e+02 3.731260e+03
1332496920008333 2.539630e+05 2.241670e+05 5.621070e+03 1.548010e+03 9.165170e+03 3.522930e+03 1.058930e+03 2.996960e+03
1332496920016667 2.585080e+05 2.249300e+05 6.011400e+03 8.188660e+02 9.039950e+03 4.482440e+03 2.490390e+03 2.679340e+03
1332496920025000 2.596270e+05 2.260220e+05 4.474500e+03 2.423020e+03 7.414190e+03 5.071970e+03 2.439380e+03 2.962960e+03
1332496920033333 2.551870e+05 2.246320e+05 4.738570e+03 3.398040e+03 7.395120e+03 4.726450e+03 1.839030e+03 3.393530e+03
1332496920041667 2.571020e+05 2.216230e+05 6.144130e+03 1.441090e+03 8.756480e+03 3.495320e+03 1.869940e+03 3.752530e+03
1332496920050000 2.636530e+05 2.217700e+05 6.221770e+03 7.389620e+02 9.547600e+03 2.666820e+03 1.462660e+03 3.332570e+03
1332496920058333 2.636130e+05 2.252560e+05 4.477120e+03 2.437450e+03 8.510210e+03 3.855630e+03 9.594420e+02 2.387180e+03
1332496920066667 2.553500e+05 2.262640e+05 4.283720e+03 3.923940e+03 7.912470e+03 5.466520e+03 1.284990e+03 2.093720e+03
1332496920075000 2.527270e+05 2.246090e+05 5.851930e+03 2.491980e+03 8.540630e+03 5.623050e+03 2.339780e+03 3.007140e+03
1332496920083333 2.584750e+05 2.235780e+05 5.924870e+03 1.394480e+03 8.779620e+03 4.544180e+03 2.132030e+03 3.849760e+03
1332496920091667 2.615630e+05 2.246090e+05 4.336140e+03 2.455750e+03 8.055380e+03 3.469110e+03 6.278730e+02 3.664200e+03
# interval-end 1332496920100000

View File

@@ -24,7 +24,7 @@ class JimOrderPlugin(nose.plugins.Plugin):
name, workingDir=loader.workingDir) name, workingDir=loader.workingDir)
try: try:
order = os.path.join(addr.filename, "test.order") order = os.path.join(addr.filename, "test.order")
except: except Exception:
order = None order = None
if order and os.path.exists(order): if order and os.path.exists(order):
files = [] files = []

View File

@@ -4,10 +4,8 @@ test_lrucache.py
test_mustclose.py test_mustclose.py
test_serializer.py test_serializer.py
test_iteratorizer.py
test_timestamper.py test_timestamper.py
test_layout.py
test_rbtree.py test_rbtree.py
test_interval.py test_interval.py

View File

@@ -30,6 +30,11 @@ class TestBulkData(object):
else: else:
data = BulkData(db, file_size = size, files_per_dir = files) data = BulkData(db, file_size = size, files_per_dir = files)
# Try opening it again (should result in locking error)
with assert_raises(IOError) as e:
data2 = BulkData(db)
in_("already locked by another process", str(e.exception))
# create empty # create empty
with assert_raises(ValueError): with assert_raises(ValueError):
data.create("/foo", "uint16_8") data.create("/foo", "uint16_8")

View File

@@ -311,11 +311,11 @@ class TestClient(object):
# Trigger a curl error in generator # Trigger a curl error in generator
with assert_raises(ServerError) as e: with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl/").next() client.http.get_gen("http://nosuchurl.example.com./").next()
# Trigger a curl error in generator # Trigger a curl error in generator
with assert_raises(ServerError) as e: with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl/").next() client.http.get_gen("http://nosuchurl.example.com./").next()
# Check 404 for missing streams # Check 404 for missing streams
for function in [ client.stream_intervals, client.stream_extract ]: for function in [ client.stream_intervals, client.stream_extract ]:
@@ -375,6 +375,7 @@ class TestClient(object):
# Delete streams that exist # Delete streams that exist
for stream in client.stream_list(): for stream in client.stream_list():
client.stream_remove(stream[0])
client.stream_destroy(stream[0]) client.stream_destroy(stream[0])
# Database is empty # Database is empty
@@ -459,6 +460,7 @@ class TestClient(object):
ctx.update_start(109) ctx.update_start(109)
ctx.insert("110 1\n") ctx.insert("110 1\n")
ctx.insert("111 1\n") ctx.insert("111 1\n")
ctx.send()
ctx.insert("112 1\n") ctx.insert("112 1\n")
ctx.insert("113 1\n") ctx.insert("113 1\n")
ctx.insert("114 1\n") ctx.insert("114 1\n")
@@ -506,6 +508,10 @@ class TestClient(object):
[ 109, 118 ], [ 109, 118 ],
[ 200, 300 ] ]) [ 200, 300 ] ])
# destroy stream (try without removing data first)
with assert_raises(ClientError):
client.stream_destroy("/context/test")
client.stream_remove("/context/test")
client.stream_destroy("/context/test") client.stream_destroy("/context/test")
client.close() client.close()
@@ -600,6 +606,7 @@ class TestClient(object):
]) ])
# Clean up # Clean up
client.stream_remove("/empty/test")
client.stream_destroy("/empty/test") client.stream_destroy("/empty/test")
client.close() client.close()
@@ -613,7 +620,7 @@ class TestClient(object):
poolmanager = c.http._last_response.connection.poolmanager poolmanager = c.http._last_response.connection.poolmanager
pool = poolmanager.pools[('http','localhost',32180)] pool = poolmanager.pools[('http','localhost',32180)]
return (pool.num_connections, pool.num_requests) return (pool.num_connections, pool.num_requests)
except: except Exception:
raise SkipTest("can't get connection info") raise SkipTest("can't get connection info")
# First request makes a connection # First request makes a connection
@@ -635,8 +642,9 @@ class TestClient(object):
eq_(connections(), (1, 5)) eq_(connections(), (1, 5))
# Clean up # Clean up
c.stream_remove("/persist/test")
c.stream_destroy("/persist/test") c.stream_destroy("/persist/test")
eq_(connections(), (1, 6)) eq_(connections(), (1, 7))
def test_client_13_timestamp_rounding(self): def test_client_13_timestamp_rounding(self):
# Test potentially bad timestamps (due to floating point # Test potentially bad timestamps (due to floating point
@@ -661,5 +669,6 @@ class TestClient(object):
# Server will round this and give an error on finalize() # Server will round this and give an error on finalize()
ctx.insert("299999999.99 1\n") ctx.insert("299999999.99 1\n")
client.stream_remove("/rounding/test")
client.stream_destroy("/rounding/test") client.stream_destroy("/rounding/test")
client.close() client.close()

View File

@@ -21,12 +21,13 @@ from testutil.helpers import *
testdb = "tests/cmdline-testdb" testdb = "tests/cmdline-testdb"
def server_start(max_results = None, bulkdata_args = {}): def server_start(max_results = None, max_removals = None, bulkdata_args = {}):
global test_server, test_db global test_server, test_db
# Start web app on a custom port # Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)( test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
testdb, testdb,
max_results = max_results, max_results = max_results,
max_removals = max_removals,
bulkdata_args = bulkdata_args) bulkdata_args = bulkdata_args)
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
port = 32180, stoppable = False, port = 32180, stoppable = False,
@@ -233,6 +234,8 @@ class TestCmdline(object):
eq_(parse_time("1333648800.0"), test) eq_(parse_time("1333648800.0"), test)
eq_(parse_time("1333648800000000"), test) eq_(parse_time("1333648800000000"), test)
eq_(parse_time("@1333648800000000"), test) eq_(parse_time("@1333648800000000"), test)
eq_(parse_time("min"), nilmdb.utils.time.min_timestamp)
eq_(parse_time("max"), nilmdb.utils.time.max_timestamp)
with assert_raises(ValueError): with assert_raises(ValueError):
parse_time("@hashtag12345") parse_time("@hashtag12345")
@@ -366,6 +369,8 @@ class TestCmdline(object):
self.contain("No stream at path") self.contain("No stream at path")
self.fail("metadata /newton/nosuchstream --set foo=bar") self.fail("metadata /newton/nosuchstream --set foo=bar")
self.contain("No stream at path") self.contain("No stream at path")
self.fail("metadata /newton/nosuchstream --delete")
self.contain("No stream at path")
self.ok("metadata /newton/prep") self.ok("metadata /newton/prep")
self.match("description=The Data\nv_scale=1.234\n") self.match("description=The Data\nv_scale=1.234\n")
@@ -391,6 +396,19 @@ class TestCmdline(object):
self.fail("metadata /newton/nosuchpath") self.fail("metadata /newton/nosuchpath")
self.contain("No stream at path /newton/nosuchpath") self.contain("No stream at path /newton/nosuchpath")
self.ok("metadata /newton/prep --delete")
self.ok("metadata /newton/prep --get")
self.match("")
self.ok("metadata /newton/prep --set "
"'description=The Data' "
"v_scale=1.234")
self.ok("metadata /newton/prep --delete v_scale")
self.ok("metadata /newton/prep --get")
self.match("description=The Data\n")
self.ok("metadata /newton/prep --set description=")
self.ok("metadata /newton/prep --get")
self.match("")
def test_06_insert(self): def test_06_insert(self):
self.ok("insert --help") self.ok("insert --help")
@@ -593,6 +611,8 @@ class TestCmdline(object):
test(6, "10:00:30", "10:00:31", extra="-b") test(6, "10:00:30", "10:00:31", extra="-b")
test(7, "10:00:30", "10:00:30.999", extra="-a -T") test(7, "10:00:30", "10:00:30.999", extra="-a -T")
test(7, "10:00:30", "10:00:30.999", extra="-a --timestamp-raw") test(7, "10:00:30", "10:00:30.999", extra="-a --timestamp-raw")
test(8, "10:01:59.9", "10:02:00.1", extra="--markup")
test(8, "10:01:59.9", "10:02:00.1", extra="-m")
# all data put in by tests # all data put in by tests
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01") self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
@@ -600,6 +620,11 @@ class TestCmdline(object):
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01") self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("43200\n") self.match("43200\n")
# markup for 3 intervals, plus extra markup lines whenever we had
# a "restart" from the nilmdb.stream_extract function
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
lines_(self.captured, 43210)
def test_09_truncated(self): def test_09_truncated(self):
# Test truncated responses by overriding the nilmdb max_results # Test truncated responses by overriding the nilmdb max_results
server_stop() server_stop()
@@ -699,11 +724,9 @@ class TestCmdline(object):
# Reinsert some data, to verify that no overlaps with deleted # Reinsert some data, to verify that no overlaps with deleted
# data are reported # data are reported
os.environ['TZ'] = "UTC" for minute in ["0", "2"]:
self.ok("insert --timestamp -f --rate 120 /newton/prep " self.ok("insert --timestamp -f --rate 120 /newton/prep"
"tests/data/prep-20120323T1000") " tests/data/prep-20120323T100" + minute)
self.ok("insert -t --filename --rate 120 /newton/prep "
"tests/data/prep-20120323T1002")
def test_11_destroy(self): def test_11_destroy(self):
# Delete records # Delete records
@@ -715,6 +738,9 @@ class TestCmdline(object):
self.fail("destroy /no/such/stream") self.fail("destroy /no/such/stream")
self.contain("No stream at path") self.contain("No stream at path")
self.fail("destroy -R /no/such/stream")
self.contain("No stream at path")
self.fail("destroy asdfasdf") self.fail("destroy asdfasdf")
self.contain("No stream at path") self.contain("No stream at path")
@@ -728,8 +754,14 @@ class TestCmdline(object):
self.ok("list --detail") self.ok("list --detail")
lines_(self.captured, 7) lines_(self.captured, 7)
# Delete some # Fail to destroy because intervals still present
self.ok("destroy /newton/prep") self.fail("destroy /newton/prep")
self.contain("all intervals must be removed")
self.ok("list --detail")
lines_(self.captured, 7)
# Destroy for real
self.ok("destroy -R /newton/prep")
self.ok("list") self.ok("list")
self.match("/newton/raw uint16_6\n" self.match("/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n") "/newton/zzz/rawnotch uint16_9\n")
@@ -740,7 +772,8 @@ class TestCmdline(object):
self.ok("destroy /newton/raw") self.ok("destroy /newton/raw")
self.ok("create /newton/raw uint16_6") self.ok("create /newton/raw uint16_6")
self.ok("destroy /newton/raw") # Specify --remove with no data
self.ok("destroy --remove /newton/raw")
self.ok("list") self.ok("list")
self.match("") self.match("")
@@ -815,7 +848,7 @@ class TestCmdline(object):
# Now recreate the data one more time and make sure there are # Now recreate the data one more time and make sure there are
# fewer files. # fewer files.
self.ok("destroy /newton/prep") self.ok("destroy --remove /newton/prep")
self.fail("destroy /newton/prep") # already destroyed self.fail("destroy /newton/prep") # already destroyed
self.ok("create /newton/prep float32_8") self.ok("create /newton/prep float32_8")
os.environ['TZ'] = "UTC" os.environ['TZ'] = "UTC"
@@ -826,14 +859,16 @@ class TestCmdline(object):
for (dirpath, dirnames, filenames) in os.walk(testdb): for (dirpath, dirnames, filenames) in os.walk(testdb):
nfiles += len(filenames) nfiles += len(filenames)
lt_(nfiles, 50) lt_(nfiles, 50)
self.ok("destroy /newton/prep") # destroy again self.ok("destroy -R /newton/prep") # destroy again
def test_14_remove_files(self): def test_14_remove_files(self):
# Test BulkData's ability to remove when data is split into # Test BulkData's ability to remove when data is split into
# multiple files. Should be a fairly comprehensive test of # multiple files. Should be a fairly comprehensive test of
# remove functionality. # remove functionality.
# Also limit max_removals, to cover more functionality.
server_stop() server_stop()
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file server_start(max_removals = 4321,
bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 }) "files_per_dir" : 3 })
# Insert data. Just for fun, insert out of order # Insert data. Just for fun, insert out of order
@@ -974,8 +1009,8 @@ class TestCmdline(object):
self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -" self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n") "> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
self.ok("destroy /diff/1") self.ok("destroy -R /diff/1")
self.ok("destroy /diff/2") self.ok("destroy -R /diff/2")
def test_16_rename(self): def test_16_rename(self):
# Test renaming. Force file size smaller so we get more files # Test renaming. Force file size smaller so we get more files
@@ -1018,10 +1053,12 @@ class TestCmdline(object):
self.contain("old and new paths are the same") self.contain("old and new paths are the same")
check_path("newton", "prep") check_path("newton", "prep")
self.fail("rename /newton/prep /newton") self.fail("rename /newton/prep /newton")
self.contain("subdirs of this path already exist") self.contain("path must contain at least one folder")
self.fail("rename /newton/prep /newton/prep/") self.fail("rename /newton/prep /newton/prep/")
self.contain("invalid path") self.contain("invalid path")
self.ok("rename /newton/prep /newton/foo") self.ok("rename /newton/prep /newton/foo/1")
check_path("newton", "foo", "1")
self.ok("rename /newton/foo/1 /newton/foo")
check_path("newton", "foo") check_path("newton", "foo")
self.ok("rename /newton/foo /totally/different/thing") self.ok("rename /newton/foo /totally/different/thing")
check_path("totally", "different", "thing") check_path("totally", "different", "thing")
@@ -1039,7 +1076,7 @@ class TestCmdline(object):
self.fail("rename /foo/bar /xxx/yyy/zzz/www") self.fail("rename /foo/bar /xxx/yyy/zzz/www")
self.contain("path is subdir of existing node") self.contain("path is subdir of existing node")
self.ok("rename /foo/bar /xxx/yyy/mmm") self.ok("rename /foo/bar /xxx/yyy/mmm")
self.ok("destroy /xxx/yyy/zzz") self.ok("destroy -R /xxx/yyy/zzz")
check_path("xxx", "yyy", "mmm") check_path("xxx", "yyy", "mmm")
# Extract it at the final path # Extract it at the final path
@@ -1047,7 +1084,7 @@ class TestCmdline(object):
"--end '2012-03-23 10:04:01'") "--end '2012-03-23 10:04:01'")
eq_(self.captured, extract_before) eq_(self.captured, extract_before)
self.ok("destroy /xxx/yyy/mmm") self.ok("destroy -R /xxx/yyy/mmm")
# Make sure temporary rename dirs weren't left around # Make sure temporary rename dirs weren't left around
for (dirpath, dirnames, filenames) in os.walk(testdb): for (dirpath, dirnames, filenames) in os.walk(testdb):

View File

@@ -8,8 +8,11 @@ from nose.tools import *
from nose.tools import assert_raises from nose.tools import assert_raises
import itertools import itertools
from nilmdb.server.interval import (Interval, DBInterval, from nilmdb.utils.interval import IntervalError
IntervalSet, IntervalError) from nilmdb.server.interval import Interval, DBInterval, IntervalSet
# so we can test them separately
from nilmdb.utils.interval import Interval as UtilsInterval
from testutil.helpers import * from testutil.helpers import *
import unittest import unittest
@@ -47,6 +50,15 @@ def makeset(string):
return iset return iset
class TestInterval: class TestInterval:
def test_client_interval(self):
# Run interval tests against the Python version of Interval.
global Interval
NilmdbInterval = Interval
Interval = UtilsInterval
self.test_interval()
self.test_interval_intersect()
Interval = NilmdbInterval
def test_interval(self): def test_interval(self):
# Test Interval class # Test Interval class
os.environ['TZ'] = "America/New_York" os.environ['TZ'] = "America/New_York"
@@ -222,7 +234,7 @@ class TestInterval:
eq_(ab,c) eq_(ab,c)
# a \ b == d # a \ b == d
eq_(IntervalSet(a.set_difference(b)), d) eq_(IntervalSet(nilmdb.utils.interval.set_difference(a,b)), d)
# Intersection with intervals # Intersection with intervals
do_test(makeset("[---|---)[)"), do_test(makeset("[---|---)[)"),
@@ -287,10 +299,11 @@ class TestInterval:
b = makeset("[-) [--) [)") b = makeset("[-) [--) [)")
c = makeset("[----) ") c = makeset("[----) ")
d = makeset(" [-) ") d = makeset(" [-) ")
eq_(a.set_difference(b, list(c)[0]), d) eq_(nilmdb.utils.interval.set_difference(
a.intersection(list(c)[0]), b.intersection(list(c)[0])), d)
# Empty second set # Empty second set
eq_(a.set_difference(IntervalSet()), a) eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)
class TestIntervalDB: class TestIntervalDB:
def test_dbinterval(self): def test_dbinterval(self):
@@ -372,14 +385,13 @@ class TestIntervalSpeed:
def test_interval_speed(self): def test_interval_speed(self):
import yappi import yappi
import time import time
import testutil.aplotter as aplotter
import random import random
import math import math
print print
yappi.start() yappi.start()
speeds = {} speeds = {}
limit = 10 # was 20 limit = 22 # was 20
for j in [ 2**x for x in range(5,limit) ]: for j in [ 2**x for x in range(5,limit) ]:
start = time.time() start = time.time()
iset = IntervalSet() iset = IntervalSet()
@@ -393,6 +405,5 @@ class TestIntervalSpeed:
speed/j, speed/j,
speed / (j*math.log(j))) # should be constant speed / (j*math.log(j))) # should be constant
speeds[j] = speed speeds[j] = speed
aplotter.plot(speeds.keys(), speeds.values(), plot_slope=True)
yappi.stop() yappi.stop()
yappi.print_stats(sort_type=yappi.SORTTYPE_TTOT, limit=10) yappi.print_stats(sort_type=yappi.SORTTYPE_TTOT, limit=10)

View File

@@ -1,61 +0,0 @@
import nilmdb
from nilmdb.utils.printf import *
import nose
from nose.tools import *
from nose.tools import assert_raises
import threading
import time
from testutil.helpers import *
def func_with_callback(a, b, callback):
callback(a)
callback(b)
callback(a+b)
return "return value"
class TestIteratorizer(object):
def test(self):
# First try it with a normal callback
self.result = ""
def cb(x):
self.result += str(x)
func_with_callback(1, 2, cb)
eq_(self.result, "123")
# Now make it an iterator
result = ""
f = lambda x: func_with_callback(1, 2, x)
with nilmdb.utils.Iteratorizer(f) as it:
for i in it:
result += str(i)
eq_(result, "123")
eq_(it.retval, "return value")
# Make sure things work when an exception occurs
result = ""
with nilmdb.utils.Iteratorizer(
lambda x: func_with_callback(1, "a", x)) as it:
with assert_raises(TypeError) as e:
for i in it:
result += str(i)
eq_(result, "1a")
# Now try to trigger the case where we stop iterating
# mid-generator, and expect the iteratorizer to clean up after
# itself. This doesn't have a particular result in the test,
# but gains coverage.
def foo():
with nilmdb.utils.Iteratorizer(f) as it:
it.next()
foo()
eq_(it.retval, None)
# Do the same thing when the curl hack is applied
def foo():
with nilmdb.utils.Iteratorizer(f, curl_hack = True) as it:
it.next()
foo()
eq_(it.retval, None)

View File

@@ -1,266 +0,0 @@
# -*- coding: utf-8 -*-
import nilmdb
from nilmdb.utils.printf import *
from nose.tools import *
from nose.tools import assert_raises
import distutils.version
import itertools
import os
import sys
import random
import unittest
from testutil.helpers import *
from nilmdb.server.layout import *
class TestLayouts(object):
# Some nilmdb.layout tests. Not complete, just fills in missing
# coverage.
def test_layouts(self):
x = nilmdb.server.layout.get_named("float32_8")
y = nilmdb.server.layout.get_named("float32_8")
eq_(x.count, y.count)
eq_(x.datatype, y.datatype)
y = nilmdb.server.layout.get_named("float32_7")
ne_(x.count, y.count)
eq_(x.datatype, y.datatype)
def test_parsing(self):
self.real_t_parsing("float32_8", "uint16_6", "uint16_9")
self.real_t_parsing("float32_8", "uint16_6", "uint16_9")
def real_t_parsing(self, name_prep, name_raw, name_rawnotch):
# invalid layouts
with assert_raises(TypeError) as e:
parser = Parser("NoSuchLayout")
with assert_raises(TypeError) as e:
parser = Parser("float32")
# too little data
parser = Parser(name_prep)
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5\n")
with assert_raises(ParserError) as e:
parser.parse(data)
in_("error", str(e.exception))
# too much data
parser = Parser(name_prep)
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9\n")
with assert_raises(ParserError) as e:
parser.parse(data)
in_("error", str(e.exception))
# just right
parser = Parser(name_prep)
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n")
parser.parse(data)
eq_(parser.min_timestamp, 1234567890.0)
eq_(parser.max_timestamp, 1234567890.1)
eq_(parser.data, [[1234567890.0,1.1,2.2,3.3,4.4,5.5,6.6,7.7,8.8],
[1234567890.1,1.1,2.2,3.3,4.4,5.5,6.6,7.7,8.8]])
# try uint16_6 too, with clamping
parser = Parser(name_raw)
data = ( "1234567890.000000 1 2 3 4 5 6\n" +
"1234567890.100000 1 2 3 4 5 6\n" )
parser.parse(data)
eq_(parser.data, [[1234567890.0,1,2,3,4,5,6],
[1234567890.1,1,2,3,4,5,6]])
# pass an instantiated class
parser = Parser(get_named(name_rawnotch))
data = ( "1234567890.000000 1 2 3 4 5 6 7 8 9\n" +
"1234567890.100000 1 2 3 4 5 6 7 8 9\n" )
parser.parse(data)
# non-monotonic
parser = Parser(name_raw)
data = ( "1234567890.100000 1 2 3 4 5 6\n" +
"1234567890.099999 1 2 3 4 5 6\n" )
with assert_raises(ParserError) as e:
parser.parse(data)
in_("not monotonically increasing", str(e.exception))
parser = Parser(name_raw)
data = ( "1234567890.100000 1 2 3 4 5 6\n" +
"1234567890.100000 1 2 3 4 5 6\n" )
with assert_raises(ParserError) as e:
parser.parse(data)
in_("not monotonically increasing", str(e.exception))
parser = Parser(name_raw)
data = ( "1234567890.100000 1 2 3 4 5 6\n" +
"1234567890.100001 1 2 3 4 5 6\n" )
parser.parse(data)
# uint16_6 with values out of bounds
parser = Parser(name_raw)
data = ( "1234567890.000000 1 2 3 4 500000 6\n" +
"1234567890.100000 1 2 3 4 5 6\n" )
with assert_raises(ParserError) as e:
parser.parse(data)
in_("value out of range", str(e.exception))
# Empty data should work but is useless
parser = Parser(name_raw)
data = ""
parser.parse(data)
assert(parser.min_timestamp is None)
assert(parser.max_timestamp is None)
def test_formatting(self):
self.real_t_formatting("float32_8", "uint16_6", "uint16_9")
self.real_t_formatting("float32_8", "uint16_6", "uint16_9")
def real_t_formatting(self, name_prep, name_raw, name_rawnotch):
# invalid layout
with assert_raises(TypeError) as e:
formatter = Formatter("NoSuchLayout")
# too little data
formatter = Formatter(name_prep)
data = [ [ 1234567890.000000, 1.1, 2.2, 3.3, 4.4, 5.5 ],
[ 1234567890.100000, 1.1, 2.2, 3.3, 4.4, 5.5 ] ]
with assert_raises(FormatterError) as e:
formatter.format(data)
in_("error", str(e.exception))
# too much data
formatter = Formatter(name_prep)
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ],
[ 1234567890.100000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ]
with assert_raises(FormatterError) as e:
formatter.format(data)
in_("error", str(e.exception))
# just right
formatter = Formatter(name_prep)
data = [ [ 1234567890.000000, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8 ],
[ 1234567890.100000, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8 ] ]
text = formatter.format(data)
eq_(text,
"1234567890.000000 1.100000e+00 2.200000e+00 3.300000e+00 "
"4.400000e+00 5.500000e+00 6.600000e+00 7.700000e+00 "
"8.800000e+00\n" +
"1234567890.100000 1.100000e+00 2.200000e+00 3.300000e+00 "
"4.400000e+00 5.500000e+00 6.600000e+00 7.700000e+00 "
"8.800000e+00\n")
# try uint16_6 too
formatter = Formatter(name_raw)
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6 ],
[ 1234567890.100000, 1, 2, 3, 4, 5, 6 ] ]
text = formatter.format(data)
eq_(text,
"1234567890.000000 1 2 3 4 5 6\n" +
"1234567890.100000 1 2 3 4 5 6\n")
# pass an instantiated class
formatter = Formatter(get_named(name_rawnotch))
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ],
[ 1234567890.100000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ]
text = formatter.format(data)
eq_(text,
"1234567890.000000 1 2 3 4 5 6 7 8 9\n" +
"1234567890.100000 1 2 3 4 5 6 7 8 9\n")
# Empty data should work but is useless
formatter = Formatter(name_raw)
data = []
text = formatter.format(data)
eq_(text, "")
def test_roundtrip(self):
self.real_t_roundtrip("float32_8", "uint16_6", "uint16_9")
self.real_t_roundtrip("float32_8", "uint16_6", "uint16_9")
def real_t_roundtrip(self, name_prep, name_raw, name_rawnotch):
# Verify that textual data passed into the Parser, and then
# back through the Formatter, then back into the Parser,
# gives identical parsed representations
random.seed(12345)
def do_roundtrip(layout, datagen):
for i in range(100):
rows = random.randint(1,100)
data = ""
ts = 1234567890
for r in range(rows):
ts += random.uniform(0,1)
row = sprintf("%f", ts) + " "
row += " ".join(datagen())
row += "\n"
data += row
parser1 = Parser(layout)
formatter = Formatter(layout)
parser2 = Parser(layout)
parser1.parse(data)
parser2.parse(formatter.format(parser1.data))
eq_(parser1.data, parser2.data)
def datagen():
return [ sprintf("%.6e", random.uniform(-1000,1000))
for x in range(8) ]
do_roundtrip(name_prep, datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(6) ]
do_roundtrip(name_raw, datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(9) ]
do_roundtrip(name_rawnotch, datagen)
class TestLayoutSpeed:
@unittest.skip("this is slow")
def test_layout_speed(self):
import time
random.seed(54321)
def do_speedtest(layout, datagen, rows = 5000, times = 100):
# Build data once
data = ""
ts = 1234567890
for r in range(rows):
ts += random.uniform(0,1)
row = sprintf("%f", ts) + " "
row += " ".join(datagen())
row += "\n"
data += row
# Do lots of roundtrips
start = time.time()
for i in range(times):
parser = Parser(layout)
formatter = Formatter(layout)
parser.parse(data)
formatter.format(parser.data)
elapsed = time.time() - start
printf("roundtrip %s: %d ms, %.1f μs/row, %d rows/sec\n",
layout,
elapsed * 1e3,
(elapsed * 1e6) / (rows * times),
(rows * times) / elapsed)
print ""
def datagen():
return [ sprintf("%.6e", random.uniform(-1000,1000))
for x in range(10) ]
do_speedtest("float32_10", datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(10) ]
do_speedtest("uint16_10", datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(6) ]
do_speedtest("uint16_6", datagen)

View File

@@ -28,9 +28,6 @@ class Test00Nilmdb(object): # named 00 so it runs first
def test_NilmDB(self): def test_NilmDB(self):
recursive_unlink(testdb) recursive_unlink(testdb)
with assert_raises(IOError):
nilmdb.server.NilmDB("/nonexistant-db/foo")
db = nilmdb.server.NilmDB(testdb) db = nilmdb.server.NilmDB(testdb)
db.close() db.close()
db = nilmdb.server.NilmDB(testdb) db = nilmdb.server.NilmDB(testdb)

View File

@@ -18,7 +18,7 @@ class TestPrintf(object):
printf("hello, world: %d", 123) printf("hello, world: %d", 123)
fprintf(test2, "hello too: %d", 123) fprintf(test2, "hello too: %d", 123)
test3 = sprintf("hello three: %d", 123) test3 = sprintf("hello three: %d", 123)
except: except Exception:
sys.stdout = old_stdout sys.stdout = old_stdout
raise raise
sys.stdout = old_stdout sys.stdout = old_stdout

View File

@@ -1,419 +0,0 @@
#-----------------------------------------------
#aplotter.py - ascii art function plotter
#Copyright (c) 2006, Imri Goldberg
#All rights reserved.
#
#Redistribution and use in source and binary forms,
#with or without modification, are permitted provided
#that the following conditions are met:
#
# * Redistributions of source code must retain the
# above copyright notice, this list of conditions
# and the following disclaimer.
# * Redistributions in binary form must reproduce the
# above copyright notice, this list of conditions
# and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of the <ORGANIZATION> nor the names of
# its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
#THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
#AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
#ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
#LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
#DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
#SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
#CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
#OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
#OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#-----------------------------------------------
import math
EPSILON = 0.000001
def transposed(mat):
result = []
for i in xrange(len(mat[0])):
result.append([x[i] for x in mat])
return result
def y_reversed(mat):
result = []
for i in range(len(mat)):
result.append(list(reversed(mat[i])))
return result
def sign(x):
if 0<x:
return 1
if 0 == x:
return 0
return -1
class Plotter(object):
class PlotData(object):
def __init__(self, x_size, y_size, min_x, max_x, min_y, max_y, x_mod, y_mod):
self.x_size = x_size
self.y_size = y_size
self.min_x = min_x
self.max_x = max_x
self.min_y = min_y
self.max_y = max_y
self.x_mod = x_mod
self.y_mod = y_mod
self.x_step = float(max_x - min_x)/float(self.x_size)
self.y_step = float(max_y - min_y)/float(self.y_size)
self.inv_x_step = 1/self.x_step
self.inv_y_step = 1/self.y_step
self.ratio = self.y_step / self.x_step
def __repr__(self):
s = "size: %s, bl: %s, tr: %s, step: %s" % ((self.x_size, self.y_size), (self.min_x, self.min_y), (self.max_x, self.max_y),
(self.x_step, self.y_step))
return s
def __init__(self, **kwargs):
self.x_size = kwargs.get("x_size", 80)
self.y_size = kwargs.get("y_size", 20)
self.will_draw_axes = kwargs.get("draw_axes", True)
self.new_line = kwargs.get("newline", "\n")
self.dot = kwargs.get("dot", "*")
self.plot_slope = kwargs.get("plot_slope", True)
self.x_margin = kwargs.get("x_margin", 0.05)
self.y_margin = kwargs.get("y_margin", 0.1)
self.will_plot_labels = kwargs.get("plot_labels", True)
@staticmethod
def get_symbol_by_slope(slope, default_symbol):
draw_symbol = default_symbol
if slope > math.tan(3*math.pi/8):
draw_symbol = "|"
elif slope > math.tan(math.pi/8) and slope < math.tan(3*math.pi/8):
draw_symbol = "/"
elif abs(slope) < math.tan(math.pi/8):
draw_symbol = "-"
elif slope < math.tan(-math.pi/8) and slope > math.tan(-3*math.pi/8):
draw_symbol = "\\"
elif slope < math.tan(-3*math.pi/8):
draw_symbol = "|"
return draw_symbol
def plot_labels(self, output_buffer, plot_data):
if plot_data.y_size < 2:
return
margin_factor = 1
do_plot_x_label = True
do_plot_y_label = True
x_str = "%+g"
if plot_data.x_size < 16:
do_plot_x_label = False
elif plot_data.x_size < 23:
x_str = "%+.2g"
y_str = "%+g"
if plot_data.x_size < 8:
do_plot_y_label = False
elif plot_data.x_size < 11:
y_str = "%+.2g"
act_min_x = (plot_data.min_x + plot_data.x_mod*margin_factor)
act_max_x = (plot_data.max_x - plot_data.x_mod*margin_factor)
act_min_y = (plot_data.min_y + plot_data.y_mod*margin_factor)
act_max_y = (plot_data.max_y - plot_data.y_mod*margin_factor)
if abs(act_min_x) < 1:
min_x_str = "%+.2g" % act_min_x
else:
min_x_str = x_str % act_min_x
if abs(act_max_x) < 1:
max_x_str = "%+.2g" % act_max_x
else:
max_x_str = x_str % act_max_x
if abs(act_min_y) < 1:
min_y_str = "%+.2g" % act_min_y
else:
min_y_str = y_str % act_min_y
if abs(act_max_y) < 1:
max_y_str = "%+.2g" % act_max_y
else:
max_y_str = y_str % act_max_y
min_x_coord = self.get_coord(act_min_x,plot_data.min_x,plot_data.x_step)
max_x_coord = self.get_coord(act_max_x,plot_data.min_x,plot_data.x_step)
min_y_coord = self.get_coord(act_min_y,plot_data.min_y,plot_data.y_step)
max_y_coord = self.get_coord(act_max_y,plot_data.min_y,plot_data.y_step)
#print plot_data
y_zero_coord = self.get_coord(0, plot_data.min_y, plot_data.y_step)
#if plot_data.min_x < 0 and plot_data.max_x > 0:
x_zero_coord = self.get_coord(0, plot_data.min_x, plot_data.x_step)
#else:
#pass
output_buffer[x_zero_coord][min_y_coord] = "+"
output_buffer[x_zero_coord][max_y_coord] = "+"
output_buffer[min_x_coord][y_zero_coord] = "+"
output_buffer[max_x_coord][y_zero_coord] = "+"
if do_plot_x_label:
for i,c in enumerate(min_x_str):
output_buffer[min_x_coord+i][y_zero_coord-1] = c
for i,c in enumerate(max_x_str):
output_buffer[max_x_coord+i-len(max_x_str)][y_zero_coord-1] = c
if do_plot_y_label:
for i,c in enumerate(max_y_str):
output_buffer[x_zero_coord+i][max_y_coord] = c
for i,c in enumerate(min_y_str):
output_buffer[x_zero_coord+i][min_y_coord] = c
def plot_data(self, xy_seq, output_buffer, plot_data):
if self.plot_slope:
xy_seq = list(xy_seq)
#sort according to the x coord
xy_seq.sort(key = lambda c: c[0])
prev_p = xy_seq[0]
e_xy_seq = enumerate(xy_seq)
e_xy_seq.next()
for i,(x,y) in e_xy_seq:
draw_symbol = self.dot
line_drawn = self.plot_line(prev_p, (x,y), output_buffer, plot_data)
prev_p = (x,y)
if not line_drawn:
if i > 0 and i < len(xy_seq)-1:
px,py = xy_seq[i-1]
nx,ny = xy_seq[i+1]
if abs(nx-px) > EPSILON:
slope = (1.0/plot_data.ratio)*(ny-py)/(nx-px)
draw_symbol = self.get_symbol_by_slope(slope, draw_symbol)
if x < plot_data.min_x or x >= plot_data.max_x or y < plot_data.min_y or y >= plot_data.max_y:
continue
x_coord = self.get_coord(x, plot_data.min_x, plot_data.x_step)
y_coord = self.get_coord(y, plot_data.min_y, plot_data.y_step)
if x_coord >= 0 and x_coord < len(output_buffer) and y_coord >= 0 and y_coord < len(output_buffer[0]):
if self.draw_axes:
if y_coord == self.get_coord(0, plot_data.min_y, plot_data.y_step) and draw_symbol == "-":
draw_symbol = "="
output_buffer[x_coord][y_coord] = draw_symbol
else:
for x,y in xy_seq:
if x < plot_data.min_x or x >= plot_data.max_x or y < plot_data.min_y or y >= plot_data.max_y:
continue
x_coord = self.get_coord(x, plot_data.min_x, plot_data.x_step)
y_coord = self.get_coord(y, plot_data.min_y, plot_data.y_step)
if x_coord >= 0 and x_coord < len(output_buffer) and y_coord > 0 and y_coord < len(output_buffer[0]):
output_buffer[x_coord][y_coord] = self.dot
def plot_line(self, start, end, output_buffer, plot_data):
start_coord = self.get_coord(start[0], plot_data.min_x, plot_data.x_step), self.get_coord(start[1], plot_data.min_y, plot_data.y_step)
end_coord = self.get_coord(end[0], plot_data.min_x, plot_data.x_step), self.get_coord(end[1], plot_data.min_y, plot_data.y_step)
x0,y0 = start_coord
x1,y1 = end_coord
if (x0,y0) == (x1,y1):
return True
clipped_line = clip_line(start, end, (plot_data.min_x, plot_data.min_y), (plot_data.max_x, plot_data.max_y))
if clipped_line != None:
start,end = clipped_line
else:
return False
start_coord = self.get_coord(start[0], plot_data.min_x, plot_data.x_step), self.get_coord(start[1], plot_data.min_y, plot_data.y_step)
end_coord = self.get_coord(end[0], plot_data.min_x, plot_data.x_step), self.get_coord(end[1], plot_data.min_y, plot_data.y_step)
x0,y0 = start_coord
x1,y1 = end_coord
if (x0,y0) == (x1,y1):
return True
x_zero_coord = self.get_coord(0, plot_data.min_x, plot_data.x_step)
y_zero_coord = self.get_coord(0, plot_data.min_y, plot_data.y_step)
if start[0]-end[0] == 0:
draw_symbol = "|"
else:
slope = (1.0/plot_data.ratio)*(end[1]-start[1])/(end[0]-start[0])
draw_symbol = self.get_symbol_by_slope(slope, self.dot)
try:
delta = x1-x0, y1-y0
if abs(delta[0])>abs(delta[1]):
s = sign(delta[0])
slope = float(delta[1])/delta[0]
for i in range(0,abs(int(delta[0]))):
cur_draw_symbol = draw_symbol
x = i*s
cur_y = int(y0+slope*x)
if self.draw_axes and cur_y == y_zero_coord and draw_symbol == "-":
cur_draw_symbol = "="
output_buffer[x0+x][cur_y] = cur_draw_symbol
else:
s = sign(delta[1])
slope = float(delta[0])/delta[1]
for i in range(0,abs(int(delta[1]))):
y = i*s
cur_draw_symbol = draw_symbol
cur_y = y0+y
if self.draw_axes and cur_y == y_zero_coord and draw_symbol == "-":
cur_draw_symbol = "="
output_buffer[int(x0+slope*y)][cur_y] = cur_draw_symbol
except:
print start, end
print start_coord, end_coord
print plot_data
raise
return False
def plot_single(self, seq, min_x = None, max_x = None, min_y = None, max_y = None):
return self.plot_double(range(len(seq)),seq, min_x, max_x, min_y, max_y)
def plot_double(self, x_seq, y_seq, min_x = None, max_x = None, min_y = None, max_y = None):
if min_x == None:
min_x = min(x_seq)
if max_x == None:
max_x = max(x_seq)
if min_y == None:
min_y = min(y_seq)
if max_y == None:
max_y = max(y_seq)
if max_y == min_y:
max_y += 1
x_mod = (max_x-min_x)*self.x_margin
y_mod = (max_y-min_y)*self.y_margin
min_x-=x_mod
max_x+=x_mod
min_y-=y_mod
max_y+=y_mod
plot_data = self.PlotData(self.x_size, self.y_size, min_x, max_x, min_y, max_y, x_mod, y_mod)
output_buffer = [[" "]*self.y_size for i in range(self.x_size)]
if self.will_draw_axes:
self.draw_axes(output_buffer, plot_data)
self.plot_data(zip(x_seq, y_seq), output_buffer, plot_data)
if self.will_plot_labels:
self.plot_labels(output_buffer, plot_data)
trans_result = transposed(y_reversed(output_buffer))
result = self.new_line.join(["".join(row) for row in trans_result])
return result
def draw_axes(self, output_buffer, plot_data):
draw_x = False
draw_y = False
if plot_data.min_x <= 0 and plot_data.max_x > 0:
draw_y = True
zero_x = self.get_coord(0, plot_data.min_x, plot_data.x_step)
for y in xrange(plot_data.y_size):
output_buffer[zero_x][y] = "|"
if plot_data.min_y <= 0 and plot_data.max_y > 0:
draw_x = True
zero_y = self.get_coord(0, plot_data.min_y, plot_data.y_step)
for x in xrange(plot_data.x_size):
output_buffer[x][zero_y] = "-"
if draw_x and draw_y:
output_buffer[zero_x][zero_y] = "+"
@staticmethod
def get_coord(val, min, step):
result = int((val - min)/step)
return result
def clip_line(line_pt_1, line_pt_2, rect_bottom_left, rect_top_right):
ts = [0.0,1.0]
if line_pt_1[0] == line_pt_2[0]:
return ((line_pt_1[0], max(min(line_pt_1[1], line_pt_2[1]), rect_bottom_left[1])),
(line_pt_1[0], min(max(line_pt_1[1], line_pt_2[1]), rect_top_right[1])))
if line_pt_1[1] == line_pt_2[1]:
return ((max(min(line_pt_1[0], line_pt_2[0]), rect_bottom_left[0]), line_pt_1[1]),
(min(max(line_pt_1[0], line_pt_2[0]), rect_top_right[0]), line_pt_1[1]))
if ((rect_bottom_left[0] <= line_pt_1[0] and line_pt_1[0] < rect_top_right[0]) and
(rect_bottom_left[1] <= line_pt_1[1] and line_pt_1[1] < rect_top_right[1]) and
(rect_bottom_left[0] <= line_pt_2[0] and line_pt_2[0] < rect_top_right[0]) and
(rect_bottom_left[1] <= line_pt_2[1] and line_pt_2[1] < rect_top_right[1])):
return line_pt_1, line_pt_2
ts.append( float(rect_bottom_left[0]-line_pt_1[0])/(line_pt_2[0]-line_pt_1[0]) )
ts.append( float(rect_top_right[0]-line_pt_1[0])/(line_pt_2[0]-line_pt_1[0]) )
ts.append( float(rect_bottom_left[1]-line_pt_1[1])/(line_pt_2[1]-line_pt_1[1]) )
ts.append( float(rect_top_right[1]-line_pt_1[1])/(line_pt_2[1]-line_pt_1[1]) )
ts.sort()
if ts[2] < 0 or ts[2] >= 1 or ts[3] < 0 or ts[2]>= 1:
return None
result = [(pt_1 + t*(pt_2-pt_1)) for t in (ts[2],ts[3]) for (pt_1, pt_2) in zip(line_pt_1, line_pt_2)]
return (result[0],result[1]), (result[2], result[3])
def plot(*args,**flags):
limit_flags_names = set(["min_x","min_y","max_x","max_y"])
limit_flags = dict([(n,flags[n]) for n in limit_flags_names & set(flags)])
settting_flags = dict([(n,flags[n]) for n in set(flags) - limit_flags_names])
if len(args) == 1:
p = Plotter(**settting_flags)
print p.plot_single(args[0],**limit_flags)
elif len(args) == 2:
p = Plotter(**settting_flags)
print p.plot_double(args[0],args[1],**limit_flags)
else:
raise NotImplementedError("can't draw multiple graphs yet")
__all__ = ["Plotter","plot"]