Compare commits
39 Commits
nilmdb-1.1
...
nilmdb-1.2
Author | SHA1 | Date | |
---|---|---|---|
0c1a1d2388 | |||
e3f335dfe5 | |||
7a191c0ebb | |||
55bf11e393 | |||
e90dcd10f3 | |||
7d44f4eaa0 | |||
f541432d44 | |||
aa4e32f78a | |||
2bc1416c00 | |||
68bbbf757d | |||
3df96fdfdd | |||
740ab76eaf | |||
ce13a47fea | |||
50a4a60786 | |||
14afa02db6 | |||
cc990d6ce4 | |||
0f5162e0c0 | |||
b26cd52f8c | |||
236d925a1d | |||
a4a4bc61ba | |||
3d82888580 | |||
749b878904 | |||
f396e3934c | |||
dd7594b5fa | |||
4ac1beee6d | |||
8c0ce736d8 | |||
8858c9426f | |||
9123ccb583 | |||
5b0441de6b | |||
317c53ab6f | |||
7db4411462 | |||
422317850e | |||
965537d8cb | |||
0dcdec5949 | |||
7fce305a1d | |||
dfbbe23512 | |||
7761a91242 | |||
9b06e46bf1 | |||
171e6f1871 |
8
Makefile
8
Makefile
@@ -21,7 +21,13 @@ lint:
|
||||
pylint --rcfile=.pylintrc nilmdb
|
||||
|
||||
test:
|
||||
ifeq ($(INSIDE_EMACS), t)
|
||||
# Use the slightly more flexible script
|
||||
python tests/runtests.py
|
||||
else
|
||||
# Let setup.py check dependencies, build stuff, and run the test
|
||||
python setup.py nosetests
|
||||
endif
|
||||
|
||||
clean::
|
||||
find . -name '*pyc' | xargs rm -f
|
||||
@@ -33,4 +39,4 @@ clean::
|
||||
gitclean::
|
||||
git clean -dXf
|
||||
|
||||
.PHONY: all build dist sdist install docs lint test clean
|
||||
.PHONY: all version build dist sdist install docs lint test clean
|
||||
|
@@ -7,11 +7,15 @@ Prerequisites:
|
||||
sudo apt-get install python2.7 python2.7-dev python-setuptools cython
|
||||
|
||||
# Base NilmDB dependencies
|
||||
sudo apt-get install python-cherrypy3 python-decorator python-simplejson python-pycurl python-dateutil python-tz python-psutil
|
||||
sudo apt-get install python-cherrypy3 python-decorator python-simplejson
|
||||
sudo apt-get install python-requests python-dateutil python-tz python-psutil
|
||||
|
||||
# Tools for running tests
|
||||
sudo apt-get install python-nose python-coverage
|
||||
|
||||
Test:
|
||||
python setup.py nosetests
|
||||
|
||||
Install:
|
||||
|
||||
python setup.py install
|
||||
|
@@ -73,7 +73,7 @@ class Client(object):
|
||||
"path": path,
|
||||
"data": self._json_param(data)
|
||||
}
|
||||
return self.http.get("stream/set_metadata", params)
|
||||
return self.http.post("stream/set_metadata", params)
|
||||
|
||||
def stream_update_metadata(self, path, data):
|
||||
"""Update stream metadata from a dictionary"""
|
||||
@@ -81,18 +81,18 @@ class Client(object):
|
||||
"path": path,
|
||||
"data": self._json_param(data)
|
||||
}
|
||||
return self.http.get("stream/update_metadata", params)
|
||||
return self.http.post("stream/update_metadata", params)
|
||||
|
||||
def stream_create(self, path, layout):
|
||||
"""Create a new stream"""
|
||||
params = { "path": path,
|
||||
"layout" : layout }
|
||||
return self.http.get("stream/create", params)
|
||||
return self.http.post("stream/create", params)
|
||||
|
||||
def stream_destroy(self, path):
|
||||
"""Delete stream and its contents"""
|
||||
params = { "path": path }
|
||||
return self.http.get("stream/destroy", params)
|
||||
return self.http.post("stream/destroy", params)
|
||||
|
||||
def stream_remove(self, path, start = None, end = None):
|
||||
"""Remove data from the specified time range"""
|
||||
@@ -103,7 +103,7 @@ class Client(object):
|
||||
params["start"] = float_to_string(start)
|
||||
if end is not None:
|
||||
params["end"] = float_to_string(end)
|
||||
return self.http.get("stream/remove", params)
|
||||
return self.http.post("stream/remove", params)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def stream_insert_context(self, path, start = None, end = None):
|
||||
@@ -156,7 +156,7 @@ class Client(object):
|
||||
params["start"] = float_to_string(start)
|
||||
if end is not None:
|
||||
params["end"] = float_to_string(end)
|
||||
return self.http.get_gen("stream/intervals", params, retjson = True)
|
||||
return self.http.get_gen("stream/intervals", params)
|
||||
|
||||
def stream_extract(self, path, start = None, end = None, count = False):
|
||||
"""
|
||||
@@ -176,8 +176,7 @@ class Client(object):
|
||||
params["end"] = float_to_string(end)
|
||||
if count:
|
||||
params["count"] = 1
|
||||
|
||||
return self.http.get_gen("stream/extract", params, retjson = False)
|
||||
return self.http.get_gen("stream/extract", params)
|
||||
|
||||
def stream_count(self, path, start = None, end = None):
|
||||
"""
|
||||
|
@@ -6,8 +6,7 @@ from nilmdb.client.errors import ClientError, ServerError, Error
|
||||
|
||||
import simplejson as json
|
||||
import urlparse
|
||||
import pycurl
|
||||
import cStringIO
|
||||
import requests
|
||||
|
||||
class HTTPClient(object):
|
||||
"""Class to manage and perform HTTP requests from the client"""
|
||||
@@ -19,40 +18,19 @@ class HTTPClient(object):
|
||||
if '://' not in reparsed:
|
||||
reparsed = urlparse.urlparse("http://" + baseurl).geturl()
|
||||
self.baseurl = reparsed
|
||||
self.curl = pycurl.Curl()
|
||||
self.curl.setopt(pycurl.SSL_VERIFYHOST, 2)
|
||||
self.curl.setopt(pycurl.FOLLOWLOCATION, 1)
|
||||
self.curl.setopt(pycurl.MAXREDIRS, 5)
|
||||
self._setup_url()
|
||||
|
||||
def _setup_url(self, url = "", params = ""):
|
||||
url = urlparse.urljoin(self.baseurl, url)
|
||||
if params:
|
||||
url = urlparse.urljoin(
|
||||
url, "?" + nilmdb.utils.urllib.urlencode(params))
|
||||
self.curl.setopt(pycurl.URL, url)
|
||||
self.url = url
|
||||
# Build Requests session object, enable SSL verification
|
||||
self.session = requests.Session()
|
||||
self.session.verify = True
|
||||
|
||||
def _check_busy_and_set_upload(self, upload):
|
||||
"""Sets the pycurl.UPLOAD option, but also raises a more
|
||||
friendly exception if the client is already serving a request."""
|
||||
try:
|
||||
self.curl.setopt(pycurl.UPLOAD, upload)
|
||||
except pycurl.error as e:
|
||||
if "is currently running" in str(e):
|
||||
raise Exception("Client is already performing a request, and "
|
||||
"nesting calls is not supported.")
|
||||
else: # pragma: no cover (shouldn't happen)
|
||||
raise
|
||||
# Saved response, so that tests can verify a few things.
|
||||
self._last_response = {}
|
||||
|
||||
def _check_error(self, body = None):
|
||||
code = self.curl.getinfo(pycurl.RESPONSE_CODE)
|
||||
if code == 200:
|
||||
return
|
||||
def _handle_error(self, url, code, body):
|
||||
# Default variables for exception. We use the entire body as
|
||||
# the default message, in case we can't extract it from a JSON
|
||||
# response.
|
||||
args = { "url" : self.url,
|
||||
args = { "url" : url,
|
||||
"status" : str(code),
|
||||
"message" : body,
|
||||
"traceback" : None }
|
||||
@@ -76,133 +54,67 @@ class HTTPClient(object):
|
||||
else:
|
||||
raise Error(**args)
|
||||
|
||||
def _req_generator(self, url, params):
|
||||
"""
|
||||
Like self._req(), but runs the perform in a separate thread.
|
||||
It returns a generator that spits out arbitrary-sized chunks
|
||||
of the resulting data, instead of using the WRITEFUNCTION
|
||||
callback.
|
||||
"""
|
||||
self._setup_url(url, params)
|
||||
self._status = None
|
||||
error_body = ""
|
||||
self._headers = ""
|
||||
def header_callback(data):
|
||||
if self._status is None:
|
||||
self._status = int(data.split(" ")[1])
|
||||
self._headers += data
|
||||
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
|
||||
def perform(callback):
|
||||
self.curl.setopt(pycurl.WRITEFUNCTION, callback)
|
||||
self.curl.perform()
|
||||
try:
|
||||
with nilmdb.utils.Iteratorizer(perform, curl_hack = True) as it:
|
||||
for i in it:
|
||||
if self._status == 200:
|
||||
# If we had a 200 response, yield the data to caller.
|
||||
yield i
|
||||
else:
|
||||
# Otherwise, collect it into an error string.
|
||||
error_body += i
|
||||
except pycurl.error as e:
|
||||
raise ServerError(status = "502 Error",
|
||||
url = self.url,
|
||||
message = e[1])
|
||||
# Raise an exception if there was an error
|
||||
self._check_error(error_body)
|
||||
|
||||
def _req(self, url, params):
|
||||
"""
|
||||
GET or POST that returns raw data. Returns the body
|
||||
data as a string, or raises an error if it contained an error.
|
||||
"""
|
||||
self._setup_url(url, params)
|
||||
body = cStringIO.StringIO()
|
||||
self.curl.setopt(pycurl.WRITEFUNCTION, body.write)
|
||||
self._headers = ""
|
||||
def header_callback(data):
|
||||
self._headers += data
|
||||
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
|
||||
try:
|
||||
self.curl.perform()
|
||||
except pycurl.error as e:
|
||||
raise ServerError(status = "502 Error",
|
||||
url = self.url,
|
||||
message = e[1])
|
||||
body_str = body.getvalue()
|
||||
# Raise an exception if there was an error
|
||||
self._check_error(body_str)
|
||||
return body_str
|
||||
|
||||
def close(self):
|
||||
self.curl.close()
|
||||
self.session.close()
|
||||
|
||||
def _iterate_lines(self, it):
|
||||
def _do_req(self, method, url, query_data, body_data, stream):
|
||||
url = urlparse.urljoin(self.baseurl, url)
|
||||
try:
|
||||
response = self.session.request(method, url,
|
||||
params = query_data,
|
||||
data = body_data)
|
||||
except requests.RequestException as e:
|
||||
raise ServerError(status = "502 Error", url = url,
|
||||
message = str(e.message))
|
||||
if response.status_code != 200:
|
||||
self._handle_error(url, response.status_code, response.content)
|
||||
self._last_response = response
|
||||
if response.headers["content-type"] in ("application/json",
|
||||
"application/x-json-stream"):
|
||||
return (response, True)
|
||||
else:
|
||||
return (response, False)
|
||||
|
||||
# Normal versions that return data directly
|
||||
def _req(self, method, url, query = None, body = None):
|
||||
"""
|
||||
Given an iterator that returns arbitrarily-sized chunks
|
||||
of data, return '\n'-delimited lines of text
|
||||
Make a request and return the body data as a string or parsed
|
||||
JSON object, or raise an error if it contained an error.
|
||||
"""
|
||||
partial = ""
|
||||
for chunk in it:
|
||||
partial += chunk
|
||||
lines = partial.split("\n")
|
||||
for line in lines[0:-1]:
|
||||
yield line
|
||||
partial = lines[-1]
|
||||
if partial != "":
|
||||
yield partial
|
||||
(response, isjson) = self._do_req(method, url, query, body, False)
|
||||
if isjson:
|
||||
return json.loads(response.content)
|
||||
return response.content
|
||||
|
||||
# Non-generator versions
|
||||
def _doreq(self, url, params, retjson):
|
||||
def get(self, url, params = None):
|
||||
"""Simple GET (parameters in URL)"""
|
||||
return self._req("GET", url, params, None)
|
||||
|
||||
def post(self, url, params = None):
|
||||
"""Simple POST (parameters in body)"""
|
||||
return self._req("POST", url, None, params)
|
||||
|
||||
def put(self, url, data, params = None):
|
||||
"""Simple PUT (parameters in URL, data in body)"""
|
||||
return self._req("PUT", url, params, data)
|
||||
|
||||
# Generator versions that return data one line at a time.
|
||||
def _req_gen(self, method, url, query = None, body = None):
|
||||
"""
|
||||
Perform a request, and return the body.
|
||||
|
||||
url: URL to request (relative to baseurl)
|
||||
params: dictionary of query parameters
|
||||
retjson: expect JSON and return python objects instead of string
|
||||
Make a request and return a generator that gives back strings
|
||||
or JSON decoded lines of the body data, or raise an error if
|
||||
it contained an eror.
|
||||
"""
|
||||
out = self._req(url, params)
|
||||
if retjson:
|
||||
return json.loads(out)
|
||||
return out
|
||||
|
||||
def get(self, url, params = None, retjson = True):
|
||||
"""Simple GET"""
|
||||
self._check_busy_and_set_upload(0)
|
||||
return self._doreq(url, params, retjson)
|
||||
|
||||
def put(self, url, postdata, params = None, retjson = True):
|
||||
"""Simple PUT"""
|
||||
self._check_busy_and_set_upload(1)
|
||||
self._setup_url(url, params)
|
||||
data = cStringIO.StringIO(postdata)
|
||||
self.curl.setopt(pycurl.READFUNCTION, data.read)
|
||||
return self._doreq(url, params, retjson)
|
||||
|
||||
# Generator versions
|
||||
def _doreq_gen(self, url, params, retjson):
|
||||
"""
|
||||
Perform a request, and return lines of the body in a generator.
|
||||
|
||||
url: URL to request (relative to baseurl)
|
||||
params: dictionary of query parameters
|
||||
retjson: expect JSON and yield python objects instead of strings
|
||||
"""
|
||||
for line in self._iterate_lines(self._req_generator(url, params)):
|
||||
if retjson:
|
||||
(response, isjson) = self._do_req(method, url, query, body, True)
|
||||
for line in response.iter_lines():
|
||||
if isjson:
|
||||
yield json.loads(line)
|
||||
else:
|
||||
yield line
|
||||
|
||||
def get_gen(self, url, params = None, retjson = True):
|
||||
"""Simple GET, returning a generator"""
|
||||
self._check_busy_and_set_upload(0)
|
||||
return self._doreq_gen(url, params, retjson)
|
||||
def get_gen(self, url, params = None):
|
||||
"""Simple GET (parameters in URL) returning a generator"""
|
||||
return self._req_gen("GET", url, params)
|
||||
|
||||
def put_gen(self, url, postdata, params = None, retjson = True):
|
||||
"""Simple PUT, returning a generator"""
|
||||
self._check_busy_and_set_upload(1)
|
||||
self._setup_url(url, params)
|
||||
data = cStringIO.StringIO(postdata)
|
||||
self.curl.setopt(pycurl.READFUNCTION, data.read)
|
||||
return self._doreq_gen(url, params, retjson)
|
||||
# Not much use for a POST or PUT generator, since they don't
|
||||
# return much data.
|
||||
|
@@ -3,9 +3,9 @@
|
||||
import nilmdb
|
||||
from nilmdb.utils.printf import *
|
||||
from nilmdb.utils import datetime_tz
|
||||
import nilmdb.utils.time
|
||||
|
||||
import sys
|
||||
import re
|
||||
import argparse
|
||||
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
||||
|
||||
@@ -33,63 +33,11 @@ class Cmdline(object):
|
||||
def arg_time(self, toparse):
|
||||
"""Parse a time string argument"""
|
||||
try:
|
||||
return self.parse_time(toparse).totimestamp()
|
||||
return nilmdb.utils.time.parse_time(toparse).totimestamp()
|
||||
except ValueError as e:
|
||||
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
|
||||
str(e), toparse))
|
||||
|
||||
def parse_time(self, toparse):
|
||||
"""
|
||||
Parse a free-form time string and return a datetime_tz object.
|
||||
If the string doesn't contain a timestamp, the current local
|
||||
timezone is assumed (e.g. from the TZ env var).
|
||||
"""
|
||||
# If string isn't "now" and doesn't contain at least 4 digits,
|
||||
# consider it invalid. smartparse might otherwise accept
|
||||
# empty strings and strings with just separators.
|
||||
if toparse != "now" and len(re.findall(r"\d", toparse)) < 4:
|
||||
raise ValueError("not enough digits for a timestamp")
|
||||
|
||||
# Try to just parse the time as given
|
||||
try:
|
||||
return datetime_tz.datetime_tz.smartparse(toparse)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Try to extract a substring in a condensed format that we expect
|
||||
# to see in a filename or header comment
|
||||
res = re.search(r"(^|[^\d])(" # non-numeric or SOL
|
||||
r"(199\d|2\d\d\d)" # year
|
||||
r"[-/]?" # separator
|
||||
r"(0[1-9]|1[012])" # month
|
||||
r"[-/]?" # separator
|
||||
r"([012]\d|3[01])" # day
|
||||
r"[-T ]?" # separator
|
||||
r"([01]\d|2[0-3])" # hour
|
||||
r"[:]?" # separator
|
||||
r"([0-5]\d)" # minute
|
||||
r"[:]?" # separator
|
||||
r"([0-5]\d)?" # second
|
||||
r"([-+]\d\d\d\d)?" # timezone
|
||||
r")", toparse)
|
||||
if res is not None:
|
||||
try:
|
||||
return datetime_tz.datetime_tz.smartparse(res.group(2))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Could also try to successively parse substrings, but let's
|
||||
# just give up for now.
|
||||
raise ValueError("unable to parse timestamp")
|
||||
|
||||
def time_string(self, timestamp):
|
||||
"""
|
||||
Convert a Unix timestamp to a string for printing, using the
|
||||
local timezone for display (e.g. from the TZ env var).
|
||||
"""
|
||||
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp)
|
||||
return dt.strftime("%a, %d %b %Y %H:%M:%S.%f %z")
|
||||
|
||||
def parser_setup(self):
|
||||
self.parser = JimArgumentParser(add_help = False,
|
||||
formatter_class = def_form)
|
||||
|
@@ -45,7 +45,7 @@ def cmd_extract(self):
|
||||
if self.args.timestamp_raw:
|
||||
time_string = repr
|
||||
else:
|
||||
time_string = self.time_string
|
||||
time_string = nilmdb.utils.time.format_time
|
||||
|
||||
if self.args.annotate:
|
||||
printf("# path: %s\n", self.args.path)
|
||||
|
@@ -2,6 +2,7 @@ from nilmdb.utils.printf import *
|
||||
import nilmdb
|
||||
import nilmdb.client
|
||||
import nilmdb.utils.timestamper as timestamper
|
||||
import nilmdb.utils.time
|
||||
|
||||
import sys
|
||||
|
||||
@@ -73,7 +74,7 @@ def cmd_insert(self):
|
||||
start = self.args.start
|
||||
else:
|
||||
try:
|
||||
start = self.parse_time(filename)
|
||||
start = nilmdb.utils.time.parse_time(filename)
|
||||
except ValueError:
|
||||
self.die("error extracting time from filename '%s'",
|
||||
filename)
|
||||
|
@@ -1,4 +1,5 @@
|
||||
from nilmdb.utils.printf import *
|
||||
import nilmdb.utils.time
|
||||
|
||||
import fnmatch
|
||||
import argparse
|
||||
@@ -57,7 +58,7 @@ def cmd_list(self):
|
||||
if self.args.timestamp_raw:
|
||||
time_string = repr
|
||||
else:
|
||||
time_string = self.time_string
|
||||
time_string = nilmdb.utils.time.format_time
|
||||
|
||||
for (path, layout) in streams:
|
||||
if not (fnmatch.fnmatch(path, self.args.path) and
|
||||
|
@@ -33,8 +33,9 @@ def main():
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Create database object
|
||||
db = nilmdb.server.NilmDB(args.database)
|
||||
# Create database object. Needs to be serialized before passing
|
||||
# to the Server.
|
||||
db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(args.database)
|
||||
|
||||
# Configure the server
|
||||
if args.quiet:
|
||||
|
@@ -5,15 +5,15 @@ from __future__ import absolute_import
|
||||
# Try to set up pyximport to automatically rebuild Cython modules. If
|
||||
# this doesn't work, it's OK, as long as the modules were built externally.
|
||||
# (e.g. python setup.py build_ext --inplace)
|
||||
try:
|
||||
try: # pragma: no cover
|
||||
import Cython
|
||||
import distutils.version
|
||||
if (distutils.version.LooseVersion(Cython.__version__) <
|
||||
distutils.version.LooseVersion("0.16")): # pragma: no cover
|
||||
distutils.version.LooseVersion("0.17")): # pragma: no cover
|
||||
raise ImportError("Cython version too old")
|
||||
import pyximport
|
||||
pyximport.install(inplace = True, build_in_temp = False)
|
||||
except ImportError: # pragma: no cover
|
||||
except (ImportError, TypeError): # pragma: no cover
|
||||
pass
|
||||
|
||||
import nilmdb.server.layout
|
||||
|
@@ -154,7 +154,7 @@ class Parser(object):
|
||||
layout, into an internal data structure suitable for a
|
||||
pytables 'table.append(parser.data)'.
|
||||
"""
|
||||
cdef double last_ts = 0, ts
|
||||
cdef double last_ts = -1e12, ts
|
||||
cdef int n = 0, i
|
||||
cdef char *line
|
||||
|
||||
|
@@ -97,12 +97,7 @@ class NilmDB(object):
|
||||
|
||||
# SQLite database too
|
||||
sqlfilename = os.path.join(self.basepath, "data.sql")
|
||||
# We use check_same_thread = False, assuming that the rest
|
||||
# of the code (e.g. Server) will be smart and not access this
|
||||
# database from multiple threads simultaneously. Otherwise
|
||||
# false positives will occur when the database is only opened
|
||||
# in one thread, and only accessed in another.
|
||||
self.con = sqlite3.connect(sqlfilename, check_same_thread = False)
|
||||
self.con = sqlite3.connect(sqlfilename, check_same_thread = True)
|
||||
self._sql_schema_update()
|
||||
|
||||
# See big comment at top about the performance implications of this
|
||||
|
@@ -15,12 +15,6 @@ import decorator
|
||||
import traceback
|
||||
import psutil
|
||||
|
||||
try:
|
||||
cherrypy.tools.json_out
|
||||
except: # pragma: no cover
|
||||
sys.stderr.write("Cherrypy 3.2+ required\n")
|
||||
sys.exit(1)
|
||||
|
||||
class NilmApp(object):
|
||||
def __init__(self, db):
|
||||
self.db = db
|
||||
@@ -77,6 +71,17 @@ def exception_to_httperror(*expected):
|
||||
# care of that.
|
||||
return decorator.decorator(wrapper)
|
||||
|
||||
# Custom Cherrypy tools
|
||||
def allow_methods(methods):
|
||||
method = cherrypy.request.method.upper()
|
||||
if method not in methods:
|
||||
if method in cherrypy.request.methods_with_bodies:
|
||||
cherrypy.request.body.read()
|
||||
allowed = ', '.join(methods)
|
||||
cherrypy.response.headers['Allow'] = allowed
|
||||
raise cherrypy.HTTPError(405, method + " not allowed; use " + allowed)
|
||||
cherrypy.tools.allow_methods = cherrypy.Tool('before_handler', allow_methods)
|
||||
|
||||
# CherryPy apps
|
||||
class Root(NilmApp):
|
||||
"""Root application for NILM database"""
|
||||
@@ -129,6 +134,7 @@ class Stream(NilmApp):
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, ValueError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
def create(self, path, layout):
|
||||
"""Create a new stream in the database. Provide path
|
||||
and one of the nilmdb.layout.layouts keys.
|
||||
@@ -139,6 +145,7 @@ class Stream(NilmApp):
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
def destroy(self, path):
|
||||
"""Delete a stream and its associated data."""
|
||||
return self.db.stream_destroy(path)
|
||||
@@ -171,6 +178,7 @@ class Stream(NilmApp):
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, LookupError, TypeError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
def set_metadata(self, path, data):
|
||||
"""Set metadata for the named stream, replacing any
|
||||
existing metadata. Data should be a json-encoded
|
||||
@@ -182,6 +190,7 @@ class Stream(NilmApp):
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, LookupError, TypeError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
def update_metadata(self, path, data):
|
||||
"""Update metadata for the named stream. Data
|
||||
should be a json-encoded dictionary"""
|
||||
@@ -191,7 +200,7 @@ class Stream(NilmApp):
|
||||
# /stream/insert?path=/newton/prep
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
#@cherrypy.tools.disable_prb()
|
||||
@cherrypy.tools.allow_methods(methods = ["PUT"])
|
||||
def insert(self, path, start, end):
|
||||
"""
|
||||
Insert new data into the database. Provide textual data
|
||||
@@ -199,12 +208,9 @@ class Stream(NilmApp):
|
||||
"""
|
||||
# Important that we always read the input before throwing any
|
||||
# errors, to keep lengths happy for persistent connections.
|
||||
# However, CherryPy 3.2.2 has a bug where this fails for GET
|
||||
# requests, so catch that. (issue #1134)
|
||||
try:
|
||||
body = cherrypy.request.body.read()
|
||||
except TypeError:
|
||||
raise cherrypy.HTTPError("400 Bad Request", "No request body")
|
||||
# Note that CherryPy 3.2.2 has a bug where this fails for GET
|
||||
# requests, if we ever want to handle those (issue #1134)
|
||||
body = cherrypy.request.body.read()
|
||||
|
||||
# Check path and get layout
|
||||
streams = self.db.stream_list(path = path)
|
||||
@@ -250,6 +256,7 @@ class Stream(NilmApp):
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
def remove(self, path, start = None, end = None):
|
||||
"""
|
||||
Remove data from the backend database. Removes all data in
|
||||
@@ -270,17 +277,16 @@ class Stream(NilmApp):
|
||||
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
@cherrypy.expose
|
||||
@chunked_response
|
||||
@response_type("text/plain")
|
||||
@response_type("application/x-json-stream")
|
||||
def intervals(self, path, start = None, end = None):
|
||||
"""
|
||||
Get intervals from backend database. Streams the resulting
|
||||
intervals as JSON strings separated by newlines. This may
|
||||
intervals as JSON strings separated by CR LF pairs. This may
|
||||
make multiple requests to the nilmdb backend to avoid causing
|
||||
it to block for too long.
|
||||
|
||||
Note that the response type is set to 'text/plain' even
|
||||
though we're sending back JSON; this is because we're not
|
||||
really returning a single JSON object.
|
||||
Note that the response type is the non-standard
|
||||
'application/x-json-stream' for lack of a better option.
|
||||
"""
|
||||
if start is not None:
|
||||
start = float(start)
|
||||
@@ -300,8 +306,8 @@ class Stream(NilmApp):
|
||||
def content(start, end):
|
||||
# Note: disable chunked responses to see tracebacks from here.
|
||||
while True:
|
||||
(intervals, restart) = self.db.stream_intervals(path, start, end)
|
||||
response = ''.join([ json.dumps(i) + "\n" for i in intervals ])
|
||||
(ints, restart) = self.db.stream_intervals(path, start, end)
|
||||
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
|
||||
yield response
|
||||
if restart == 0:
|
||||
break
|
||||
@@ -381,10 +387,13 @@ class Server(object):
|
||||
# Save server version, just for verification during tests
|
||||
self.version = nilmdb.__version__
|
||||
|
||||
# Need to wrap DB object in a serializer because we'll call
|
||||
# into it from separate threads.
|
||||
self.embedded = embedded
|
||||
self.db = nilmdb.utils.Serializer(db)
|
||||
self.db = db
|
||||
if not getattr(db, "_thread_safe", None):
|
||||
raise KeyError("Database object " + str(db) + " doesn't claim "
|
||||
"to be thread safe. You should pass "
|
||||
"nilmdb.utils.serializer_proxy(NilmDB)(args) "
|
||||
"rather than NilmDB(args).")
|
||||
|
||||
# Build up global server configuration
|
||||
cherrypy.config.update({
|
||||
@@ -408,6 +417,11 @@ class Server(object):
|
||||
app_config.update({ 'response.headers.Access-Control-Allow-Origin':
|
||||
'*' })
|
||||
|
||||
# Only allow GET and HEAD by default. Individual handlers
|
||||
# can override.
|
||||
app_config.update({ 'tools.allow_methods.on': True,
|
||||
'tools.allow_methods.methods': ['GET', 'HEAD'] })
|
||||
|
||||
# Send tracebacks in error responses. They're hidden by the
|
||||
# error_page function for client errors (code 400-499).
|
||||
app_config.update({ 'request.show_tracebacks' : True })
|
||||
|
@@ -2,9 +2,9 @@
|
||||
|
||||
from nilmdb.utils.timer import Timer
|
||||
from nilmdb.utils.iteratorizer import Iteratorizer
|
||||
from nilmdb.utils.serializer import Serializer
|
||||
from nilmdb.utils.serializer import serializer_proxy
|
||||
from nilmdb.utils.lrucache import lru_cache
|
||||
from nilmdb.utils.diskusage import du, human_size
|
||||
from nilmdb.utils.mustclose import must_close
|
||||
from nilmdb.utils.urllib import urlencode
|
||||
from nilmdb.utils import atomic
|
||||
import nilmdb.utils.threadsafety
|
||||
|
@@ -16,6 +16,7 @@ class IteratorizerThread(threading.Thread):
|
||||
callback (provided by this class) as an argument
|
||||
"""
|
||||
threading.Thread.__init__(self)
|
||||
self.name = "Iteratorizer-" + function.__name__ + "-" + self.name
|
||||
self.function = function
|
||||
self.queue = queue
|
||||
self.die = False
|
||||
|
@@ -12,15 +12,12 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
|
||||
already been called."""
|
||||
def class_decorator(cls):
|
||||
|
||||
# Helper to replace a class method with a wrapper function,
|
||||
# while maintaining argument specs etc.
|
||||
def wrap_class_method(wrapper_func):
|
||||
method = wrapper_func.__name__
|
||||
if method in cls.__dict__:
|
||||
orig = getattr(cls, method).im_func
|
||||
else:
|
||||
orig = lambda self: None
|
||||
setattr(cls, method, decorator.decorator(wrapper_func, orig))
|
||||
def wrap_class_method(wrapper):
|
||||
try:
|
||||
orig = getattr(cls, wrapper.__name__).im_func
|
||||
except:
|
||||
orig = lambda x: None
|
||||
setattr(cls, wrapper.__name__, decorator.decorator(wrapper, orig))
|
||||
|
||||
@wrap_class_method
|
||||
def __init__(orig, self, *args, **kwargs):
|
||||
|
@@ -1,6 +1,10 @@
|
||||
import Queue
|
||||
import threading
|
||||
import sys
|
||||
import decorator
|
||||
import inspect
|
||||
import types
|
||||
import functools
|
||||
|
||||
# This file provides a class that will wrap an object and serialize
|
||||
# all calls to its methods. All calls to that object will be queued
|
||||
@@ -12,8 +16,9 @@ import sys
|
||||
class SerializerThread(threading.Thread):
|
||||
"""Thread that retrieves call information from the queue, makes the
|
||||
call, and returns the results."""
|
||||
def __init__(self, call_queue):
|
||||
def __init__(self, classname, call_queue):
|
||||
threading.Thread.__init__(self)
|
||||
self.name = "Serializer-" + classname + "-" + self.name
|
||||
self.call_queue = call_queue
|
||||
|
||||
def run(self):
|
||||
@@ -22,51 +27,83 @@ class SerializerThread(threading.Thread):
|
||||
# Terminate if result_queue is None
|
||||
if result_queue is None:
|
||||
return
|
||||
exception = None
|
||||
result = None
|
||||
try:
|
||||
result = func(*args, **kwargs) # wrapped
|
||||
except:
|
||||
result_queue.put((sys.exc_info(), None))
|
||||
exception = sys.exc_info()
|
||||
# Ensure we delete these before returning a result, so
|
||||
# we don't unncessarily hold onto a reference while
|
||||
# we're waiting for the next call.
|
||||
del func, args, kwargs
|
||||
result_queue.put((exception, result))
|
||||
del exception, result
|
||||
|
||||
def serializer_proxy(obj_or_type):
|
||||
"""Wrap the given object or type in a SerializerObjectProxy.
|
||||
|
||||
Returns a SerializerObjectProxy object that proxies all method
|
||||
calls to the object, as well as attribute retrievals.
|
||||
|
||||
The proxied requests, including instantiation, are performed in a
|
||||
single thread and serialized between caller threads.
|
||||
"""
|
||||
class SerializerCallProxy(object):
|
||||
def __init__(self, call_queue, func, objectproxy):
|
||||
self.call_queue = call_queue
|
||||
self.func = func
|
||||
# Need to hold a reference to object proxy so it doesn't
|
||||
# go away (and kill the thread) until after get called.
|
||||
self.objectproxy = objectproxy
|
||||
def __call__(self, *args, **kwargs):
|
||||
result_queue = Queue.Queue()
|
||||
self.call_queue.put((result_queue, self.func, args, kwargs))
|
||||
( exc_info, result ) = result_queue.get()
|
||||
if exc_info is None:
|
||||
return result
|
||||
else:
|
||||
result_queue.put((None, result))
|
||||
raise exc_info[0], exc_info[1], exc_info[2]
|
||||
|
||||
class WrapCall(object):
|
||||
"""Wrap a callable using the given queues"""
|
||||
class SerializerObjectProxy(object):
|
||||
def __init__(self, obj_or_type, *args, **kwargs):
|
||||
self.__object = obj_or_type
|
||||
try:
|
||||
if type(obj_or_type) in (types.TypeType, types.ClassType):
|
||||
classname = obj_or_type.__name__
|
||||
else:
|
||||
classname = obj_or_type.__class__.__name__
|
||||
except AttributeError: # pragma: no cover
|
||||
classname = "???"
|
||||
self.__call_queue = Queue.Queue()
|
||||
self.__thread = SerializerThread(classname, self.__call_queue)
|
||||
self.__thread.daemon = True
|
||||
self.__thread.start()
|
||||
self._thread_safe = True
|
||||
|
||||
def __init__(self, call_queue, result_queue, func):
|
||||
self.call_queue = call_queue
|
||||
self.result_queue = result_queue
|
||||
self.func = func
|
||||
def __getattr__(self, key):
|
||||
if key.startswith("_SerializerObjectProxy__"): # pragma: no cover
|
||||
raise AttributeError
|
||||
attr = getattr(self.__object, key)
|
||||
if not callable(attr):
|
||||
getter = SerializerCallProxy(self.__call_queue, getattr, self)
|
||||
return getter(self.__object, key)
|
||||
r = SerializerCallProxy(self.__call_queue, attr, self)
|
||||
return r
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
self.call_queue.put((self.result_queue, self.func, args, kwargs))
|
||||
( exc_info, result ) = self.result_queue.get()
|
||||
if exc_info is None:
|
||||
return result
|
||||
else:
|
||||
raise exc_info[0], exc_info[1], exc_info[2]
|
||||
def __call__(self, *args, **kwargs):
|
||||
"""Call this to instantiate the type, if a type was passed
|
||||
to serializer_proxy. Otherwise, pass the call through."""
|
||||
ret = SerializerCallProxy(self.__call_queue,
|
||||
self.__object, self)(*args, **kwargs)
|
||||
if type(self.__object) in (types.TypeType, types.ClassType):
|
||||
# Instantiation
|
||||
self.__object = ret
|
||||
return self
|
||||
return ret
|
||||
|
||||
class WrapObject(object):
|
||||
"""Wrap all calls to methods in a target object with WrapCall"""
|
||||
def __del__(self):
|
||||
self.__call_queue.put((None, None, None, None))
|
||||
self.__thread.join()
|
||||
|
||||
def __init__(self, target):
|
||||
self.__wrap_target = target
|
||||
self.__wrap_call_queue = Queue.Queue()
|
||||
self.__wrap_serializer = SerializerThread(self.__wrap_call_queue)
|
||||
self.__wrap_serializer.daemon = True
|
||||
self.__wrap_serializer.start()
|
||||
|
||||
def __getattr__(self, key):
|
||||
"""Wrap methods of self.__wrap_target in a WrapCall instance"""
|
||||
func = getattr(self.__wrap_target, key)
|
||||
if not callable(func):
|
||||
raise TypeError("Can't serialize attribute %r (type: %s)"
|
||||
% (key, type(func)))
|
||||
result_queue = Queue.Queue()
|
||||
return WrapCall(self.__wrap_call_queue, result_queue, func)
|
||||
|
||||
def __del__(self):
|
||||
self.__wrap_call_queue.put((None, None, None, None))
|
||||
self.__wrap_serializer.join()
|
||||
|
||||
# Just an alias
|
||||
Serializer = WrapObject
|
||||
return SerializerObjectProxy(obj_or_type)
|
||||
|
109
nilmdb/utils/threadsafety.py
Normal file
109
nilmdb/utils/threadsafety.py
Normal file
@@ -0,0 +1,109 @@
|
||||
from nilmdb.utils.printf import *
|
||||
import threading
|
||||
import warnings
|
||||
import types
|
||||
|
||||
def verify_proxy(obj_or_type, exception = False, check_thread = True,
|
||||
check_concurrent = True):
|
||||
"""Wrap the given object or type in a VerifyObjectProxy.
|
||||
|
||||
Returns a VerifyObjectProxy that proxies all method calls to the
|
||||
given object, as well as attribute retrievals.
|
||||
|
||||
When calling methods, the following checks are performed. If
|
||||
exception is True, an exception is raised. Otherwise, a warning
|
||||
is printed.
|
||||
|
||||
check_thread = True # Warn/fail if two different threads call methods.
|
||||
check_concurrent = True # Warn/fail if two functions are concurrently
|
||||
# run through this proxy
|
||||
"""
|
||||
class Namespace(object):
|
||||
pass
|
||||
class VerifyCallProxy(object):
|
||||
def __init__(self, func, parent_namespace):
|
||||
self.func = func
|
||||
self.parent_namespace = parent_namespace
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
p = self.parent_namespace
|
||||
this = threading.current_thread()
|
||||
try:
|
||||
callee = self.func.__name__
|
||||
except AttributeError:
|
||||
callee = "???"
|
||||
|
||||
if p.thread is None:
|
||||
p.thread = this
|
||||
p.thread_callee = callee
|
||||
|
||||
if check_thread and p.thread != this:
|
||||
err = sprintf("unsafe threading: %s called %s.%s,"
|
||||
" but %s called %s.%s",
|
||||
p.thread.name, p.classname, p.thread_callee,
|
||||
this.name, p.classname, callee)
|
||||
if exception:
|
||||
raise AssertionError(err)
|
||||
else: # pragma: no cover
|
||||
warnings.warn(err)
|
||||
|
||||
need_concur_unlock = False
|
||||
if check_concurrent:
|
||||
if p.concur_lock.acquire(False) == False:
|
||||
err = sprintf("unsafe concurrency: %s called %s.%s "
|
||||
"while %s is still in %s.%s",
|
||||
this.name, p.classname, callee,
|
||||
p.concur_tname, p.classname, p.concur_callee)
|
||||
if exception:
|
||||
raise AssertionError(err)
|
||||
else: # pragma: no cover
|
||||
warnings.warn(err)
|
||||
else:
|
||||
p.concur_tname = this.name
|
||||
p.concur_callee = callee
|
||||
need_concur_unlock = True
|
||||
|
||||
try:
|
||||
ret = self.func(*args, **kwargs)
|
||||
finally:
|
||||
if need_concur_unlock:
|
||||
p.concur_lock.release()
|
||||
return ret
|
||||
|
||||
class VerifyObjectProxy(object):
|
||||
def __init__(self, obj_or_type, *args, **kwargs):
|
||||
p = Namespace()
|
||||
self.__ns = p
|
||||
p.thread = None
|
||||
p.thread_callee = None
|
||||
p.concur_lock = threading.Lock()
|
||||
p.concur_tname = None
|
||||
p.concur_callee = None
|
||||
self.__obj = obj_or_type
|
||||
try:
|
||||
if type(obj_or_type) in (types.TypeType, types.ClassType):
|
||||
p.classname = self.__obj.__name__
|
||||
else:
|
||||
p.classname = self.__obj.__class__.__name__
|
||||
except AttributeError: # pragma: no cover
|
||||
p.classname = "???"
|
||||
|
||||
def __getattr__(self, key):
|
||||
if key.startswith("_VerifyObjectProxy__"): # pragma: no cover
|
||||
raise AttributeError
|
||||
attr = getattr(self.__obj, key)
|
||||
if not callable(attr):
|
||||
return VerifyCallProxy(getattr, self.__ns)(self.__obj, key)
|
||||
return VerifyCallProxy(attr, self.__ns)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
"""Call this to instantiate the type, if a type was passed
|
||||
to verify_proxy. Otherwise, pass the call through."""
|
||||
ret = VerifyCallProxy(self.__obj, self.__ns)(*args, **kwargs)
|
||||
if type(self.__obj) in (types.TypeType, types.ClassType):
|
||||
# Instantiation
|
||||
self.__obj = ret
|
||||
return self
|
||||
return ret
|
||||
|
||||
return VerifyObjectProxy(obj_or_type)
|
54
nilmdb/utils/time.py
Normal file
54
nilmdb/utils/time.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from nilmdb.utils import datetime_tz
|
||||
import re
|
||||
|
||||
def parse_time(toparse):
|
||||
"""
|
||||
Parse a free-form time string and return a datetime_tz object.
|
||||
If the string doesn't contain a timestamp, the current local
|
||||
timezone is assumed (e.g. from the TZ env var).
|
||||
"""
|
||||
# If string isn't "now" and doesn't contain at least 4 digits,
|
||||
# consider it invalid. smartparse might otherwise accept
|
||||
# empty strings and strings with just separators.
|
||||
if toparse != "now" and len(re.findall(r"\d", toparse)) < 4:
|
||||
raise ValueError("not enough digits for a timestamp")
|
||||
|
||||
# Try to just parse the time as given
|
||||
try:
|
||||
return datetime_tz.datetime_tz.smartparse(toparse)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Try to extract a substring in a condensed format that we expect
|
||||
# to see in a filename or header comment
|
||||
res = re.search(r"(^|[^\d])(" # non-numeric or SOL
|
||||
r"(199\d|2\d\d\d)" # year
|
||||
r"[-/]?" # separator
|
||||
r"(0[1-9]|1[012])" # month
|
||||
r"[-/]?" # separator
|
||||
r"([012]\d|3[01])" # day
|
||||
r"[-T ]?" # separator
|
||||
r"([01]\d|2[0-3])" # hour
|
||||
r"[:]?" # separator
|
||||
r"([0-5]\d)" # minute
|
||||
r"[:]?" # separator
|
||||
r"([0-5]\d)?" # second
|
||||
r"([-+]\d\d\d\d)?" # timezone
|
||||
r")", toparse)
|
||||
if res is not None:
|
||||
try:
|
||||
return datetime_tz.datetime_tz.smartparse(res.group(2))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Could also try to successively parse substrings, but let's
|
||||
# just give up for now.
|
||||
raise ValueError("unable to parse timestamp")
|
||||
|
||||
def format_time(timestamp):
|
||||
"""
|
||||
Convert a Unix timestamp to a string for printing, using the
|
||||
local timezone for display (e.g. from the TZ env var).
|
||||
"""
|
||||
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp)
|
||||
return dt.strftime("%a, %d %b %Y %H:%M:%S.%f %z")
|
@@ -6,6 +6,7 @@
|
||||
# foo.flush()
|
||||
|
||||
from __future__ import print_function
|
||||
from __future__ import absolute_import
|
||||
import contextlib
|
||||
import time
|
||||
|
||||
|
@@ -1,37 +0,0 @@
|
||||
from __future__ import absolute_import
|
||||
from urllib import quote_plus, _is_unicode
|
||||
|
||||
# urllib.urlencode insists on encoding Unicode as ASCII. This is based
|
||||
# on that function, except we always encode it as UTF-8 instead.
|
||||
|
||||
def urlencode(query):
|
||||
"""Encode a dictionary into a URL query string.
|
||||
|
||||
If any values in the query arg are sequences, each sequence
|
||||
element is converted to a separate parameter.
|
||||
"""
|
||||
|
||||
query = query.items()
|
||||
|
||||
l = []
|
||||
for k, v in query:
|
||||
k = quote_plus(str(k))
|
||||
if isinstance(v, str):
|
||||
v = quote_plus(v)
|
||||
l.append(k + '=' + v)
|
||||
elif _is_unicode(v):
|
||||
v = quote_plus(v.encode("utf-8","strict"))
|
||||
l.append(k + '=' + v)
|
||||
else:
|
||||
try:
|
||||
# is this a sufficient test for sequence-ness?
|
||||
len(v)
|
||||
except TypeError:
|
||||
# not a sequence
|
||||
v = quote_plus(str(v))
|
||||
l.append(k + '=' + v)
|
||||
else:
|
||||
# loop over the sequence
|
||||
for elt in v:
|
||||
l.append(k + '=' + quote_plus(str(elt)))
|
||||
return '&'.join(l)
|
@@ -20,6 +20,7 @@ cover-erase=1
|
||||
stop=1
|
||||
verbosity=2
|
||||
tests=tests
|
||||
#tests=tests/test_threadsafety.py
|
||||
#tests=tests/test_bulkdata.py
|
||||
#tests=tests/test_mustclose.py
|
||||
#tests=tests/test_lrucache.py
|
||||
|
1
setup.py
1
setup.py
@@ -115,6 +115,7 @@ setup(name='nilmdb',
|
||||
'python-dateutil',
|
||||
'pytz',
|
||||
'psutil >= 0.3.0',
|
||||
'requests >= 1.1.0, < 2.0.0',
|
||||
],
|
||||
packages = [ 'nilmdb',
|
||||
'nilmdb.utils',
|
||||
|
@@ -1,4 +1,5 @@
|
||||
test_printf.py
|
||||
test_threadsafety.py
|
||||
test_lrucache.py
|
||||
test_mustclose.py
|
||||
|
||||
|
@@ -6,6 +6,7 @@ from nilmdb.utils import timestamper
|
||||
from nilmdb.client import ClientError, ServerError
|
||||
from nilmdb.utils import datetime_tz
|
||||
|
||||
from nose.plugins.skip import SkipTest
|
||||
from nose.tools import *
|
||||
from nose.tools import assert_raises
|
||||
import itertools
|
||||
@@ -31,7 +32,7 @@ def setup_module():
|
||||
recursive_unlink(testdb)
|
||||
|
||||
# Start web app on a custom port
|
||||
test_db = nilmdb.NilmDB(testdb, sync = False)
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(testdb, sync = False)
|
||||
test_server = nilmdb.Server(test_db, host = "127.0.0.1",
|
||||
port = 12380, stoppable = False,
|
||||
fast_shutdown = True,
|
||||
@@ -97,6 +98,15 @@ class TestClient(object):
|
||||
with assert_raises(ClientError):
|
||||
client.stream_create("/newton/prep", "NoSuchLayout")
|
||||
|
||||
# Bad method types
|
||||
with assert_raises(ClientError):
|
||||
client.http.put("/stream/list","")
|
||||
# Try a bunch of times to make sure the request body is getting consumed
|
||||
for x in range(10):
|
||||
with assert_raises(ClientError):
|
||||
client.http.post("/stream/list")
|
||||
client = nilmdb.Client(url = testurl)
|
||||
|
||||
# Create three streams
|
||||
client.stream_create("/newton/prep", "PrepData")
|
||||
client.stream_create("/newton/raw", "RawData")
|
||||
@@ -266,12 +276,6 @@ class TestClient(object):
|
||||
with assert_raises(ClientError) as e:
|
||||
client.stream_remove("/newton/prep", 123, 120)
|
||||
|
||||
# Test the exception we get if we nest requests
|
||||
with assert_raises(Exception) as e:
|
||||
for data in client.stream_extract("/newton/prep"):
|
||||
x = client.stream_intervals("/newton/prep")
|
||||
in_("nesting calls is not supported", str(e.exception))
|
||||
|
||||
# Test count
|
||||
eq_(client.stream_count("/newton/prep"), 14400)
|
||||
|
||||
@@ -301,24 +305,6 @@ class TestClient(object):
|
||||
with assert_raises(ServerError) as e:
|
||||
client.http.get_gen("http://nosuchurl/").next()
|
||||
|
||||
# Check non-json version of string output
|
||||
eq_(json.loads(client.http.get("/stream/list",retjson=False)),
|
||||
client.http.get("/stream/list",retjson=True))
|
||||
|
||||
# Check non-json version of generator output
|
||||
for (a, b) in itertools.izip(
|
||||
client.http.get_gen("/stream/list",retjson=False),
|
||||
client.http.get_gen("/stream/list",retjson=True)):
|
||||
eq_(json.loads(a), b)
|
||||
|
||||
# Check PUT with generator out
|
||||
with assert_raises(ClientError) as e:
|
||||
client.http.put_gen("stream/insert", "",
|
||||
{ "path": "/newton/prep",
|
||||
"start": 0, "end": 0 }).next()
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("start must precede end", str(e.exception))
|
||||
|
||||
# Check 404 for missing streams
|
||||
for function in [ client.stream_intervals, client.stream_extract ]:
|
||||
with assert_raises(ClientError) as e:
|
||||
@@ -337,35 +323,41 @@ class TestClient(object):
|
||||
client = nilmdb.Client(url = testurl)
|
||||
http = client.http
|
||||
|
||||
# Use a warning rather than returning a test failure, so that we can
|
||||
# still disable chunked responses for debugging.
|
||||
# Use a warning rather than returning a test failure for the
|
||||
# transfer-encoding, so that we can still disable chunked
|
||||
# responses for debugging.
|
||||
|
||||
def headers():
|
||||
h = ""
|
||||
for (k, v) in http._last_response.headers.items():
|
||||
h += k + ": " + v + "\n"
|
||||
return h.lower()
|
||||
|
||||
# Intervals
|
||||
x = http.get("stream/intervals", { "path": "/newton/prep" },
|
||||
retjson=False)
|
||||
lines_(x, 1)
|
||||
if "Transfer-Encoding: chunked" not in http._headers:
|
||||
x = http.get("stream/intervals", { "path": "/newton/prep" })
|
||||
if "transfer-encoding: chunked" not in headers():
|
||||
warnings.warn("Non-chunked HTTP response for /stream/intervals")
|
||||
if "Content-Type: text/plain;charset=utf-8" not in http._headers:
|
||||
raise AssertionError("/stream/intervals is not text/plain:\n" +
|
||||
http._headers)
|
||||
if "content-type: application/x-json-stream" not in headers():
|
||||
raise AssertionError("/stream/intervals content type "
|
||||
"is not application/x-json-stream:\n" +
|
||||
headers())
|
||||
|
||||
# Extract
|
||||
x = http.get("stream/extract",
|
||||
{ "path": "/newton/prep",
|
||||
"start": "123",
|
||||
"end": "124" }, retjson=False)
|
||||
if "Transfer-Encoding: chunked" not in http._headers:
|
||||
"end": "124" })
|
||||
if "transfer-encoding: chunked" not in headers():
|
||||
warnings.warn("Non-chunked HTTP response for /stream/extract")
|
||||
if "Content-Type: text/plain;charset=utf-8" not in http._headers:
|
||||
if "content-type: text/plain;charset=utf-8" not in headers():
|
||||
raise AssertionError("/stream/extract is not text/plain:\n" +
|
||||
http._headers)
|
||||
headers())
|
||||
|
||||
# Make sure Access-Control-Allow-Origin gets set
|
||||
if "Access-Control-Allow-Origin: " not in http._headers:
|
||||
if "access-control-allow-origin: " not in headers():
|
||||
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
|
||||
"header in /stream/extract response:\n" +
|
||||
http._headers)
|
||||
headers())
|
||||
|
||||
client.close()
|
||||
|
||||
@@ -576,3 +568,38 @@ class TestClient(object):
|
||||
# Clean up
|
||||
client.stream_destroy("/empty/test")
|
||||
client.close()
|
||||
|
||||
def test_client_12_persistent(self):
|
||||
# Check that connections are persistent when they should be.
|
||||
# This is pretty hard to test; we have to poke deep into
|
||||
# the Requests library.
|
||||
with nilmdb.Client(url = testurl) as c:
|
||||
def connections():
|
||||
try:
|
||||
poolmanager = c.http._last_response.connection.poolmanager
|
||||
pool = poolmanager.pools[('http','localhost',12380)]
|
||||
return (pool.num_connections, pool.num_requests)
|
||||
except:
|
||||
raise SkipTest("can't get connection info")
|
||||
|
||||
# First request makes a connection
|
||||
c.stream_create("/persist/test", "uint16_1")
|
||||
eq_(connections(), (1, 1))
|
||||
|
||||
# Non-generator
|
||||
c.stream_list("/persist/test")
|
||||
eq_(connections(), (1, 2))
|
||||
c.stream_list("/persist/test")
|
||||
eq_(connections(), (1, 3))
|
||||
|
||||
# Generators
|
||||
for x in c.stream_intervals("/persist/test"):
|
||||
pass
|
||||
eq_(connections(), (1, 4))
|
||||
for x in c.stream_intervals("/persist/test"):
|
||||
pass
|
||||
eq_(connections(), (1, 5))
|
||||
|
||||
# Clean up
|
||||
c.stream_destroy("/persist/test")
|
||||
eq_(connections(), (1, 6))
|
||||
|
@@ -27,9 +27,10 @@ testdb = "tests/cmdline-testdb"
|
||||
def server_start(max_results = None, bulkdata_args = {}):
|
||||
global test_server, test_db
|
||||
# Start web app on a custom port
|
||||
test_db = nilmdb.NilmDB(testdb, sync = False,
|
||||
max_results = max_results,
|
||||
bulkdata_args = bulkdata_args)
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(
|
||||
testdb, sync = False,
|
||||
max_results = max_results,
|
||||
bulkdata_args = bulkdata_args)
|
||||
test_server = nilmdb.Server(test_db, host = "127.0.0.1",
|
||||
port = 12380, stoppable = False,
|
||||
fast_shutdown = True,
|
||||
@@ -162,16 +163,16 @@ class TestCmdline(object):
|
||||
|
||||
# try some URL constructions
|
||||
self.fail("--url http://nosuchurl/ info")
|
||||
self.contain("Couldn't resolve host 'nosuchurl'")
|
||||
self.contain("error connecting to server")
|
||||
|
||||
self.fail("--url nosuchurl info")
|
||||
self.contain("Couldn't resolve host 'nosuchurl'")
|
||||
self.contain("error connecting to server")
|
||||
|
||||
self.fail("-u nosuchurl/foo info")
|
||||
self.contain("Couldn't resolve host 'nosuchurl'")
|
||||
self.contain("error connecting to server")
|
||||
|
||||
self.fail("-u localhost:0 info")
|
||||
self.contain("couldn't connect to host")
|
||||
self.fail("-u localhost:1 info")
|
||||
self.contain("error connecting to server")
|
||||
|
||||
self.ok("-u localhost:12380 info")
|
||||
self.ok("info")
|
||||
@@ -191,7 +192,23 @@ class TestCmdline(object):
|
||||
self.fail("extract --start 2000-01-01 --start 2001-01-02")
|
||||
self.contain("duplicated argument")
|
||||
|
||||
def test_02_info(self):
|
||||
def test_02_parsetime(self):
|
||||
os.environ['TZ'] = "America/New_York"
|
||||
test = datetime_tz.datetime_tz.now()
|
||||
parse_time = nilmdb.utils.time.parse_time
|
||||
eq_(parse_time(str(test)), test)
|
||||
test = datetime_tz.datetime_tz.smartparse("20120405 1400-0400")
|
||||
eq_(parse_time("hi there 20120405 1400-0400 testing! 123"), test)
|
||||
eq_(parse_time("20120405 1800 UTC"), test)
|
||||
eq_(parse_time("20120405 1400-0400 UTC"), test)
|
||||
for badtime in [ "20120405 1400-9999", "hello", "-", "", "4:00" ]:
|
||||
with assert_raises(ValueError):
|
||||
x = parse_time(badtime)
|
||||
x = parse_time("now")
|
||||
eq_(parse_time("snapshot-20120405-140000.raw.gz"), test)
|
||||
eq_(parse_time("prep-20120405T1400"), test)
|
||||
|
||||
def test_03_info(self):
|
||||
self.ok("info")
|
||||
self.contain("Server URL: http://localhost:12380/")
|
||||
self.contain("Client version: " + nilmdb.__version__)
|
||||
@@ -200,7 +217,7 @@ class TestCmdline(object):
|
||||
self.contain("Server database size")
|
||||
self.contain("Server database free space")
|
||||
|
||||
def test_03_createlist(self):
|
||||
def test_04_createlist(self):
|
||||
# Basic stream tests, like those in test_client.
|
||||
|
||||
# No streams
|
||||
@@ -276,7 +293,7 @@ class TestCmdline(object):
|
||||
self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01")
|
||||
self.contain("start must precede end")
|
||||
|
||||
def test_04_metadata(self):
|
||||
def test_05_metadata(self):
|
||||
# Set / get metadata
|
||||
self.fail("metadata")
|
||||
self.fail("metadata --get")
|
||||
@@ -333,22 +350,6 @@ class TestCmdline(object):
|
||||
self.fail("metadata /newton/nosuchpath")
|
||||
self.contain("No stream at path /newton/nosuchpath")
|
||||
|
||||
def test_05_parsetime(self):
|
||||
os.environ['TZ'] = "America/New_York"
|
||||
cmd = nilmdb.cmdline.Cmdline(None)
|
||||
test = datetime_tz.datetime_tz.now()
|
||||
eq_(cmd.parse_time(str(test)), test)
|
||||
test = datetime_tz.datetime_tz.smartparse("20120405 1400-0400")
|
||||
eq_(cmd.parse_time("hi there 20120405 1400-0400 testing! 123"), test)
|
||||
eq_(cmd.parse_time("20120405 1800 UTC"), test)
|
||||
eq_(cmd.parse_time("20120405 1400-0400 UTC"), test)
|
||||
for badtime in [ "20120405 1400-9999", "hello", "-", "", "4:00" ]:
|
||||
with assert_raises(ValueError):
|
||||
x = cmd.parse_time(badtime)
|
||||
x = cmd.parse_time("now")
|
||||
eq_(cmd.parse_time("snapshot-20120405-140000.raw.gz"), test)
|
||||
eq_(cmd.parse_time("prep-20120405T1400"), test)
|
||||
|
||||
def test_06_insert(self):
|
||||
self.ok("insert --help")
|
||||
|
||||
|
@@ -16,6 +16,8 @@ import Queue
|
||||
import cStringIO
|
||||
import time
|
||||
|
||||
from nilmdb.utils import serializer_proxy
|
||||
|
||||
testdb = "tests/testdb"
|
||||
|
||||
#@atexit.register
|
||||
@@ -104,12 +106,17 @@ class Test00Nilmdb(object): # named 00 so it runs first
|
||||
|
||||
class TestBlockingServer(object):
|
||||
def setUp(self):
|
||||
self.db = nilmdb.NilmDB(testdb, sync=False)
|
||||
self.db = serializer_proxy(nilmdb.NilmDB)(testdb, sync=False)
|
||||
|
||||
def tearDown(self):
|
||||
self.db.close()
|
||||
|
||||
def test_blocking_server(self):
|
||||
# Server should fail if the database doesn't have a "_thread_safe"
|
||||
# property.
|
||||
with assert_raises(KeyError):
|
||||
nilmdb.Server(object())
|
||||
|
||||
# Start web app on a custom port
|
||||
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
|
||||
port = 12380, stoppable = True)
|
||||
@@ -140,7 +147,7 @@ class TestServer(object):
|
||||
|
||||
def setUp(self):
|
||||
# Start web app on a custom port
|
||||
self.db = nilmdb.NilmDB(testdb, sync=False)
|
||||
self.db = serializer_proxy(nilmdb.NilmDB)(testdb, sync=False)
|
||||
self.server = nilmdb.Server(self.db, host = "127.0.0.1",
|
||||
port = 12380, stoppable = False)
|
||||
self.server.start(blocking = False)
|
||||
@@ -201,12 +208,3 @@ class TestServer(object):
|
||||
data = getjson("/stream/get_metadata?path=/newton/prep"
|
||||
"&key=foo")
|
||||
eq_(data, {'foo': None})
|
||||
|
||||
|
||||
def test_insert(self):
|
||||
# GET instead of POST (no body)
|
||||
# (actual POST test is done by client code)
|
||||
with assert_raises(HTTPError) as e:
|
||||
getjson("/stream/insert?path=/newton/prep&start=0&end=0")
|
||||
eq_(e.exception.code, 400)
|
||||
|
||||
|
@@ -9,16 +9,28 @@ import time
|
||||
|
||||
from testutil.helpers import *
|
||||
|
||||
#raise nose.exc.SkipTest("Skip these")
|
||||
|
||||
class Foo(object):
|
||||
val = 0
|
||||
|
||||
def __init__(self, asdf = "asdf"):
|
||||
self.init_thread = threading.current_thread().name
|
||||
|
||||
@classmethod
|
||||
def foo(self):
|
||||
pass
|
||||
|
||||
def fail(self):
|
||||
raise Exception("you asked me to do this")
|
||||
|
||||
def test(self, debug = False):
|
||||
self.tester(debug)
|
||||
|
||||
def t(self):
|
||||
pass
|
||||
|
||||
def tester(self, debug = False):
|
||||
# purposely not thread-safe
|
||||
self.test_thread = threading.current_thread().name
|
||||
oldval = self.val
|
||||
newval = oldval + 1
|
||||
time.sleep(0.05)
|
||||
@@ -46,27 +58,29 @@ class Base(object):
|
||||
t.join()
|
||||
self.verify_result()
|
||||
|
||||
def verify_result(self):
|
||||
eq_(self.foo.val, 20)
|
||||
eq_(self.foo.init_thread, self.foo.test_thread)
|
||||
|
||||
class TestUnserialized(Base):
|
||||
def setUp(self):
|
||||
self.foo = Foo()
|
||||
|
||||
def verify_result(self):
|
||||
# This should have failed to increment properly
|
||||
assert(self.foo.val != 20)
|
||||
ne_(self.foo.val, 20)
|
||||
# Init and tests ran in different threads
|
||||
ne_(self.foo.init_thread, self.foo.test_thread)
|
||||
|
||||
class TestSerialized(Base):
|
||||
class TestSerializer(Base):
|
||||
def setUp(self):
|
||||
self.realfoo = Foo()
|
||||
self.foo = nilmdb.utils.Serializer(self.realfoo)
|
||||
self.foo = nilmdb.utils.serializer_proxy(Foo)("qwer")
|
||||
|
||||
def tearDown(self):
|
||||
del self.foo
|
||||
|
||||
def verify_result(self):
|
||||
# This should have worked
|
||||
eq_(self.realfoo.val, 20)
|
||||
|
||||
def test_attribute(self):
|
||||
# Can't wrap attributes yet
|
||||
with assert_raises(TypeError):
|
||||
self.foo.val
|
||||
def test_multi(self):
|
||||
sp = nilmdb.utils.serializer_proxy
|
||||
sp(Foo("x")).t()
|
||||
sp(sp(Foo)("x")).t()
|
||||
sp(sp(Foo))("x").t()
|
||||
sp(sp(Foo("x"))).t()
|
||||
sp(sp(Foo)("x")).t()
|
||||
sp(sp(Foo))("x").t()
|
||||
|
96
tests/test_threadsafety.py
Normal file
96
tests/test_threadsafety.py
Normal file
@@ -0,0 +1,96 @@
|
||||
import nilmdb
|
||||
from nilmdb.utils.printf import *
|
||||
|
||||
import nose
|
||||
from nose.tools import *
|
||||
from nose.tools import assert_raises
|
||||
|
||||
from testutil.helpers import *
|
||||
import threading
|
||||
|
||||
class Thread(threading.Thread):
|
||||
def __init__(self, target):
|
||||
self.target = target
|
||||
threading.Thread.__init__(self)
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
self.target()
|
||||
except AssertionError as e:
|
||||
self.error = e
|
||||
else:
|
||||
self.error = None
|
||||
|
||||
class Test():
|
||||
def __init__(self):
|
||||
self.test = 1234
|
||||
|
||||
@classmethod
|
||||
def asdf(cls):
|
||||
pass
|
||||
|
||||
def foo(self, exception = False, reenter = False):
|
||||
if exception:
|
||||
raise Exception()
|
||||
self.bar(reenter)
|
||||
|
||||
def bar(self, reenter):
|
||||
if reenter:
|
||||
self.foo()
|
||||
return 123
|
||||
|
||||
def baz_threaded(self, target):
|
||||
t = Thread(target)
|
||||
t.start()
|
||||
t.join()
|
||||
return t
|
||||
|
||||
def baz(self, target):
|
||||
target()
|
||||
|
||||
class TestThreadSafety(object):
|
||||
def tryit(self, c, threading_ok, concurrent_ok):
|
||||
eq_(c.test, 1234)
|
||||
c.foo()
|
||||
t = Thread(c.foo)
|
||||
t.start()
|
||||
t.join()
|
||||
if threading_ok and t.error:
|
||||
raise Exception("got unexpected error: " + str(t.error))
|
||||
if not threading_ok and not t.error:
|
||||
raise Exception("failed to get expected error")
|
||||
try:
|
||||
c.baz(c.foo)
|
||||
except AssertionError as e:
|
||||
if concurrent_ok:
|
||||
raise Exception("got unexpected error: " + str(e))
|
||||
else:
|
||||
if not concurrent_ok:
|
||||
raise Exception("failed to get expected error")
|
||||
t = c.baz_threaded(c.foo)
|
||||
if (concurrent_ok and threading_ok) and t.error:
|
||||
raise Exception("got unexpected error: " + str(t.error))
|
||||
if not (concurrent_ok and threading_ok) and not t.error:
|
||||
raise Exception("failed to get expected error")
|
||||
|
||||
def test(self):
|
||||
proxy = nilmdb.utils.threadsafety.verify_proxy
|
||||
self.tryit(Test(), True, True)
|
||||
self.tryit(proxy(Test(), True, True, True), False, False)
|
||||
self.tryit(proxy(Test(), True, True, False), False, True)
|
||||
self.tryit(proxy(Test(), True, False, True), True, False)
|
||||
self.tryit(proxy(Test(), True, False, False), True, True)
|
||||
self.tryit(proxy(Test, True, True, True)(), False, False)
|
||||
self.tryit(proxy(Test, True, True, False)(), False, True)
|
||||
self.tryit(proxy(Test, True, False, True)(), True, False)
|
||||
self.tryit(proxy(Test, True, False, False)(), True, True)
|
||||
|
||||
proxy(proxy(proxy(Test))()).foo()
|
||||
|
||||
c = proxy(Test())
|
||||
c.foo()
|
||||
try:
|
||||
c.foo(exception = True)
|
||||
except Exception:
|
||||
pass
|
||||
c.foo()
|
@@ -83,7 +83,7 @@ To use it:
|
||||
import os, sys, re
|
||||
from distutils.core import Command
|
||||
from distutils.command.sdist import sdist as _sdist
|
||||
from distutils.command.build import build as _build
|
||||
from distutils.command.build_py import build_py as _build_py
|
||||
|
||||
versionfile_source = None
|
||||
versionfile_build = None
|
||||
@@ -578,11 +578,10 @@ class cmd_version(Command):
|
||||
ver = get_version(verbose=True)
|
||||
print("Version is currently: %s" % ver)
|
||||
|
||||
|
||||
class cmd_build(_build):
|
||||
class cmd_build_py(_build_py):
|
||||
def run(self):
|
||||
versions = get_versions(verbose=True)
|
||||
_build.run(self)
|
||||
_build_py.run(self)
|
||||
# now locate _version.py in the new build/ directory and replace it
|
||||
# with an updated value
|
||||
target_versionfile = os.path.join(self.build_lib, versionfile_build)
|
||||
@@ -651,6 +650,6 @@ class cmd_update_files(Command):
|
||||
def get_cmdclass():
|
||||
return {'version': cmd_version,
|
||||
'update_files': cmd_update_files,
|
||||
'build': cmd_build,
|
||||
'build_py': cmd_build_py,
|
||||
'sdist': cmd_sdist,
|
||||
}
|
||||
|
Reference in New Issue
Block a user