Compare commits

...

33 Commits

Author SHA1 Message Date
f0304b4c00 Merge branch 'binary' into HEAD 2013-04-07 18:08:10 -04:00
60594ca58e Numpy is required for tests now, due to nilmdb.client.numpyclient
Still allow installation without it, though.
2013-04-07 18:05:43 -04:00
c7f2df4abc Add nilmdb.client.numpyclient.NumpyClient with stream_extract_numpy
This is a subclass of nilmdb.client.client.Client that adds numpy
specific routines, which should be a lot faster.
2013-04-07 17:43:52 -04:00
5b7409f802 Add binary extract to client, server, nilmdb, bulkdata, and rocket. 2013-04-07 16:06:52 -04:00
06038062a2 Fix error in time parsing 2013-04-06 19:12:17 -04:00
ae9fe89759 Parse timestamps with '@' before any other checks 2013-04-04 14:43:18 -04:00
04def60021 Include stream path in "no such stream" errors 2013-04-02 21:06:49 -04:00
9ce0f69dff Add "--delete" option to "nilmtool metadata" tool
This is the same as "--update" with an empty string as the value.
2013-04-02 16:07:28 -04:00
90c3be91c4 Natural sort for streams in client.stream_list 2013-04-02 14:37:32 -04:00
ebccfb3531 Fix stream renaming when the new path is a parent of the old 2013-04-01 19:25:17 -04:00
e006f1d02e Change default URL to http://localhost/nilmdb/ 2013-04-01 18:04:31 -04:00
5292319802 server: consolidate time processing and checks 2013-03-30 21:16:40 -04:00
173121ca87 Switch URL to one that should definitely not resolve 2013-03-30 17:31:35 -04:00
26bab031bd Add StreamInserter.send() to trigger intermediate block send 2013-03-30 17:30:43 -04:00
b5fefffa09 Use a global cached server object for WSGI app
This is instead of caching it inside nilmdb.server.wsgi_application.
Might make things work a bit better in case the web server decides
to call wsgi_application multiple times.
2013-03-30 15:56:57 -04:00
dccb3e370a WSGI config needs to specify application group
This ensures that the same Python sub-instance handles the request,
even if it's coming in from two different virtual hosts.
2013-03-30 15:56:02 -04:00
95ca55aa7e Print out WSGI environment on DB init failure 2013-03-30 15:55:41 -04:00
e01813f29d Fix wsgi documentation 2013-03-25 13:52:32 -04:00
7f41e117a2 Fix tabs 2013-03-25 13:44:03 -04:00
dd5fc806e5 Restructure WSGI app to regenerate error on each call, if needed
This way, errors like "database already locked" can be fixed and the
page reloaded, without needing to restart Apache.
2013-03-24 21:52:11 -04:00
f8ca8d31e6 Remove Iteratorizer, as it's no longer needed 2013-03-24 21:31:03 -04:00
ed89d803f0 Remove aplotter code 2013-03-24 21:29:09 -04:00
3d24092cd2 Replace bare 'except:' with 'except: Exception'
Otherwise we might inadvertently catch SystemExit or KeyboardExit or
something we don't want to catch.
2013-03-24 21:28:01 -04:00
304bb43d85 Move lockfile out of data dir, to avoid stream tree conflicts 2013-03-24 21:23:45 -04:00
59a79a30a5 Remove lockfile when done.
This isn't necessary for correct behavior: if the database is killed,
the old flock() will go away when the file descriptor gets closed.
2013-03-24 21:20:47 -04:00
c0d450d39e Add locking mechanism to avoid multiple servers on one DB 2013-03-24 21:20:20 -04:00
6f14d609b2 Fix issue where bulkdata was accidentally closed 2013-03-24 21:16:18 -04:00
77ef87456f Improve WSGI application support, fix docs 2013-03-24 21:16:03 -04:00
32d6af935c Improve wsgi docs 2013-03-22 19:17:36 -04:00
6af3a6fc41 Add WSGI application support and documentation 2013-03-22 19:14:34 -04:00
f8a06fb3b7 Clarify default DB path in nilmdb_server.py help text 2013-03-22 15:09:37 -04:00
e790bb9e8a Fix test failure when tests are run as root 2013-03-21 14:33:02 -04:00
89be6f5931 Add option to include interval start/end markup on extract
When enabled, lines like "# interval-start 1234567890123456" and "#
interval-end 1234567890123456" will be added to the data output.  Note
that there may be an "interval-end" timestamp followed by an identical
"interval-start" timestamp, if the response at the nilmdb level was
split up into multiple chunks.

In general, assume contiguous data if previous_interval_end ==
new_interval_start.
2013-03-19 14:23:33 -04:00
32 changed files with 632 additions and 678 deletions

View File

@@ -24,3 +24,5 @@ Usage:
nilmdb-server --help
nilmtool --help
See docs/wsgi.md for info on setting up a WSGI application in Apache.

32
docs/wsgi.md Normal file
View File

@@ -0,0 +1,32 @@
WSGI Application in Apache
--------------------------
Install `apache2` and `libapache2-mod-wsgi`
We'll set up the database server at URL `http://myhost.com/nilmdb`.
The database will be stored in `/home/nilm/db`, and the process will
run as user `nilm`, group `nilm`.
First, create a WSGI script `/home/nilm/nilmdb.wsgi` containing:
import nilmdb.server
application = nilmdb.server.wsgi_application("/home/nilm/db", "/nilmdb")
The first parameter is the local filesystem path, and the second
parameter is the path part of the URL.
Then, set up Apache with a configuration like:
<VirtualHost>
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
WSGIApplicationGroup nilmdb-appgroup
WSGIProcessGroup nilmdb-procgroup
WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm
# Access control example:
<Location /nilmdb>
Order deny,allow
Deny from all
Allow from 1.2.3.4
</Location>
</VirtualHost>

View File

@@ -6,6 +6,7 @@ import nilmdb.utils
import nilmdb.client.httpclient
from nilmdb.client.errors import ClientError
import re
import time
import simplejson as json
import contextlib
@@ -65,7 +66,12 @@ class Client(object):
params["layout"] = layout
if extended:
params["extended"] = 1
return self.http.get("stream/list", params)
def sort_streams_nicely(x):
"""Human-friendly sort (/stream/2 before /stream/10)"""
num = lambda t: int(t) if t.isdigit() else t
key = lambda k: [ num(c) for c in re.split('([0-9]+)', k[0]) ]
return sorted(x, key = key)
return sort_streams_nicely(self.http.get("stream/list", params))
def stream_get_metadata(self, path, keys = None):
params = { "path": path }
@@ -171,7 +177,8 @@ class Client(object):
params["end"] = timestamp_to_string(end)
return self.http.get_gen("stream/intervals", params)
def stream_extract(self, path, start = None, end = None, count = False):
def stream_extract(self, path, start = None, end = None,
count = False, markup = False, binary = False):
"""
Extract data from a stream. Returns a generator that yields
lines of ASCII-formatted data that matches the database
@@ -179,6 +186,14 @@ class Client(object):
Specify count = True to return a count of matching data points
rather than the actual data. The output format is unchanged.
Specify markup = True to include comments in the returned data
that indicate interval starts and ends.
Specify binary = True to return chunks of raw binary data,
rather than lines of ASCII-formatted data. Raw binary data
is always little-endian and matches the database types
(including a uint64 timestamp).
"""
params = {
"path": path,
@@ -189,7 +204,11 @@ class Client(object):
params["end"] = timestamp_to_string(end)
if count:
params["count"] = 1
return self.http.get_gen("stream/extract", params)
if markup:
params["markup"] = 1
if binary:
params["binary"] = 1
return self.http.get_gen("stream/extract", params, binary = binary)
def stream_count(self, path, start = None, end = None):
"""
@@ -307,6 +326,11 @@ class StreamInserter(object):
part of a new interval and there may be a gap left in-between."""
self._send_block(final = True)
def send(self):
"""Send any data that we might have buffered up. Does not affect
any other treatment of timestamps or endpoints."""
self._send_block(final = False)
def _get_first_noncomment(self, block):
"""Return the (start, end) indices of the first full line in
block that isn't a comment, or raise IndexError if

View File

@@ -16,7 +16,7 @@ class HTTPClient(object):
reparsed = urlparse.urlparse(baseurl).geturl()
if '://' not in reparsed:
reparsed = urlparse.urlparse("http://" + baseurl).geturl()
self.baseurl = reparsed
self.baseurl = reparsed.rstrip('/') + '/'
# Build Requests session object, enable SSL verification
self.session = requests.Session()
@@ -110,7 +110,8 @@ class HTTPClient(object):
return self._req("PUT", url, params, data)
# Generator versions that return data one line at a time.
def _req_gen(self, method, url, query = None, body = None, headers = None):
def _req_gen(self, method, url, query = None, body = None,
headers = None, binary = False):
"""
Make a request and return a generator that gives back strings
or JSON decoded lines of the body data, or raise an error if
@@ -118,16 +119,19 @@ class HTTPClient(object):
"""
(response, isjson) = self._do_req(method, url, query, body,
stream = True, headers = headers)
if isjson:
if binary:
for chunk in response.iter_content(chunk_size = 65536):
yield chunk
elif isjson:
for line in response.iter_lines():
yield json.loads(line)
else:
for line in response.iter_lines():
yield line
def get_gen(self, url, params = None):
def get_gen(self, url, params = None, binary = False):
"""Simple GET (parameters in URL) returning a generator"""
return self._req_gen("GET", url, params)
return self._req_gen("GET", url, params, binary = binary)
# Not much use for a POST or PUT generator, since they don't
# return much data.

View File

@@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
"""Provide a NumpyClient class that is based on normal Client, but has
additional methods for extracting and inserting data via Numpy arrays."""
import nilmdb.utils
import nilmdb.client.client
import nilmdb.client.httpclient
from nilmdb.client.errors import ClientError
import contextlib
from nilmdb.utils.time import timestamp_to_string, string_to_timestamp
import numpy
import cStringIO
def layout_to_dtype(layout):
ltype = layout.split('_')[0]
lcount = int(layout.split('_')[1])
if ltype.startswith('int'):
atype = '<i' + str(int(ltype[3:]) / 8)
elif ltype.startswith('uint'):
atype = '<u' + str(int(ltype[4:]) / 8)
elif ltype.startswith('float'):
atype = '<f' + str(int(ltype[5:]) / 8)
else:
raise ValueError("bad layout")
return numpy.dtype([('timestamp', '<i8'), ('data', atype, lcount)])
class NumpyClient(nilmdb.client.client.Client):
"""Subclass of nilmdb.client.Client that adds additional methods for
extracting and inserting data via Numpy arrays."""
def stream_extract_numpy(self, path, start = None, end = None,
layout = None, maxrows = 100000,
structured = False):
"""
Extract data from a stream. Returns a generator that yields
Numpy arrays of up to 'maxrows' of data each.
If 'layout' is None, it is read using stream_info.
If 'structured' is False, all data is converted to float64
and returned in a flat 2D array. Otherwise, data is returned
as a structured dtype in a 1D array.
"""
if layout is None:
streams = self.stream_list(path)
if len(streams) != 1:
raise ClientError("can't get layout for path: " + path)
layout = streams[0][1]
dtype = layout_to_dtype(layout)
def to_numpy(data):
a = numpy.fromstring(data, dtype)
if structured:
return a
return numpy.c_[a['timestamp'], a['data']]
chunks = []
total_len = 0
maxsize = dtype.itemsize * maxrows
for data in self.stream_extract(path, start, end, binary = True):
# Add this block of binary data
chunks.append(data)
total_len += len(data)
# See if we have enough to make the requested Numpy array
while total_len >= maxsize:
assembled = "".join(chunks)
total_len -= maxsize
chunks = [ assembled[maxsize:] ]
block = assembled[:maxsize]
yield to_numpy(block)
if total_len:
yield to_numpy("".join(chunks))

View File

@@ -81,7 +81,7 @@ class Cmdline(object):
def __init__(self, argv = None):
self.argv = argv or sys.argv[1:]
self.client = None
self.def_url = os.environ.get("NILMDB_URL", "http://localhost:12380")
self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
self.subcmd = {}
self.complete = Complete()

View File

@@ -29,6 +29,8 @@ def setup(self, sub):
group.add_argument("-a", "--annotate", action="store_true",
help="Include comments with some information "
"about the stream")
group.add_argument("-m", "--markup", action="store_true",
help="Include comments with interval starts and ends")
group.add_argument("-T", "--timestamp-raw", action="store_true",
help="Show raw timestamps in annotated information")
group.add_argument("-c", "--count", action="store_true",
@@ -61,7 +63,8 @@ def cmd_extract(self):
for dataline in self.client.stream_extract(self.args.path,
self.args.start,
self.args.end,
self.args.count):
self.args.count,
self.args.markup):
if self.args.bare and not self.args.count:
# Strip timestamp (first element). Doesn't make sense
# if we are only returning a count.

View File

@@ -9,7 +9,8 @@ def setup(self, sub):
a stream.
""",
usage="%(prog)s path [-g [key ...] | "
"-s key=value [...] | -u key=value [...]]")
"-s key=value [...] | -u key=value [...]] | "
"-d [key ...]")
cmd.set_defaults(handler = cmd_metadata)
group = cmd.add_argument_group("Required arguments")
@@ -30,6 +31,9 @@ def setup(self, sub):
help="Update metadata using provided "
"key=value pairs",
).completer = self.complete.meta_keyval
exc.add_argument("-d", "--delete", nargs="*", metavar="key",
help="Delete metadata for specified keys (default all)",
).completer = self.complete.meta_key
return cmd
def cmd_metadata(self):
@@ -56,6 +60,16 @@ def cmd_metadata(self):
handler(self.args.path, data)
except nilmdb.client.ClientError as e:
self.die("error setting/updating metadata: %s", str(e))
elif self.args.delete is not None:
# Delete (by setting values to empty strings)
keys = self.args.delete or None
try:
data = self.client.stream_get_metadata(self.args.path, keys)
for key in data:
data[key] = ""
self.client.stream_update_metadata(self.args.path, data)
except nilmdb.client.ClientError as e:
self.die("error deleting metadata: %s", str(e))
else:
# Get (or unspecified)
keys = self.args.get or None
@@ -64,7 +78,7 @@ def cmd_metadata(self):
except nilmdb.client.ClientError as e:
self.die("error getting metadata: %s", str(e))
for key, value in sorted(data.items()):
# Omit nonexistant keys
# Print nonexistant keys as having empty value
if value is None:
value = ""
printf("%s=%s\n", key, value)

View File

@@ -22,7 +22,7 @@ def main():
group.add_argument('-p', '--port', help = 'Listen on the given port',
type = int, default = 12380)
group.add_argument('-d', '--database', help = 'Database directory',
default = os.path.join(os.getcwd(), "db"))
default = "./db")
group.add_argument('-q', '--quiet', help = 'Silence output',
action = 'store_true')
group.add_argument('-t', '--traceback',

View File

@@ -17,5 +17,5 @@ except (ImportError, TypeError): # pragma: no cover
pass
from nilmdb.server.nilmdb import NilmDB
from nilmdb.server.server import Server
from nilmdb.server.server import Server, wsgi_application
from nilmdb.server.errors import NilmDBError, StreamError, OverlapError

View File

@@ -14,6 +14,7 @@ import re
import sys
import tempfile
import nilmdb.utils.lock
from . import rocket
# Up to 256 open file descriptors at any given time.
@@ -26,6 +27,8 @@ class BulkData(object):
def __init__(self, basepath, **kwargs):
self.basepath = basepath
self.root = os.path.join(self.basepath, "data")
self.lock = self.root + ".lock"
self.lockfile = None
# Tuneables
if "file_size" in kwargs:
@@ -44,8 +47,22 @@ class BulkData(object):
if not os.path.isdir(self.root):
os.mkdir(self.root)
# Create the lock
self.lockfile = open(self.lock, "w")
if not nilmdb.utils.lock.exclusive_lock(self.lockfile):
raise IOError('database at "' + self.basepath +
'" is already locked by another process')
def close(self):
self.getnode.cache_remove_all()
if self.lockfile:
nilmdb.utils.lock.exclusive_unlock(self.lockfile)
self.lockfile.close()
try:
os.unlink(self.lock)
except OSError: # pragma: no cover
pass
self.lockfile = None
def _encode_filename(self, path):
# Encode all paths to UTF-8, regardless of sys.getfilesystemencoding(),
@@ -62,7 +79,12 @@ class BulkData(object):
if Table.exists(ospath):
raise ValueError("stream already exists at this path")
if os.path.isdir(ospath):
raise ValueError("subdirs of this path already exist")
# Look for any files in subdirectories. Fully empty subdirectories
# are OK; they might be there during a rename
for (root, dirs, files) in os.walk(ospath):
if len(files):
raise ValueError(
"non-empty subdirs of this path already exist")
def _create_parents(self, unicodepath):
"""Verify the path name, and create parent directories if they
@@ -134,7 +156,7 @@ class BulkData(object):
# Open and cache it
self.getnode(unicodepath)
except:
except Exception:
exc_info = sys.exc_info()
try:
os.rmdir(ospath)
@@ -171,7 +193,6 @@ class BulkData(object):
# Basic checks
if oldospath == newospath:
raise ValueError("old and new paths are the same")
self._create_check_ospath(newospath)
# Move the table to a temporary location
tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root)
@@ -179,6 +200,9 @@ class BulkData(object):
os.rename(oldospath, tmppath)
try:
# Check destination path
self._create_check_ospath(newospath)
# Create parent dirs for new location
self._create_parents(newunicodepath)
@@ -371,7 +395,7 @@ class Table(object):
# Try deleting subdir, too
try:
os.rmdir(os.path.join(self.root, subdir))
except:
except Exception:
pass
# Cache open files
@@ -455,7 +479,7 @@ class Table(object):
# Success, so update self.nrows accordingly
self.nrows = tot_rows
def get_data(self, start, stop):
def get_data(self, start, stop, binary = False):
"""Extract data corresponding to Python range [n:m],
and returns a formatted string"""
if (start is None or
@@ -473,10 +497,13 @@ class Table(object):
if count > remaining:
count = remaining
f = self.file_open(subdir, filename)
if binary:
ret.append(f.extract_binary(offset, count))
else:
ret.append(f.extract_string(offset, count))
remaining -= count
row += count
return "".join(ret)
return b"".join(ret)
def __getitem__(self, row):
"""Extract timestamps from a row, with table[n] notation."""
@@ -504,7 +531,7 @@ class Table(object):
with open(cachefile, "rb") as f:
ranges = pickle.load(f)
cachefile_present = True
except:
except Exception:
ranges = []
cachefile_present = False

View File

@@ -12,6 +12,7 @@ Manages both the SQL database and the table storage backend.
from __future__ import absolute_import
import nilmdb.utils
from nilmdb.utils.printf import *
from nilmdb.utils.time import timestamp_to_string
from nilmdb.utils.interval import IntervalError
from nilmdb.server.interval import Interval, DBInterval, IntervalSet
@@ -105,7 +106,9 @@ class NilmDB(object):
try:
os.makedirs(self.basepath)
except OSError as e:
if e.errno != errno.EEXIST:
if e.errno != errno.EEXIST: # pragma: no cover
# (no coverage, because it's hard to trigger this case
# if tests are run as root)
raise IOError("can't create tree " + self.basepath)
# Our data goes inside it
@@ -116,8 +119,9 @@ class NilmDB(object):
self.con = sqlite3.connect(sqlfilename, check_same_thread = True)
try:
self._sql_schema_update()
finally: # pragma: no cover
except Exception: # pragma: no cover
self.data.close()
raise
# See big comment at top about the performance implications of this
self.con.execute("PRAGMA synchronous=NORMAL")
@@ -533,7 +537,8 @@ class NilmDB(object):
dbinterval.db_startpos,
dbinterval.db_endpos)
def stream_extract(self, path, start = None, end = None, count = False):
def stream_extract(self, path, start = None, end = None,
count = False, markup = False, binary = False):
"""
Returns (data, restart) tuple.
@@ -546,10 +551,17 @@ class NilmDB(object):
and a new request with a start time of 'restart' will fetch
the next block of data.
count, if true, means to not return raw data, but just the count
'count', if true, means to not return raw data, but just the count
of rows that would have been returned. This is much faster
than actually fetching the data. It is not limited by
max_results.
'markup', if true, indicates that returned data should be
marked with a comment denoting when a particular interval
starts, and another comment when an interval ends.
'binary', if true, means to return raw binary rather than
ASCII-formatted data.
"""
stream_id = self._stream_id(path)
table = self.data.getnode(path)
@@ -560,6 +572,8 @@ class NilmDB(object):
matched = 0
remaining = self.max_results
restart = None
if binary and (markup or count):
raise NilmDBError("binary mode can't be used with markup or count")
for interval in intervals.intersection(requested):
# Reading single rows from the table is too slow, so
# we use two bisections to find both the starting and
@@ -578,14 +592,26 @@ class NilmDB(object):
row_end = row_max
restart = table[row_max]
# Add markup
if markup:
result.append("# interval-start " +
timestamp_to_string(interval.start) + "\n")
# Gather these results up
result.append(table.get_data(row_start, row_end))
result.append(table.get_data(row_start, row_end, binary))
# Count them
remaining -= row_end - row_start
# Add markup, and exit if restart is set.
if restart is not None:
if markup:
result.append("# interval-end " +
timestamp_to_string(restart) + "\n")
break
if markup:
result.append("# interval-end " +
timestamp_to_string(interval.end) + "\n")
if count:
return matched

View File

@@ -527,6 +527,46 @@ err:
return NULL;
}
/****
* Extract to binary string containing raw little-endian binary data
*/
static PyObject *Rocket_extract_binary(Rocket *self, PyObject *args)
{
long count;
long offset;
if (!PyArg_ParseTuple(args, "ll", &offset, &count))
return NULL;
if (!self->file) {
PyErr_SetString(PyExc_Exception, "no file");
return NULL;
}
/* Seek to target location */
if (fseek(self->file, offset, SEEK_SET) < 0) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
uint8_t *str;
int len = count * self->binary_size;
str = malloc(len);
if (str == NULL) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
/* Data in the file is already in the desired little-endian
binary format, so just read it directly. */
if (fread(str, self->binary_size, count, self->file) != count) {
free(str);
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
PyObject *pystr = PyBytes_FromStringAndSize((char *)str, len);
free(str);
return pystr;
}
/****
* Extract timestamp
@@ -600,6 +640,12 @@ static PyMethodDef Rocket_methods[] = {
"Extract count rows of data from the file at offset offset.\n"
"Return an ascii formatted string according to the layout" },
{ "extract_binary",
(PyCFunction)Rocket_extract_binary, METH_VARARGS,
"extract_binary(self, offset, count)\n\n"
"Extract count rows of data from the file at offset offset.\n"
"Return a raw binary string of data matching the data layout." },
{ "extract_timestamp",
(PyCFunction)Rocket_extract_timestamp, METH_VARARGS,
"extract_timestamp(self, offset)\n\n"

View File

@@ -11,9 +11,11 @@ from nilmdb.utils.time import string_to_timestamp
import cherrypy
import sys
import os
import socket
import simplejson as json
import decorator
import psutil
import traceback
class NilmApp(object):
def __init__(self, db):
@@ -172,6 +174,21 @@ class Root(NilmApp):
class Stream(NilmApp):
"""Stream-specific operations"""
# Helpers
def _get_times(self, start_param, end_param):
(start, end) = (None, None)
if start_param is not None:
start = string_to_timestamp(start_param)
if end_param is not None:
end = string_to_timestamp(end_param)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError(
"400 Bad Request",
sprintf("start must precede end (%s >= %s)",
start_param, end_param))
return (start, end)
# /stream/list
# /stream/list?layout=float32_8
# /stream/list?path=/newton/prep&extended=1
@@ -300,16 +317,11 @@ class Stream(NilmApp):
body = cherrypy.request.body.read()
# Check path and get layout
streams = self.db.stream_list(path = path)
if len(streams) != 1:
raise cherrypy.HTTPError("404 Not Found", "No such stream")
if len(self.db.stream_list(path = path)) != 1:
raise cherrypy.HTTPError("404", "No such stream: " + path)
# Check limits
start = string_to_timestamp(start)
end = string_to_timestamp(end)
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
(start, end) = self._get_times(start, end)
# Pass the data directly to nilmdb, which will parse it and
# raise a ValueError if there are any problems.
@@ -331,14 +343,7 @@ class Stream(NilmApp):
the interval [start, end). Returns the number of data points
removed.
"""
if start is not None:
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
(start, end) = self._get_times(start, end)
total_removed = 0
while True:
(removed, restart) = self.db.stream_remove(path, start, end)
@@ -369,15 +374,7 @@ class Stream(NilmApp):
Note that the response type is the non-standard
'application/x-json-stream' for lack of a better option.
"""
if start is not None:
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
(start, end) = self._get_times(start, end)
if len(self.db.stream_list(path = path)) != 1:
raise cherrypy.HTTPError("404", "No such stream: " + path)
@@ -402,47 +399,56 @@ class Stream(NilmApp):
@cherrypy.expose
@chunked_response
@response_type("text/plain")
def extract(self, path, start = None, end = None, count = False):
def extract(self, path, start = None, end = None,
count = False, markup = False, binary = False):
"""
Extract data from backend database. Streams the resulting
entries as ASCII text lines separated by newlines. This may
make multiple requests to the nilmdb backend to avoid causing
it to block for too long.
Add count=True to return a count rather than actual data.
"""
if start is not None:
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
If 'count' is True, returns a count rather than actual data.
# Check parameters
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
If 'markup' is True, adds comments to the stream denoting each
interval's start and end timestamp.
If 'binary' is True, return raw binary data, rather than lines
of ASCII-formatted data. Raw binary data is always
little-endian and matches the database types (including a
uint64 timestamp).
"""
(start, end) = self._get_times(start, end)
# Check path and get layout
streams = self.db.stream_list(path = path)
if len(streams) != 1:
raise cherrypy.HTTPError("404 Not Found", "No such stream")
if len(self.db.stream_list(path = path)) != 1:
raise cherrypy.HTTPError("404", "No such stream: " + path)
if binary:
cherrypy.response.headers['Content-Type'] = (
"application/octet-stream")
if markup or count:
raise cherrypy.HTTPError("400", "can't mix binary and "
"markup or count modes")
@workaround_cp_bug_1200
def content(start, end, count):
def content(start, end):
# Note: disable chunked responses to see tracebacks from here.
if count:
matched = self.db.stream_extract(path, start, end, count)
matched = self.db.stream_extract(path, start, end,
count = True)
yield sprintf("%d\n", matched)
return
while True:
(data, restart) = self.db.stream_extract(path, start, end)
(data, restart) = self.db.stream_extract(
path, start, end, count = False,
markup = markup, binary = binary)
yield data
if restart is None:
return
start = restart
return content(start, end, count)
return content(start, end)
class Exiter(object):
"""App that exits the server, for testing"""
@@ -460,7 +466,8 @@ class Server(object):
stoppable = False, # whether /exit URL exists
embedded = True, # hide diagnostics and output, etc
fast_shutdown = False, # don't wait for clients to disconn.
force_traceback = False # include traceback in all errors
force_traceback = False, # include traceback in all errors
basepath = '', # base URL path for cherrypy.tree
):
# Save server version, just for verification during tests
self.version = nilmdb.__version__
@@ -520,7 +527,7 @@ class Server(object):
if stoppable:
root.exit = Exiter()
cherrypy.tree.apps = {}
cherrypy.tree.mount(root, "/", config = { "/" : app_config })
cherrypy.tree.mount(root, basepath, config = { "/" : app_config })
# Shutdowns normally wait for clients to disconnect. To speed
# up tests, set fast_shutdown = True
@@ -530,6 +537,9 @@ class Server(object):
else:
cherrypy.server.shutdown_timeout = 5
# Set up the WSGI application pointer for external programs
self.wsgi_application = cherrypy.tree
def json_error_page(self, status, message, traceback, version):
"""Return a custom error page in JSON so the client can parse it"""
errordata = { "status" : status,
@@ -596,3 +606,55 @@ class Server(object):
def stop(self):
cherrypy.engine.exit()
# Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
# instance since the database can only be opened once. For this to
# work, the web server must use only a single process and single
# Python interpreter. Multiple threads are OK.
_wsgi_server = None
def wsgi_application(dbpath, basepath): # pragma: no cover
"""Return a WSGI application object with a database at the
specified path.
'dbpath' is a filesystem location, e.g. /home/nilm/db
'basepath' is the URL path of the application base, which
is the same as the first argument to Apache's WSGIScriptAlias
directive.
"""
def application(environ, start_response):
global _wsgi_server
if _wsgi_server is None:
# Try to start the server
try:
db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(dbpath)
_wsgi_server = nilmdb.server.Server(
db, embedded = True,
basepath = basepath.rstrip('/'))
except Exception:
# Build an error message on failure
import pprint
err = sprintf("Initializing database at path '%s' failed:\n\n",
dbpath)
err += traceback.format_exc()
try:
import pwd
import grp
err += sprintf("\nRunning as: uid=%d (%s), gid=%d (%s) "
"on host %s, pid %d\n",
os.getuid(), pwd.getpwuid(os.getuid())[0],
os.getgid(), grp.getgrgid(os.getgid())[0],
socket.gethostname(), os.getpid())
except ImportError:
pass
err += sprintf("\nEnvironment:\n%s\n", pprint.pformat(environ))
if _wsgi_server is None:
# Serve up the error with our own mini WSGI app.
headers = [ ('Content-type', 'text/plain'),
('Content-length', str(len(err))) ]
start_response("500 Internal Server Error", headers)
return [err]
# Call the normal application
return _wsgi_server.wsgi_application(environ, start_response)
return application

View File

@@ -1,7 +1,7 @@
"""NilmDB utilities"""
from __future__ import absolute_import
from nilmdb.utils.timer import Timer
from nilmdb.utils.iteratorizer import Iteratorizer
from nilmdb.utils.serializer import serializer_proxy
from nilmdb.utils.lrucache import lru_cache
from nilmdb.utils.diskusage import du, human_size
@@ -12,3 +12,4 @@ import nilmdb.utils.fallocate
import nilmdb.utils.time
import nilmdb.utils.iterator
import nilmdb.utils.interval
import nilmdb.utils.lock

View File

@@ -1,100 +0,0 @@
import Queue
import threading
import sys
import contextlib
# This file provides a context manager that converts a function
# that takes a callback into a generator that returns an iterable.
# This is done by running the function in a new thread.
# Based partially on http://stackoverflow.com/questions/9968592/
class IteratorizerThread(threading.Thread):
def __init__(self, queue, function, curl_hack):
"""
function: function to execute, which takes the
callback (provided by this class) as an argument
"""
threading.Thread.__init__(self)
self.name = "Iteratorizer-" + function.__name__ + "-" + self.name
self.function = function
self.queue = queue
self.die = False
self.curl_hack = curl_hack
def callback(self, data):
try:
if self.die:
raise Exception() # trigger termination
self.queue.put((1, data))
except:
if self.curl_hack:
# We can't raise exceptions, because the pycurl
# extension module will unconditionally print the
# exception itself, and not pass it up to the caller.
# Instead, just return a value that tells curl to
# abort. (-1 would be best, in case we were given 0
# bytes, but the extension doesn't support that).
self.queue.put((2, sys.exc_info()))
return 0
raise
def run(self):
try:
result = self.function(self.callback)
except:
self.queue.put((2, sys.exc_info()))
else:
self.queue.put((0, result))
@contextlib.contextmanager
def Iteratorizer(function, curl_hack = False):
"""
Context manager that takes a function expecting a callback,
and provides an iterable that yields the values passed to that
callback instead.
function: function to execute, which takes a callback
(provided by this context manager) as an argument
with iteratorizer(func) as it:
for i in it:
print 'callback was passed:', i
print 'function returned:', it.retval
"""
queue = Queue.Queue(maxsize = 1)
thread = IteratorizerThread(queue, function, curl_hack)
thread.daemon = True
thread.start()
class iteratorizer_gen(object):
def __init__(self, queue):
self.queue = queue
self.retval = None
def __iter__(self):
return self
def next(self):
(typ, data) = self.queue.get()
if typ == 0:
# function has returned
self.retval = data
raise StopIteration
elif typ == 1:
# data is available
return data
else:
# callback raised an exception
raise data[0], data[1], data[2]
try:
yield iteratorizer_gen(queue)
finally:
# Ask the thread to die, if it's still running.
thread.die = True
while thread.isAlive():
try:
queue.get(True, 0.01)
except: # pragma: no cover
pass

33
nilmdb/utils/lock.py Normal file
View File

@@ -0,0 +1,33 @@
# File locking
import warnings
try:
import fcntl
import errno
def exclusive_lock(f):
"""Acquire an exclusive lock. Returns True on successful
lock, or False on error."""
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
return False
else: # pragma: no cover
raise
return True
def exclusive_unlock(f):
"""Release an exclusive lock."""
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except ImportError: # pragma: no cover
def exclusive_lock(f):
"""Dummy lock function -- does not lock!"""
warnings.warn("Pretending to lock " + str(f))
return True
def exclusive_unlock(f):
"""Release an exclusive lock."""
return

View File

@@ -15,7 +15,7 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
def wrap_class_method(wrapper):
try:
orig = getattr(cls, wrapper.__name__).im_func
except:
except Exception:
orig = lambda x: None
setattr(cls, wrapper.__name__, decorator.decorator(wrapper, orig))

View File

@@ -65,6 +65,14 @@ def parse_time(toparse):
if toparse == "max":
return max_timestamp
# If it starts with @, treat it as a NILM timestamp
# (integer microseconds since epoch)
try:
if toparse[0] == '@':
return int(toparse[1:])
except (ValueError, KeyError, IndexError):
pass
# If string isn't "now" and doesn't contain at least 4 digits,
# consider it invalid. smartparse might otherwise accept
# empty strings and strings with just separators.
@@ -78,14 +86,6 @@ def parse_time(toparse):
except (ValueError, OverflowError):
pass
# If it starts with @, treat it as a NILM timestamp
# (integer microseconds since epoch)
try:
if toparse[0] == '@':
return int(toparse[1:])
except (ValueError, KeyError):
pass
# If it's parseable as a float, treat it as a Unix or NILM
# timestamp based on its range.
try:

View File

@@ -39,7 +39,7 @@ versioneer.parentdir_prefix = 'nilmdb-'
# Hack to workaround logging/multiprocessing issue:
# https://groups.google.com/d/msg/nose-users/fnJ-kAUbYHQ/_UsLN786ygcJ
try: import multiprocessing
except: pass
except Exception: pass
# Use Cython if it's new enough, otherwise use preexisting C files.
cython_modules = [ 'nilmdb.server.interval',
@@ -107,6 +107,7 @@ setup(name='nilmdb',
author_email = 'jim@jtan.com',
tests_require = [ 'nose',
'coverage',
'numpy',
],
setup_requires = [ 'distribute',
],

28
tests/data/extract-8 Normal file
View File

@@ -0,0 +1,28 @@
# interval-start 1332496919900000
1332496919900000 2.523050e+05 2.254020e+05 4.779410e+03 3.638030e+03 8.138070e+03 4.334460e+03 1.083780e+03 3.743730e+03
1332496919908333 2.551190e+05 2.237870e+05 5.965640e+03 2.076350e+03 9.468790e+03 3.693880e+03 1.247860e+03 3.393680e+03
1332496919916667 2.616370e+05 2.247980e+05 4.848970e+03 2.315620e+03 9.323300e+03 4.225460e+03 1.805780e+03 2.593050e+03
1332496919925000 2.606460e+05 2.251300e+05 3.061360e+03 3.951840e+03 7.662910e+03 5.341410e+03 1.986520e+03 2.276780e+03
1332496919933333 2.559710e+05 2.235030e+05 4.096030e+03 3.296970e+03 7.827080e+03 5.452120e+03 2.492520e+03 2.929450e+03
1332496919941667 2.579260e+05 2.217080e+05 5.472320e+03 1.555700e+03 8.495760e+03 4.491140e+03 2.379780e+03 3.741710e+03
1332496919950000 2.610180e+05 2.242350e+05 4.669770e+03 1.876190e+03 8.366680e+03 3.677510e+03 9.021690e+02 3.549040e+03
1332496919958333 2.569150e+05 2.274650e+05 2.785070e+03 3.751930e+03 7.440320e+03 3.964860e+03 -3.227860e+02 2.460890e+03
1332496919966667 2.509510e+05 2.262000e+05 3.772710e+03 3.131950e+03 8.159860e+03 4.539860e+03 7.375190e+02 2.126750e+03
1332496919975000 2.556710e+05 2.223720e+05 5.826200e+03 8.715560e+02 9.120240e+03 4.545110e+03 2.804310e+03 2.721000e+03
1332496919983333 2.649730e+05 2.214860e+05 5.839130e+03 4.659180e+02 8.628300e+03 3.934870e+03 2.972490e+03 3.773730e+03
1332496919991667 2.652170e+05 2.233920e+05 3.718770e+03 2.834970e+03 7.209900e+03 3.460260e+03 1.324930e+03 4.075960e+03
# interval-end 1332496919991668
# interval-start 1332496920000000
1332496920000000 2.564370e+05 2.244300e+05 4.011610e+03 3.475340e+03 7.495890e+03 3.388940e+03 2.613970e+02 3.731260e+03
1332496920008333 2.539630e+05 2.241670e+05 5.621070e+03 1.548010e+03 9.165170e+03 3.522930e+03 1.058930e+03 2.996960e+03
1332496920016667 2.585080e+05 2.249300e+05 6.011400e+03 8.188660e+02 9.039950e+03 4.482440e+03 2.490390e+03 2.679340e+03
1332496920025000 2.596270e+05 2.260220e+05 4.474500e+03 2.423020e+03 7.414190e+03 5.071970e+03 2.439380e+03 2.962960e+03
1332496920033333 2.551870e+05 2.246320e+05 4.738570e+03 3.398040e+03 7.395120e+03 4.726450e+03 1.839030e+03 3.393530e+03
1332496920041667 2.571020e+05 2.216230e+05 6.144130e+03 1.441090e+03 8.756480e+03 3.495320e+03 1.869940e+03 3.752530e+03
1332496920050000 2.636530e+05 2.217700e+05 6.221770e+03 7.389620e+02 9.547600e+03 2.666820e+03 1.462660e+03 3.332570e+03
1332496920058333 2.636130e+05 2.252560e+05 4.477120e+03 2.437450e+03 8.510210e+03 3.855630e+03 9.594420e+02 2.387180e+03
1332496920066667 2.553500e+05 2.262640e+05 4.283720e+03 3.923940e+03 7.912470e+03 5.466520e+03 1.284990e+03 2.093720e+03
1332496920075000 2.527270e+05 2.246090e+05 5.851930e+03 2.491980e+03 8.540630e+03 5.623050e+03 2.339780e+03 3.007140e+03
1332496920083333 2.584750e+05 2.235780e+05 5.924870e+03 1.394480e+03 8.779620e+03 4.544180e+03 2.132030e+03 3.849760e+03
1332496920091667 2.615630e+05 2.246090e+05 4.336140e+03 2.455750e+03 8.055380e+03 3.469110e+03 6.278730e+02 3.664200e+03
# interval-end 1332496920100000

View File

@@ -24,7 +24,7 @@ class JimOrderPlugin(nose.plugins.Plugin):
name, workingDir=loader.workingDir)
try:
order = os.path.join(addr.filename, "test.order")
except:
except Exception:
order = None
if order and os.path.exists(order):
files = []

View File

@@ -4,7 +4,6 @@ test_lrucache.py
test_mustclose.py
test_serializer.py
test_iteratorizer.py
test_timestamper.py
test_rbtree.py
@@ -13,6 +12,7 @@ test_interval.py
test_bulkdata.py
test_nilmdb.py
test_client.py
test_numpyclient.py
test_cmdline.py
test_*.py

View File

@@ -30,6 +30,11 @@ class TestBulkData(object):
else:
data = BulkData(db, file_size = size, files_per_dir = files)
# Try opening it again (should result in locking error)
with assert_raises(IOError) as e:
data2 = BulkData(db)
in_("already locked by another process", str(e.exception))
# create empty
with assert_raises(ValueError):
data.create("/foo", "uint16_8")

View File

@@ -23,6 +23,7 @@ import warnings
import resource
import time
import re
import struct
from testutil.helpers import *
@@ -293,6 +294,23 @@ class TestClient(object):
# Test count
eq_(client.stream_count("/newton/prep"), 14400)
# Test binary output
with assert_raises(ClientError) as e:
list(client.stream_extract("/newton/prep",
markup = True, binary = True))
with assert_raises(ClientError) as e:
list(client.stream_extract("/newton/prep",
count = True, binary = True))
data = "".join(client.stream_extract("/newton/prep", binary = True))
# Quick check using struct
unpacker = struct.Struct("<qffffffff")
out = []
for i in range(14400):
out.append(unpacker.unpack_from(data, i * unpacker.size))
eq_(out[0], (1332511200000000, 266568.0, 224029.0, 5161.39990234375,
2525.169921875, 8350.83984375, 3724.699951171875,
1355.3399658203125, 2039.0))
client.close()
def test_client_06_generators(self):
@@ -311,11 +329,11 @@ class TestClient(object):
# Trigger a curl error in generator
with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl/").next()
client.http.get_gen("http://nosuchurl.example.com./").next()
# Trigger a curl error in generator
with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl/").next()
client.http.get_gen("http://nosuchurl.example.com./").next()
# Check 404 for missing streams
for function in [ client.stream_intervals, client.stream_extract ]:
@@ -460,6 +478,7 @@ class TestClient(object):
ctx.update_start(109)
ctx.insert("110 1\n")
ctx.insert("111 1\n")
ctx.send()
ctx.insert("112 1\n")
ctx.insert("113 1\n")
ctx.insert("114 1\n")
@@ -619,7 +638,7 @@ class TestClient(object):
poolmanager = c.http._last_response.connection.poolmanager
pool = poolmanager.pools[('http','localhost',32180)]
return (pool.num_connections, pool.num_requests)
except:
except Exception:
raise SkipTest("can't get connection info")
# First request makes a connection

View File

@@ -369,6 +369,8 @@ class TestCmdline(object):
self.contain("No stream at path")
self.fail("metadata /newton/nosuchstream --set foo=bar")
self.contain("No stream at path")
self.fail("metadata /newton/nosuchstream --delete")
self.contain("No stream at path")
self.ok("metadata /newton/prep")
self.match("description=The Data\nv_scale=1.234\n")
@@ -394,6 +396,19 @@ class TestCmdline(object):
self.fail("metadata /newton/nosuchpath")
self.contain("No stream at path /newton/nosuchpath")
self.ok("metadata /newton/prep --delete")
self.ok("metadata /newton/prep --get")
self.match("")
self.ok("metadata /newton/prep --set "
"'description=The Data' "
"v_scale=1.234")
self.ok("metadata /newton/prep --delete v_scale")
self.ok("metadata /newton/prep --get")
self.match("description=The Data\n")
self.ok("metadata /newton/prep --set description=")
self.ok("metadata /newton/prep --get")
self.match("")
def test_06_insert(self):
self.ok("insert --help")
@@ -596,6 +611,8 @@ class TestCmdline(object):
test(6, "10:00:30", "10:00:31", extra="-b")
test(7, "10:00:30", "10:00:30.999", extra="-a -T")
test(7, "10:00:30", "10:00:30.999", extra="-a --timestamp-raw")
test(8, "10:01:59.9", "10:02:00.1", extra="--markup")
test(8, "10:01:59.9", "10:02:00.1", extra="-m")
# all data put in by tests
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
@@ -603,6 +620,11 @@ class TestCmdline(object):
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("43200\n")
# markup for 3 intervals, plus extra markup lines whenever we had
# a "restart" from the nilmdb.stream_extract function
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
lines_(self.captured, 43210)
def test_09_truncated(self):
# Test truncated responses by overriding the nilmdb max_results
server_stop()
@@ -1031,10 +1053,12 @@ class TestCmdline(object):
self.contain("old and new paths are the same")
check_path("newton", "prep")
self.fail("rename /newton/prep /newton")
self.contain("subdirs of this path already exist")
self.contain("path must contain at least one folder")
self.fail("rename /newton/prep /newton/prep/")
self.contain("invalid path")
self.ok("rename /newton/prep /newton/foo")
self.ok("rename /newton/prep /newton/foo/1")
check_path("newton", "foo", "1")
self.ok("rename /newton/foo/1 /newton/foo")
check_path("newton", "foo")
self.ok("rename /newton/foo /totally/different/thing")
check_path("totally", "different", "thing")

View File

@@ -385,7 +385,6 @@ class TestIntervalSpeed:
def test_interval_speed(self):
import yappi
import time
import testutil.aplotter as aplotter
import random
import math
@@ -406,6 +405,5 @@ class TestIntervalSpeed:
speed/j,
speed / (j*math.log(j))) # should be constant
speeds[j] = speed
aplotter.plot(speeds.keys(), speeds.values(), plot_slope=True)
yappi.stop()
yappi.print_stats(sort_type=yappi.SORTTYPE_TTOT, limit=10)

View File

@@ -1,61 +0,0 @@
import nilmdb
from nilmdb.utils.printf import *
import nose
from nose.tools import *
from nose.tools import assert_raises
import threading
import time
from testutil.helpers import *
def func_with_callback(a, b, callback):
callback(a)
callback(b)
callback(a+b)
return "return value"
class TestIteratorizer(object):
def test(self):
# First try it with a normal callback
self.result = ""
def cb(x):
self.result += str(x)
func_with_callback(1, 2, cb)
eq_(self.result, "123")
# Now make it an iterator
result = ""
f = lambda x: func_with_callback(1, 2, x)
with nilmdb.utils.Iteratorizer(f) as it:
for i in it:
result += str(i)
eq_(result, "123")
eq_(it.retval, "return value")
# Make sure things work when an exception occurs
result = ""
with nilmdb.utils.Iteratorizer(
lambda x: func_with_callback(1, "a", x)) as it:
with assert_raises(TypeError) as e:
for i in it:
result += str(i)
eq_(result, "1a")
# Now try to trigger the case where we stop iterating
# mid-generator, and expect the iteratorizer to clean up after
# itself. This doesn't have a particular result in the test,
# but gains coverage.
def foo():
with nilmdb.utils.Iteratorizer(f) as it:
it.next()
foo()
eq_(it.retval, None)
# Do the same thing when the curl hack is applied
def foo():
with nilmdb.utils.Iteratorizer(f, curl_hack = True) as it:
it.next()
foo()
eq_(it.retval, None)

View File

@@ -28,9 +28,6 @@ class Test00Nilmdb(object): # named 00 so it runs first
def test_NilmDB(self):
recursive_unlink(testdb)
with assert_raises(IOError):
nilmdb.server.NilmDB("/nonexistant-db/foo")
db = nilmdb.server.NilmDB(testdb)
db.close()
db = nilmdb.server.NilmDB(testdb)
@@ -93,13 +90,16 @@ class Test00Nilmdb(object): # named 00 so it runs first
eq_(db.stream_get_metadata("/newton/prep"), meta1)
eq_(db.stream_get_metadata("/newton/raw"), meta1)
# fill in some test coverage for start >= end
# fill in some misc. test coverage
with assert_raises(nilmdb.server.NilmDBError):
db.stream_remove("/newton/prep", 0, 0)
with assert_raises(nilmdb.server.NilmDBError):
db.stream_remove("/newton/prep", 1, 0)
db.stream_remove("/newton/prep", 0, 1)
with assert_raises(nilmdb.server.NilmDBError):
db.stream_extract("/newton/prep", count = True, binary = True)
db.close()
class TestBlockingServer(object):

108
tests/test_numpyclient.py Normal file
View File

@@ -0,0 +1,108 @@
# -*- coding: utf-8 -*-
import nilmdb.server
import nilmdb.client
import nilmdb.client.numpyclient
from nilmdb.utils.printf import *
from nilmdb.utils import timestamper
from nilmdb.client import ClientError, ServerError
from nilmdb.utils import datetime_tz
from nose.plugins.skip import SkipTest
from nose.tools import *
from nose.tools import assert_raises
import itertools
import distutils.version
from testutil.helpers import *
import numpy as np
testdb = "tests/numpyclient-testdb"
testurl = "http://localhost:32180/"
def setup_module():
global test_server, test_db
# Clear out DB
recursive_unlink(testdb)
# Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
port = 32180, stoppable = False,
fast_shutdown = True,
force_traceback = True)
test_server.start(blocking = False)
def teardown_module():
global test_server, test_db
# Close web app
test_server.stop()
test_db.close()
class TestNumpyClient(object):
def test_numpyclient_01_basic(self):
# Test basic connection
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
version = client.version()
eq_(distutils.version.LooseVersion(version),
distutils.version.LooseVersion(test_server.version))
# Verify subclassing
assert(isinstance(client, nilmdb.client.Client))
# Layouts
for layout in "int8_t", "something_8", "integer_1":
with assert_raises(ValueError):
for x in client.stream_extract_numpy("/foo", layout=layout):
pass
for layout in "int8_1", "uint8_30", "int16_20", "float64_100":
with assert_raises(ClientError) as e:
for x in client.stream_extract_numpy("/foo", layout=layout):
pass
in_("No such stream", str(e.exception))
with assert_raises(ClientError) as e:
for x in client.stream_extract_numpy("/foo"):
pass
in_("can't get layout for path", str(e.exception))
client.close()
def test_numpyclient_02_extract(self):
client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
# Insert some data as text
client.stream_create("/newton/prep", "float32_8")
testfile = "tests/data/prep-20120323T1000"
start = nilmdb.utils.time.parse_time("20120323T1000")
rate = 120
data = timestamper.TimestamperRate(testfile, start, rate)
result = client.stream_insert("/newton/prep", data,
start, start + 119999777)
# Extract Numpy arrays
array = None
pieces = 0
for chunk in client.stream_extract_numpy("/newton/prep", maxrows=1000):
pieces += 1
if array is not None:
array = np.vstack((array, chunk))
else:
array = chunk
eq_(array.shape, (14400, 9))
eq_(pieces, 15)
# Try structured
s = list(client.stream_extract_numpy("/newton/prep", structured = True))
assert(np.array_equal(np.c_[s[0]['timestamp'], s[0]['data']], array))
# Compare. Will be close but not exact because the conversion
# to and from ASCII was lossy.
data = timestamper.TimestamperRate(testfile, start, rate)
actual = np.fromstring(" ".join(data), sep=' ').reshape(14400, 9)
assert(np.allclose(array, actual))
client.close()

View File

@@ -18,7 +18,7 @@ class TestPrintf(object):
printf("hello, world: %d", 123)
fprintf(test2, "hello too: %d", 123)
test3 = sprintf("hello three: %d", 123)
except:
except Exception:
sys.stdout = old_stdout
raise
sys.stdout = old_stdout

View File

@@ -1,419 +0,0 @@
#-----------------------------------------------
#aplotter.py - ascii art function plotter
#Copyright (c) 2006, Imri Goldberg
#All rights reserved.
#
#Redistribution and use in source and binary forms,
#with or without modification, are permitted provided
#that the following conditions are met:
#
# * Redistributions of source code must retain the
# above copyright notice, this list of conditions
# and the following disclaimer.
# * Redistributions in binary form must reproduce the
# above copyright notice, this list of conditions
# and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of the <ORGANIZATION> nor the names of
# its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
#THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
#AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
#ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
#LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
#DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
#SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
#CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
#OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
#OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#-----------------------------------------------
import math
EPSILON = 0.000001
def transposed(mat):
result = []
for i in xrange(len(mat[0])):
result.append([x[i] for x in mat])
return result
def y_reversed(mat):
result = []
for i in range(len(mat)):
result.append(list(reversed(mat[i])))
return result
def sign(x):
if 0<x:
return 1
if 0 == x:
return 0
return -1
class Plotter(object):
class PlotData(object):
def __init__(self, x_size, y_size, min_x, max_x, min_y, max_y, x_mod, y_mod):
self.x_size = x_size
self.y_size = y_size
self.min_x = min_x
self.max_x = max_x
self.min_y = min_y
self.max_y = max_y
self.x_mod = x_mod
self.y_mod = y_mod
self.x_step = float(max_x - min_x)/float(self.x_size)
self.y_step = float(max_y - min_y)/float(self.y_size)
self.inv_x_step = 1/self.x_step
self.inv_y_step = 1/self.y_step
self.ratio = self.y_step / self.x_step
def __repr__(self):
s = "size: %s, bl: %s, tr: %s, step: %s" % ((self.x_size, self.y_size), (self.min_x, self.min_y), (self.max_x, self.max_y),
(self.x_step, self.y_step))
return s
def __init__(self, **kwargs):
self.x_size = kwargs.get("x_size", 80)
self.y_size = kwargs.get("y_size", 20)
self.will_draw_axes = kwargs.get("draw_axes", True)
self.new_line = kwargs.get("newline", "\n")
self.dot = kwargs.get("dot", "*")
self.plot_slope = kwargs.get("plot_slope", True)
self.x_margin = kwargs.get("x_margin", 0.05)
self.y_margin = kwargs.get("y_margin", 0.1)
self.will_plot_labels = kwargs.get("plot_labels", True)
@staticmethod
def get_symbol_by_slope(slope, default_symbol):
draw_symbol = default_symbol
if slope > math.tan(3*math.pi/8):
draw_symbol = "|"
elif slope > math.tan(math.pi/8) and slope < math.tan(3*math.pi/8):
draw_symbol = "/"
elif abs(slope) < math.tan(math.pi/8):
draw_symbol = "-"
elif slope < math.tan(-math.pi/8) and slope > math.tan(-3*math.pi/8):
draw_symbol = "\\"
elif slope < math.tan(-3*math.pi/8):
draw_symbol = "|"
return draw_symbol
def plot_labels(self, output_buffer, plot_data):
if plot_data.y_size < 2:
return
margin_factor = 1
do_plot_x_label = True
do_plot_y_label = True
x_str = "%+g"
if plot_data.x_size < 16:
do_plot_x_label = False
elif plot_data.x_size < 23:
x_str = "%+.2g"
y_str = "%+g"
if plot_data.x_size < 8:
do_plot_y_label = False
elif plot_data.x_size < 11:
y_str = "%+.2g"
act_min_x = (plot_data.min_x + plot_data.x_mod*margin_factor)
act_max_x = (plot_data.max_x - plot_data.x_mod*margin_factor)
act_min_y = (plot_data.min_y + plot_data.y_mod*margin_factor)
act_max_y = (plot_data.max_y - plot_data.y_mod*margin_factor)
if abs(act_min_x) < 1:
min_x_str = "%+.2g" % act_min_x
else:
min_x_str = x_str % act_min_x
if abs(act_max_x) < 1:
max_x_str = "%+.2g" % act_max_x
else:
max_x_str = x_str % act_max_x
if abs(act_min_y) < 1:
min_y_str = "%+.2g" % act_min_y
else:
min_y_str = y_str % act_min_y
if abs(act_max_y) < 1:
max_y_str = "%+.2g" % act_max_y
else:
max_y_str = y_str % act_max_y
min_x_coord = self.get_coord(act_min_x,plot_data.min_x,plot_data.x_step)
max_x_coord = self.get_coord(act_max_x,plot_data.min_x,plot_data.x_step)
min_y_coord = self.get_coord(act_min_y,plot_data.min_y,plot_data.y_step)
max_y_coord = self.get_coord(act_max_y,plot_data.min_y,plot_data.y_step)
#print plot_data
y_zero_coord = self.get_coord(0, plot_data.min_y, plot_data.y_step)
#if plot_data.min_x < 0 and plot_data.max_x > 0:
x_zero_coord = self.get_coord(0, plot_data.min_x, plot_data.x_step)
#else:
#pass
output_buffer[x_zero_coord][min_y_coord] = "+"
output_buffer[x_zero_coord][max_y_coord] = "+"
output_buffer[min_x_coord][y_zero_coord] = "+"
output_buffer[max_x_coord][y_zero_coord] = "+"
if do_plot_x_label:
for i,c in enumerate(min_x_str):
output_buffer[min_x_coord+i][y_zero_coord-1] = c
for i,c in enumerate(max_x_str):
output_buffer[max_x_coord+i-len(max_x_str)][y_zero_coord-1] = c
if do_plot_y_label:
for i,c in enumerate(max_y_str):
output_buffer[x_zero_coord+i][max_y_coord] = c
for i,c in enumerate(min_y_str):
output_buffer[x_zero_coord+i][min_y_coord] = c
def plot_data(self, xy_seq, output_buffer, plot_data):
if self.plot_slope:
xy_seq = list(xy_seq)
#sort according to the x coord
xy_seq.sort(key = lambda c: c[0])
prev_p = xy_seq[0]
e_xy_seq = enumerate(xy_seq)
e_xy_seq.next()
for i,(x,y) in e_xy_seq:
draw_symbol = self.dot
line_drawn = self.plot_line(prev_p, (x,y), output_buffer, plot_data)
prev_p = (x,y)
if not line_drawn:
if i > 0 and i < len(xy_seq)-1:
px,py = xy_seq[i-1]
nx,ny = xy_seq[i+1]
if abs(nx-px) > EPSILON:
slope = (1.0/plot_data.ratio)*(ny-py)/(nx-px)
draw_symbol = self.get_symbol_by_slope(slope, draw_symbol)
if x < plot_data.min_x or x >= plot_data.max_x or y < plot_data.min_y or y >= plot_data.max_y:
continue
x_coord = self.get_coord(x, plot_data.min_x, plot_data.x_step)
y_coord = self.get_coord(y, plot_data.min_y, plot_data.y_step)
if x_coord >= 0 and x_coord < len(output_buffer) and y_coord >= 0 and y_coord < len(output_buffer[0]):
if self.draw_axes:
if y_coord == self.get_coord(0, plot_data.min_y, plot_data.y_step) and draw_symbol == "-":
draw_symbol = "="
output_buffer[x_coord][y_coord] = draw_symbol
else:
for x,y in xy_seq:
if x < plot_data.min_x or x >= plot_data.max_x or y < plot_data.min_y or y >= plot_data.max_y:
continue
x_coord = self.get_coord(x, plot_data.min_x, plot_data.x_step)
y_coord = self.get_coord(y, plot_data.min_y, plot_data.y_step)
if x_coord >= 0 and x_coord < len(output_buffer) and y_coord > 0 and y_coord < len(output_buffer[0]):
output_buffer[x_coord][y_coord] = self.dot
def plot_line(self, start, end, output_buffer, plot_data):
start_coord = self.get_coord(start[0], plot_data.min_x, plot_data.x_step), self.get_coord(start[1], plot_data.min_y, plot_data.y_step)
end_coord = self.get_coord(end[0], plot_data.min_x, plot_data.x_step), self.get_coord(end[1], plot_data.min_y, plot_data.y_step)
x0,y0 = start_coord
x1,y1 = end_coord
if (x0,y0) == (x1,y1):
return True
clipped_line = clip_line(start, end, (plot_data.min_x, plot_data.min_y), (plot_data.max_x, plot_data.max_y))
if clipped_line != None:
start,end = clipped_line
else:
return False
start_coord = self.get_coord(start[0], plot_data.min_x, plot_data.x_step), self.get_coord(start[1], plot_data.min_y, plot_data.y_step)
end_coord = self.get_coord(end[0], plot_data.min_x, plot_data.x_step), self.get_coord(end[1], plot_data.min_y, plot_data.y_step)
x0,y0 = start_coord
x1,y1 = end_coord
if (x0,y0) == (x1,y1):
return True
x_zero_coord = self.get_coord(0, plot_data.min_x, plot_data.x_step)
y_zero_coord = self.get_coord(0, plot_data.min_y, plot_data.y_step)
if start[0]-end[0] == 0:
draw_symbol = "|"
else:
slope = (1.0/plot_data.ratio)*(end[1]-start[1])/(end[0]-start[0])
draw_symbol = self.get_symbol_by_slope(slope, self.dot)
try:
delta = x1-x0, y1-y0
if abs(delta[0])>abs(delta[1]):
s = sign(delta[0])
slope = float(delta[1])/delta[0]
for i in range(0,abs(int(delta[0]))):
cur_draw_symbol = draw_symbol
x = i*s
cur_y = int(y0+slope*x)
if self.draw_axes and cur_y == y_zero_coord and draw_symbol == "-":
cur_draw_symbol = "="
output_buffer[x0+x][cur_y] = cur_draw_symbol
else:
s = sign(delta[1])
slope = float(delta[0])/delta[1]
for i in range(0,abs(int(delta[1]))):
y = i*s
cur_draw_symbol = draw_symbol
cur_y = y0+y
if self.draw_axes and cur_y == y_zero_coord and draw_symbol == "-":
cur_draw_symbol = "="
output_buffer[int(x0+slope*y)][cur_y] = cur_draw_symbol
except:
print start, end
print start_coord, end_coord
print plot_data
raise
return False
def plot_single(self, seq, min_x = None, max_x = None, min_y = None, max_y = None):
return self.plot_double(range(len(seq)),seq, min_x, max_x, min_y, max_y)
def plot_double(self, x_seq, y_seq, min_x = None, max_x = None, min_y = None, max_y = None):
if min_x == None:
min_x = min(x_seq)
if max_x == None:
max_x = max(x_seq)
if min_y == None:
min_y = min(y_seq)
if max_y == None:
max_y = max(y_seq)
if max_y == min_y:
max_y += 1
x_mod = (max_x-min_x)*self.x_margin
y_mod = (max_y-min_y)*self.y_margin
min_x-=x_mod
max_x+=x_mod
min_y-=y_mod
max_y+=y_mod
plot_data = self.PlotData(self.x_size, self.y_size, min_x, max_x, min_y, max_y, x_mod, y_mod)
output_buffer = [[" "]*self.y_size for i in range(self.x_size)]
if self.will_draw_axes:
self.draw_axes(output_buffer, plot_data)
self.plot_data(zip(x_seq, y_seq), output_buffer, plot_data)
if self.will_plot_labels:
self.plot_labels(output_buffer, plot_data)
trans_result = transposed(y_reversed(output_buffer))
result = self.new_line.join(["".join(row) for row in trans_result])
return result
def draw_axes(self, output_buffer, plot_data):
draw_x = False
draw_y = False
if plot_data.min_x <= 0 and plot_data.max_x > 0:
draw_y = True
zero_x = self.get_coord(0, plot_data.min_x, plot_data.x_step)
for y in xrange(plot_data.y_size):
output_buffer[zero_x][y] = "|"
if plot_data.min_y <= 0 and plot_data.max_y > 0:
draw_x = True
zero_y = self.get_coord(0, plot_data.min_y, plot_data.y_step)
for x in xrange(plot_data.x_size):
output_buffer[x][zero_y] = "-"
if draw_x and draw_y:
output_buffer[zero_x][zero_y] = "+"
@staticmethod
def get_coord(val, min, step):
result = int((val - min)/step)
return result
def clip_line(line_pt_1, line_pt_2, rect_bottom_left, rect_top_right):
ts = [0.0,1.0]
if line_pt_1[0] == line_pt_2[0]:
return ((line_pt_1[0], max(min(line_pt_1[1], line_pt_2[1]), rect_bottom_left[1])),
(line_pt_1[0], min(max(line_pt_1[1], line_pt_2[1]), rect_top_right[1])))
if line_pt_1[1] == line_pt_2[1]:
return ((max(min(line_pt_1[0], line_pt_2[0]), rect_bottom_left[0]), line_pt_1[1]),
(min(max(line_pt_1[0], line_pt_2[0]), rect_top_right[0]), line_pt_1[1]))
if ((rect_bottom_left[0] <= line_pt_1[0] and line_pt_1[0] < rect_top_right[0]) and
(rect_bottom_left[1] <= line_pt_1[1] and line_pt_1[1] < rect_top_right[1]) and
(rect_bottom_left[0] <= line_pt_2[0] and line_pt_2[0] < rect_top_right[0]) and
(rect_bottom_left[1] <= line_pt_2[1] and line_pt_2[1] < rect_top_right[1])):
return line_pt_1, line_pt_2
ts.append( float(rect_bottom_left[0]-line_pt_1[0])/(line_pt_2[0]-line_pt_1[0]) )
ts.append( float(rect_top_right[0]-line_pt_1[0])/(line_pt_2[0]-line_pt_1[0]) )
ts.append( float(rect_bottom_left[1]-line_pt_1[1])/(line_pt_2[1]-line_pt_1[1]) )
ts.append( float(rect_top_right[1]-line_pt_1[1])/(line_pt_2[1]-line_pt_1[1]) )
ts.sort()
if ts[2] < 0 or ts[2] >= 1 or ts[3] < 0 or ts[2]>= 1:
return None
result = [(pt_1 + t*(pt_2-pt_1)) for t in (ts[2],ts[3]) for (pt_1, pt_2) in zip(line_pt_1, line_pt_2)]
return (result[0],result[1]), (result[2], result[3])
def plot(*args,**flags):
limit_flags_names = set(["min_x","min_y","max_x","max_y"])
limit_flags = dict([(n,flags[n]) for n in limit_flags_names & set(flags)])
settting_flags = dict([(n,flags[n]) for n in set(flags) - limit_flags_names])
if len(args) == 1:
p = Plotter(**settting_flags)
print p.plot_single(args[0],**limit_flags)
elif len(args) == 2:
p = Plotter(**settting_flags)
print p.plot_double(args[0],args[1],**limit_flags)
else:
raise NotImplementedError("can't draw multiple graphs yet")
__all__ = ["Plotter","plot"]