Compare commits

..

32 Commits

Author SHA1 Message Date
b5fefffa09 Use a global cached server object for WSGI app
This is instead of caching it inside nilmdb.server.wsgi_application.
Might make things work a bit better in case the web server decides
to call wsgi_application multiple times.
2013-03-30 15:56:57 -04:00
dccb3e370a WSGI config needs to specify application group
This ensures that the same Python sub-instance handles the request,
even if it's coming in from two different virtual hosts.
2013-03-30 15:56:02 -04:00
95ca55aa7e Print out WSGI environment on DB init failure 2013-03-30 15:55:41 -04:00
e01813f29d Fix wsgi documentation 2013-03-25 13:52:32 -04:00
7f41e117a2 Fix tabs 2013-03-25 13:44:03 -04:00
dd5fc806e5 Restructure WSGI app to regenerate error on each call, if needed
This way, errors like "database already locked" can be fixed and the
page reloaded, without needing to restart Apache.
2013-03-24 21:52:11 -04:00
f8ca8d31e6 Remove Iteratorizer, as it's no longer needed 2013-03-24 21:31:03 -04:00
ed89d803f0 Remove aplotter code 2013-03-24 21:29:09 -04:00
3d24092cd2 Replace bare 'except:' with 'except: Exception'
Otherwise we might inadvertently catch SystemExit or KeyboardExit or
something we don't want to catch.
2013-03-24 21:28:01 -04:00
304bb43d85 Move lockfile out of data dir, to avoid stream tree conflicts 2013-03-24 21:23:45 -04:00
59a79a30a5 Remove lockfile when done.
This isn't necessary for correct behavior: if the database is killed,
the old flock() will go away when the file descriptor gets closed.
2013-03-24 21:20:47 -04:00
c0d450d39e Add locking mechanism to avoid multiple servers on one DB 2013-03-24 21:20:20 -04:00
6f14d609b2 Fix issue where bulkdata was accidentally closed 2013-03-24 21:16:18 -04:00
77ef87456f Improve WSGI application support, fix docs 2013-03-24 21:16:03 -04:00
32d6af935c Improve wsgi docs 2013-03-22 19:17:36 -04:00
6af3a6fc41 Add WSGI application support and documentation 2013-03-22 19:14:34 -04:00
f8a06fb3b7 Clarify default DB path in nilmdb_server.py help text 2013-03-22 15:09:37 -04:00
e790bb9e8a Fix test failure when tests are run as root 2013-03-21 14:33:02 -04:00
89be6f5931 Add option to include interval start/end markup on extract
When enabled, lines like "# interval-start 1234567890123456" and "#
interval-end 1234567890123456" will be added to the data output.  Note
that there may be an "interval-end" timestamp followed by an identical
"interval-start" timestamp, if the response at the nilmdb level was
split up into multiple chunks.

In general, assume contiguous data if previous_interval_end ==
new_interval_start.
2013-03-19 14:23:33 -04:00
4cdef3285d Destroy now requires that all data has been previously removed.
Added new flag "-R" to command line to perform an automatic removal.
This should be the last of the ways in which a single command could
block the nilmdb thread for a long time.
2013-03-18 19:39:03 -04:00
bcd82c4d59 Limit the number of rows removed per call to nilmdb.stream_remove
Server class will retry as needed, as with stream_extract and
stream_intervals.
2013-03-18 18:22:45 -04:00
caf63ab01f Fix stream_extract/stream_intervals restart around timestamp == 0. 2013-03-18 18:20:25 -04:00
2d72891162 Accept "min" and "max" as timestamps on command line 2013-03-18 18:19:24 -04:00
cda2ac3e77 Don't return a mutable interval from IntervalSet.intersection()
Instead, always take the subset, which creates a new interval.
Also adds a small optimization by moving the 'if orig' check outside the
loop.
2013-03-18 18:16:35 -04:00
57d3d60f6a Fix relative import problems 2013-03-18 16:27:27 -04:00
d6b5befe76 Don't use filenames as default arg completion 2013-03-16 17:27:58 -04:00
7429c1788d Update nilmdb.utils.time 2013-03-15 22:49:59 -04:00
0ef71c193b Remove layout.pyx, since rocket replaced it 2013-03-15 22:32:40 -04:00
4a50dd015e Merge branch 'python-intervals' 2013-03-15 21:39:11 -04:00
22274550ab Test python version of Interval too 2013-03-15 21:37:03 -04:00
4f06d6ae68 Move Interval set_difference inside nilmdb.utils for clients
Clients might need to to Interval math too, so move a simple Interval
class and start putting helpers in there.
2013-03-15 21:37:03 -04:00
c54d8041c3 Update design docs 2013-03-15 21:07:01 -04:00
34 changed files with 544 additions and 1236 deletions

View File

@@ -24,3 +24,5 @@ Usage:
nilmdb-server --help
nilmtool --help
See docs/wsgi.md for info on setting up a WSGI application in Apache.

View File

@@ -186,6 +186,19 @@ IntervalSet speed
- rbtree and interval converted to cython:
8.4 μS, total 12 s, 134 MB RAM
- Would like to move Interval itself back to Python so other
non-cythonized code like client code can use it more easily.
Testing speed with just `test_interval` being tested, with
`range(5,22)`, using `/usr/bin/time -v python tests/runtests.py`,
times recorded for 2097152:
- 52ae397 (Interval in cython):
12.6133 μs each, ratio 0.866533, total 47 sec, 399 MB RAM
- 9759dcf (Interval in python):
21.2937 μs each, ratio 1.462870, total 83 sec, 1107 MB RAM
That's a huge difference! Instead, will keep Interval and DBInterval
cythonized inside nilmdb, and just have an additional copy in
nilmdb.utils for clients to use.
Layouts
-------
Current/old design has specific layouts: RawData, PrepData, RawNotchedData.

32
docs/wsgi.md Normal file
View File

@@ -0,0 +1,32 @@
WSGI Application in Apache
--------------------------
Install `apache2` and `libapache2-mod-wsgi`
We'll set up the database server at URL `http://myhost.com/nilmdb`.
The database will be stored in `/home/nilm/db`, and the process will
run as user `nilm`, group `nilm`.
First, create a WSGI script `/home/nilm/nilmdb.wsgi` containing:
import nilmdb.server
application = nilmdb.server.wsgi_application("/home/nilm/db", "/nilmdb")
The first parameter is the local filesystem path, and the second
parameter is the path part of the URL.
Then, set up Apache with a configuration like:
<VirtualHost>
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
WSGIApplicationGroup nilmdb-appgroup
WSGIProcessGroup nilmdb-procgroup
WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm
# Access control example:
<Location /nilmdb>
Order deny,allow
Deny from all
Allow from 1.2.3.4
</Location>
</VirtualHost>

View File

@@ -17,4 +17,4 @@ _nilmtool_argcomplete() {
unset COMPREPLY
fi
}
complete -o nospace -o default -F _nilmtool_argcomplete nilmtool
complete -o nospace -F _nilmtool_argcomplete nilmtool

View File

@@ -97,7 +97,7 @@ class Client(object):
return self.http.post("stream/create", params)
def stream_destroy(self, path):
"""Delete stream and its contents"""
"""Delete stream. Fails if any data is still present."""
params = { "path": path }
return self.http.post("stream/destroy", params)
@@ -171,7 +171,8 @@ class Client(object):
params["end"] = timestamp_to_string(end)
return self.http.get_gen("stream/intervals", params)
def stream_extract(self, path, start = None, end = None, count = False):
def stream_extract(self, path, start = None, end = None,
count = False, markup = False):
"""
Extract data from a stream. Returns a generator that yields
lines of ASCII-formatted data that matches the database
@@ -179,6 +180,9 @@ class Client(object):
Specify count = True to return a count of matching data points
rather than the actual data. The output format is unchanged.
Specify markup = True to include comments in the returned data
that indicate interval starts and ends.
"""
params = {
"path": path,
@@ -189,6 +193,8 @@ class Client(object):
params["end"] = timestamp_to_string(end)
if count:
params["count"] = 1
if markup:
params["markup"] = 1
return self.http.get_gen("stream/extract", params)
def stream_count(self, path, start = None, end = None):

View File

@@ -16,7 +16,7 @@ class HTTPClient(object):
reparsed = urlparse.urlparse(baseurl).geturl()
if '://' not in reparsed:
reparsed = urlparse.urlparse("http://" + baseurl).geturl()
self.baseurl = reparsed
self.baseurl = reparsed.rstrip('/') + '/'
# Build Requests session object, enable SSL verification
self.session = requests.Session()

View File

@@ -7,11 +7,14 @@ def setup(self, sub):
cmd = sub.add_parser("destroy", help="Delete a stream and all data",
formatter_class = def_form,
description="""
Destroy the stream at the specified path. All
data and metadata related to the stream is
permanently deleted.
Destroy the stream at the specified path.
The stream must be empty. All metadata
related to the stream is permanently deleted.
""")
cmd.set_defaults(handler = cmd_destroy)
group = cmd.add_argument_group("Options")
group.add_argument("-R", "--remove", action="store_true",
help="Remove all data before destroying stream")
group = cmd.add_argument_group("Required arguments")
group.add_argument("path",
help="Path of the stream to delete, e.g. /foo/bar",
@@ -20,6 +23,11 @@ def setup(self, sub):
def cmd_destroy(self):
"""Destroy stream"""
if self.args.remove:
try:
count = self.client.stream_remove(self.args.path)
except nilmdb.client.ClientError as e:
self.die("error removing data: %s", str(e))
try:
self.client.stream_destroy(self.args.path)
except nilmdb.client.ClientError as e:

View File

@@ -29,6 +29,8 @@ def setup(self, sub):
group.add_argument("-a", "--annotate", action="store_true",
help="Include comments with some information "
"about the stream")
group.add_argument("-m", "--markup", action="store_true",
help="Include comments with interval starts and ends")
group.add_argument("-T", "--timestamp-raw", action="store_true",
help="Show raw timestamps in annotated information")
group.add_argument("-c", "--count", action="store_true",
@@ -61,7 +63,8 @@ def cmd_extract(self):
for dataline in self.client.stream_extract(self.args.path,
self.args.start,
self.args.end,
self.args.count):
self.args.count,
self.args.markup):
if self.args.bare and not self.args.count:
# Strip timestamp (first element). Doesn't make sense
# if we are only returning a count.

View File

@@ -22,7 +22,7 @@ def main():
group.add_argument('-p', '--port', help = 'Listen on the given port',
type = int, default = 12380)
group.add_argument('-d', '--database', help = 'Database directory',
default = os.path.join(os.getcwd(), "db"))
default = "./db")
group.add_argument('-q', '--quiet', help = 'Silence output',
action = 'store_true')
group.add_argument('-t', '--traceback',

View File

@@ -17,5 +17,5 @@ except (ImportError, TypeError): # pragma: no cover
pass
from nilmdb.server.nilmdb import NilmDB
from nilmdb.server.server import Server
from nilmdb.server.server import Server, wsgi_application
from nilmdb.server.errors import NilmDBError, StreamError, OverlapError

View File

@@ -14,6 +14,7 @@ import re
import sys
import tempfile
import nilmdb.utils.lock
from . import rocket
# Up to 256 open file descriptors at any given time.
@@ -26,6 +27,8 @@ class BulkData(object):
def __init__(self, basepath, **kwargs):
self.basepath = basepath
self.root = os.path.join(self.basepath, "data")
self.lock = self.root + ".lock"
self.lockfile = None
# Tuneables
if "file_size" in kwargs:
@@ -44,8 +47,22 @@ class BulkData(object):
if not os.path.isdir(self.root):
os.mkdir(self.root)
# Create the lock
self.lockfile = open(self.lock, "w")
if not nilmdb.utils.lock.exclusive_lock(self.lockfile):
raise IOError('database at "' + self.basepath +
'" is already locked by another process')
def close(self):
self.getnode.cache_remove_all()
if self.lockfile:
nilmdb.utils.lock.exclusive_unlock(self.lockfile)
self.lockfile.close()
try:
os.unlink(self.lock)
except OSError: # pragma: no cover
pass
self.lockfile = None
def _encode_filename(self, path):
# Encode all paths to UTF-8, regardless of sys.getfilesystemencoding(),
@@ -134,7 +151,7 @@ class BulkData(object):
# Open and cache it
self.getnode(unicodepath)
except:
except Exception:
exc_info = sys.exc_info()
try:
os.rmdir(ospath)
@@ -371,7 +388,7 @@ class Table(object):
# Try deleting subdir, too
try:
os.rmdir(os.path.join(self.root, subdir))
except:
except Exception:
pass
# Cache open files
@@ -504,7 +521,7 @@ class Table(object):
with open(cachefile, "rb") as f:
ranges = pickle.load(f)
cachefile_present = True
except:
except Exception:
ranges = []
cachefile_present = False

View File

@@ -1,5 +1,9 @@
"""Interval, IntervalSet
The Interval implemented here is just like
nilmdb.utils.interval.Interval, except implemented in Cython for
speed.
Represents an interval of time, and a set of such intervals.
Intervals are half-open, ie. they include data points with timestamps
@@ -23,6 +27,7 @@ from ..utils.time import min_timestamp as nilmdb_min_timestamp
from ..utils.time import max_timestamp as nilmdb_max_timestamp
from ..utils.time import timestamp_to_string
from ..utils.iterator import imerge
from ..utils.interval import IntervalError
import itertools
cimport rbtree
@@ -30,10 +35,6 @@ from libc.stdint cimport uint64_t, int64_t
ctypedef int64_t timestamp_t
class IntervalError(Exception):
"""Error due to interval overlap, etc"""
pass
cdef class Interval:
"""Represents an interval of time."""
@@ -59,17 +60,7 @@ cdef class Interval:
def __cmp__(self, Interval other):
"""Compare two intervals. If non-equal, order by start then end"""
if not isinstance(other, Interval):
raise TypeError("bad type")
if self.start == other.start:
if self.end < other.end:
return -1
if self.end > other.end:
return 1
return 0
if self.start < other.start:
return -1
return 1
return cmp(self.start, other.start) or cmp(self.end, other.end)
cpdef intersects(self, Interval other):
"""Return True if two Interval objects intersect"""
@@ -295,80 +286,18 @@ cdef class IntervalSet:
(potentially) subsetted to make the one that is being
returned.
"""
if not isinstance(interval, Interval):
raise TypeError("bad type")
for n in self.tree.intersect(interval.start, interval.end):
i = n.obj
if i:
if i.start >= interval.start and i.end <= interval.end:
if orig:
yield (i, i)
else:
yield i
else:
subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end))
if orig:
yield (subset, i)
else:
yield subset
def set_difference(self, IntervalSet other not None,
Interval bounds = None):
"""
Compute the difference (self \\ other) between this
IntervalSet and the given IntervalSet; i.e., the ranges
that are present in 'self' but not 'other'.
If 'bounds' is not None, results are limited to the range
specified by the interval 'bounds'.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (self).
"""
# Iterate through all starts and ends in sorted order. Add a
# tag to the iterator so that we can figure out which one they
# were, after sorting.
def decorate(it, key_start, key_end):
for i in it:
yield i.start, key_start, i
yield i.end, key_end, i
if bounds is None:
bounds = Interval(nilmdb_min_timestamp,
nilmdb_max_timestamp)
self_iter = decorate(self.intersection(bounds), 0, 2)
other_iter = decorate(other.intersection(bounds), 1, 3)
# Now iterate over the timestamps of each start and end.
# At each point, evaluate which type of end it is, to determine
# how to build up the output intervals.
self_interval = None
other_interval = None
out_start = None
for (ts, k, i) in imerge(self_iter, other_iter):
if k == 0:
# start self interval
self_interval = i
if other_interval is None:
out_start = ts
elif k == 1:
# start other interval
other_interval = i
if out_start is not None and out_start != ts:
yield self_interval.subset(out_start, ts)
out_start = None
elif k == 2:
# end self interval
if out_start is not None and out_start != ts:
yield self_interval.subset(out_start, ts)
out_start = None
self_interval = None
elif k == 3:
# end other interval
other_interval = None
if self_interval:
out_start = ts
if orig:
for n in self.tree.intersect(interval.start, interval.end):
i = n.obj
subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end))
yield (subset, i)
else:
for n in self.tree.intersect(interval.start, interval.end):
i = n.obj
subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end))
yield subset
cpdef intersects(self, Interval other):
"""Return True if this IntervalSet intersects another interval"""

View File

@@ -1,204 +0,0 @@
# cython: profile=False
import time
import sys
import inspect
import cStringIO
from ..utils.time import min_timestamp as nilmdb_min_timestamp
cdef enum:
max_value_count = 64
cimport cython
cimport libc.stdlib
cimport libc.stdio
cimport libc.string
class ParserError(Exception):
def __init__(self, line, message):
self.message = "line " + str(line) + ": " + message
Exception.__init__(self, self.message)
class FormatterError(Exception):
pass
class Layout:
"""Represents a NILM database layout"""
def __init__(self, typestring):
"""Initialize this Layout object to handle the specified
type string"""
try:
[ datatype, count ] = typestring.split("_")
except:
raise KeyError("invalid layout string")
try:
self.count = int(count)
except ValueError:
raise KeyError("invalid count")
if self.count < 1 or self.count > max_value_count:
raise KeyError("invalid count")
if datatype == 'uint16':
self.parse = self.parse_uint16
self.format_str = "%.6f" + " %d" * self.count
self.format = self.format_generic
elif datatype == 'float32':
self.parse = self.parse_float64
self.format_str = "%.6f" + " %.6e" * self.count
self.format = self.format_generic
elif datatype == 'float64':
self.parse = self.parse_float64
self.format_str = "%.6f" + " %.16e" * self.count
self.format = self.format_generic
else:
raise KeyError("invalid type")
self.datatype = datatype
# Parsers
def parse_float64(self, char *text):
cdef int n
cdef double ts
# Return doubles even in float32 case, since they're going into
# a Python array which would upconvert to double anyway.
result = [0] * (self.count + 1)
cdef char *end
ts = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("bad timestamp")
result[0] = ts
for n in range(self.count):
text = end
result[n+1] = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("wrong number of values")
n = 0
while end[n] == ' ':
n += 1
if end[n] != '\n' and end[n] != '#' and end[n] != '\0':
raise ValueError("extra data on line")
return (ts, result)
def parse_uint16(self, char *text):
cdef int n
cdef double ts
cdef int v
cdef char *end
result = [0] * (self.count + 1)
ts = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("bad timestamp")
result[0] = ts
for n in range(self.count):
text = end
v = libc.stdlib.strtol(text, &end, 10)
if v < 0 or v > 65535:
raise ValueError("value out of range")
result[n+1] = v
if end == text:
raise ValueError("wrong number of values")
n = 0
while end[n] == ' ':
n += 1
if end[n] != '\n' and end[n] != '#' and end[n] != '\0':
raise ValueError("extra data on line")
return (ts, result)
# Formatters
def format_generic(self, d):
n = len(d) - 1
if n != self.count:
raise ValueError("wrong number of values for layout type: "
"got %d, wanted %d" % (n, self.count))
return (self.format_str % tuple(d)) + "\n"
# Get a layout by name
def get_named(typestring):
try:
return Layout(typestring)
except KeyError:
compat = { "PrepData": "float32_8",
"RawData": "uint16_6",
"RawNotchedData": "uint16_9" }
return Layout(compat[typestring])
class Parser(object):
"""Object that parses and stores ASCII data for inclusion into the
database"""
def __init__(self, layout):
if issubclass(layout.__class__, Layout):
self.layout = layout
else:
try:
self.layout = get_named(layout)
except KeyError:
raise TypeError("unknown layout")
self.data = []
self.min_timestamp = None
self.max_timestamp = None
def parse(self, textdata):
"""
Parse the data, provided as lines of text, using the current
layout, into an internal data structure suitable for a
pytables 'table.append(parser.data)'.
"""
cdef double last_ts = nilmdb_min_timestamp
cdef double ts
cdef int n = 0, i
cdef char *line
indata = cStringIO.StringIO(textdata)
# Assume any parsing error is a real error.
# In the future we might want to skip completely empty lines,
# or partial lines right before EOF?
try:
self.data = []
for pyline in indata:
line = pyline
n += 1
if line[0] == '\#':
continue
(ts, row) = self.layout.parse(line)
if ts <= last_ts:
raise ValueError("timestamp is not "
"monotonically increasing")
last_ts = ts
self.data.append(row)
except (ValueError, IndexError, TypeError) as e:
raise ParserError(n, "error: " + e.message)
# Mark timestamp ranges
if len(self.data):
self.min_timestamp = self.data[0][0]
self.max_timestamp = self.data[-1][0]
class Formatter(object):
"""Object that formats database data into ASCII"""
def __init__(self, layout):
if issubclass(layout.__class__, Layout):
self.layout = layout
else:
try:
self.layout = get_named(layout)
except KeyError:
raise TypeError("unknown layout")
def format(self, data):
"""
Format raw data from the database, using the current layout,
as lines of ACSII text.
"""
text = cStringIO.StringIO()
try:
for row in data:
text.write(self.layout.format(row))
except (ValueError, IndexError, TypeError) as e:
raise FormatterError("formatting error: " + e.message)
return text.getvalue()

View File

@@ -12,8 +12,11 @@ Manages both the SQL database and the table storage backend.
from __future__ import absolute_import
import nilmdb.utils
from nilmdb.utils.printf import *
from nilmdb.server.interval import (Interval, DBInterval,
IntervalSet, IntervalError)
from nilmdb.utils.time import timestamp_to_string
from nilmdb.utils.interval import IntervalError
from nilmdb.server.interval import Interval, DBInterval, IntervalSet
from nilmdb.server import bulkdata
from nilmdb.server.errors import NilmDBError, StreamError, OverlapError
@@ -81,7 +84,18 @@ class NilmDB(object):
verbose = 0
def __init__(self, basepath, max_results=None,
bulkdata_args=None):
max_removals=None, bulkdata_args=None):
"""Initialize NilmDB at the given basepath.
Other arguments are for debugging / testing:
'max_results' is the max rows to send in a single
stream_intervals or stream_extract response.
'max_removals' is the max rows to delete at once
in stream_move.
'bulkdata_args' is kwargs for the bulkdata module.
"""
if bulkdata_args is None:
bulkdata_args = {}
@@ -92,7 +106,9 @@ class NilmDB(object):
try:
os.makedirs(self.basepath)
except OSError as e:
if e.errno != errno.EEXIST:
if e.errno != errno.EEXIST: # pragma: no cover
# (no coverage, because it's hard to trigger this case
# if tests are run as root)
raise IOError("can't create tree " + self.basepath)
# Our data goes inside it
@@ -103,19 +119,20 @@ class NilmDB(object):
self.con = sqlite3.connect(sqlfilename, check_same_thread = True)
try:
self._sql_schema_update()
finally: # pragma: no cover
except Exception: # pragma: no cover
self.data.close()
raise
# See big comment at top about the performance implications of this
self.con.execute("PRAGMA synchronous=NORMAL")
self.con.execute("PRAGMA journal_mode=WAL")
# Approximate largest number of elements that we want to send
# in a single reply (for stream_intervals, stream_extract)
if max_results:
self.max_results = max_results
else:
self.max_results = 16384
# in a single reply (for stream_intervals, stream_extract).
self.max_results = max_results or 16384
# Remove up to this many rows per call to stream_remove.
self.max_removals = max_removals or 1048576
def get_basepath(self):
return self.basepath
@@ -328,18 +345,18 @@ class NilmDB(object):
List all intervals in 'path' between 'start' and 'end'. If
'diffpath' is not none, list instead the set-difference
between the intervals in the two streams; i.e. all interval
ranges that are present in 'path' but not 'path2'.
ranges that are present in 'path' but not 'diffpath'.
Returns (intervals, restart) tuple.
intervals is a list of [start,end] timestamps of all intervals
'intervals' is a list of [start,end] timestamps of all intervals
that exist for path, between start and end.
restart, if nonzero, means that there were too many results to
return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated,
and a new request with a start time of 'restart' will fetch
the next block of data.
'restart', if not None, means that there were too many results
to return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated, and
a new request with a start time of 'restart' will fetch the
next block of data.
"""
stream_id = self._stream_id(path)
intervals = self._get_intervals(stream_id)
@@ -350,7 +367,9 @@ class NilmDB(object):
requested = Interval(start, end)
result = []
if diffpath:
getter = intervals.set_difference(diffintervals, requested)
getter = nilmdb.utils.interval.set_difference(
intervals.intersection(requested),
diffintervals.intersection(requested))
else:
getter = intervals.intersection(requested)
for n, i in enumerate(getter):
@@ -359,7 +378,7 @@ class NilmDB(object):
break
result.append([i.start, i.end])
else:
restart = 0
restart = None
return (result, restart)
def stream_create(self, path, layout_name):
@@ -435,17 +454,22 @@ class NilmDB(object):
(newpath, stream_id))
def stream_destroy(self, path):
"""Fully remove a table and all of its data from the database.
No way to undo it! Metadata is removed."""
"""Fully remove a table from the database. Fails if there are
any intervals data present; remove them first. Metadata is
also removed."""
stream_id = self._stream_id(path)
# Delete the cached interval data (if it was cached)
# Verify that no intervals are present, and clear the cache
iset = self._get_intervals(stream_id)
if len(iset):
raise NilmDBError("all intervals must be removed before "
"destroying a stream")
self._get_intervals.cache_remove(self, stream_id)
# Delete the data
# Delete the bulkdata storage
self.data.destroy(path)
# Delete metadata, stream, intervals
# Delete metadata, stream, intervals (should be none)
with self.con as con:
con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
@@ -513,23 +537,28 @@ class NilmDB(object):
dbinterval.db_startpos,
dbinterval.db_endpos)
def stream_extract(self, path, start = None, end = None, count = False):
def stream_extract(self, path, start = None, end = None,
count = False, markup = False):
"""
Returns (data, restart) tuple.
data is ASCII-formatted data from the database, formatted
'data' is ASCII-formatted data from the database, formatted
according to the layout of the stream.
restart, if nonzero, means that there were too many results to
'restart', if not None, means that there were too many results to
return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated,
and a new request with a start time of 'restart' will fetch
the next block of data.
count, if true, means to not return raw data, but just the count
'count', if true, means to not return raw data, but just the count
of rows that would have been returned. This is much faster
than actually fetching the data. It is not limited by
max_results.
'markup', if true, indicates that returned data should be
marked with a comment denoting when a particular interval
starts, and another comment when an interval ends.
"""
stream_id = self._stream_id(path)
table = self.data.getnode(path)
@@ -539,7 +568,7 @@ class NilmDB(object):
result = []
matched = 0
remaining = self.max_results
restart = 0
restart = None
for interval in intervals.intersection(requested):
# Reading single rows from the table is too slow, so
# we use two bisections to find both the starting and
@@ -558,14 +587,26 @@ class NilmDB(object):
row_end = row_max
restart = table[row_max]
# Add markup
if markup:
result.append("# interval-start " +
timestamp_to_string(interval.start) + "\n")
# Gather these results up
result.append(table.get_data(row_start, row_end))
# Count them
remaining -= row_end - row_start
if restart:
# Add markup, and exit if restart is set.
if restart is not None:
if markup:
result.append("# interval-end " +
timestamp_to_string(restart) + "\n")
break
if markup:
result.append("# interval-end " +
timestamp_to_string(interval.end) + "\n")
if count:
return matched
@@ -574,9 +615,17 @@ class NilmDB(object):
def stream_remove(self, path, start = None, end = None):
"""
Remove data from the specified time interval within a stream.
Removes all data in the interval [start, end), and intervals
are truncated or split appropriately. Returns the number of
data points removed.
Removes data in the interval [start, end), and intervals are
truncated or split appropriately.
Returns a (removed, restart) tuple.
'removed' is the number of data points that were removed.
'restart', if not None, means there were too many rows to
remove in a single request. This function should be called
again with a start time of 'restart' to complete the removal.
"""
stream_id = self._stream_id(path)
table = self.data.getnode(path)
@@ -584,6 +633,8 @@ class NilmDB(object):
(start, end) = self._check_user_times(start, end)
to_remove = Interval(start, end)
removed = 0
remaining = self.max_removals
restart = None
# Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now.
@@ -594,6 +645,13 @@ class NilmDB(object):
row_start = self._find_start(table, dbint)
row_end = self._find_end(table, dbint)
# Shorten it if we'll hit the maximum number of removals
row_max = row_start + remaining
if row_max < row_end:
row_end = row_max
dbint.end = table[row_max]
restart = dbint.end
# Adjust the DBInterval to match the newly found ends
dbint.db_start = dbint.start
dbint.db_end = dbint.end
@@ -609,4 +667,7 @@ class NilmDB(object):
# Count how many were removed
removed += row_end - row_start
return removed
if restart is not None:
break
return (removed, restart)

View File

@@ -11,9 +11,11 @@ from nilmdb.utils.time import string_to_timestamp
import cherrypy
import sys
import os
import socket
import simplejson as json
import decorator
import psutil
import traceback
class NilmApp(object):
def __init__(self, db):
@@ -210,7 +212,7 @@ class Stream(NilmApp):
@exception_to_httperror(NilmDBError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def destroy(self, path):
"""Delete a stream and its associated data."""
"""Delete a stream. Fails if any data is still present."""
return self.db.stream_destroy(path)
# /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1
@@ -339,7 +341,14 @@ class Stream(NilmApp):
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
return self.db.stream_remove(path, start, end)
total_removed = 0
while True:
(removed, restart) = self.db.stream_remove(path, start, end)
total_removed += removed
if restart is None:
break
start = restart
return total_removed
# /stream/intervals?path=/newton/prep
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
@@ -386,7 +395,7 @@ class Stream(NilmApp):
diffpath)
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
yield response
if restart == 0:
if restart is None:
break
start = restart
return content(start, end)
@@ -395,14 +404,18 @@ class Stream(NilmApp):
@cherrypy.expose
@chunked_response
@response_type("text/plain")
def extract(self, path, start = None, end = None, count = False):
def extract(self, path, start = None, end = None,
count = False, markup = False):
"""
Extract data from backend database. Streams the resulting
entries as ASCII text lines separated by newlines. This may
make multiple requests to the nilmdb backend to avoid causing
it to block for too long.
Add count=True to return a count rather than actual data.
If 'count' is True, returns a count rather than actual data.
If 'markup' is True, adds comments to the stream denoting each
interval's start and end timestamp.
"""
if start is not None:
start = string_to_timestamp(start)
@@ -421,21 +434,23 @@ class Stream(NilmApp):
raise cherrypy.HTTPError("404 Not Found", "No such stream")
@workaround_cp_bug_1200
def content(start, end, count):
def content(start, end):
# Note: disable chunked responses to see tracebacks from here.
if count:
matched = self.db.stream_extract(path, start, end, count)
matched = self.db.stream_extract(path, start, end,
count = True)
yield sprintf("%d\n", matched)
return
while True:
(data, restart) = self.db.stream_extract(path, start, end)
(data, restart) = self.db.stream_extract(
path, start, end, count = False, markup = markup)
yield data
if restart == 0:
if restart is None:
return
start = restart
return content(start, end, count)
return content(start, end)
class Exiter(object):
"""App that exits the server, for testing"""
@@ -453,7 +468,8 @@ class Server(object):
stoppable = False, # whether /exit URL exists
embedded = True, # hide diagnostics and output, etc
fast_shutdown = False, # don't wait for clients to disconn.
force_traceback = False # include traceback in all errors
force_traceback = False, # include traceback in all errors
basepath = '', # base URL path for cherrypy.tree
):
# Save server version, just for verification during tests
self.version = nilmdb.__version__
@@ -513,7 +529,7 @@ class Server(object):
if stoppable:
root.exit = Exiter()
cherrypy.tree.apps = {}
cherrypy.tree.mount(root, "/", config = { "/" : app_config })
cherrypy.tree.mount(root, basepath, config = { "/" : app_config })
# Shutdowns normally wait for clients to disconnect. To speed
# up tests, set fast_shutdown = True
@@ -523,6 +539,9 @@ class Server(object):
else:
cherrypy.server.shutdown_timeout = 5
# Set up the WSGI application pointer for external programs
self.wsgi_application = cherrypy.tree
def json_error_page(self, status, message, traceback, version):
"""Return a custom error page in JSON so the client can parse it"""
errordata = { "status" : status,
@@ -589,3 +608,55 @@ class Server(object):
def stop(self):
cherrypy.engine.exit()
# Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
# instance since the database can only be opened once. For this to
# work, the web server must use only a single process and single
# Python interpreter. Multiple threads are OK.
_wsgi_server = None
def wsgi_application(dbpath, basepath): # pragma: no cover
"""Return a WSGI application object with a database at the
specified path.
'dbpath' is a filesystem location, e.g. /home/nilm/db
'basepath' is the URL path of the application base, which
is the same as the first argument to Apache's WSGIScriptAlias
directive.
"""
def application(environ, start_response):
global _wsgi_server
if _wsgi_server is None:
# Try to start the server
try:
db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath)
_wsgi_server = nilmdb.server.Server(
db, embedded = True,
basepath = basepath.rstrip('/'))
except Exception:
# Build an error message on failure
import pprint
err = sprintf("Initializing database at path '%s' failed:\n\n",
dbpath)
err += traceback.format_exc()
try:
import pwd
import grp
err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s) "
"on host %s, pid %d\n",
os.getuid(), pwd.getpwuid(os.getuid())[0],
os.getgid(), grp.getgrgid(os.getgid())[0],
socket.gethostname(), os.getpid())
except ImportError:
pass
err += sprintf("\nEnvironment:\n%s\n", pprint.pformat(environ))
if _wsgi_server is None:
# Serve up the error with our own mini WSGI app.
headers = [ ('Content-type', 'text/plain'),
('Content-length', str(len(err))) ]
start_response("500 Internal Server Error", headers)
return [err]
# Call the normal application
return _wsgi_server.wsgi_application(environ, start_response)
return application

View File

@@ -1,7 +1,7 @@
"""NilmDB utilities"""
from __future__ import absolute_import
from nilmdb.utils.timer import Timer
from nilmdb.utils.iteratorizer import Iteratorizer
from nilmdb.utils.serializer import serializer_proxy
from nilmdb.utils.lrucache import lru_cache
from nilmdb.utils.diskusage import du, human_size
@@ -11,3 +11,5 @@ import nilmdb.utils.threadsafety
import nilmdb.utils.fallocate
import nilmdb.utils.time
import nilmdb.utils.iterator
import nilmdb.utils.interval
import nilmdb.utils.lock

106
nilmdb/utils/interval.py Normal file
View File

@@ -0,0 +1,106 @@
"""Interval. Like nilmdb.server.interval, but re-implemented here
in plain Python so clients have easier access to it.
Intervals are half-open, ie. they include data points with timestamps
[start, end)
"""
import nilmdb.utils.time
import nilmdb.utils.iterator
class IntervalError(Exception):
"""Error due to interval overlap, etc"""
pass
# Interval
class Interval:
"""Represents an interval of time."""
def __init__(self, start, end):
"""
'start' and 'end' are arbitrary numbers that represent time
"""
if start >= end:
# Explicitly disallow zero-width intervals (since they're half-open)
raise IntervalError("start %s must precede end %s" % (start, end))
self.start = start
self.end = end
def __repr__(self):
s = repr(self.start) + ", " + repr(self.end)
return self.__class__.__name__ + "(" + s + ")"
def __str__(self):
return ("[" + nilmdb.utils.time.timestamp_to_string(self.start) +
" -> " + nilmdb.utils.time.timestamp_to_string(self.end) + ")")
def __cmp__(self, other):
"""Compare two intervals. If non-equal, order by start then end"""
return cmp(self.start, other.start) or cmp(self.end, other.end)
def intersects(self, other):
"""Return True if two Interval objects intersect"""
if not isinstance(other, Interval):
raise TypeError("need an Interval")
if self.end <= other.start or self.start >= other.end:
return False
return True
def subset(self, start, end):
"""Return a new Interval that is a subset of this one"""
# A subclass that tracks additional data might override this.
if start < self.start or end > self.end:
raise IntervalError("not a subset")
return Interval(start, end)
def set_difference(a, b):
"""
Compute the difference (a \\ b) between the intervals in 'a' and
the intervals in 'b'; i.e., the ranges that are present in 'self'
but not 'other'.
'a' and 'b' must both be iterables.
Returns a generator that yields each interval in turn.
Output intervals are built as subsets of the intervals in the
first argument (a).
"""
# Iterate through all starts and ends in sorted order. Add a
# tag to the iterator so that we can figure out which one they
# were, after sorting.
def decorate(it, key_start, key_end):
for i in it:
yield i.start, key_start, i
yield i.end, key_end, i
a_iter = decorate(iter(a), 0, 2)
b_iter = decorate(iter(b), 1, 3)
# Now iterate over the timestamps of each start and end.
# At each point, evaluate which type of end it is, to determine
# how to build up the output intervals.
a_interval = None
b_interval = None
out_start = None
for (ts, k, i) in nilmdb.utils.iterator.imerge(a_iter, b_iter):
if k == 0:
# start a interval
a_interval = i
if b_interval is None:
out_start = ts
elif k == 1:
# start b interval
b_interval = i
if out_start is not None and out_start != ts:
yield a_interval.subset(out_start, ts)
out_start = None
elif k == 2:
# end a interval
if out_start is not None and out_start != ts:
yield a_interval.subset(out_start, ts)
out_start = None
a_interval = None
elif k == 3:
# end b interval
b_interval = None
if a_interval:
out_start = ts

View File

@@ -1,100 +0,0 @@
import Queue
import threading
import sys
import contextlib
# This file provides a context manager that converts a function
# that takes a callback into a generator that returns an iterable.
# This is done by running the function in a new thread.
# Based partially on http://stackoverflow.com/questions/9968592/
class IteratorizerThread(threading.Thread):
def __init__(self, queue, function, curl_hack):
"""
function: function to execute, which takes the
callback (provided by this class) as an argument
"""
threading.Thread.__init__(self)
self.name = "Iteratorizer-" + function.__name__ + "-" + self.name
self.function = function
self.queue = queue
self.die = False
self.curl_hack = curl_hack
def callback(self, data):
try:
if self.die:
raise Exception() # trigger termination
self.queue.put((1, data))
except:
if self.curl_hack:
# We can't raise exceptions, because the pycurl
# extension module will unconditionally print the
# exception itself, and not pass it up to the caller.
# Instead, just return a value that tells curl to
# abort. (-1 would be best, in case we were given 0
# bytes, but the extension doesn't support that).
self.queue.put((2, sys.exc_info()))
return 0
raise
def run(self):
try:
result = self.function(self.callback)
except:
self.queue.put((2, sys.exc_info()))
else:
self.queue.put((0, result))
@contextlib.contextmanager
def Iteratorizer(function, curl_hack = False):
"""
Context manager that takes a function expecting a callback,
and provides an iterable that yields the values passed to that
callback instead.
function: function to execute, which takes a callback
(provided by this context manager) as an argument
with iteratorizer(func) as it:
for i in it:
print 'callback was passed:', i
print 'function returned:', it.retval
"""
queue = Queue.Queue(maxsize = 1)
thread = IteratorizerThread(queue, function, curl_hack)
thread.daemon = True
thread.start()
class iteratorizer_gen(object):
def __init__(self, queue):
self.queue = queue
self.retval = None
def __iter__(self):
return self
def next(self):
(typ, data) = self.queue.get()
if typ == 0:
# function has returned
self.retval = data
raise StopIteration
elif typ == 1:
# data is available
return data
else:
# callback raised an exception
raise data[0], data[1], data[2]
try:
yield iteratorizer_gen(queue)
finally:
# Ask the thread to die, if it's still running.
thread.die = True
while thread.isAlive():
try:
queue.get(True, 0.01)
except: # pragma: no cover
pass

33
nilmdb/utils/lock.py Normal file
View File

@@ -0,0 +1,33 @@
# File locking
import warnings
try:
import fcntl
import errno
def exclusive_lock(f):
"""Acquire an exclusive lock. Returns True on successful
lock, or False on error."""
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
return False
else: # pragma: no cover
raise
return True
def exclusive_unlock(f):
"""Release an exclusive lock."""
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except ImportError: # pragma: no cover
def exclusive_lock(f):
"""Dummy lock function -- does not lock!"""
warnings.warn("Pretending to lock " + str(f))
return True
def exclusive_unlock(f):
"""Release an exclusive lock."""
return

View File

@@ -15,7 +15,7 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
def wrap_class_method(wrapper):
try:
orig = getattr(cls, wrapper.__name__).im_func
except:
except Exception:
orig = lambda x: None
setattr(cls, wrapper.__name__, decorator.decorator(wrapper, orig))

View File

@@ -1,5 +1,8 @@
from __future__ import absolute_import
from nilmdb.utils import datetime_tz
import re
import time
# Range
min_timestamp = (-2**63)
@@ -36,6 +39,7 @@ def unix_to_timestamp(unix):
"""Convert a Unix timestamp (floating point seconds since epoch)
into a NILM timestamp (integer microseconds since epoch)"""
return int(round(unix * 1e6))
seconds_to_timestamp = unix_to_timestamp
def timestamp_to_unix(timestamp):
"""Convert a NILM timestamp (integer microseconds since epoch)
@@ -56,6 +60,11 @@ def parse_time(toparse):
timestamp, the current local timezone is assumed (e.g. from the TZ
env var).
"""
if toparse == "min":
return min_timestamp
if toparse == "max":
return max_timestamp
# If string isn't "now" and doesn't contain at least 4 digits,
# consider it invalid. smartparse might otherwise accept
# empty strings and strings with just separators.
@@ -118,4 +127,4 @@ def parse_time(toparse):
def now():
"""Return current timestamp"""
return unix_to_timestamp(datetime_tz.datetime_tz.utcnow().totimestamp())
return unix_to_timestamp(time.time())

View File

@@ -39,11 +39,10 @@ versioneer.parentdir_prefix = 'nilmdb-'
# Hack to workaround logging/multiprocessing issue:
# https://groups.google.com/d/msg/nose-users/fnJ-kAUbYHQ/_UsLN786ygcJ
try: import multiprocessing
except: pass
except Exception: pass
# Use Cython if it's new enough, otherwise use preexisting C files.
cython_modules = [ 'nilmdb.server.interval',
'nilmdb.server.layout',
'nilmdb.server.rbtree' ]
try:
import Cython

28
tests/data/extract-8 Normal file
View File

@@ -0,0 +1,28 @@
# interval-start 1332496919900000
1332496919900000 2.523050e+05 2.254020e+05 4.779410e+03 3.638030e+03 8.138070e+03 4.334460e+03 1.083780e+03 3.743730e+03
1332496919908333 2.551190e+05 2.237870e+05 5.965640e+03 2.076350e+03 9.468790e+03 3.693880e+03 1.247860e+03 3.393680e+03
1332496919916667 2.616370e+05 2.247980e+05 4.848970e+03 2.315620e+03 9.323300e+03 4.225460e+03 1.805780e+03 2.593050e+03
1332496919925000 2.606460e+05 2.251300e+05 3.061360e+03 3.951840e+03 7.662910e+03 5.341410e+03 1.986520e+03 2.276780e+03
1332496919933333 2.559710e+05 2.235030e+05 4.096030e+03 3.296970e+03 7.827080e+03 5.452120e+03 2.492520e+03 2.929450e+03
1332496919941667 2.579260e+05 2.217080e+05 5.472320e+03 1.555700e+03 8.495760e+03 4.491140e+03 2.379780e+03 3.741710e+03
1332496919950000 2.610180e+05 2.242350e+05 4.669770e+03 1.876190e+03 8.366680e+03 3.677510e+03 9.021690e+02 3.549040e+03
1332496919958333 2.569150e+05 2.274650e+05 2.785070e+03 3.751930e+03 7.440320e+03 3.964860e+03 -3.227860e+02 2.460890e+03
1332496919966667 2.509510e+05 2.262000e+05 3.772710e+03 3.131950e+03 8.159860e+03 4.539860e+03 7.375190e+02 2.126750e+03
1332496919975000 2.556710e+05 2.223720e+05 5.826200e+03 8.715560e+02 9.120240e+03 4.545110e+03 2.804310e+03 2.721000e+03
1332496919983333 2.649730e+05 2.214860e+05 5.839130e+03 4.659180e+02 8.628300e+03 3.934870e+03 2.972490e+03 3.773730e+03
1332496919991667 2.652170e+05 2.233920e+05 3.718770e+03 2.834970e+03 7.209900e+03 3.460260e+03 1.324930e+03 4.075960e+03
# interval-end 1332496919991668
# interval-start 1332496920000000
1332496920000000 2.564370e+05 2.244300e+05 4.011610e+03 3.475340e+03 7.495890e+03 3.388940e+03 2.613970e+02 3.731260e+03
1332496920008333 2.539630e+05 2.241670e+05 5.621070e+03 1.548010e+03 9.165170e+03 3.522930e+03 1.058930e+03 2.996960e+03
1332496920016667 2.585080e+05 2.249300e+05 6.011400e+03 8.188660e+02 9.039950e+03 4.482440e+03 2.490390e+03 2.679340e+03
1332496920025000 2.596270e+05 2.260220e+05 4.474500e+03 2.423020e+03 7.414190e+03 5.071970e+03 2.439380e+03 2.962960e+03
1332496920033333 2.551870e+05 2.246320e+05 4.738570e+03 3.398040e+03 7.395120e+03 4.726450e+03 1.839030e+03 3.393530e+03
1332496920041667 2.571020e+05 2.216230e+05 6.144130e+03 1.441090e+03 8.756480e+03 3.495320e+03 1.869940e+03 3.752530e+03
1332496920050000 2.636530e+05 2.217700e+05 6.221770e+03 7.389620e+02 9.547600e+03 2.666820e+03 1.462660e+03 3.332570e+03
1332496920058333 2.636130e+05 2.252560e+05 4.477120e+03 2.437450e+03 8.510210e+03 3.855630e+03 9.594420e+02 2.387180e+03
1332496920066667 2.553500e+05 2.262640e+05 4.283720e+03 3.923940e+03 7.912470e+03 5.466520e+03 1.284990e+03 2.093720e+03
1332496920075000 2.527270e+05 2.246090e+05 5.851930e+03 2.491980e+03 8.540630e+03 5.623050e+03 2.339780e+03 3.007140e+03
1332496920083333 2.584750e+05 2.235780e+05 5.924870e+03 1.394480e+03 8.779620e+03 4.544180e+03 2.132030e+03 3.849760e+03
1332496920091667 2.615630e+05 2.246090e+05 4.336140e+03 2.455750e+03 8.055380e+03 3.469110e+03 6.278730e+02 3.664200e+03
# interval-end 1332496920100000

View File

@@ -24,7 +24,7 @@ class JimOrderPlugin(nose.plugins.Plugin):
name, workingDir=loader.workingDir)
try:
order = os.path.join(addr.filename, "test.order")
except:
except Exception:
order = None
if order and os.path.exists(order):
files = []

View File

@@ -4,10 +4,8 @@ test_lrucache.py
test_mustclose.py
test_serializer.py
test_iteratorizer.py
test_timestamper.py
test_layout.py
test_rbtree.py
test_interval.py

View File

@@ -30,6 +30,11 @@ class TestBulkData(object):
else:
data = BulkData(db, file_size = size, files_per_dir = files)
# Try opening it again (should result in locking error)
with assert_raises(IOError) as e:
data2 = BulkData(db)
in_("already locked by another process", str(e.exception))
# create empty
with assert_raises(ValueError):
data.create("/foo", "uint16_8")

View File

@@ -375,6 +375,7 @@ class TestClient(object):
# Delete streams that exist
for stream in client.stream_list():
client.stream_remove(stream[0])
client.stream_destroy(stream[0])
# Database is empty
@@ -506,6 +507,10 @@ class TestClient(object):
[ 109, 118 ],
[ 200, 300 ] ])
# destroy stream (try without removing data first)
with assert_raises(ClientError):
client.stream_destroy("/context/test")
client.stream_remove("/context/test")
client.stream_destroy("/context/test")
client.close()
@@ -600,6 +605,7 @@ class TestClient(object):
])
# Clean up
client.stream_remove("/empty/test")
client.stream_destroy("/empty/test")
client.close()
@@ -613,7 +619,7 @@ class TestClient(object):
poolmanager = c.http._last_response.connection.poolmanager
pool = poolmanager.pools[('http','localhost',32180)]
return (pool.num_connections, pool.num_requests)
except:
except Exception:
raise SkipTest("can't get connection info")
# First request makes a connection
@@ -635,8 +641,9 @@ class TestClient(object):
eq_(connections(), (1, 5))
# Clean up
c.stream_remove("/persist/test")
c.stream_destroy("/persist/test")
eq_(connections(), (1, 6))
eq_(connections(), (1, 7))
def test_client_13_timestamp_rounding(self):
# Test potentially bad timestamps (due to floating point
@@ -661,5 +668,6 @@ class TestClient(object):
# Server will round this and give an error on finalize()
ctx.insert("299999999.99 1\n")
client.stream_remove("/rounding/test")
client.stream_destroy("/rounding/test")
client.close()

View File

@@ -21,12 +21,13 @@ from testutil.helpers import *
testdb = "tests/cmdline-testdb"
def server_start(max_results = None, bulkdata_args = {}):
def server_start(max_results = None, max_removals = None, bulkdata_args = {}):
global test_server, test_db
# Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
testdb,
max_results = max_results,
max_removals = max_removals,
bulkdata_args = bulkdata_args)
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
port = 32180, stoppable = False,
@@ -233,6 +234,8 @@ class TestCmdline(object):
eq_(parse_time("1333648800.0"), test)
eq_(parse_time("1333648800000000"), test)
eq_(parse_time("@1333648800000000"), test)
eq_(parse_time("min"), nilmdb.utils.time.min_timestamp)
eq_(parse_time("max"), nilmdb.utils.time.max_timestamp)
with assert_raises(ValueError):
parse_time("@hashtag12345")
@@ -593,6 +596,8 @@ class TestCmdline(object):
test(6, "10:00:30", "10:00:31", extra="-b")
test(7, "10:00:30", "10:00:30.999", extra="-a -T")
test(7, "10:00:30", "10:00:30.999", extra="-a --timestamp-raw")
test(8, "10:01:59.9", "10:02:00.1", extra="--markup")
test(8, "10:01:59.9", "10:02:00.1", extra="-m")
# all data put in by tests
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
@@ -600,6 +605,11 @@ class TestCmdline(object):
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("43200\n")
# markup for 3 intervals, plus extra markup lines whenever we had
# a "restart" from the nilmdb.stream_extract function
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
lines_(self.captured, 43210)
def test_09_truncated(self):
# Test truncated responses by overriding the nilmdb max_results
server_stop()
@@ -699,11 +709,9 @@ class TestCmdline(object):
# Reinsert some data, to verify that no overlaps with deleted
# data are reported
os.environ['TZ'] = "UTC"
self.ok("insert --timestamp -f --rate 120 /newton/prep "
"tests/data/prep-20120323T1000")
self.ok("insert -t --filename --rate 120 /newton/prep "
"tests/data/prep-20120323T1002")
for minute in ["0", "2"]:
self.ok("insert --timestamp -f --rate 120 /newton/prep"
" tests/data/prep-20120323T100" + minute)
def test_11_destroy(self):
# Delete records
@@ -715,6 +723,9 @@ class TestCmdline(object):
self.fail("destroy /no/such/stream")
self.contain("No stream at path")
self.fail("destroy -R /no/such/stream")
self.contain("No stream at path")
self.fail("destroy asdfasdf")
self.contain("No stream at path")
@@ -728,8 +739,14 @@ class TestCmdline(object):
self.ok("list --detail")
lines_(self.captured, 7)
# Delete some
self.ok("destroy /newton/prep")
# Fail to destroy because intervals still present
self.fail("destroy /newton/prep")
self.contain("all intervals must be removed")
self.ok("list --detail")
lines_(self.captured, 7)
# Destroy for real
self.ok("destroy -R /newton/prep")
self.ok("list")
self.match("/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n")
@@ -740,7 +757,8 @@ class TestCmdline(object):
self.ok("destroy /newton/raw")
self.ok("create /newton/raw uint16_6")
self.ok("destroy /newton/raw")
# Specify --remove with no data
self.ok("destroy --remove /newton/raw")
self.ok("list")
self.match("")
@@ -815,7 +833,7 @@ class TestCmdline(object):
# Now recreate the data one more time and make sure there are
# fewer files.
self.ok("destroy /newton/prep")
self.ok("destroy --remove /newton/prep")
self.fail("destroy /newton/prep") # already destroyed
self.ok("create /newton/prep float32_8")
os.environ['TZ'] = "UTC"
@@ -826,14 +844,16 @@ class TestCmdline(object):
for (dirpath, dirnames, filenames) in os.walk(testdb):
nfiles += len(filenames)
lt_(nfiles, 50)
self.ok("destroy /newton/prep") # destroy again
self.ok("destroy -R /newton/prep") # destroy again
def test_14_remove_files(self):
# Test BulkData's ability to remove when data is split into
# multiple files. Should be a fairly comprehensive test of
# remove functionality.
# Also limit max_removals, to cover more functionality.
server_stop()
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
server_start(max_removals = 4321,
bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 })
# Insert data. Just for fun, insert out of order
@@ -974,8 +994,8 @@ class TestCmdline(object):
self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
self.ok("destroy /diff/1")
self.ok("destroy /diff/2")
self.ok("destroy -R /diff/1")
self.ok("destroy -R /diff/2")
def test_16_rename(self):
# Test renaming. Force file size smaller so we get more files
@@ -1039,7 +1059,7 @@ class TestCmdline(object):
self.fail("rename /foo/bar /xxx/yyy/zzz/www")
self.contain("path is subdir of existing node")
self.ok("rename /foo/bar /xxx/yyy/mmm")
self.ok("destroy /xxx/yyy/zzz")
self.ok("destroy -R /xxx/yyy/zzz")
check_path("xxx", "yyy", "mmm")
# Extract it at the final path
@@ -1047,7 +1067,7 @@ class TestCmdline(object):
"--end '2012-03-23 10:04:01'")
eq_(self.captured, extract_before)
self.ok("destroy /xxx/yyy/mmm")
self.ok("destroy -R /xxx/yyy/mmm")
# Make sure temporary rename dirs weren't left around
for (dirpath, dirnames, filenames) in os.walk(testdb):

View File

@@ -8,8 +8,11 @@ from nose.tools import *
from nose.tools import assert_raises
import itertools
from nilmdb.server.interval import (Interval, DBInterval,
IntervalSet, IntervalError)
from nilmdb.utils.interval import IntervalError
from nilmdb.server.interval import Interval, DBInterval, IntervalSet
# so we can test them separately
from nilmdb.utils.interval import Interval as UtilsInterval
from testutil.helpers import *
import unittest
@@ -47,6 +50,15 @@ def makeset(string):
return iset
class TestInterval:
def test_client_interval(self):
# Run interval tests against the Python version of Interval.
global Interval
NilmdbInterval = Interval
Interval = UtilsInterval
self.test_interval()
self.test_interval_intersect()
Interval = NilmdbInterval
def test_interval(self):
# Test Interval class
os.environ['TZ'] = "America/New_York"
@@ -222,7 +234,7 @@ class TestInterval:
eq_(ab,c)
# a \ b == d
eq_(IntervalSet(a.set_difference(b)), d)
eq_(IntervalSet(nilmdb.utils.interval.set_difference(a,b)), d)
# Intersection with intervals
do_test(makeset("[---|---)[)"),
@@ -287,10 +299,11 @@ class TestInterval:
b = makeset("[-) [--) [)")
c = makeset("[----) ")
d = makeset(" [-) ")
eq_(a.set_difference(b, list(c)[0]), d)
eq_(nilmdb.utils.interval.set_difference(
a.intersection(list(c)[0]), b.intersection(list(c)[0])), d)
# Empty second set
eq_(a.set_difference(IntervalSet()), a)
eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)
class TestIntervalDB:
def test_dbinterval(self):
@@ -372,14 +385,13 @@ class TestIntervalSpeed:
def test_interval_speed(self):
import yappi
import time
import testutil.aplotter as aplotter
import random
import math
print
yappi.start()
speeds = {}
limit = 10 # was 20
limit = 22 # was 20
for j in [ 2**x for x in range(5,limit) ]:
start = time.time()
iset = IntervalSet()
@@ -393,6 +405,5 @@ class TestIntervalSpeed:
speed/j,
speed / (j*math.log(j))) # should be constant
speeds[j] = speed
aplotter.plot(speeds.keys(), speeds.values(), plot_slope=True)
yappi.stop()
yappi.print_stats(sort_type=yappi.SORTTYPE_TTOT, limit=10)

View File

@@ -1,61 +0,0 @@
import nilmdb
from nilmdb.utils.printf import *
import nose
from nose.tools import *
from nose.tools import assert_raises
import threading
import time
from testutil.helpers import *
def func_with_callback(a, b, callback):
callback(a)
callback(b)
callback(a+b)
return "return value"
class TestIteratorizer(object):
def test(self):
# First try it with a normal callback
self.result = ""
def cb(x):
self.result += str(x)
func_with_callback(1, 2, cb)
eq_(self.result, "123")
# Now make it an iterator
result = ""
f = lambda x: func_with_callback(1, 2, x)
with nilmdb.utils.Iteratorizer(f) as it:
for i in it:
result += str(i)
eq_(result, "123")
eq_(it.retval, "return value")
# Make sure things work when an exception occurs
result = ""
with nilmdb.utils.Iteratorizer(
lambda x: func_with_callback(1, "a", x)) as it:
with assert_raises(TypeError) as e:
for i in it:
result += str(i)
eq_(result, "1a")
# Now try to trigger the case where we stop iterating
# mid-generator, and expect the iteratorizer to clean up after
# itself. This doesn't have a particular result in the test,
# but gains coverage.
def foo():
with nilmdb.utils.Iteratorizer(f) as it:
it.next()
foo()
eq_(it.retval, None)
# Do the same thing when the curl hack is applied
def foo():
with nilmdb.utils.Iteratorizer(f, curl_hack = True) as it:
it.next()
foo()
eq_(it.retval, None)

View File

@@ -1,266 +0,0 @@
# -*- coding: utf-8 -*-
import nilmdb
from nilmdb.utils.printf import *
from nose.tools import *
from nose.tools import assert_raises
import distutils.version
import itertools
import os
import sys
import random
import unittest
from testutil.helpers import *
from nilmdb.server.layout import *
class TestLayouts(object):
# Some nilmdb.layout tests. Not complete, just fills in missing
# coverage.
def test_layouts(self):
x = nilmdb.server.layout.get_named("float32_8")
y = nilmdb.server.layout.get_named("float32_8")
eq_(x.count, y.count)
eq_(x.datatype, y.datatype)
y = nilmdb.server.layout.get_named("float32_7")
ne_(x.count, y.count)
eq_(x.datatype, y.datatype)
def test_parsing(self):
self.real_t_parsing("float32_8", "uint16_6", "uint16_9")
self.real_t_parsing("float32_8", "uint16_6", "uint16_9")
def real_t_parsing(self, name_prep, name_raw, name_rawnotch):
# invalid layouts
with assert_raises(TypeError) as e:
parser = Parser("NoSuchLayout")
with assert_raises(TypeError) as e:
parser = Parser("float32")
# too little data
parser = Parser(name_prep)
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5\n")
with assert_raises(ParserError) as e:
parser.parse(data)
in_("error", str(e.exception))
# too much data
parser = Parser(name_prep)
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9\n")
with assert_raises(ParserError) as e:
parser.parse(data)
in_("error", str(e.exception))
# just right
parser = Parser(name_prep)
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n" +
"1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n")
parser.parse(data)
eq_(parser.min_timestamp, 1234567890.0)
eq_(parser.max_timestamp, 1234567890.1)
eq_(parser.data, [[1234567890.0,1.1,2.2,3.3,4.4,5.5,6.6,7.7,8.8],
[1234567890.1,1.1,2.2,3.3,4.4,5.5,6.6,7.7,8.8]])
# try uint16_6 too, with clamping
parser = Parser(name_raw)
data = ( "1234567890.000000 1 2 3 4 5 6\n" +
"1234567890.100000 1 2 3 4 5 6\n" )
parser.parse(data)
eq_(parser.data, [[1234567890.0,1,2,3,4,5,6],
[1234567890.1,1,2,3,4,5,6]])
# pass an instantiated class
parser = Parser(get_named(name_rawnotch))
data = ( "1234567890.000000 1 2 3 4 5 6 7 8 9\n" +
"1234567890.100000 1 2 3 4 5 6 7 8 9\n" )
parser.parse(data)
# non-monotonic
parser = Parser(name_raw)
data = ( "1234567890.100000 1 2 3 4 5 6\n" +
"1234567890.099999 1 2 3 4 5 6\n" )
with assert_raises(ParserError) as e:
parser.parse(data)
in_("not monotonically increasing", str(e.exception))
parser = Parser(name_raw)
data = ( "1234567890.100000 1 2 3 4 5 6\n" +
"1234567890.100000 1 2 3 4 5 6\n" )
with assert_raises(ParserError) as e:
parser.parse(data)
in_("not monotonically increasing", str(e.exception))
parser = Parser(name_raw)
data = ( "1234567890.100000 1 2 3 4 5 6\n" +
"1234567890.100001 1 2 3 4 5 6\n" )
parser.parse(data)
# uint16_6 with values out of bounds
parser = Parser(name_raw)
data = ( "1234567890.000000 1 2 3 4 500000 6\n" +
"1234567890.100000 1 2 3 4 5 6\n" )
with assert_raises(ParserError) as e:
parser.parse(data)
in_("value out of range", str(e.exception))
# Empty data should work but is useless
parser = Parser(name_raw)
data = ""
parser.parse(data)
assert(parser.min_timestamp is None)
assert(parser.max_timestamp is None)
def test_formatting(self):
self.real_t_formatting("float32_8", "uint16_6", "uint16_9")
self.real_t_formatting("float32_8", "uint16_6", "uint16_9")
def real_t_formatting(self, name_prep, name_raw, name_rawnotch):
# invalid layout
with assert_raises(TypeError) as e:
formatter = Formatter("NoSuchLayout")
# too little data
formatter = Formatter(name_prep)
data = [ [ 1234567890.000000, 1.1, 2.2, 3.3, 4.4, 5.5 ],
[ 1234567890.100000, 1.1, 2.2, 3.3, 4.4, 5.5 ] ]
with assert_raises(FormatterError) as e:
formatter.format(data)
in_("error", str(e.exception))
# too much data
formatter = Formatter(name_prep)
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ],
[ 1234567890.100000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ]
with assert_raises(FormatterError) as e:
formatter.format(data)
in_("error", str(e.exception))
# just right
formatter = Formatter(name_prep)
data = [ [ 1234567890.000000, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8 ],
[ 1234567890.100000, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8 ] ]
text = formatter.format(data)
eq_(text,
"1234567890.000000 1.100000e+00 2.200000e+00 3.300000e+00 "
"4.400000e+00 5.500000e+00 6.600000e+00 7.700000e+00 "
"8.800000e+00\n" +
"1234567890.100000 1.100000e+00 2.200000e+00 3.300000e+00 "
"4.400000e+00 5.500000e+00 6.600000e+00 7.700000e+00 "
"8.800000e+00\n")
# try uint16_6 too
formatter = Formatter(name_raw)
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6 ],
[ 1234567890.100000, 1, 2, 3, 4, 5, 6 ] ]
text = formatter.format(data)
eq_(text,
"1234567890.000000 1 2 3 4 5 6\n" +
"1234567890.100000 1 2 3 4 5 6\n")
# pass an instantiated class
formatter = Formatter(get_named(name_rawnotch))
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ],
[ 1234567890.100000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ]
text = formatter.format(data)
eq_(text,
"1234567890.000000 1 2 3 4 5 6 7 8 9\n" +
"1234567890.100000 1 2 3 4 5 6 7 8 9\n")
# Empty data should work but is useless
formatter = Formatter(name_raw)
data = []
text = formatter.format(data)
eq_(text, "")
def test_roundtrip(self):
self.real_t_roundtrip("float32_8", "uint16_6", "uint16_9")
self.real_t_roundtrip("float32_8", "uint16_6", "uint16_9")
def real_t_roundtrip(self, name_prep, name_raw, name_rawnotch):
# Verify that textual data passed into the Parser, and then
# back through the Formatter, then back into the Parser,
# gives identical parsed representations
random.seed(12345)
def do_roundtrip(layout, datagen):
for i in range(100):
rows = random.randint(1,100)
data = ""
ts = 1234567890
for r in range(rows):
ts += random.uniform(0,1)
row = sprintf("%f", ts) + " "
row += " ".join(datagen())
row += "\n"
data += row
parser1 = Parser(layout)
formatter = Formatter(layout)
parser2 = Parser(layout)
parser1.parse(data)
parser2.parse(formatter.format(parser1.data))
eq_(parser1.data, parser2.data)
def datagen():
return [ sprintf("%.6e", random.uniform(-1000,1000))
for x in range(8) ]
do_roundtrip(name_prep, datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(6) ]
do_roundtrip(name_raw, datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(9) ]
do_roundtrip(name_rawnotch, datagen)
class TestLayoutSpeed:
@unittest.skip("this is slow")
def test_layout_speed(self):
import time
random.seed(54321)
def do_speedtest(layout, datagen, rows = 5000, times = 100):
# Build data once
data = ""
ts = 1234567890
for r in range(rows):
ts += random.uniform(0,1)
row = sprintf("%f", ts) + " "
row += " ".join(datagen())
row += "\n"
data += row
# Do lots of roundtrips
start = time.time()
for i in range(times):
parser = Parser(layout)
formatter = Formatter(layout)
parser.parse(data)
formatter.format(parser.data)
elapsed = time.time() - start
printf("roundtrip %s: %d ms, %.1f μs/row, %d rows/sec\n",
layout,
elapsed * 1e3,
(elapsed * 1e6) / (rows * times),
(rows * times) / elapsed)
print ""
def datagen():
return [ sprintf("%.6e", random.uniform(-1000,1000))
for x in range(10) ]
do_speedtest("float32_10", datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(10) ]
do_speedtest("uint16_10", datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(6) ]
do_speedtest("uint16_6", datagen)

View File

@@ -28,9 +28,6 @@ class Test00Nilmdb(object): # named 00 so it runs first
def test_NilmDB(self):
recursive_unlink(testdb)
with assert_raises(IOError):
nilmdb.server.NilmDB("/nonexistant-db/foo")
db = nilmdb.server.NilmDB(testdb)
db.close()
db = nilmdb.server.NilmDB(testdb)

View File

@@ -18,7 +18,7 @@ class TestPrintf(object):
printf("hello, world: %d", 123)
fprintf(test2, "hello too: %d", 123)
test3 = sprintf("hello three: %d", 123)
except:
except Exception:
sys.stdout = old_stdout
raise
sys.stdout = old_stdout

View File

@@ -1,419 +0,0 @@
#-----------------------------------------------
#aplotter.py - ascii art function plotter
#Copyright (c) 2006, Imri Goldberg
#All rights reserved.
#
#Redistribution and use in source and binary forms,
#with or without modification, are permitted provided
#that the following conditions are met:
#
# * Redistributions of source code must retain the
# above copyright notice, this list of conditions
# and the following disclaimer.
# * Redistributions in binary form must reproduce the
# above copyright notice, this list of conditions
# and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of the <ORGANIZATION> nor the names of
# its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
#THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
#AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
#ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
#LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
#DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
#SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
#CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
#OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
#OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#-----------------------------------------------
import math
EPSILON = 0.000001
def transposed(mat):
result = []
for i in xrange(len(mat[0])):
result.append([x[i] for x in mat])
return result
def y_reversed(mat):
result = []
for i in range(len(mat)):
result.append(list(reversed(mat[i])))
return result
def sign(x):
if 0<x:
return 1
if 0 == x:
return 0
return -1
class Plotter(object):
class PlotData(object):
def __init__(self, x_size, y_size, min_x, max_x, min_y, max_y, x_mod, y_mod):
self.x_size = x_size
self.y_size = y_size
self.min_x = min_x
self.max_x = max_x
self.min_y = min_y
self.max_y = max_y
self.x_mod = x_mod
self.y_mod = y_mod
self.x_step = float(max_x - min_x)/float(self.x_size)
self.y_step = float(max_y - min_y)/float(self.y_size)
self.inv_x_step = 1/self.x_step
self.inv_y_step = 1/self.y_step
self.ratio = self.y_step / self.x_step
def __repr__(self):
s = "size: %s, bl: %s, tr: %s, step: %s" % ((self.x_size, self.y_size), (self.min_x, self.min_y), (self.max_x, self.max_y),
(self.x_step, self.y_step))
return s
def __init__(self, **kwargs):
self.x_size = kwargs.get("x_size", 80)
self.y_size = kwargs.get("y_size", 20)
self.will_draw_axes = kwargs.get("draw_axes", True)
self.new_line = kwargs.get("newline", "\n")
self.dot = kwargs.get("dot", "*")
self.plot_slope = kwargs.get("plot_slope", True)
self.x_margin = kwargs.get("x_margin", 0.05)
self.y_margin = kwargs.get("y_margin", 0.1)
self.will_plot_labels = kwargs.get("plot_labels", True)
@staticmethod
def get_symbol_by_slope(slope, default_symbol):
draw_symbol = default_symbol
if slope > math.tan(3*math.pi/8):
draw_symbol = "|"
elif slope > math.tan(math.pi/8) and slope < math.tan(3*math.pi/8):
draw_symbol = "/"
elif abs(slope) < math.tan(math.pi/8):
draw_symbol = "-"
elif slope < math.tan(-math.pi/8) and slope > math.tan(-3*math.pi/8):
draw_symbol = "\\"
elif slope < math.tan(-3*math.pi/8):
draw_symbol = "|"
return draw_symbol
def plot_labels(self, output_buffer, plot_data):
if plot_data.y_size < 2:
return
margin_factor = 1
do_plot_x_label = True
do_plot_y_label = True
x_str = "%+g"
if plot_data.x_size < 16:
do_plot_x_label = False
elif plot_data.x_size < 23:
x_str = "%+.2g"
y_str = "%+g"
if plot_data.x_size < 8:
do_plot_y_label = False
elif plot_data.x_size < 11:
y_str = "%+.2g"
act_min_x = (plot_data.min_x + plot_data.x_mod*margin_factor)
act_max_x = (plot_data.max_x - plot_data.x_mod*margin_factor)
act_min_y = (plot_data.min_y + plot_data.y_mod*margin_factor)
act_max_y = (plot_data.max_y - plot_data.y_mod*margin_factor)
if abs(act_min_x) < 1:
min_x_str = "%+.2g" % act_min_x
else:
min_x_str = x_str % act_min_x
if abs(act_max_x) < 1:
max_x_str = "%+.2g" % act_max_x
else:
max_x_str = x_str % act_max_x
if abs(act_min_y) < 1:
min_y_str = "%+.2g" % act_min_y
else:
min_y_str = y_str % act_min_y
if abs(act_max_y) < 1:
max_y_str = "%+.2g" % act_max_y
else:
max_y_str = y_str % act_max_y
min_x_coord = self.get_coord(act_min_x,plot_data.min_x,plot_data.x_step)
max_x_coord = self.get_coord(act_max_x,plot_data.min_x,plot_data.x_step)
min_y_coord = self.get_coord(act_min_y,plot_data.min_y,plot_data.y_step)
max_y_coord = self.get_coord(act_max_y,plot_data.min_y,plot_data.y_step)
#print plot_data
y_zero_coord = self.get_coord(0, plot_data.min_y, plot_data.y_step)
#if plot_data.min_x < 0 and plot_data.max_x > 0:
x_zero_coord = self.get_coord(0, plot_data.min_x, plot_data.x_step)
#else:
#pass
output_buffer[x_zero_coord][min_y_coord] = "+"
output_buffer[x_zero_coord][max_y_coord] = "+"
output_buffer[min_x_coord][y_zero_coord] = "+"
output_buffer[max_x_coord][y_zero_coord] = "+"
if do_plot_x_label:
for i,c in enumerate(min_x_str):
output_buffer[min_x_coord+i][y_zero_coord-1] = c
for i,c in enumerate(max_x_str):
output_buffer[max_x_coord+i-len(max_x_str)][y_zero_coord-1] = c
if do_plot_y_label:
for i,c in enumerate(max_y_str):
output_buffer[x_zero_coord+i][max_y_coord] = c
for i,c in enumerate(min_y_str):
output_buffer[x_zero_coord+i][min_y_coord] = c
def plot_data(self, xy_seq, output_buffer, plot_data):
if self.plot_slope:
xy_seq = list(xy_seq)
#sort according to the x coord
xy_seq.sort(key = lambda c: c[0])
prev_p = xy_seq[0]
e_xy_seq = enumerate(xy_seq)
e_xy_seq.next()
for i,(x,y) in e_xy_seq:
draw_symbol = self.dot
line_drawn = self.plot_line(prev_p, (x,y), output_buffer, plot_data)
prev_p = (x,y)
if not line_drawn:
if i > 0 and i < len(xy_seq)-1:
px,py = xy_seq[i-1]
nx,ny = xy_seq[i+1]
if abs(nx-px) > EPSILON:
slope = (1.0/plot_data.ratio)*(ny-py)/(nx-px)
draw_symbol = self.get_symbol_by_slope(slope, draw_symbol)
if x < plot_data.min_x or x >= plot_data.max_x or y < plot_data.min_y or y >= plot_data.max_y:
continue
x_coord = self.get_coord(x, plot_data.min_x, plot_data.x_step)
y_coord = self.get_coord(y, plot_data.min_y, plot_data.y_step)
if x_coord >= 0 and x_coord < len(output_buffer) and y_coord >= 0 and y_coord < len(output_buffer[0]):
if self.draw_axes:
if y_coord == self.get_coord(0, plot_data.min_y, plot_data.y_step) and draw_symbol == "-":
draw_symbol = "="
output_buffer[x_coord][y_coord] = draw_symbol
else:
for x,y in xy_seq:
if x < plot_data.min_x or x >= plot_data.max_x or y < plot_data.min_y or y >= plot_data.max_y:
continue
x_coord = self.get_coord(x, plot_data.min_x, plot_data.x_step)
y_coord = self.get_coord(y, plot_data.min_y, plot_data.y_step)
if x_coord >= 0 and x_coord < len(output_buffer) and y_coord > 0 and y_coord < len(output_buffer[0]):
output_buffer[x_coord][y_coord] = self.dot
def plot_line(self, start, end, output_buffer, plot_data):
start_coord = self.get_coord(start[0], plot_data.min_x, plot_data.x_step), self.get_coord(start[1], plot_data.min_y, plot_data.y_step)
end_coord = self.get_coord(end[0], plot_data.min_x, plot_data.x_step), self.get_coord(end[1], plot_data.min_y, plot_data.y_step)
x0,y0 = start_coord
x1,y1 = end_coord
if (x0,y0) == (x1,y1):
return True
clipped_line = clip_line(start, end, (plot_data.min_x, plot_data.min_y), (plot_data.max_x, plot_data.max_y))
if clipped_line != None:
start,end = clipped_line
else:
return False
start_coord = self.get_coord(start[0], plot_data.min_x, plot_data.x_step), self.get_coord(start[1], plot_data.min_y, plot_data.y_step)
end_coord = self.get_coord(end[0], plot_data.min_x, plot_data.x_step), self.get_coord(end[1], plot_data.min_y, plot_data.y_step)
x0,y0 = start_coord
x1,y1 = end_coord
if (x0,y0) == (x1,y1):
return True
x_zero_coord = self.get_coord(0, plot_data.min_x, plot_data.x_step)
y_zero_coord = self.get_coord(0, plot_data.min_y, plot_data.y_step)
if start[0]-end[0] == 0:
draw_symbol = "|"
else:
slope = (1.0/plot_data.ratio)*(end[1]-start[1])/(end[0]-start[0])
draw_symbol = self.get_symbol_by_slope(slope, self.dot)
try:
delta = x1-x0, y1-y0
if abs(delta[0])>abs(delta[1]):
s = sign(delta[0])
slope = float(delta[1])/delta[0]
for i in range(0,abs(int(delta[0]))):
cur_draw_symbol = draw_symbol
x = i*s
cur_y = int(y0+slope*x)
if self.draw_axes and cur_y == y_zero_coord and draw_symbol == "-":
cur_draw_symbol = "="
output_buffer[x0+x][cur_y] = cur_draw_symbol
else:
s = sign(delta[1])
slope = float(delta[0])/delta[1]
for i in range(0,abs(int(delta[1]))):
y = i*s
cur_draw_symbol = draw_symbol
cur_y = y0+y
if self.draw_axes and cur_y == y_zero_coord and draw_symbol == "-":
cur_draw_symbol = "="
output_buffer[int(x0+slope*y)][cur_y] = cur_draw_symbol
except:
print start, end
print start_coord, end_coord
print plot_data
raise
return False
def plot_single(self, seq, min_x = None, max_x = None, min_y = None, max_y = None):
return self.plot_double(range(len(seq)),seq, min_x, max_x, min_y, max_y)
def plot_double(self, x_seq, y_seq, min_x = None, max_x = None, min_y = None, max_y = None):
if min_x == None:
min_x = min(x_seq)
if max_x == None:
max_x = max(x_seq)
if min_y == None:
min_y = min(y_seq)
if max_y == None:
max_y = max(y_seq)
if max_y == min_y:
max_y += 1
x_mod = (max_x-min_x)*self.x_margin
y_mod = (max_y-min_y)*self.y_margin
min_x-=x_mod
max_x+=x_mod
min_y-=y_mod
max_y+=y_mod
plot_data = self.PlotData(self.x_size, self.y_size, min_x, max_x, min_y, max_y, x_mod, y_mod)
output_buffer = [[" "]*self.y_size for i in range(self.x_size)]
if self.will_draw_axes:
self.draw_axes(output_buffer, plot_data)
self.plot_data(zip(x_seq, y_seq), output_buffer, plot_data)
if self.will_plot_labels:
self.plot_labels(output_buffer, plot_data)
trans_result = transposed(y_reversed(output_buffer))
result = self.new_line.join(["".join(row) for row in trans_result])
return result
def draw_axes(self, output_buffer, plot_data):
draw_x = False
draw_y = False
if plot_data.min_x <= 0 and plot_data.max_x > 0:
draw_y = True
zero_x = self.get_coord(0, plot_data.min_x, plot_data.x_step)
for y in xrange(plot_data.y_size):
output_buffer[zero_x][y] = "|"
if plot_data.min_y <= 0 and plot_data.max_y > 0:
draw_x = True
zero_y = self.get_coord(0, plot_data.min_y, plot_data.y_step)
for x in xrange(plot_data.x_size):
output_buffer[x][zero_y] = "-"
if draw_x and draw_y:
output_buffer[zero_x][zero_y] = "+"
@staticmethod
def get_coord(val, min, step):
result = int((val - min)/step)
return result
def clip_line(line_pt_1, line_pt_2, rect_bottom_left, rect_top_right):
ts = [0.0,1.0]
if line_pt_1[0] == line_pt_2[0]:
return ((line_pt_1[0], max(min(line_pt_1[1], line_pt_2[1]), rect_bottom_left[1])),
(line_pt_1[0], min(max(line_pt_1[1], line_pt_2[1]), rect_top_right[1])))
if line_pt_1[1] == line_pt_2[1]:
return ((max(min(line_pt_1[0], line_pt_2[0]), rect_bottom_left[0]), line_pt_1[1]),
(min(max(line_pt_1[0], line_pt_2[0]), rect_top_right[0]), line_pt_1[1]))
if ((rect_bottom_left[0] <= line_pt_1[0] and line_pt_1[0] < rect_top_right[0]) and
(rect_bottom_left[1] <= line_pt_1[1] and line_pt_1[1] < rect_top_right[1]) and
(rect_bottom_left[0] <= line_pt_2[0] and line_pt_2[0] < rect_top_right[0]) and
(rect_bottom_left[1] <= line_pt_2[1] and line_pt_2[1] < rect_top_right[1])):
return line_pt_1, line_pt_2
ts.append( float(rect_bottom_left[0]-line_pt_1[0])/(line_pt_2[0]-line_pt_1[0]) )
ts.append( float(rect_top_right[0]-line_pt_1[0])/(line_pt_2[0]-line_pt_1[0]) )
ts.append( float(rect_bottom_left[1]-line_pt_1[1])/(line_pt_2[1]-line_pt_1[1]) )
ts.append( float(rect_top_right[1]-line_pt_1[1])/(line_pt_2[1]-line_pt_1[1]) )
ts.sort()
if ts[2] < 0 or ts[2] >= 1 or ts[3] < 0 or ts[2]>= 1:
return None
result = [(pt_1 + t*(pt_2-pt_1)) for t in (ts[2],ts[3]) for (pt_1, pt_2) in zip(line_pt_1, line_pt_2)]
return (result[0],result[1]), (result[2], result[3])
def plot(*args,**flags):
limit_flags_names = set(["min_x","min_y","max_x","max_y"])
limit_flags = dict([(n,flags[n]) for n in limit_flags_names & set(flags)])
settting_flags = dict([(n,flags[n]) for n in set(flags) - limit_flags_names])
if len(args) == 1:
p = Plotter(**settting_flags)
print p.plot_single(args[0],**limit_flags)
elif len(args) == 2:
p = Plotter(**settting_flags)
print p.plot_double(args[0],args[1],**limit_flags)
else:
raise NotImplementedError("can't draw multiple graphs yet")
__all__ = ["Plotter","plot"]