Compare commits

...

29 Commits

Author SHA1 Message Date
5292319802 server: consolidate time processing and checks 2013-03-30 21:16:40 -04:00
173121ca87 Switch URL to one that should definitely not resolve 2013-03-30 17:31:35 -04:00
26bab031bd Add StreamInserter.send() to trigger intermediate block send 2013-03-30 17:30:43 -04:00
b5fefffa09 Use a global cached server object for WSGI app
This is instead of caching it inside nilmdb.server.wsgi_application.
Might make things work a bit better in case the web server decides
to call wsgi_application multiple times.
2013-03-30 15:56:57 -04:00
dccb3e370a WSGI config needs to specify application group
This ensures that the same Python sub-instance handles the request,
even if it's coming in from two different virtual hosts.
2013-03-30 15:56:02 -04:00
95ca55aa7e Print out WSGI environment on DB init failure 2013-03-30 15:55:41 -04:00
e01813f29d Fix wsgi documentation 2013-03-25 13:52:32 -04:00
7f41e117a2 Fix tabs 2013-03-25 13:44:03 -04:00
dd5fc806e5 Restructure WSGI app to regenerate error on each call, if needed
This way, errors like "database already locked" can be fixed and the
page reloaded, without needing to restart Apache.
2013-03-24 21:52:11 -04:00
f8ca8d31e6 Remove Iteratorizer, as it's no longer needed 2013-03-24 21:31:03 -04:00
ed89d803f0 Remove aplotter code 2013-03-24 21:29:09 -04:00
3d24092cd2 Replace bare 'except:' with 'except: Exception'
Otherwise we might inadvertently catch SystemExit or KeyboardExit or
something we don't want to catch.
2013-03-24 21:28:01 -04:00
304bb43d85 Move lockfile out of data dir, to avoid stream tree conflicts 2013-03-24 21:23:45 -04:00
59a79a30a5 Remove lockfile when done.
This isn't necessary for correct behavior: if the database is killed,
the old flock() will go away when the file descriptor gets closed.
2013-03-24 21:20:47 -04:00
c0d450d39e Add locking mechanism to avoid multiple servers on one DB 2013-03-24 21:20:20 -04:00
6f14d609b2 Fix issue where bulkdata was accidentally closed 2013-03-24 21:16:18 -04:00
77ef87456f Improve WSGI application support, fix docs 2013-03-24 21:16:03 -04:00
32d6af935c Improve wsgi docs 2013-03-22 19:17:36 -04:00
6af3a6fc41 Add WSGI application support and documentation 2013-03-22 19:14:34 -04:00
f8a06fb3b7 Clarify default DB path in nilmdb_server.py help text 2013-03-22 15:09:37 -04:00
e790bb9e8a Fix test failure when tests are run as root 2013-03-21 14:33:02 -04:00
89be6f5931 Add option to include interval start/end markup on extract
When enabled, lines like "# interval-start 1234567890123456" and "#
interval-end 1234567890123456" will be added to the data output.  Note
that there may be an "interval-end" timestamp followed by an identical
"interval-start" timestamp, if the response at the nilmdb level was
split up into multiple chunks.

In general, assume contiguous data if previous_interval_end ==
new_interval_start.
2013-03-19 14:23:33 -04:00
4cdef3285d Destroy now requires that all data has been previously removed.
Added new flag "-R" to command line to perform an automatic removal.
This should be the last of the ways in which a single command could
block the nilmdb thread for a long time.
2013-03-18 19:39:03 -04:00
bcd82c4d59 Limit the number of rows removed per call to nilmdb.stream_remove
Server class will retry as needed, as with stream_extract and
stream_intervals.
2013-03-18 18:22:45 -04:00
caf63ab01f Fix stream_extract/stream_intervals restart around timestamp == 0. 2013-03-18 18:20:25 -04:00
2d72891162 Accept "min" and "max" as timestamps on command line 2013-03-18 18:19:24 -04:00
cda2ac3e77 Don't return a mutable interval from IntervalSet.intersection()
Instead, always take the subset, which creates a new interval.
Also adds a small optimization by moving the 'if orig' check outside the
loop.
2013-03-18 18:16:35 -04:00
57d3d60f6a Fix relative import problems 2013-03-18 16:27:27 -04:00
d6b5befe76 Don't use filenames as default arg completion 2013-03-16 17:27:58 -04:00
30 changed files with 415 additions and 715 deletions

View File

@@ -24,3 +24,5 @@ Usage:
nilmdb-server --help
nilmtool --help
See docs/wsgi.md for info on setting up a WSGI application in Apache.

32
docs/wsgi.md Normal file
View File

@@ -0,0 +1,32 @@
WSGI Application in Apache
--------------------------
Install `apache2` and `libapache2-mod-wsgi`
We'll set up the database server at URL `http://myhost.com/nilmdb`.
The database will be stored in `/home/nilm/db`, and the process will
run as user `nilm`, group `nilm`.
First, create a WSGI script `/home/nilm/nilmdb.wsgi` containing:
import nilmdb.server
application = nilmdb.server.wsgi_application("/home/nilm/db", "/nilmdb")
The first parameter is the local filesystem path, and the second
parameter is the path part of the URL.
Then, set up Apache with a configuration like:
<VirtualHost>
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
WSGIApplicationGroup nilmdb-appgroup
WSGIProcessGroup nilmdb-procgroup
WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm
# Access control example:
<Location /nilmdb>
Order deny,allow
Deny from all
Allow from 1.2.3.4
</Location>
</VirtualHost>

View File

@@ -17,4 +17,4 @@ _nilmtool_argcomplete() {
unset COMPREPLY
fi
}
complete -o nospace -o default -F _nilmtool_argcomplete nilmtool
complete -o nospace -F _nilmtool_argcomplete nilmtool

View File

@@ -97,7 +97,7 @@ class Client(object):
return self.http.post("stream/create", params)
def stream_destroy(self, path):
"""Delete stream and its contents"""
"""Delete stream. Fails if any data is still present."""
params = { "path": path }
return self.http.post("stream/destroy", params)
@@ -171,7 +171,8 @@ class Client(object):
params["end"] = timestamp_to_string(end)
return self.http.get_gen("stream/intervals", params)
def stream_extract(self, path, start = None, end = None, count = False):
def stream_extract(self, path, start = None, end = None,
count = False, markup = False):
"""
Extract data from a stream. Returns a generator that yields
lines of ASCII-formatted data that matches the database
@@ -179,6 +180,9 @@ class Client(object):
Specify count = True to return a count of matching data points
rather than the actual data. The output format is unchanged.
Specify markup = True to include comments in the returned data
that indicate interval starts and ends.
"""
params = {
"path": path,
@@ -189,6 +193,8 @@ class Client(object):
params["end"] = timestamp_to_string(end)
if count:
params["count"] = 1
if markup:
params["markup"] = 1
return self.http.get_gen("stream/extract", params)
def stream_count(self, path, start = None, end = None):
@@ -307,6 +313,11 @@ class StreamInserter(object):
part of a new interval and there may be a gap left in-between."""
self._send_block(final = True)
def send(self):
"""Send any data that we might have buffered up. Does not affect
any other treatment of timestamps or endpoints."""
self._send_block(final = False)
def _get_first_noncomment(self, block):
"""Return the (start, end) indices of the first full line in
block that isn't a comment, or raise IndexError if

View File

@@ -16,7 +16,7 @@ class HTTPClient(object):
reparsed = urlparse.urlparse(baseurl).geturl()
if '://' not in reparsed:
reparsed = urlparse.urlparse("http://" + baseurl).geturl()
self.baseurl = reparsed
self.baseurl = reparsed.rstrip('/') + '/'
# Build Requests session object, enable SSL verification
self.session = requests.Session()

View File

@@ -7,11 +7,14 @@ def setup(self, sub):
cmd = sub.add_parser("destroy", help="Delete a stream and all data",
formatter_class = def_form,
description="""
Destroy the stream at the specified path. All
data and metadata related to the stream is
permanently deleted.
Destroy the stream at the specified path.
The stream must be empty. All metadata
related to the stream is permanently deleted.
""")
cmd.set_defaults(handler = cmd_destroy)
group = cmd.add_argument_group("Options")
group.add_argument("-R", "--remove", action="store_true",
help="Remove all data before destroying stream")
group = cmd.add_argument_group("Required arguments")
group.add_argument("path",
help="Path of the stream to delete, e.g. /foo/bar",
@@ -20,6 +23,11 @@ def setup(self, sub):
def cmd_destroy(self):
"""Destroy stream"""
if self.args.remove:
try:
count = self.client.stream_remove(self.args.path)
except nilmdb.client.ClientError as e:
self.die("error removing data: %s", str(e))
try:
self.client.stream_destroy(self.args.path)
except nilmdb.client.ClientError as e:

View File

@@ -29,6 +29,8 @@ def setup(self, sub):
group.add_argument("-a", "--annotate", action="store_true",
help="Include comments with some information "
"about the stream")
group.add_argument("-m", "--markup", action="store_true",
help="Include comments with interval starts and ends")
group.add_argument("-T", "--timestamp-raw", action="store_true",
help="Show raw timestamps in annotated information")
group.add_argument("-c", "--count", action="store_true",
@@ -61,7 +63,8 @@ def cmd_extract(self):
for dataline in self.client.stream_extract(self.args.path,
self.args.start,
self.args.end,
self.args.count):
self.args.count,
self.args.markup):
if self.args.bare and not self.args.count:
# Strip timestamp (first element). Doesn't make sense
# if we are only returning a count.

View File

@@ -22,7 +22,7 @@ def main():
group.add_argument('-p', '--port', help = 'Listen on the given port',
type = int, default = 12380)
group.add_argument('-d', '--database', help = 'Database directory',
default = os.path.join(os.getcwd(), "db"))
default = "./db")
group.add_argument('-q', '--quiet', help = 'Silence output',
action = 'store_true')
group.add_argument('-t', '--traceback',

View File

@@ -17,5 +17,5 @@ except (ImportError, TypeError): # pragma: no cover
pass
from nilmdb.server.nilmdb import NilmDB
from nilmdb.server.server import Server
from nilmdb.server.server import Server, wsgi_application
from nilmdb.server.errors import NilmDBError, StreamError, OverlapError

View File

@@ -14,6 +14,7 @@ import re
import sys
import tempfile
import nilmdb.utils.lock
from . import rocket
# Up to 256 open file descriptors at any given time.
@@ -26,6 +27,8 @@ class BulkData(object):
def __init__(self, basepath, **kwargs):
self.basepath = basepath
self.root = os.path.join(self.basepath, "data")
self.lock = self.root + ".lock"
self.lockfile = None
# Tuneables
if "file_size" in kwargs:
@@ -44,8 +47,22 @@ class BulkData(object):
if not os.path.isdir(self.root):
os.mkdir(self.root)
# Create the lock
self.lockfile = open(self.lock, "w")
if not nilmdb.utils.lock.exclusive_lock(self.lockfile):
raise IOError('database at "' + self.basepath +
'" is already locked by another process')
def close(self):
self.getnode.cache_remove_all()
if self.lockfile:
nilmdb.utils.lock.exclusive_unlock(self.lockfile)
self.lockfile.close()
try:
os.unlink(self.lock)
except OSError: # pragma: no cover
pass
self.lockfile = None
def _encode_filename(self, path):
# Encode all paths to UTF-8, regardless of sys.getfilesystemencoding(),
@@ -134,7 +151,7 @@ class BulkData(object):
# Open and cache it
self.getnode(unicodepath)
except:
except Exception:
exc_info = sys.exc_info()
try:
os.rmdir(ospath)
@@ -371,7 +388,7 @@ class Table(object):
# Try deleting subdir, too
try:
os.rmdir(os.path.join(self.root, subdir))
except:
except Exception:
pass
# Cache open files
@@ -504,7 +521,7 @@ class Table(object):
with open(cachefile, "rb") as f:
ranges = pickle.load(f)
cachefile_present = True
except:
except Exception:
ranges = []
cachefile_present = False

View File

@@ -286,23 +286,18 @@ cdef class IntervalSet:
(potentially) subsetted to make the one that is being
returned.
"""
if not isinstance(interval, Interval):
raise TypeError("bad type")
for n in self.tree.intersect(interval.start, interval.end):
i = n.obj
if i:
if i.start >= interval.start and i.end <= interval.end:
if orig:
yield (i, i)
else:
yield i
else:
subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end))
if orig:
yield (subset, i)
else:
yield subset
if orig:
for n in self.tree.intersect(interval.start, interval.end):
i = n.obj
subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end))
yield (subset, i)
else:
for n in self.tree.intersect(interval.start, interval.end):
i = n.obj
subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end))
yield subset
cpdef intersects(self, Interval other):
"""Return True if this IntervalSet intersects another interval"""

View File

@@ -12,6 +12,7 @@ Manages both the SQL database and the table storage backend.
from __future__ import absolute_import
import nilmdb.utils
from nilmdb.utils.printf import *
from nilmdb.utils.time import timestamp_to_string
from nilmdb.utils.interval import IntervalError
from nilmdb.server.interval import Interval, DBInterval, IntervalSet
@@ -83,7 +84,18 @@ class NilmDB(object):
verbose = 0
def __init__(self, basepath, max_results=None,
bulkdata_args=None):
max_removals=None, bulkdata_args=None):
"""Initialize NilmDB at the given basepath.
Other arguments are for debugging / testing:
'max_results' is the max rows to send in a single
stream_intervals or stream_extract response.
'max_removals' is the max rows to delete at once
in stream_move.
'bulkdata_args' is kwargs for the bulkdata module.
"""
if bulkdata_args is None:
bulkdata_args = {}
@@ -94,7 +106,9 @@ class NilmDB(object):
try:
os.makedirs(self.basepath)
except OSError as e:
if e.errno != errno.EEXIST:
if e.errno != errno.EEXIST: # pragma: no cover
# (no coverage, because it's hard to trigger this case
# if tests are run as root)
raise IOError("can't create tree " + self.basepath)
# Our data goes inside it
@@ -105,19 +119,20 @@ class NilmDB(object):
self.con = sqlite3.connect(sqlfilename, check_same_thread = True)
try:
self._sql_schema_update()
finally: # pragma: no cover
except Exception: # pragma: no cover
self.data.close()
raise
# See big comment at top about the performance implications of this
self.con.execute("PRAGMA synchronous=NORMAL")
self.con.execute("PRAGMA journal_mode=WAL")
# Approximate largest number of elements that we want to send
# in a single reply (for stream_intervals, stream_extract)
if max_results:
self.max_results = max_results
else:
self.max_results = 16384
# in a single reply (for stream_intervals, stream_extract).
self.max_results = max_results or 16384
# Remove up to this many rows per call to stream_remove.
self.max_removals = max_removals or 1048576
def get_basepath(self):
return self.basepath
@@ -334,14 +349,14 @@ class NilmDB(object):
Returns (intervals, restart) tuple.
intervals is a list of [start,end] timestamps of all intervals
'intervals' is a list of [start,end] timestamps of all intervals
that exist for path, between start and end.
restart, if nonzero, means that there were too many results to
return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated,
and a new request with a start time of 'restart' will fetch
the next block of data.
'restart', if not None, means that there were too many results
to return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated, and
a new request with a start time of 'restart' will fetch the
next block of data.
"""
stream_id = self._stream_id(path)
intervals = self._get_intervals(stream_id)
@@ -363,7 +378,7 @@ class NilmDB(object):
break
result.append([i.start, i.end])
else:
restart = 0
restart = None
return (result, restart)
def stream_create(self, path, layout_name):
@@ -439,17 +454,22 @@ class NilmDB(object):
(newpath, stream_id))
def stream_destroy(self, path):
"""Fully remove a table and all of its data from the database.
No way to undo it! Metadata is removed."""
"""Fully remove a table from the database. Fails if there are
any intervals data present; remove them first. Metadata is
also removed."""
stream_id = self._stream_id(path)
# Delete the cached interval data (if it was cached)
# Verify that no intervals are present, and clear the cache
iset = self._get_intervals(stream_id)
if len(iset):
raise NilmDBError("all intervals must be removed before "
"destroying a stream")
self._get_intervals.cache_remove(self, stream_id)
# Delete the data
# Delete the bulkdata storage
self.data.destroy(path)
# Delete metadata, stream, intervals
# Delete metadata, stream, intervals (should be none)
with self.con as con:
con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
@@ -517,23 +537,28 @@ class NilmDB(object):
dbinterval.db_startpos,
dbinterval.db_endpos)
def stream_extract(self, path, start = None, end = None, count = False):
def stream_extract(self, path, start = None, end = None,
count = False, markup = False):
"""
Returns (data, restart) tuple.
data is ASCII-formatted data from the database, formatted
'data' is ASCII-formatted data from the database, formatted
according to the layout of the stream.
restart, if nonzero, means that there were too many results to
'restart', if not None, means that there were too many results to
return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated,
and a new request with a start time of 'restart' will fetch
the next block of data.
count, if true, means to not return raw data, but just the count
'count', if true, means to not return raw data, but just the count
of rows that would have been returned. This is much faster
than actually fetching the data. It is not limited by
max_results.
'markup', if true, indicates that returned data should be
marked with a comment denoting when a particular interval
starts, and another comment when an interval ends.
"""
stream_id = self._stream_id(path)
table = self.data.getnode(path)
@@ -543,7 +568,7 @@ class NilmDB(object):
result = []
matched = 0
remaining = self.max_results
restart = 0
restart = None
for interval in intervals.intersection(requested):
# Reading single rows from the table is too slow, so
# we use two bisections to find both the starting and
@@ -562,14 +587,26 @@ class NilmDB(object):
row_end = row_max
restart = table[row_max]
# Add markup
if markup:
result.append("# interval-start " +
timestamp_to_string(interval.start) + "\n")
# Gather these results up
result.append(table.get_data(row_start, row_end))
# Count them
remaining -= row_end - row_start
if restart:
# Add markup, and exit if restart is set.
if restart is not None:
if markup:
result.append("# interval-end " +
timestamp_to_string(restart) + "\n")
break
if markup:
result.append("# interval-end " +
timestamp_to_string(interval.end) + "\n")
if count:
return matched
@@ -578,9 +615,17 @@ class NilmDB(object):
def stream_remove(self, path, start = None, end = None):
"""
Remove data from the specified time interval within a stream.
Removes all data in the interval [start, end), and intervals
are truncated or split appropriately. Returns the number of
data points removed.
Removes data in the interval [start, end), and intervals are
truncated or split appropriately.
Returns a (removed, restart) tuple.
'removed' is the number of data points that were removed.
'restart', if not None, means there were too many rows to
remove in a single request. This function should be called
again with a start time of 'restart' to complete the removal.
"""
stream_id = self._stream_id(path)
table = self.data.getnode(path)
@@ -588,6 +633,8 @@ class NilmDB(object):
(start, end) = self._check_user_times(start, end)
to_remove = Interval(start, end)
removed = 0
remaining = self.max_removals
restart = None
# Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now.
@@ -598,6 +645,13 @@ class NilmDB(object):
row_start = self._find_start(table, dbint)
row_end = self._find_end(table, dbint)
# Shorten it if we'll hit the maximum number of removals
row_max = row_start + remaining
if row_max < row_end:
row_end = row_max
dbint.end = table[row_max]
restart = dbint.end
# Adjust the DBInterval to match the newly found ends
dbint.db_start = dbint.start
dbint.db_end = dbint.end
@@ -613,4 +667,7 @@ class NilmDB(object):
# Count how many were removed
removed += row_end - row_start
return removed
if restart is not None:
break
return (removed, restart)

View File

@@ -11,9 +11,11 @@ from nilmdb.utils.time import string_to_timestamp
import cherrypy
import sys
import os
import socket
import simplejson as json
import decorator
import psutil
import traceback
class NilmApp(object):
def __init__(self, db):
@@ -172,6 +174,21 @@ class Root(NilmApp):
class Stream(NilmApp):
"""Stream-specific operations"""
# Helpers
def _get_times(self, start_param, end_param):
(start, end) = (None, None)
if start_param is not None:
start = string_to_timestamp(start_param)
if end_param is not None:
end = string_to_timestamp(end_param)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError(
"400 Bad Request",
sprintf("start must precede end (%s >= %s)",
start_param, end_param))
return (start, end)
# /stream/list
# /stream/list?layout=float32_8
# /stream/list?path=/newton/prep&extended=1
@@ -210,7 +227,7 @@ class Stream(NilmApp):
@exception_to_httperror(NilmDBError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def destroy(self, path):
"""Delete a stream and its associated data."""
"""Delete a stream. Fails if any data is still present."""
return self.db.stream_destroy(path)
# /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1
@@ -305,11 +322,7 @@ class Stream(NilmApp):
raise cherrypy.HTTPError("404 Not Found", "No such stream")
# Check limits
start = string_to_timestamp(start)
end = string_to_timestamp(end)
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
(start, end) = self._get_times(start, end)
# Pass the data directly to nilmdb, which will parse it and
# raise a ValueError if there are any problems.
@@ -331,15 +344,15 @@ class Stream(NilmApp):
the interval [start, end). Returns the number of data points
removed.
"""
if start is not None:
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
return self.db.stream_remove(path, start, end)
(start, end) = self._get_times(start, end)
total_removed = 0
while True:
(removed, restart) = self.db.stream_remove(path, start, end)
total_removed += removed
if restart is None:
break
start = restart
return total_removed
# /stream/intervals?path=/newton/prep
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
@@ -362,15 +375,7 @@ class Stream(NilmApp):
Note that the response type is the non-standard
'application/x-json-stream' for lack of a better option.
"""
if start is not None:
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
(start, end) = self._get_times(start, end)
if len(self.db.stream_list(path = path)) != 1:
raise cherrypy.HTTPError("404", "No such stream: " + path)
@@ -386,7 +391,7 @@ class Stream(NilmApp):
diffpath)
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
yield response
if restart == 0:
if restart is None:
break
start = restart
return content(start, end)
@@ -395,25 +400,20 @@ class Stream(NilmApp):
@cherrypy.expose
@chunked_response
@response_type("text/plain")
def extract(self, path, start = None, end = None, count = False):
def extract(self, path, start = None, end = None,
count = False, markup = False):
"""
Extract data from backend database. Streams the resulting
entries as ASCII text lines separated by newlines. This may
make multiple requests to the nilmdb backend to avoid causing
it to block for too long.
Add count=True to return a count rather than actual data.
"""
if start is not None:
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
If 'count' is True, returns a count rather than actual data.
# Check parameters
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
If 'markup' is True, adds comments to the stream denoting each
interval's start and end timestamp.
"""
(start, end) = self._get_times(start, end)
# Check path and get layout
streams = self.db.stream_list(path = path)
@@ -421,21 +421,23 @@ class Stream(NilmApp):
raise cherrypy.HTTPError("404 Not Found", "No such stream")
@workaround_cp_bug_1200
def content(start, end, count):
def content(start, end):
# Note: disable chunked responses to see tracebacks from here.
if count:
matched = self.db.stream_extract(path, start, end, count)
matched = self.db.stream_extract(path, start, end,
count = True)
yield sprintf("%d\n", matched)
return
while True:
(data, restart) = self.db.stream_extract(path, start, end)
(data, restart) = self.db.stream_extract(
path, start, end, count = False, markup = markup)
yield data
if restart == 0:
if restart is None:
return
start = restart
return content(start, end, count)
return content(start, end)
class Exiter(object):
"""App that exits the server, for testing"""
@@ -453,7 +455,8 @@ class Server(object):
stoppable = False, # whether /exit URL exists
embedded = True, # hide diagnostics and output, etc
fast_shutdown = False, # don't wait for clients to disconn.
force_traceback = False # include traceback in all errors
force_traceback = False, # include traceback in all errors
basepath = '', # base URL path for cherrypy.tree
):
# Save server version, just for verification during tests
self.version = nilmdb.__version__
@@ -513,7 +516,7 @@ class Server(object):
if stoppable:
root.exit = Exiter()
cherrypy.tree.apps = {}
cherrypy.tree.mount(root, "/", config = { "/" : app_config })
cherrypy.tree.mount(root, basepath, config = { "/" : app_config })
# Shutdowns normally wait for clients to disconnect. To speed
# up tests, set fast_shutdown = True
@@ -523,6 +526,9 @@ class Server(object):
else:
cherrypy.server.shutdown_timeout = 5
# Set up the WSGI application pointer for external programs
self.wsgi_application = cherrypy.tree
def json_error_page(self, status, message, traceback, version):
"""Return a custom error page in JSON so the client can parse it"""
errordata = { "status" : status,
@@ -589,3 +595,55 @@ class Server(object):
def stop(self):
cherrypy.engine.exit()
# Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
# instance since the database can only be opened once. For this to
# work, the web server must use only a single process and single
# Python interpreter. Multiple threads are OK.
_wsgi_server = None
def wsgi_application(dbpath, basepath): # pragma: no cover
"""Return a WSGI application object with a database at the
specified path.
'dbpath' is a filesystem location, e.g. /home/nilm/db
'basepath' is the URL path of the application base, which
is the same as the first argument to Apache's WSGIScriptAlias
directive.
"""
def application(environ, start_response):
global _wsgi_server
if _wsgi_server is None:
# Try to start the server
try:
db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath)
_wsgi_server = nilmdb.server.Server(
db, embedded = True,
basepath = basepath.rstrip('/'))
except Exception:
# Build an error message on failure
import pprint
err = sprintf("Initializing database at path '%s' failed:\n\n",
dbpath)
err += traceback.format_exc()
try:
import pwd
import grp
err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s) "
"on host %s, pid %d\n",
os.getuid(), pwd.getpwuid(os.getuid())[0],
os.getgid(), grp.getgrgid(os.getgid())[0],
socket.gethostname(), os.getpid())
except ImportError:
pass
err += sprintf("\nEnvironment:\n%s\n", pprint.pformat(environ))
if _wsgi_server is None:
# Serve up the error with our own mini WSGI app.
headers = [ ('Content-type', 'text/plain'),
('Content-length', str(len(err))) ]
start_response("500 Internal Server Error", headers)
return [err]
# Call the normal application
return _wsgi_server.wsgi_application(environ, start_response)
return application

View File

@@ -1,7 +1,7 @@
"""NilmDB utilities"""
from __future__ import absolute_import
from nilmdb.utils.timer import Timer
from nilmdb.utils.iteratorizer import Iteratorizer
from nilmdb.utils.serializer import serializer_proxy
from nilmdb.utils.lrucache import lru_cache
from nilmdb.utils.diskusage import du, human_size
@@ -12,3 +12,4 @@ import nilmdb.utils.fallocate
import nilmdb.utils.time
import nilmdb.utils.iterator
import nilmdb.utils.interval
import nilmdb.utils.lock

View File

@@ -1,100 +0,0 @@
import Queue
import threading
import sys
import contextlib
# This file provides a context manager that converts a function
# that takes a callback into a generator that returns an iterable.
# This is done by running the function in a new thread.
# Based partially on http://stackoverflow.com/questions/9968592/
class IteratorizerThread(threading.Thread):
def __init__(self, queue, function, curl_hack):
"""
function: function to execute, which takes the
callback (provided by this class) as an argument
"""
threading.Thread.__init__(self)
self.name = "Iteratorizer-" + function.__name__ + "-" + self.name
self.function = function
self.queue = queue
self.die = False
self.curl_hack = curl_hack
def callback(self, data):
try:
if self.die:
raise Exception() # trigger termination
self.queue.put((1, data))
except:
if self.curl_hack:
# We can't raise exceptions, because the pycurl
# extension module will unconditionally print the
# exception itself, and not pass it up to the caller.
# Instead, just return a value that tells curl to
# abort. (-1 would be best, in case we were given 0
# bytes, but the extension doesn't support that).
self.queue.put((2, sys.exc_info()))
return 0
raise
def run(self):
try:
result = self.function(self.callback)
except:
self.queue.put((2, sys.exc_info()))
else:
self.queue.put((0, result))
@contextlib.contextmanager
def Iteratorizer(function, curl_hack = False):
"""
Context manager that takes a function expecting a callback,
and provides an iterable that yields the values passed to that
callback instead.
function: function to execute, which takes a callback
(provided by this context manager) as an argument
with iteratorizer(func) as it:
for i in it:
print 'callback was passed:', i
print 'function returned:', it.retval
"""
queue = Queue.Queue(maxsize = 1)
thread = IteratorizerThread(queue, function, curl_hack)
thread.daemon = True
thread.start()
class iteratorizer_gen(object):
def __init__(self, queue):
self.queue = queue
self.retval = None
def __iter__(self):
return self
def next(self):
(typ, data) = self.queue.get()
if typ == 0:
# function has returned
self.retval = data
raise StopIteration
elif typ == 1:
# data is available
return data
else:
# callback raised an exception
raise data[0], data[1], data[2]
try:
yield iteratorizer_gen(queue)
finally:
# Ask the thread to die, if it's still running.
thread.die = True
while thread.isAlive():
try:
queue.get(True, 0.01)
except: # pragma: no cover
pass

33
nilmdb/utils/lock.py Normal file
View File

@@ -0,0 +1,33 @@
# File locking
import warnings
try:
import fcntl
import errno
def exclusive_lock(f):
"""Acquire an exclusive lock. Returns True on successful
lock, or False on error."""
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
return False
else: # pragma: no cover
raise
return True
def exclusive_unlock(f):
"""Release an exclusive lock."""
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except ImportError: # pragma: no cover
def exclusive_lock(f):
"""Dummy lock function -- does not lock!"""
warnings.warn("Pretending to lock " + str(f))
return True
def exclusive_unlock(f):
"""Release an exclusive lock."""
return

View File

@@ -15,7 +15,7 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
def wrap_class_method(wrapper):
try:
orig = getattr(cls, wrapper.__name__).im_func
except:
except Exception:
orig = lambda x: None
setattr(cls, wrapper.__name__, decorator.decorator(wrapper, orig))

View File

@@ -1,3 +1,5 @@
from __future__ import absolute_import
from nilmdb.utils import datetime_tz
import re
import time
@@ -58,6 +60,11 @@ def parse_time(toparse):
timestamp, the current local timezone is assumed (e.g. from the TZ
env var).
"""
if toparse == "min":
return min_timestamp
if toparse == "max":
return max_timestamp
# If string isn't "now" and doesn't contain at least 4 digits,
# consider it invalid. smartparse might otherwise accept
# empty strings and strings with just separators.

View File

@@ -39,7 +39,7 @@ versioneer.parentdir_prefix = 'nilmdb-'
# Hack to workaround logging/multiprocessing issue:
# https://groups.google.com/d/msg/nose-users/fnJ-kAUbYHQ/_UsLN786ygcJ
try: import multiprocessing
except: pass
except Exception: pass
# Use Cython if it's new enough, otherwise use preexisting C files.
cython_modules = [ 'nilmdb.server.interval',

28
tests/data/extract-8 Normal file
View File

@@ -0,0 +1,28 @@
# interval-start 1332496919900000
1332496919900000 2.523050e+05 2.254020e+05 4.779410e+03 3.638030e+03 8.138070e+03 4.334460e+03 1.083780e+03 3.743730e+03
1332496919908333 2.551190e+05 2.237870e+05 5.965640e+03 2.076350e+03 9.468790e+03 3.693880e+03 1.247860e+03 3.393680e+03
1332496919916667 2.616370e+05 2.247980e+05 4.848970e+03 2.315620e+03 9.323300e+03 4.225460e+03 1.805780e+03 2.593050e+03
1332496919925000 2.606460e+05 2.251300e+05 3.061360e+03 3.951840e+03 7.662910e+03 5.341410e+03 1.986520e+03 2.276780e+03
1332496919933333 2.559710e+05 2.235030e+05 4.096030e+03 3.296970e+03 7.827080e+03 5.452120e+03 2.492520e+03 2.929450e+03
1332496919941667 2.579260e+05 2.217080e+05 5.472320e+03 1.555700e+03 8.495760e+03 4.491140e+03 2.379780e+03 3.741710e+03
1332496919950000 2.610180e+05 2.242350e+05 4.669770e+03 1.876190e+03 8.366680e+03 3.677510e+03 9.021690e+02 3.549040e+03
1332496919958333 2.569150e+05 2.274650e+05 2.785070e+03 3.751930e+03 7.440320e+03 3.964860e+03 -3.227860e+02 2.460890e+03
1332496919966667 2.509510e+05 2.262000e+05 3.772710e+03 3.131950e+03 8.159860e+03 4.539860e+03 7.375190e+02 2.126750e+03
1332496919975000 2.556710e+05 2.223720e+05 5.826200e+03 8.715560e+02 9.120240e+03 4.545110e+03 2.804310e+03 2.721000e+03
1332496919983333 2.649730e+05 2.214860e+05 5.839130e+03 4.659180e+02 8.628300e+03 3.934870e+03 2.972490e+03 3.773730e+03
1332496919991667 2.652170e+05 2.233920e+05 3.718770e+03 2.834970e+03 7.209900e+03 3.460260e+03 1.324930e+03 4.075960e+03
# interval-end 1332496919991668
# interval-start 1332496920000000
1332496920000000 2.564370e+05 2.244300e+05 4.011610e+03 3.475340e+03 7.495890e+03 3.388940e+03 2.613970e+02 3.731260e+03
1332496920008333 2.539630e+05 2.241670e+05 5.621070e+03 1.548010e+03 9.165170e+03 3.522930e+03 1.058930e+03 2.996960e+03
1332496920016667 2.585080e+05 2.249300e+05 6.011400e+03 8.188660e+02 9.039950e+03 4.482440e+03 2.490390e+03 2.679340e+03
1332496920025000 2.596270e+05 2.260220e+05 4.474500e+03 2.423020e+03 7.414190e+03 5.071970e+03 2.439380e+03 2.962960e+03
1332496920033333 2.551870e+05 2.246320e+05 4.738570e+03 3.398040e+03 7.395120e+03 4.726450e+03 1.839030e+03 3.393530e+03
1332496920041667 2.571020e+05 2.216230e+05 6.144130e+03 1.441090e+03 8.756480e+03 3.495320e+03 1.869940e+03 3.752530e+03
1332496920050000 2.636530e+05 2.217700e+05 6.221770e+03 7.389620e+02 9.547600e+03 2.666820e+03 1.462660e+03 3.332570e+03
1332496920058333 2.636130e+05 2.252560e+05 4.477120e+03 2.437450e+03 8.510210e+03 3.855630e+03 9.594420e+02 2.387180e+03
1332496920066667 2.553500e+05 2.262640e+05 4.283720e+03 3.923940e+03 7.912470e+03 5.466520e+03 1.284990e+03 2.093720e+03
1332496920075000 2.527270e+05 2.246090e+05 5.851930e+03 2.491980e+03 8.540630e+03 5.623050e+03 2.339780e+03 3.007140e+03
1332496920083333 2.584750e+05 2.235780e+05 5.924870e+03 1.394480e+03 8.779620e+03 4.544180e+03 2.132030e+03 3.849760e+03
1332496920091667 2.615630e+05 2.246090e+05 4.336140e+03 2.455750e+03 8.055380e+03 3.469110e+03 6.278730e+02 3.664200e+03
# interval-end 1332496920100000

View File

@@ -24,7 +24,7 @@ class JimOrderPlugin(nose.plugins.Plugin):
name, workingDir=loader.workingDir)
try:
order = os.path.join(addr.filename, "test.order")
except:
except Exception:
order = None
if order and os.path.exists(order):
files = []

View File

@@ -4,7 +4,6 @@ test_lrucache.py
test_mustclose.py
test_serializer.py
test_iteratorizer.py
test_timestamper.py
test_rbtree.py

View File

@@ -30,6 +30,11 @@ class TestBulkData(object):
else:
data = BulkData(db, file_size = size, files_per_dir = files)
# Try opening it again (should result in locking error)
with assert_raises(IOError) as e:
data2 = BulkData(db)
in_("already locked by another process", str(e.exception))
# create empty
with assert_raises(ValueError):
data.create("/foo", "uint16_8")

View File

@@ -311,11 +311,11 @@ class TestClient(object):
# Trigger a curl error in generator
with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl/").next()
client.http.get_gen("http://nosuchurl.example.com./").next()
# Trigger a curl error in generator
with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl/").next()
client.http.get_gen("http://nosuchurl.example.com./").next()
# Check 404 for missing streams
for function in [ client.stream_intervals, client.stream_extract ]:
@@ -375,6 +375,7 @@ class TestClient(object):
# Delete streams that exist
for stream in client.stream_list():
client.stream_remove(stream[0])
client.stream_destroy(stream[0])
# Database is empty
@@ -459,6 +460,7 @@ class TestClient(object):
ctx.update_start(109)
ctx.insert("110 1\n")
ctx.insert("111 1\n")
ctx.send()
ctx.insert("112 1\n")
ctx.insert("113 1\n")
ctx.insert("114 1\n")
@@ -506,6 +508,10 @@ class TestClient(object):
[ 109, 118 ],
[ 200, 300 ] ])
# destroy stream (try without removing data first)
with assert_raises(ClientError):
client.stream_destroy("/context/test")
client.stream_remove("/context/test")
client.stream_destroy("/context/test")
client.close()
@@ -600,6 +606,7 @@ class TestClient(object):
])
# Clean up
client.stream_remove("/empty/test")
client.stream_destroy("/empty/test")
client.close()
@@ -613,7 +620,7 @@ class TestClient(object):
poolmanager = c.http._last_response.connection.poolmanager
pool = poolmanager.pools[('http','localhost',32180)]
return (pool.num_connections, pool.num_requests)
except:
except Exception:
raise SkipTest("can't get connection info")
# First request makes a connection
@@ -635,8 +642,9 @@ class TestClient(object):
eq_(connections(), (1, 5))
# Clean up
c.stream_remove("/persist/test")
c.stream_destroy("/persist/test")
eq_(connections(), (1, 6))
eq_(connections(), (1, 7))
def test_client_13_timestamp_rounding(self):
# Test potentially bad timestamps (due to floating point
@@ -661,5 +669,6 @@ class TestClient(object):
# Server will round this and give an error on finalize()
ctx.insert("299999999.99 1\n")
client.stream_remove("/rounding/test")
client.stream_destroy("/rounding/test")
client.close()

View File

@@ -21,12 +21,13 @@ from testutil.helpers import *
testdb = "tests/cmdline-testdb"
def server_start(max_results = None, bulkdata_args = {}):
def server_start(max_results = None, max_removals = None, bulkdata_args = {}):
global test_server, test_db
# Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
testdb,
max_results = max_results,
max_removals = max_removals,
bulkdata_args = bulkdata_args)
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
port = 32180, stoppable = False,
@@ -233,6 +234,8 @@ class TestCmdline(object):
eq_(parse_time("1333648800.0"), test)
eq_(parse_time("1333648800000000"), test)
eq_(parse_time("@1333648800000000"), test)
eq_(parse_time("min"), nilmdb.utils.time.min_timestamp)
eq_(parse_time("max"), nilmdb.utils.time.max_timestamp)
with assert_raises(ValueError):
parse_time("@hashtag12345")
@@ -593,6 +596,8 @@ class TestCmdline(object):
test(6, "10:00:30", "10:00:31", extra="-b")
test(7, "10:00:30", "10:00:30.999", extra="-a -T")
test(7, "10:00:30", "10:00:30.999", extra="-a --timestamp-raw")
test(8, "10:01:59.9", "10:02:00.1", extra="--markup")
test(8, "10:01:59.9", "10:02:00.1", extra="-m")
# all data put in by tests
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
@@ -600,6 +605,11 @@ class TestCmdline(object):
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("43200\n")
# markup for 3 intervals, plus extra markup lines whenever we had
# a "restart" from the nilmdb.stream_extract function
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
lines_(self.captured, 43210)
def test_09_truncated(self):
# Test truncated responses by overriding the nilmdb max_results
server_stop()
@@ -699,11 +709,9 @@ class TestCmdline(object):
# Reinsert some data, to verify that no overlaps with deleted
# data are reported
os.environ['TZ'] = "UTC"
self.ok("insert --timestamp -f --rate 120 /newton/prep "
"tests/data/prep-20120323T1000")
self.ok("insert -t --filename --rate 120 /newton/prep "
"tests/data/prep-20120323T1002")
for minute in ["0", "2"]:
self.ok("insert --timestamp -f --rate 120 /newton/prep"
" tests/data/prep-20120323T100" + minute)
def test_11_destroy(self):
# Delete records
@@ -715,6 +723,9 @@ class TestCmdline(object):
self.fail("destroy /no/such/stream")
self.contain("No stream at path")
self.fail("destroy -R /no/such/stream")
self.contain("No stream at path")
self.fail("destroy asdfasdf")
self.contain("No stream at path")
@@ -728,8 +739,14 @@ class TestCmdline(object):
self.ok("list --detail")
lines_(self.captured, 7)
# Delete some
self.ok("destroy /newton/prep")
# Fail to destroy because intervals still present
self.fail("destroy /newton/prep")
self.contain("all intervals must be removed")
self.ok("list --detail")
lines_(self.captured, 7)
# Destroy for real
self.ok("destroy -R /newton/prep")
self.ok("list")
self.match("/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n")
@@ -740,7 +757,8 @@ class TestCmdline(object):
self.ok("destroy /newton/raw")
self.ok("create /newton/raw uint16_6")
self.ok("destroy /newton/raw")
# Specify --remove with no data
self.ok("destroy --remove /newton/raw")
self.ok("list")
self.match("")
@@ -815,7 +833,7 @@ class TestCmdline(object):
# Now recreate the data one more time and make sure there are
# fewer files.
self.ok("destroy /newton/prep")
self.ok("destroy --remove /newton/prep")
self.fail("destroy /newton/prep") # already destroyed
self.ok("create /newton/prep float32_8")
os.environ['TZ'] = "UTC"
@@ -826,14 +844,16 @@ class TestCmdline(object):
for (dirpath, dirnames, filenames) in os.walk(testdb):
nfiles += len(filenames)
lt_(nfiles, 50)
self.ok("destroy /newton/prep") # destroy again
self.ok("destroy -R /newton/prep") # destroy again
def test_14_remove_files(self):
# Test BulkData's ability to remove when data is split into
# multiple files. Should be a fairly comprehensive test of
# remove functionality.
# Also limit max_removals, to cover more functionality.
server_stop()
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
server_start(max_removals = 4321,
bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 })
# Insert data. Just for fun, insert out of order
@@ -974,8 +994,8 @@ class TestCmdline(object):
self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
self.ok("destroy /diff/1")
self.ok("destroy /diff/2")
self.ok("destroy -R /diff/1")
self.ok("destroy -R /diff/2")
def test_16_rename(self):
# Test renaming. Force file size smaller so we get more files
@@ -1039,7 +1059,7 @@ class TestCmdline(object):
self.fail("rename /foo/bar /xxx/yyy/zzz/www")
self.contain("path is subdir of existing node")
self.ok("rename /foo/bar /xxx/yyy/mmm")
self.ok("destroy /xxx/yyy/zzz")
self.ok("destroy -R /xxx/yyy/zzz")
check_path("xxx", "yyy", "mmm")
# Extract it at the final path
@@ -1047,7 +1067,7 @@ class TestCmdline(object):
"--end '2012-03-23 10:04:01'")
eq_(self.captured, extract_before)
self.ok("destroy /xxx/yyy/mmm")
self.ok("destroy -R /xxx/yyy/mmm")
# Make sure temporary rename dirs weren't left around
for (dirpath, dirnames, filenames) in os.walk(testdb):

View File

@@ -385,7 +385,6 @@ class TestIntervalSpeed:
def test_interval_speed(self):
import yappi
import time
import testutil.aplotter as aplotter
import random
import math
@@ -406,6 +405,5 @@ class TestIntervalSpeed:
speed/j,
speed / (j*math.log(j))) # should be constant
speeds[j] = speed
aplotter.plot(speeds.keys(), speeds.values(), plot_slope=True)
yappi.stop()
yappi.print_stats(sort_type=yappi.SORTTYPE_TTOT, limit=10)

View File

@@ -1,61 +0,0 @@
import nilmdb
from nilmdb.utils.printf import *
import nose
from nose.tools import *
from nose.tools import assert_raises
import threading
import time
from testutil.helpers import *
def func_with_callback(a, b, callback):
callback(a)
callback(b)
callback(a+b)
return "return value"
class TestIteratorizer(object):
def test(self):
# First try it with a normal callback
self.result = ""
def cb(x):
self.result += str(x)
func_with_callback(1, 2, cb)
eq_(self.result, "123")
# Now make it an iterator
result = ""
f = lambda x: func_with_callback(1, 2, x)
with nilmdb.utils.Iteratorizer(f) as it:
for i in it:
result += str(i)
eq_(result, "123")
eq_(it.retval, "return value")
# Make sure things work when an exception occurs
result = ""
with nilmdb.utils.Iteratorizer(
lambda x: func_with_callback(1, "a", x)) as it:
with assert_raises(TypeError) as e:
for i in it:
result += str(i)
eq_(result, "1a")
# Now try to trigger the case where we stop iterating
# mid-generator, and expect the iteratorizer to clean up after
# itself. This doesn't have a particular result in the test,
# but gains coverage.
def foo():
with nilmdb.utils.Iteratorizer(f) as it:
it.next()
foo()
eq_(it.retval, None)
# Do the same thing when the curl hack is applied
def foo():
with nilmdb.utils.Iteratorizer(f, curl_hack = True) as it:
it.next()
foo()
eq_(it.retval, None)

View File

@@ -28,9 +28,6 @@ class Test00Nilmdb(object): # named 00 so it runs first
def test_NilmDB(self):
recursive_unlink(testdb)
with assert_raises(IOError):
nilmdb.server.NilmDB("/nonexistant-db/foo")
db = nilmdb.server.NilmDB(testdb)
db.close()
db = nilmdb.server.NilmDB(testdb)

View File

@@ -18,7 +18,7 @@ class TestPrintf(object):
printf("hello, world: %d", 123)
fprintf(test2, "hello too: %d", 123)
test3 = sprintf("hello three: %d", 123)
except:
except Exception:
sys.stdout = old_stdout
raise
sys.stdout = old_stdout

View File

@@ -1,419 +0,0 @@
#-----------------------------------------------
#aplotter.py - ascii art function plotter
#Copyright (c) 2006, Imri Goldberg
#All rights reserved.
#
#Redistribution and use in source and binary forms,
#with or without modification, are permitted provided
#that the following conditions are met:
#
# * Redistributions of source code must retain the
# above copyright notice, this list of conditions
# and the following disclaimer.
# * Redistributions in binary form must reproduce the
# above copyright notice, this list of conditions
# and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of the <ORGANIZATION> nor the names of
# its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
#THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
#AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
#ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
#LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
#DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
#SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
#CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
#OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
#OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#-----------------------------------------------
import math
EPSILON = 0.000001
def transposed(mat):
result = []
for i in xrange(len(mat[0])):
result.append([x[i] for x in mat])
return result
def y_reversed(mat):
result = []
for i in range(len(mat)):
result.append(list(reversed(mat[i])))
return result
def sign(x):
if 0<x:
return 1
if 0 == x:
return 0
return -1
class Plotter(object):
class PlotData(object):
def __init__(self, x_size, y_size, min_x, max_x, min_y, max_y, x_mod, y_mod):
self.x_size = x_size
self.y_size = y_size
self.min_x = min_x
self.max_x = max_x
self.min_y = min_y
self.max_y = max_y
self.x_mod = x_mod
self.y_mod = y_mod
self.x_step = float(max_x - min_x)/float(self.x_size)
self.y_step = float(max_y - min_y)/float(self.y_size)
self.inv_x_step = 1/self.x_step
self.inv_y_step = 1/self.y_step
self.ratio = self.y_step / self.x_step
def __repr__(self):
s = "size: %s, bl: %s, tr: %s, step: %s" % ((self.x_size, self.y_size), (self.min_x, self.min_y), (self.max_x, self.max_y),
(self.x_step, self.y_step))
return s
def __init__(self, **kwargs):
self.x_size = kwargs.get("x_size", 80)
self.y_size = kwargs.get("y_size", 20)
self.will_draw_axes = kwargs.get("draw_axes", True)
self.new_line = kwargs.get("newline", "\n")
self.dot = kwargs.get("dot", "*")
self.plot_slope = kwargs.get("plot_slope", True)
self.x_margin = kwargs.get("x_margin", 0.05)
self.y_margin = kwargs.get("y_margin", 0.1)
self.will_plot_labels = kwargs.get("plot_labels", True)
@staticmethod
def get_symbol_by_slope(slope, default_symbol):
draw_symbol = default_symbol
if slope > math.tan(3*math.pi/8):
draw_symbol = "|"
elif slope > math.tan(math.pi/8) and slope < math.tan(3*math.pi/8):
draw_symbol = "/"
elif abs(slope) < math.tan(math.pi/8):
draw_symbol = "-"
elif slope < math.tan(-math.pi/8) and slope > math.tan(-3*math.pi/8):
draw_symbol = "\\"
elif slope < math.tan(-3*math.pi/8):
draw_symbol = "|"
return draw_symbol
def plot_labels(self, output_buffer, plot_data):
if plot_data.y_size < 2:
return
margin_factor = 1
do_plot_x_label = True
do_plot_y_label = True
x_str = "%+g"
if plot_data.x_size < 16:
do_plot_x_label = False
elif plot_data.x_size < 23:
x_str = "%+.2g"
y_str = "%+g"
if plot_data.x_size < 8:
do_plot_y_label = False
elif plot_data.x_size < 11:
y_str = "%+.2g"
act_min_x = (plot_data.min_x + plot_data.x_mod*margin_factor)
act_max_x = (plot_data.max_x - plot_data.x_mod*margin_factor)
act_min_y = (plot_data.min_y + plot_data.y_mod*margin_factor)
act_max_y = (plot_data.max_y - plot_data.y_mod*margin_factor)
if abs(act_min_x) < 1:
min_x_str = "%+.2g" % act_min_x
else:
min_x_str = x_str % act_min_x
if abs(act_max_x) < 1:
max_x_str = "%+.2g" % act_max_x
else:
max_x_str = x_str % act_max_x
if abs(act_min_y) < 1:
min_y_str = "%+.2g" % act_min_y
else:
min_y_str = y_str % act_min_y
if abs(act_max_y) < 1:
max_y_str = "%+.2g" % act_max_y
else:
max_y_str = y_str % act_max_y
min_x_coord = self.get_coord(act_min_x,plot_data.min_x,plot_data.x_step)
max_x_coord = self.get_coord(act_max_x,plot_data.min_x,plot_data.x_step)
min_y_coord = self.get_coord(act_min_y,plot_data.min_y,plot_data.y_step)
max_y_coord = self.get_coord(act_max_y,plot_data.min_y,plot_data.y_step)
#print plot_data
y_zero_coord = self.get_coord(0, plot_data.min_y, plot_data.y_step)
#if plot_data.min_x < 0 and plot_data.max_x > 0:
x_zero_coord = self.get_coord(0, plot_data.min_x, plot_data.x_step)
#else:
#pass
output_buffer[x_zero_coord][min_y_coord] = "+"
output_buffer[x_zero_coord][max_y_coord] = "+"
output_buffer[min_x_coord][y_zero_coord] = "+"
output_buffer[max_x_coord][y_zero_coord] = "+"
if do_plot_x_label:
for i,c in enumerate(min_x_str):
output_buffer[min_x_coord+i][y_zero_coord-1] = c
for i,c in enumerate(max_x_str):
output_buffer[max_x_coord+i-len(max_x_str)][y_zero_coord-1] = c
if do_plot_y_label:
for i,c in enumerate(max_y_str):
output_buffer[x_zero_coord+i][max_y_coord] = c
for i,c in enumerate(min_y_str):
output_buffer[x_zero_coord+i][min_y_coord] = c
def plot_data(self, xy_seq, output_buffer, plot_data):
if self.plot_slope:
xy_seq = list(xy_seq)
#sort according to the x coord
xy_seq.sort(key = lambda c: c[0])
prev_p = xy_seq[0]
e_xy_seq = enumerate(xy_seq)
e_xy_seq.next()
for i,(x,y) in e_xy_seq:
draw_symbol = self.dot
line_drawn = self.plot_line(prev_p, (x,y), output_buffer, plot_data)
prev_p = (x,y)
if not line_drawn:
if i > 0 and i < len(xy_seq)-1:
px,py = xy_seq[i-1]
nx,ny = xy_seq[i+1]
if abs(nx-px) > EPSILON:
slope = (1.0/plot_data.ratio)*(ny-py)/(nx-px)
draw_symbol = self.get_symbol_by_slope(slope, draw_symbol)
if x < plot_data.min_x or x >= plot_data.max_x or y < plot_data.min_y or y >= plot_data.max_y:
continue
x_coord = self.get_coord(x, plot_data.min_x, plot_data.x_step)
y_coord = self.get_coord(y, plot_data.min_y, plot_data.y_step)
if x_coord >= 0 and x_coord < len(output_buffer) and y_coord >= 0 and y_coord < len(output_buffer[0]):
if self.draw_axes:
if y_coord == self.get_coord(0, plot_data.min_y, plot_data.y_step) and draw_symbol == "-":
draw_symbol = "="
output_buffer[x_coord][y_coord] = draw_symbol
else:
for x,y in xy_seq:
if x < plot_data.min_x or x >= plot_data.max_x or y < plot_data.min_y or y >= plot_data.max_y:
continue
x_coord = self.get_coord(x, plot_data.min_x, plot_data.x_step)
y_coord = self.get_coord(y, plot_data.min_y, plot_data.y_step)
if x_coord >= 0 and x_coord < len(output_buffer) and y_coord > 0 and y_coord < len(output_buffer[0]):
output_buffer[x_coord][y_coord] = self.dot
def plot_line(self, start, end, output_buffer, plot_data):
start_coord = self.get_coord(start[0], plot_data.min_x, plot_data.x_step), self.get_coord(start[1], plot_data.min_y, plot_data.y_step)
end_coord = self.get_coord(end[0], plot_data.min_x, plot_data.x_step), self.get_coord(end[1], plot_data.min_y, plot_data.y_step)
x0,y0 = start_coord
x1,y1 = end_coord
if (x0,y0) == (x1,y1):
return True
clipped_line = clip_line(start, end, (plot_data.min_x, plot_data.min_y), (plot_data.max_x, plot_data.max_y))
if clipped_line != None:
start,end = clipped_line
else:
return False
start_coord = self.get_coord(start[0], plot_data.min_x, plot_data.x_step), self.get_coord(start[1], plot_data.min_y, plot_data.y_step)
end_coord = self.get_coord(end[0], plot_data.min_x, plot_data.x_step), self.get_coord(end[1], plot_data.min_y, plot_data.y_step)
x0,y0 = start_coord
x1,y1 = end_coord
if (x0,y0) == (x1,y1):
return True
x_zero_coord = self.get_coord(0, plot_data.min_x, plot_data.x_step)
y_zero_coord = self.get_coord(0, plot_data.min_y, plot_data.y_step)
if start[0]-end[0] == 0:
draw_symbol = "|"
else:
slope = (1.0/plot_data.ratio)*(end[1]-start[1])/(end[0]-start[0])
draw_symbol = self.get_symbol_by_slope(slope, self.dot)
try:
delta = x1-x0, y1-y0
if abs(delta[0])>abs(delta[1]):
s = sign(delta[0])
slope = float(delta[1])/delta[0]
for i in range(0,abs(int(delta[0]))):
cur_draw_symbol = draw_symbol
x = i*s
cur_y = int(y0+slope*x)
if self.draw_axes and cur_y == y_zero_coord and draw_symbol == "-":
cur_draw_symbol = "="
output_buffer[x0+x][cur_y] = cur_draw_symbol
else:
s = sign(delta[1])
slope = float(delta[0])/delta[1]
for i in range(0,abs(int(delta[1]))):
y = i*s
cur_draw_symbol = draw_symbol
cur_y = y0+y
if self.draw_axes and cur_y == y_zero_coord and draw_symbol == "-":
cur_draw_symbol = "="
output_buffer[int(x0+slope*y)][cur_y] = cur_draw_symbol
except:
print start, end
print start_coord, end_coord
print plot_data
raise
return False
def plot_single(self, seq, min_x = None, max_x = None, min_y = None, max_y = None):
return self.plot_double(range(len(seq)),seq, min_x, max_x, min_y, max_y)
def plot_double(self, x_seq, y_seq, min_x = None, max_x = None, min_y = None, max_y = None):
if min_x == None:
min_x = min(x_seq)
if max_x == None:
max_x = max(x_seq)
if min_y == None:
min_y = min(y_seq)
if max_y == None:
max_y = max(y_seq)
if max_y == min_y:
max_y += 1
x_mod = (max_x-min_x)*self.x_margin
y_mod = (max_y-min_y)*self.y_margin
min_x-=x_mod
max_x+=x_mod
min_y-=y_mod
max_y+=y_mod
plot_data = self.PlotData(self.x_size, self.y_size, min_x, max_x, min_y, max_y, x_mod, y_mod)
output_buffer = [[" "]*self.y_size for i in range(self.x_size)]
if self.will_draw_axes:
self.draw_axes(output_buffer, plot_data)
self.plot_data(zip(x_seq, y_seq), output_buffer, plot_data)
if self.will_plot_labels:
self.plot_labels(output_buffer, plot_data)
trans_result = transposed(y_reversed(output_buffer))
result = self.new_line.join(["".join(row) for row in trans_result])
return result
def draw_axes(self, output_buffer, plot_data):
draw_x = False
draw_y = False
if plot_data.min_x <= 0 and plot_data.max_x > 0:
draw_y = True
zero_x = self.get_coord(0, plot_data.min_x, plot_data.x_step)
for y in xrange(plot_data.y_size):
output_buffer[zero_x][y] = "|"
if plot_data.min_y <= 0 and plot_data.max_y > 0:
draw_x = True
zero_y = self.get_coord(0, plot_data.min_y, plot_data.y_step)
for x in xrange(plot_data.x_size):
output_buffer[x][zero_y] = "-"
if draw_x and draw_y:
output_buffer[zero_x][zero_y] = "+"
@staticmethod
def get_coord(val, min, step):
result = int((val - min)/step)
return result
def clip_line(line_pt_1, line_pt_2, rect_bottom_left, rect_top_right):
ts = [0.0,1.0]
if line_pt_1[0] == line_pt_2[0]:
return ((line_pt_1[0], max(min(line_pt_1[1], line_pt_2[1]), rect_bottom_left[1])),
(line_pt_1[0], min(max(line_pt_1[1], line_pt_2[1]), rect_top_right[1])))
if line_pt_1[1] == line_pt_2[1]:
return ((max(min(line_pt_1[0], line_pt_2[0]), rect_bottom_left[0]), line_pt_1[1]),
(min(max(line_pt_1[0], line_pt_2[0]), rect_top_right[0]), line_pt_1[1]))
if ((rect_bottom_left[0] <= line_pt_1[0] and line_pt_1[0] < rect_top_right[0]) and
(rect_bottom_left[1] <= line_pt_1[1] and line_pt_1[1] < rect_top_right[1]) and
(rect_bottom_left[0] <= line_pt_2[0] and line_pt_2[0] < rect_top_right[0]) and
(rect_bottom_left[1] <= line_pt_2[1] and line_pt_2[1] < rect_top_right[1])):
return line_pt_1, line_pt_2
ts.append( float(rect_bottom_left[0]-line_pt_1[0])/(line_pt_2[0]-line_pt_1[0]) )
ts.append( float(rect_top_right[0]-line_pt_1[0])/(line_pt_2[0]-line_pt_1[0]) )
ts.append( float(rect_bottom_left[1]-line_pt_1[1])/(line_pt_2[1]-line_pt_1[1]) )
ts.append( float(rect_top_right[1]-line_pt_1[1])/(line_pt_2[1]-line_pt_1[1]) )
ts.sort()
if ts[2] < 0 or ts[2] >= 1 or ts[3] < 0 or ts[2]>= 1:
return None
result = [(pt_1 + t*(pt_2-pt_1)) for t in (ts[2],ts[3]) for (pt_1, pt_2) in zip(line_pt_1, line_pt_2)]
return (result[0],result[1]), (result[2], result[3])
def plot(*args,**flags):
limit_flags_names = set(["min_x","min_y","max_x","max_y"])
limit_flags = dict([(n,flags[n]) for n in limit_flags_names & set(flags)])
settting_flags = dict([(n,flags[n]) for n in set(flags) - limit_flags_names])
if len(args) == 1:
p = Plotter(**settting_flags)
print p.plot_single(args[0],**limit_flags)
elif len(args) == 2:
p = Plotter(**settting_flags)
print p.plot_double(args[0],args[1],**limit_flags)
else:
raise NotImplementedError("can't draw multiple graphs yet")
__all__ = ["Plotter","plot"]