Compare commits

..

18 Commits

Author SHA1 Message Date
6cd28b67b1 Support iterator protocol in Serializer 2013-07-24 14:52:26 -04:00
d6d215d53d Improve boolean HTTP parameter handling 2013-07-15 14:38:28 -04:00
e02143ddb2 Remove duplicated test 2013-07-14 15:30:53 -04:00
e275384d03 Fix WSGI docs again 2013-07-11 16:36:32 -04:00
a6a67ec15c Update WSGI docs 2013-07-10 14:16:25 -04:00
fc43107307 Fill out test coverage 2013-07-09 19:06:26 -04:00
90633413bb Add nilmdb.utils.interval.human_string function 2013-07-09 19:01:53 -04:00
c7c3aff0fb Add nilmdb.utils.interval.optimize function 2013-07-09 17:50:21 -04:00
e2347c954e Split more CherrpyPy stuff into serverutil 2013-07-02 11:44:08 -04:00
222a5c6c53 Move server decorators and other utilities to a separate file
This will help with implementing nilmrun.
2013-07-02 11:32:19 -04:00
1ca2c143e5 Fix typo 2013-06-29 12:39:00 -04:00
b5df575c79 Fix tests 2013-05-09 22:27:10 -04:00
2768a5ad15 Show FQDN rather than hostname. 2013-05-09 13:33:05 -04:00
a105543c38 Show a more helpful message at the root nilmdb path 2013-05-09 13:30:10 -04:00
309f38d0ed Merge branch '32bit' 2013-05-08 17:20:31 -04:00
9a27b6ef6a Make rocket code suitable for 32-bit architectures 2013-05-08 16:35:32 -04:00
99532cf9e0 Fix coverage 2013-05-07 23:00:44 -04:00
dfdd0e5c74 Fix line parsing in http client 2013-05-07 22:56:00 -04:00
14 changed files with 439 additions and 240 deletions

View File

@@ -19,12 +19,12 @@ Then, set up Apache with a configuration like:
<VirtualHost>
WSGIScriptAlias /nilmdb /home/nilm/nilmdb.wsgi
WSGIApplicationGroup nilmdb-appgroup
WSGIProcessGroup nilmdb-procgroup
WSGIDaemonProcess nilmdb-procgroup threads=32 user=nilm group=nilm
# Access control example:
<Location /nilmdb>
WSGIProcessGroup nilmdb-procgroup
WSGIApplicationGroup nilmdb-appgroup
# Access control example:
Order deny,allow
Deny from all
Allow from 1.2.3.4

View File

@@ -123,14 +123,36 @@ class HTTPClient(object):
"""
(response, isjson) = self._do_req(method, url, query, body,
stream = True, headers = headers)
# Like the iter_lines function in Requests, but only splits on
# the specified line ending.
def lines(source, ending):
pending = None
for chunk in source:
if pending is not None:
chunk = pending + chunk
tmp = chunk.split(ending)
lines = tmp[:-1]
if chunk.endswith(ending):
pending = None
else:
pending = tmp[-1]
for line in lines:
yield line
if pending is not None: # pragma: no cover (missing newline)
yield pending
# Yield the chunks or lines as requested
if binary:
for chunk in response.iter_content(chunk_size = 65536):
yield chunk
elif isjson:
for line in response.iter_lines():
for line in lines(response.iter_content(chunk_size = 1),
ending = '\r\n'):
yield json.loads(line)
else:
for line in response.iter_lines():
for line in lines(response.iter_content(chunk_size = 65536),
ending = '\n'):
yield line
def get_gen(self, url, params = None, binary = False):

View File

@@ -5,6 +5,9 @@
#include <ctype.h>
#include <stdint.h>
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
/* Values missing from stdint.h */
#define UINT8_MIN 0
#define UINT16_MIN 0
@@ -19,13 +22,6 @@
typedef int64_t timestamp_t;
/* This code probably needs to be double-checked for the case where
sizeof(long) != 8, so enforce that here with something that will
fail at build time. We assume that the python integer type can
hold an int64_t. */
const static char __long_ok[1 - 2*!(sizeof(int64_t) ==
sizeof(long int))] = { 0 };
/* Somewhat arbitrary, just so we can use fixed sizes for strings
etc. */
static const int MAX_LAYOUT_COUNT = 1024;
@@ -58,7 +54,7 @@ static PyObject *raise_str(int line, int col, int code, const char *string)
static PyObject *raise_int(int line, int col, int code, int64_t num)
{
PyObject *o;
o = Py_BuildValue("(iiil)", line, col, code, num);
o = Py_BuildValue("(iiiL)", line, col, code, (long long)num);
if (o != NULL) {
PyErr_SetObject(ParseError, o);
Py_DECREF(o);
@@ -249,11 +245,11 @@ static PyObject *Rocket_get_file_size(Rocket *self)
/****
* Append from string
*/
static inline long int strtol10(const char *nptr, char **endptr) {
return strtol(nptr, endptr, 10);
static inline long int strtoll10(const char *nptr, char **endptr) {
return strtoll(nptr, endptr, 10);
}
static inline long int strtoul10(const char *nptr, char **endptr) {
return strtoul(nptr, endptr, 10);
static inline long int strtoull10(const char *nptr, char **endptr) {
return strtoull(nptr, endptr, 10);
}
/* .append_string(count, data, offset, linenum, start, end, last_timestamp) */
@@ -264,6 +260,7 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
int offset;
const char *linestart;
int linenum;
long long ll1, ll2, ll3;
timestamp_t start;
timestamp_t end;
timestamp_t last_timestamp;
@@ -280,10 +277,13 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
but we need the null termination for strto*. If we had
strnto* that took a length, we could use t# and not require
a copy. */
if (!PyArg_ParseTuple(args, "isiilll:append_string", &count,
if (!PyArg_ParseTuple(args, "isiiLLL:append_string", &count,
&data, &offset, &linenum,
&start, &end, &last_timestamp))
&ll1, &ll2, &ll3))
return NULL;
start = ll1;
end = ll2;
last_timestamp = ll3;
/* Skip spaces, but don't skip over a newline. */
#define SKIP_BLANK(buf) do { \
@@ -372,14 +372,14 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
goto extra_data_on_line; \
break
CS(INT8, strtol10, t64.i, t8.i, t8.u, , 1);
CS(UINT8, strtoul10, t64.u, t8.u, t8.u, , 1);
CS(INT16, strtol10, t64.i, t16.i, t16.u, le16toh, 2);
CS(UINT16, strtoul10, t64.u, t16.u, t16.u, le16toh, 2);
CS(INT32, strtol10, t64.i, t32.i, t32.u, le32toh, 4);
CS(UINT32, strtoul10, t64.u, t32.u, t32.u, le32toh, 4);
CS(INT64, strtol10, t64.i, t64.i, t64.u, le64toh, 8);
CS(UINT64, strtoul10, t64.u, t64.u, t64.u, le64toh, 8);
CS(INT8, strtoll10, t64.i, t8.i, t8.u, , 1);
CS(UINT8, strtoull10, t64.u, t8.u, t8.u, , 1);
CS(INT16, strtoll10, t64.i, t16.i, t16.u, le16toh, 2);
CS(UINT16, strtoull10, t64.u, t16.u, t16.u, le16toh, 2);
CS(INT32, strtoll10, t64.i, t32.i, t32.u, le32toh, 4);
CS(UINT32, strtoull10, t64.u, t32.u, t32.u, le32toh, 4);
CS(INT64, strtoll10, t64.i, t64.i, t64.u, le64toh, 8);
CS(UINT64, strtoull10, t64.u, t64.u, t64.u, le64toh, 8);
CS(FLOAT32, strtod, t64.d, t32.f, t32.u, le32toh, 4);
CS(FLOAT64, strtod, t64.d, t64.d, t64.u, le64toh, 8);
#undef CS
@@ -397,7 +397,8 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
/* Build return value and return */
offset = buf - data;
PyObject *o;
o = Py_BuildValue("(iili)", written, offset, last_timestamp, linenum);
o = Py_BuildValue("(iiLi)", written, offset,
(long long)last_timestamp, linenum);
return o;
err:
PyErr_SetFromErrno(PyExc_OSError);
@@ -431,14 +432,18 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
int data_len;
int linenum;
int offset;
long long ll1, ll2, ll3;
timestamp_t start;
timestamp_t end;
timestamp_t last_timestamp;
if (!PyArg_ParseTuple(args, "it#iilll:append_binary",
if (!PyArg_ParseTuple(args, "it#iiLLL:append_binary",
&count, &data, &data_len, &offset,
&linenum, &start, &end, &last_timestamp))
&linenum, &ll1, &ll2, &ll3))
return NULL;
start = ll1;
end = ll2;
last_timestamp = ll3;
/* Advance to offset */
if (offset > data_len)
@@ -476,8 +481,8 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
/* Build return value and return */
PyObject *o;
o = Py_BuildValue("(iili)", rows, offset + rows * self->binary_size,
last_timestamp, linenum);
o = Py_BuildValue("(iiLi)", rows, offset + rows * self->binary_size,
(long long)last_timestamp, linenum);
return o;
}
@@ -534,7 +539,7 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
if (fread(&t64.u, 8, 1, self->file) != 1)
goto err;
t64.u = le64toh(t64.u);
ret = sprintf(&str[len], "%ld", t64.i);
ret = sprintf(&str[len], "%" PRId64, t64.i);
if (ret <= 0)
goto err;
len += ret;
@@ -556,14 +561,14 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
len += ret; \
} \
break
CASE(INT8, "%hhd", t8.i, t8.u, , 1);
CASE(UINT8, "%hhu", t8.u, t8.u, , 1);
CASE(INT16, "%hd", t16.i, t16.u, le16toh, 2);
CASE(UINT16, "%hu", t16.u, t16.u, le16toh, 2);
CASE(INT32, "%d", t32.i, t32.u, le32toh, 4);
CASE(UINT32, "%u", t32.u, t32.u, le32toh, 4);
CASE(INT64, "%ld", t64.i, t64.u, le64toh, 8);
CASE(UINT64, "%lu", t64.u, t64.u, le64toh, 8);
CASE(INT8, "%" PRId8, t8.i, t8.u, , 1);
CASE(UINT8, "%" PRIu8, t8.u, t8.u, , 1);
CASE(INT16, "%" PRId16, t16.i, t16.u, le16toh, 2);
CASE(UINT16, "%" PRIu16, t16.u, t16.u, le16toh, 2);
CASE(INT32, "%" PRId32, t32.i, t32.u, le32toh, 4);
CASE(UINT32, "%" PRIu32, t32.u, t32.u, le32toh, 4);
CASE(INT64, "%" PRId64, t64.i, t64.u, le64toh, 8);
CASE(UINT64, "%" PRIu64, t64.u, t64.u, le64toh, 8);
/* These next two are a bit debatable. floats
are 6-9 significant figures, so we print 7.
Doubles are 15-19, so we print 17. This is
@@ -653,7 +658,7 @@ static PyObject *Rocket_extract_timestamp(Rocket *self, PyObject *args)
/* Convert and return */
t64.u = le64toh(t64.u);
return Py_BuildValue("l", t64.i);
return Py_BuildValue("L", (long long)t64.i);
}
/****

View File

@@ -17,126 +17,26 @@ import decorator
import psutil
import traceback
from nilmdb.server.serverutil import (
chunked_response,
response_type,
workaround_cp_bug_1200,
exception_to_httperror,
CORS_allow,
json_to_request_params,
json_error_page,
cherrypy_start,
cherrypy_stop,
bool_param,
)
# Add CORS_allow tool
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
class NilmApp(object):
def __init__(self, db):
self.db = db
# Decorators
def chunked_response(func):
"""Decorator to enable chunked responses."""
# Set this to False to get better tracebacks from some requests
# (/stream/extract, /stream/intervals).
func._cp_config = { 'response.stream': True }
return func
def response_type(content_type):
"""Return a decorator-generating function that sets the
response type to the specified string."""
def wrapper(func, *args, **kwargs):
cherrypy.response.headers['Content-Type'] = content_type
return func(*args, **kwargs)
return decorator.decorator(wrapper)
@decorator.decorator
def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
"""Decorator to work around CherryPy bug #1200 in a response
generator.
Even if chunked responses are disabled, LookupError or
UnicodeError exceptions may still be swallowed by CherryPy due to
bug #1200. This throws them as generic Exceptions instead so that
they make it through.
"""
exc_info = None
try:
for val in func(*args, **kwargs):
yield val
except (LookupError, UnicodeError):
# Re-raise it, but maintain the original traceback
exc_info = sys.exc_info()
new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1]))
raise new_exc, None, exc_info[2]
finally:
del exc_info
def exception_to_httperror(*expected):
"""Return a decorator-generating function that catches expected
errors and throws a HTTPError describing it instead.
@exception_to_httperror(NilmDBError, ValueError)
def foo():
pass
"""
def wrapper(func, *args, **kwargs):
exc_info = None
try:
return func(*args, **kwargs)
except expected:
# Re-raise it, but maintain the original traceback
exc_info = sys.exc_info()
new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1]))
raise new_exc, None, exc_info[2]
finally:
del exc_info
# We need to preserve the function's argspecs for CherryPy to
# handle argument errors correctly. Decorator.decorator takes
# care of that.
return decorator.decorator(wrapper)
# Custom CherryPy tools
def CORS_allow(methods):
"""This does several things:
Handles CORS preflight requests.
Adds Allow: header to all requests.
Raise 405 if request.method not in method.
It is similar to cherrypy.tools.allow, with the CORS stuff added.
"""
request = cherrypy.request.headers
response = cherrypy.response.headers
if not isinstance(methods, (tuple, list)): # pragma: no cover
methods = [ methods ]
methods = [ m.upper() for m in methods if m ]
if not methods: # pragma: no cover
methods = [ 'GET', 'HEAD' ]
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
methods.append('HEAD')
response['Allow'] = ', '.join(methods)
# Allow all origins
if 'Origin' in request:
response['Access-Control-Allow-Origin'] = request['Origin']
# If it's a CORS request, send response.
request_method = request.get("Access-Control-Request-Method", None)
request_headers = request.get("Access-Control-Request-Headers", None)
if (cherrypy.request.method == "OPTIONS" and
request_method and request_headers):
response['Access-Control-Allow-Headers'] = request_headers
response['Access-Control-Allow-Methods'] = ', '.join(methods)
# Try to stop further processing and return a 200 OK
cherrypy.response.status = "200 OK"
cherrypy.response.body = ""
cherrypy.request.handler = lambda: ""
return
# Reject methods that were not explicitly allowed
if cherrypy.request.method not in methods:
raise cherrypy.HTTPError(405)
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
# Helper for json_in tool to process JSON data into normal request
# parameters.
def json_to_request_params(body):
cherrypy.lib.jsontools.json_processor(body)
if not isinstance(cherrypy.request.json, dict):
raise cherrypy.HTTPError(415)
cherrypy.request.params.update(cherrypy.request.json)
# CherryPy apps
class Root(NilmApp):
"""Root application for NILM database"""
@@ -147,7 +47,10 @@ class Root(NilmApp):
# /
@cherrypy.expose
def index(self):
raise cherrypy.NotFound()
cherrypy.response.headers['Content-Type'] = 'text/plain'
msg = sprintf("This is NilmDB version %s, running on host %s.\n",
nilmdb.__version__, socket.getfqdn())
return msg
# /favicon.ico
@cherrypy.expose
@@ -203,10 +106,10 @@ class Stream(NilmApp):
layout parameter, just list streams that match the given path
or layout.
If extent is not given, returns a list of lists containing
the path and layout: [ path, layout ]
If extended is missing or zero, returns a list of lists
containing the path and layout: [ path, layout ]
If extended is provided, returns a list of lists containing
If extended is true, returns a list of lists containing
extended info: [ path, layout, extent_min, extent_max,
total_rows, total_seconds ]. More data may be added.
"""
@@ -319,6 +222,8 @@ class Stream(NilmApp):
little-endian and matches the database types (including an
int64 timestamp).
"""
binary = bool_param(binary)
# Important that we always read the input before throwing any
# errors, to keep lengths happy for persistent connections.
# Note that CherryPy 3.2.2 has a bug where this fails for GET
@@ -443,6 +348,10 @@ class Stream(NilmApp):
little-endian and matches the database types (including an
int64 timestamp).
"""
binary = bool_param(binary)
markup = bool_param(markup)
count = bool_param(count)
(start, end) = self._get_times(start, end)
# Check path and get layout
@@ -570,70 +479,14 @@ class Server(object):
def json_error_page(self, status, message, traceback, version):
"""Return a custom error page in JSON so the client can parse it"""
errordata = { "status" : status,
"message" : message,
"traceback" : traceback }
# Don't send a traceback if the error was 400-499 (client's fault)
try:
code = int(status.split()[0])
if not self.force_traceback:
if code >= 400 and code <= 499:
errordata["traceback"] = ""
except Exception: # pragma: no cover
pass
# Override the response type, which was previously set to text/html
cherrypy.serving.response.headers['Content-Type'] = (
"application/json;charset=utf-8" )
# Undo the HTML escaping that cherrypy's get_error_page function applies
# (cherrypy issue 1135)
for k, v in errordata.iteritems():
v = v.replace("&lt;","<")
v = v.replace("&gt;",">")
v = v.replace("&amp;","&")
errordata[k] = v
return json.dumps(errordata, separators=(',',':'))
return json_error_page(status, message, traceback, version,
self.force_traceback)
def start(self, blocking = False, event = None):
if not self.embedded: # pragma: no cover
# Handle signals nicely
if hasattr(cherrypy.engine, "signal_handler"):
cherrypy.engine.signal_handler.subscribe()
if hasattr(cherrypy.engine, "console_control_handler"):
cherrypy.engine.console_control_handler.subscribe()
# Cherrypy stupidly calls os._exit(70) when it can't bind the
# port. At least try to print a reasonable error and continue
# in this case, rather than just dying silently (as we would
# otherwise do in embedded mode)
real_exit = os._exit
def fake_exit(code): # pragma: no cover
if code == os.EX_SOFTWARE:
fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
else:
real_exit(code)
os._exit = fake_exit
cherrypy.engine.start()
os._exit = real_exit
# Signal that the engine has started successfully
if event is not None:
event.set()
if blocking:
try:
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
interval = 0.1, channel = 'main')
except (KeyboardInterrupt, IOError): # pragma: no cover
cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
cherrypy.engine.exit()
except SystemExit: # pragma: no cover
cherrypy.engine.log('SystemExit raised: shutting down bus')
cherrypy.engine.exit()
raise
cherrypy_start(blocking, event, self.embedded)
def stop(self):
cherrypy.engine.exit()
cherrypy_stop()
# Use a single global nilmdb.server.NilmDB and nilmdb.server.Server
# instance since the database can only be opened once. For this to

214
nilmdb/server/serverutil.py Normal file
View File

@@ -0,0 +1,214 @@
"""Miscellaneous decorators and other helpers for running a CherryPy
server"""
import cherrypy
import sys
import os
import decorator
import simplejson as json
# Helper to parse parameters into booleans
def bool_param(s):
"""Return a bool indicating whether parameter 's' was True or False,
supporting a few different types for 's'."""
try:
ss = s.lower()
if ss in [ "0", "false", "f", "no", "n" ]:
return False
if ss in [ "1", "true", "t", "yes", "y" ]:
return True
except Exception:
return bool(s)
raise cherrypy.HTTPError("400 Bad Request",
"can't parse parameter: " + ss)
# Decorators
def chunked_response(func):
"""Decorator to enable chunked responses."""
# Set this to False to get better tracebacks from some requests
# (/stream/extract, /stream/intervals).
func._cp_config = { 'response.stream': True }
return func
def response_type(content_type):
"""Return a decorator-generating function that sets the
response type to the specified string."""
def wrapper(func, *args, **kwargs):
cherrypy.response.headers['Content-Type'] = content_type
return func(*args, **kwargs)
return decorator.decorator(wrapper)
@decorator.decorator
def workaround_cp_bug_1200(func, *args, **kwargs): # pragma: no cover
"""Decorator to work around CherryPy bug #1200 in a response
generator.
Even if chunked responses are disabled, LookupError or
UnicodeError exceptions may still be swallowed by CherryPy due to
bug #1200. This throws them as generic Exceptions instead so that
they make it through.
"""
exc_info = None
try:
for val in func(*args, **kwargs):
yield val
except (LookupError, UnicodeError):
# Re-raise it, but maintain the original traceback
exc_info = sys.exc_info()
new_exc = Exception(exc_info[0].__name__ + ": " + str(exc_info[1]))
raise new_exc, None, exc_info[2]
finally:
del exc_info
def exception_to_httperror(*expected):
"""Return a decorator-generating function that catches expected
errors and throws a HTTPError describing it instead.
@exception_to_httperror(NilmDBError, ValueError)
def foo():
pass
"""
def wrapper(func, *args, **kwargs):
exc_info = None
try:
return func(*args, **kwargs)
except expected:
# Re-raise it, but maintain the original traceback
exc_info = sys.exc_info()
new_exc = cherrypy.HTTPError("400 Bad Request", str(exc_info[1]))
raise new_exc, None, exc_info[2]
finally:
del exc_info
# We need to preserve the function's argspecs for CherryPy to
# handle argument errors correctly. Decorator.decorator takes
# care of that.
return decorator.decorator(wrapper)
# Custom CherryPy tools
def CORS_allow(methods):
"""This does several things:
Handles CORS preflight requests.
Adds Allow: header to all requests.
Raise 405 if request.method not in method.
It is similar to cherrypy.tools.allow, with the CORS stuff added.
Add this to CherryPy with:
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
"""
request = cherrypy.request.headers
response = cherrypy.response.headers
if not isinstance(methods, (tuple, list)): # pragma: no cover
methods = [ methods ]
methods = [ m.upper() for m in methods if m ]
if not methods: # pragma: no cover
methods = [ 'GET', 'HEAD' ]
elif 'GET' in methods and 'HEAD' not in methods: # pragma: no cover
methods.append('HEAD')
response['Allow'] = ', '.join(methods)
# Allow all origins
if 'Origin' in request:
response['Access-Control-Allow-Origin'] = request['Origin']
# If it's a CORS request, send response.
request_method = request.get("Access-Control-Request-Method", None)
request_headers = request.get("Access-Control-Request-Headers", None)
if (cherrypy.request.method == "OPTIONS" and
request_method and request_headers):
response['Access-Control-Allow-Headers'] = request_headers
response['Access-Control-Allow-Methods'] = ', '.join(methods)
# Try to stop further processing and return a 200 OK
cherrypy.response.status = "200 OK"
cherrypy.response.body = ""
cherrypy.request.handler = lambda: ""
return
# Reject methods that were not explicitly allowed
if cherrypy.request.method not in methods:
raise cherrypy.HTTPError(405)
# Helper for json_in tool to process JSON data into normal request
# parameters.
def json_to_request_params(body):
cherrypy.lib.jsontools.json_processor(body)
if not isinstance(cherrypy.request.json, dict):
raise cherrypy.HTTPError(415)
cherrypy.request.params.update(cherrypy.request.json)
# Used as an "error_page.default" handler
def json_error_page(status, message, traceback, version,
force_traceback = False):
"""Return a custom error page in JSON so the client can parse it"""
errordata = { "status" : status,
"message" : message,
"traceback" : traceback }
# Don't send a traceback if the error was 400-499 (client's fault)
try:
code = int(status.split()[0])
if not force_traceback:
if code >= 400 and code <= 499:
errordata["traceback"] = ""
except Exception: # pragma: no cover
pass
# Override the response type, which was previously set to text/html
cherrypy.serving.response.headers['Content-Type'] = (
"application/json;charset=utf-8" )
# Undo the HTML escaping that cherrypy's get_error_page function applies
# (cherrypy issue 1135)
for k, v in errordata.iteritems():
v = v.replace("&lt;","<")
v = v.replace("&gt;",">")
v = v.replace("&amp;","&")
errordata[k] = v
return json.dumps(errordata, separators=(',',':'))
# Start/stop CherryPy standalone server
def cherrypy_start(blocking = False, event = False, embedded = False):
"""Start the CherryPy server, handling errors and signals
somewhat gracefully."""
if not embedded: # pragma: no cover
# Handle signals nicely
if hasattr(cherrypy.engine, "signal_handler"):
cherrypy.engine.signal_handler.subscribe()
if hasattr(cherrypy.engine, "console_control_handler"):
cherrypy.engine.console_control_handler.subscribe()
# Cherrypy stupidly calls os._exit(70) when it can't bind the
# port. At least try to print a reasonable error and continue
# in this case, rather than just dying silently (as we would
# otherwise do in embedded mode)
real_exit = os._exit
def fake_exit(code): # pragma: no cover
if code == os.EX_SOFTWARE:
fprintf(sys.stderr, "error: CherryPy called os._exit!\n")
else:
real_exit(code)
os._exit = fake_exit
cherrypy.engine.start()
os._exit = real_exit
# Signal that the engine has started successfully
if event is not None:
event.set()
if blocking:
try:
cherrypy.engine.wait(cherrypy.engine.states.EXITING,
interval = 0.1, channel = 'main')
except (KeyboardInterrupt, IOError): # pragma: no cover
cherrypy.engine.log('Keyboard Interrupt: shutting down bus')
cherrypy.engine.exit()
except SystemExit: # pragma: no cover
cherrypy.engine.log('SystemExit raised: shutting down bus')
cherrypy.engine.exit()
raise
# Stop CherryPy server
def cherrypy_stop():
cherrypy.engine.exit()

View File

@@ -1,5 +1,6 @@
"""Interval. Like nilmdb.server.interval, but re-implemented here
in plain Python so clients have easier access to it.
in plain Python so clients have easier access to it, and with a few
helper functions.
Intervals are half-open, ie. they include data points with timestamps
[start, end)
@@ -34,6 +35,10 @@ class Interval:
return ("[" + nilmdb.utils.time.timestamp_to_string(self.start) +
" -> " + nilmdb.utils.time.timestamp_to_string(self.end) + ")")
def human_string(self):
return ("[ " + nilmdb.utils.time.timestamp_to_human(self.start) +
" -> " + nilmdb.utils.time.timestamp_to_human(self.end) + " ]")
def __cmp__(self, other):
"""Compare two intervals. If non-equal, order by start then end"""
return cmp(self.start, other.start) or cmp(self.end, other.end)
@@ -104,3 +109,20 @@ def set_difference(a, b):
b_interval = None
if a_interval:
out_start = ts
def optimize(it):
"""
Given an iterable 'it' with intervals, optimize them by joining
together intervals that are adjacent in time, and return a generator
that yields the new intervals.
"""
saved_int = None
for interval in it:
if saved_int is not None:
if saved_int.end == interval.start:
interval.start = saved_int.start
else:
yield saved_int
saved_int = interval
if saved_int is not None:
yield saved_int

View File

@@ -91,6 +91,20 @@ def serializer_proxy(obj_or_type):
r = SerializerCallProxy(self.__call_queue, attr, self)
return r
# For an interable object, on __iter__(), save the object's
# iterator and return this proxy. On next(), call the object's
# iterator through this proxy.
def __iter__(self):
attr = getattr(self.__object, "__iter__")
self.__iter = SerializerCallProxy(self.__call_queue, attr, self)()
return self
def next(self):
return SerializerCallProxy(self.__call_queue,
self.__iter.next, self)()
def __getitem__(self, key):
return self.__getattr__("__getitem__")(key)
def __call__(self, *args, **kwargs):
"""Call this to instantiate the type, if a type was passed
to serializer_proxy. Otherwise, pass the call through."""

View File

@@ -1,7 +1,14 @@
import sys
if sys.version_info[0] >= 3: # pragma: no cover (future Python3 compat)
text_type = str
else:
text_type = unicode
def encode(u):
"""Try to encode something from Unicode to a string using the
default encoding. If it fails, try encoding as UTF-8."""
if not isinstance(u, unicode):
if not isinstance(u, text_type):
return u
try:
return u.encode()
@@ -11,7 +18,7 @@ def encode(u):
def decode(s):
"""Try to decode someting from string to Unicode using the
default encoding. If it fails, try decoding as UTF-8."""
if isinstance(s, unicode):
if isinstance(s, text_type):
return s
try:
return s.decode()

8
tests/data/timestamped Normal file
View File

@@ -0,0 +1,8 @@
-10000000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
-100000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
-100000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
-1000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03
1000000000 2.61246e+05 2.22735e+05 4.60340e+03 2.58221e+03 8.42804e+03 3.41890e+03 9.57898e+02 4.00585e+03

View File

@@ -354,10 +354,6 @@ class TestClient(object):
with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl.example.com./").next()
# Trigger a curl error in generator
with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl.example.com./").next()
# Check 404 for missing streams
for function in [ client.stream_intervals, client.stream_extract ]:
with assert_raises(ClientError) as e:
@@ -396,27 +392,38 @@ class TestClient(object):
headers())
# Extract
x = http.get("stream/extract",
{ "path": "/newton/prep",
"start": "123",
"end": "124" })
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124" })
if "transfer-encoding: chunked" not in headers():
warnings.warn("Non-chunked HTTP response for /stream/extract")
if "content-type: text/plain;charset=utf-8" not in headers():
raise AssertionError("/stream/extract is not text/plain:\n" +
headers())
x = http.get("stream/extract",
{ "path": "/newton/prep",
"start": "123",
"end": "124",
"binary": "1" })
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124",
"binary": "1" })
if "transfer-encoding: chunked" not in headers():
warnings.warn("Non-chunked HTTP response for /stream/extract")
if "content-type: application/octet-stream" not in headers():
raise AssertionError("/stream/extract is not binary:\n" +
headers())
# Make sure a binary of "0" is really off
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124",
"binary": "0" })
if "content-type: application/octet-stream" in headers():
raise AssertionError("/stream/extract is not text:\n" +
headers())
# Invalid parameters
with assert_raises(ClientError) as e:
x = http.get("stream/extract", { "path": "/newton/prep",
"start": "123", "end": "124",
"binary": "asdfasfd" })
in_("can't parse parameter", str(e.exception))
client.close()
def test_client_08_unicode(self):

View File

@@ -475,6 +475,13 @@ class TestCmdline(object):
# bad start time
self.fail("insert -t -r 120 --start 'whatever' /newton/prep /dev/null")
# Test negative times
self.ok("insert --start @-10000000000 --end @1000000001 /newton/prep"
" tests/data/timestamped")
self.ok("extract -c /newton/prep --start min --end @1000000001")
self.match("8\n")
self.ok("remove /newton/prep --start min --end @1000000001")
def test_07_detail_extended(self):
# Just count the number of lines, it's probably fine
self.ok("list --detail")

View File

@@ -59,6 +59,14 @@ class TestInterval:
self.test_interval_intersect()
Interval = NilmdbInterval
# Other helpers in nilmdb.utils.interval
i = [ UtilsInterval(1,2), UtilsInterval(2,3), UtilsInterval(4,5) ]
eq_(list(nilmdb.utils.interval.optimize(i)),
[ UtilsInterval(1,3), UtilsInterval(4,5) ])
eq_(UtilsInterval(1234567890123456, 1234567890654321).human_string(),
"[ Fri, 13 Feb 2009 18:31:30.123456 -0500 -> " +
"Fri, 13 Feb 2009 18:31:30.654321 -0500 ]")
def test_interval(self):
# Test Interval class
os.environ['TZ'] = "America/New_York"

View File

@@ -157,11 +157,14 @@ class TestServer(object):
def test_server(self):
# Make sure we can't force an exit, and test other 404 errors
for url in [ "/exit", "/", "/favicon.ico" ]:
for url in [ "/exit", "/favicon.ico" ]:
with assert_raises(HTTPError) as e:
geturl(url)
eq_(e.exception.code, 404)
# Root page
in_("This is NilmDB", geturl("/"))
# Check version
eq_(distutils.version.LooseVersion(getjson("/version")),
distutils.version.LooseVersion(nilmdb.__version__))

View File

@@ -62,6 +62,28 @@ class Base(object):
eq_(self.foo.val, 20)
eq_(self.foo.init_thread, self.foo.test_thread)
class ListLike(object):
def __init__(self):
self.thread = threading.current_thread().name
self.foo = 0
def __iter__(self):
eq_(threading.current_thread().name, self.thread)
self.foo = 0
return self
def __getitem__(self, key):
eq_(threading.current_thread().name, self.thread)
return key
def next(self):
eq_(threading.current_thread().name, self.thread)
if self.foo < 5:
self.foo += 1
return self.foo
else:
raise StopIteration
class TestUnserialized(Base):
def setUp(self):
self.foo = Foo()
@@ -84,3 +106,10 @@ class TestSerializer(Base):
sp(sp(Foo("x"))).t()
sp(sp(Foo)("x")).t()
sp(sp(Foo))("x").t()
def test_iter(self):
sp = nilmdb.utils.serializer_proxy
i = sp(ListLike)()
print iter(i)
eq_(list(i), [1,2,3,4,5])
eq_(i[3], 3)