Merge branch 'fixups'
This commit is contained in:
commit
2bc1416c00
|
@ -73,7 +73,7 @@ class Client(object):
|
|||
"path": path,
|
||||
"data": self._json_param(data)
|
||||
}
|
||||
return self.http.get("stream/set_metadata", params)
|
||||
return self.http.post("stream/set_metadata", params)
|
||||
|
||||
def stream_update_metadata(self, path, data):
|
||||
"""Update stream metadata from a dictionary"""
|
||||
|
@ -81,18 +81,18 @@ class Client(object):
|
|||
"path": path,
|
||||
"data": self._json_param(data)
|
||||
}
|
||||
return self.http.get("stream/update_metadata", params)
|
||||
return self.http.post("stream/update_metadata", params)
|
||||
|
||||
def stream_create(self, path, layout):
|
||||
"""Create a new stream"""
|
||||
params = { "path": path,
|
||||
"layout" : layout }
|
||||
return self.http.get("stream/create", params)
|
||||
return self.http.post("stream/create", params)
|
||||
|
||||
def stream_destroy(self, path):
|
||||
"""Delete stream and its contents"""
|
||||
params = { "path": path }
|
||||
return self.http.get("stream/destroy", params)
|
||||
return self.http.post("stream/destroy", params)
|
||||
|
||||
def stream_remove(self, path, start = None, end = None):
|
||||
"""Remove data from the specified time range"""
|
||||
|
@ -103,7 +103,7 @@ class Client(object):
|
|||
params["start"] = float_to_string(start)
|
||||
if end is not None:
|
||||
params["end"] = float_to_string(end)
|
||||
return self.http.get("stream/remove", params)
|
||||
return self.http.post("stream/remove", params)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def stream_insert_context(self, path, start = None, end = None):
|
||||
|
|
|
@ -8,6 +8,19 @@ import simplejson as json
|
|||
import urlparse
|
||||
import pycurl
|
||||
import cStringIO
|
||||
import threading
|
||||
|
||||
import warnings # can remove
|
||||
|
||||
class HTTPClientLock(object):
|
||||
def __init__(self):
|
||||
self.lock = threading.Lock()
|
||||
def __enter__(self):
|
||||
if not self.lock.acquire(False):
|
||||
raise Exception("Client is already performing a request, and "
|
||||
"nested or concurrent calls are not supported.")
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.lock.release()
|
||||
|
||||
class HTTPClient(object):
|
||||
"""Class to manage and perform HTTP requests from the client"""
|
||||
|
@ -23,28 +36,25 @@ class HTTPClient(object):
|
|||
self.curl.setopt(pycurl.SSL_VERIFYHOST, 2)
|
||||
self.curl.setopt(pycurl.FOLLOWLOCATION, 1)
|
||||
self.curl.setopt(pycurl.MAXREDIRS, 5)
|
||||
self.curl.setopt(pycurl.NOSIGNAL, 1)
|
||||
self.lock = HTTPClientLock()
|
||||
self._setup_url()
|
||||
|
||||
def _setup_url(self, url = "", params = ""):
|
||||
def _setup_url(self, url = "", params = None, post = False):
|
||||
url = urlparse.urljoin(self.baseurl, url)
|
||||
if params:
|
||||
url = urlparse.urljoin(
|
||||
url, "?" + nilmdb.utils.urllib.urlencode(params))
|
||||
if params is None:
|
||||
encoded = ""
|
||||
else:
|
||||
encoded = nilmdb.utils.urllib.urlencode(params)
|
||||
self.curl.setopt(pycurl.POST, 1 if post else 0)
|
||||
if post:
|
||||
self.curl.setopt(pycurl.POSTFIELDS, encoded)
|
||||
else:
|
||||
if encoded:
|
||||
url = urlparse.urljoin(url, '?' + encoded)
|
||||
self.curl.setopt(pycurl.URL, url)
|
||||
self.url = url
|
||||
|
||||
def _check_busy_and_set_upload(self, upload):
|
||||
"""Sets the pycurl.UPLOAD option, but also raises a more
|
||||
friendly exception if the client is already serving a request."""
|
||||
try:
|
||||
self.curl.setopt(pycurl.UPLOAD, upload)
|
||||
except pycurl.error as e:
|
||||
if "is currently running" in str(e):
|
||||
raise Exception("Client is already performing a request, and "
|
||||
"nesting calls is not supported.")
|
||||
else: # pragma: no cover (shouldn't happen)
|
||||
raise
|
||||
|
||||
def _check_error(self, body = None):
|
||||
code = self.curl.getinfo(pycurl.RESPONSE_CODE)
|
||||
if code == 200:
|
||||
|
@ -76,20 +86,25 @@ class HTTPClient(object):
|
|||
else:
|
||||
raise Error(**args)
|
||||
|
||||
def _req_generator(self, url, params):
|
||||
def _req_generator(self, url, params, post_params = False):
|
||||
"""
|
||||
Like self._req(), but runs the perform in a separate thread.
|
||||
It returns a generator that spits out arbitrary-sized chunks
|
||||
of the resulting data, instead of using the WRITEFUNCTION
|
||||
callback.
|
||||
Like self._req(), but returns a generator that spits out
|
||||
arbitrary-sized chunks of the resulting data.
|
||||
"""
|
||||
self._setup_url(url, params)
|
||||
self._status = None
|
||||
self._setup_url(url, params, post_params)
|
||||
error_body = ""
|
||||
self._headers = ""
|
||||
self._req_status = None
|
||||
self._req_expect_status = True
|
||||
def header_callback(data):
|
||||
if self._status is None:
|
||||
self._status = int(data.split(" ")[1])
|
||||
# This weirdness is to handle the fact that
|
||||
# there can be a "HTTP/1.1 100 Continue" block before
|
||||
# the real response. Save all statuses that we see.
|
||||
if self._req_expect_status:
|
||||
self._req_status = int(data.split(" ")[1])
|
||||
self._req_expect_status = False
|
||||
elif data == "\r\n":
|
||||
self._req_expect_status = True
|
||||
self._headers += data
|
||||
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
|
||||
def perform(callback):
|
||||
|
@ -98,7 +113,7 @@ class HTTPClient(object):
|
|||
try:
|
||||
with nilmdb.utils.Iteratorizer(perform, curl_hack = True) as it:
|
||||
for i in it:
|
||||
if self._status == 200:
|
||||
if self._req_status == 200:
|
||||
# If we had a 200 response, yield the data to caller.
|
||||
yield i
|
||||
else:
|
||||
|
@ -111,12 +126,12 @@ class HTTPClient(object):
|
|||
# Raise an exception if there was an error
|
||||
self._check_error(error_body)
|
||||
|
||||
def _req(self, url, params):
|
||||
def _req(self, url, params, post_params = False):
|
||||
"""
|
||||
GET or POST that returns raw data. Returns the body
|
||||
data as a string, or raises an error if it contained an error.
|
||||
"""
|
||||
self._setup_url(url, params)
|
||||
self._setup_url(url, params, post_params)
|
||||
body = cStringIO.StringIO()
|
||||
self.curl.setopt(pycurl.WRITEFUNCTION, body.write)
|
||||
self._headers = ""
|
||||
|
@ -135,7 +150,8 @@ class HTTPClient(object):
|
|||
return body_str
|
||||
|
||||
def close(self):
|
||||
self.curl.close()
|
||||
with self.lock:
|
||||
self.curl.close()
|
||||
|
||||
def _iterate_lines(self, it):
|
||||
"""
|
||||
|
@ -152,57 +168,44 @@ class HTTPClient(object):
|
|||
if partial != "":
|
||||
yield partial
|
||||
|
||||
# Non-generator versions
|
||||
def _doreq(self, url, params, retjson):
|
||||
"""
|
||||
Perform a request, and return the body.
|
||||
|
||||
url: URL to request (relative to baseurl)
|
||||
params: dictionary of query parameters
|
||||
retjson: expect JSON and return python objects instead of string
|
||||
"""
|
||||
out = self._req(url, params)
|
||||
def _maybe_json(self, retjson, str):
|
||||
"""Parse str as JSON if retjson is true, otherwise return
|
||||
it directly."""
|
||||
if retjson:
|
||||
return json.loads(out)
|
||||
return out
|
||||
return json.loads(str)
|
||||
return str
|
||||
|
||||
# Normal versions that return data directly
|
||||
def get(self, url, params = None, retjson = True):
|
||||
"""Simple GET"""
|
||||
self._check_busy_and_set_upload(0)
|
||||
return self._doreq(url, params, retjson)
|
||||
"""Simple GET (parameters in URL)"""
|
||||
with self.lock:
|
||||
self.curl.setopt(pycurl.UPLOAD, 0)
|
||||
self.curl.setopt(pycurl.READFUNCTION, lambda: None)
|
||||
return self._maybe_json(retjson, self._req(url, params, False))
|
||||
|
||||
def put(self, url, postdata, params = None, retjson = True):
|
||||
"""Simple PUT"""
|
||||
self._check_busy_and_set_upload(1)
|
||||
self._setup_url(url, params)
|
||||
data = cStringIO.StringIO(postdata)
|
||||
self.curl.setopt(pycurl.READFUNCTION, data.read)
|
||||
return self._doreq(url, params, retjson)
|
||||
def post(self, url, params = None, retjson = True):
|
||||
"""Simple POST (parameters in body)"""
|
||||
with self.lock:
|
||||
self.curl.setopt(pycurl.UPLOAD, 0)
|
||||
self.curl.setopt(pycurl.READFUNCTION, lambda: None)
|
||||
return self._maybe_json(retjson, self._req(url, params, True))
|
||||
|
||||
# Generator versions
|
||||
def _doreq_gen(self, url, params, retjson):
|
||||
"""
|
||||
Perform a request, and return lines of the body in a generator.
|
||||
|
||||
url: URL to request (relative to baseurl)
|
||||
params: dictionary of query parameters
|
||||
retjson: expect JSON and yield python objects instead of strings
|
||||
"""
|
||||
for line in self._iterate_lines(self._req_generator(url, params)):
|
||||
if retjson:
|
||||
yield json.loads(line)
|
||||
else:
|
||||
yield line
|
||||
def put(self, url, data, params = None, retjson = True):
|
||||
"""Simple PUT (parameters in URL, data in body)"""
|
||||
with self.lock:
|
||||
self.curl.setopt(pycurl.UPLOAD, 1)
|
||||
data = cStringIO.StringIO(data)
|
||||
self.curl.setopt(pycurl.READFUNCTION, data.read)
|
||||
return self._maybe_json(retjson, self._req(url, params, False))
|
||||
|
||||
# Generator versions that return data one line at a time.
|
||||
def get_gen(self, url, params = None, retjson = True):
|
||||
"""Simple GET, returning a generator"""
|
||||
self._check_busy_and_set_upload(0)
|
||||
return self._doreq_gen(url, params, retjson)
|
||||
"""Simple GET that yields one resonse line at a time"""
|
||||
with self.lock:
|
||||
self.curl.setopt(pycurl.UPLOAD, 0)
|
||||
self.curl.setopt(pycurl.READFUNCTION, lambda: None)
|
||||
for line in self._iterate_lines(self._req_generator(url, params)):
|
||||
yield self._maybe_json(retjson, line)
|
||||
|
||||
def put_gen(self, url, postdata, params = None, retjson = True):
|
||||
"""Simple PUT, returning a generator"""
|
||||
self._check_busy_and_set_upload(1)
|
||||
self._setup_url(url, params)
|
||||
data = cStringIO.StringIO(postdata)
|
||||
self.curl.setopt(pycurl.READFUNCTION, data.read)
|
||||
return self._doreq_gen(url, params, retjson)
|
||||
# Not much use for a POST or PUT generator, since they don't
|
||||
# return much data.
|
||||
|
|
|
@ -154,7 +154,7 @@ class Parser(object):
|
|||
layout, into an internal data structure suitable for a
|
||||
pytables 'table.append(parser.data)'.
|
||||
"""
|
||||
cdef double last_ts = 0, ts
|
||||
cdef double last_ts = -1e12, ts
|
||||
cdef int n = 0, i
|
||||
cdef char *line
|
||||
|
||||
|
|
|
@ -15,12 +15,6 @@ import decorator
|
|||
import traceback
|
||||
import psutil
|
||||
|
||||
try:
|
||||
cherrypy.tools.json_out
|
||||
except: # pragma: no cover
|
||||
sys.stderr.write("Cherrypy 3.2+ required\n")
|
||||
sys.exit(1)
|
||||
|
||||
class NilmApp(object):
|
||||
def __init__(self, db):
|
||||
self.db = db
|
||||
|
@ -77,6 +71,17 @@ def exception_to_httperror(*expected):
|
|||
# care of that.
|
||||
return decorator.decorator(wrapper)
|
||||
|
||||
# Custom Cherrypy tools
|
||||
def allow_methods(methods):
|
||||
method = cherrypy.request.method.upper()
|
||||
if method not in methods:
|
||||
if method in cherrypy.request.methods_with_bodies:
|
||||
cherrypy.request.body.read()
|
||||
allowed = ', '.join(methods)
|
||||
cherrypy.response.headers['Allow'] = allowed
|
||||
raise cherrypy.HTTPError(405, method + " not allowed; use " + allowed)
|
||||
cherrypy.tools.allow_methods = cherrypy.Tool('before_handler', allow_methods)
|
||||
|
||||
# CherryPy apps
|
||||
class Root(NilmApp):
|
||||
"""Root application for NILM database"""
|
||||
|
@ -129,6 +134,7 @@ class Stream(NilmApp):
|
|||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, ValueError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
def create(self, path, layout):
|
||||
"""Create a new stream in the database. Provide path
|
||||
and one of the nilmdb.layout.layouts keys.
|
||||
|
@ -139,6 +145,7 @@ class Stream(NilmApp):
|
|||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
def destroy(self, path):
|
||||
"""Delete a stream and its associated data."""
|
||||
return self.db.stream_destroy(path)
|
||||
|
@ -171,6 +178,7 @@ class Stream(NilmApp):
|
|||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, LookupError, TypeError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
def set_metadata(self, path, data):
|
||||
"""Set metadata for the named stream, replacing any
|
||||
existing metadata. Data should be a json-encoded
|
||||
|
@ -182,6 +190,7 @@ class Stream(NilmApp):
|
|||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError, LookupError, TypeError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
def update_metadata(self, path, data):
|
||||
"""Update metadata for the named stream. Data
|
||||
should be a json-encoded dictionary"""
|
||||
|
@ -191,7 +200,7 @@ class Stream(NilmApp):
|
|||
# /stream/insert?path=/newton/prep
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
#@cherrypy.tools.disable_prb()
|
||||
@cherrypy.tools.allow_methods(methods = ["PUT"])
|
||||
def insert(self, path, start, end):
|
||||
"""
|
||||
Insert new data into the database. Provide textual data
|
||||
|
@ -199,12 +208,9 @@ class Stream(NilmApp):
|
|||
"""
|
||||
# Important that we always read the input before throwing any
|
||||
# errors, to keep lengths happy for persistent connections.
|
||||
# However, CherryPy 3.2.2 has a bug where this fails for GET
|
||||
# requests, so catch that. (issue #1134)
|
||||
try:
|
||||
body = cherrypy.request.body.read()
|
||||
except TypeError:
|
||||
raise cherrypy.HTTPError("400 Bad Request", "No request body")
|
||||
# Note that CherryPy 3.2.2 has a bug where this fails for GET
|
||||
# requests, if we ever want to handle those (issue #1134)
|
||||
body = cherrypy.request.body.read()
|
||||
|
||||
# Check path and get layout
|
||||
streams = self.db.stream_list(path = path)
|
||||
|
@ -250,6 +256,7 @@ class Stream(NilmApp):
|
|||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError)
|
||||
@cherrypy.tools.allow_methods(methods = ["POST"])
|
||||
def remove(self, path, start = None, end = None):
|
||||
"""
|
||||
Remove data from the backend database. Removes all data in
|
||||
|
@ -411,6 +418,11 @@ class Server(object):
|
|||
app_config.update({ 'response.headers.Access-Control-Allow-Origin':
|
||||
'*' })
|
||||
|
||||
# Only allow GET and HEAD by default. Individual handlers
|
||||
# can override.
|
||||
app_config.update({ 'tools.allow_methods.on': True,
|
||||
'tools.allow_methods.methods': ['GET', 'HEAD'] })
|
||||
|
||||
# Send tracebacks in error responses. They're hidden by the
|
||||
# error_page function for client errors (code 400-499).
|
||||
app_config.update({ 'request.show_tracebacks' : True })
|
||||
|
|
|
@ -97,6 +97,15 @@ class TestClient(object):
|
|||
with assert_raises(ClientError):
|
||||
client.stream_create("/newton/prep", "NoSuchLayout")
|
||||
|
||||
# Bad method types
|
||||
with assert_raises(ClientError):
|
||||
client.http.put("/stream/list","")
|
||||
# Try a bunch of times to make sure the request body is getting consumed
|
||||
for x in range(10):
|
||||
with assert_raises(ClientError):
|
||||
client.http.post("/stream/list")
|
||||
client = nilmdb.Client(url = testurl)
|
||||
|
||||
# Create three streams
|
||||
client.stream_create("/newton/prep", "PrepData")
|
||||
client.stream_create("/newton/raw", "RawData")
|
||||
|
@ -269,8 +278,9 @@ class TestClient(object):
|
|||
# Test the exception we get if we nest requests
|
||||
with assert_raises(Exception) as e:
|
||||
for data in client.stream_extract("/newton/prep"):
|
||||
x = client.stream_intervals("/newton/prep")
|
||||
in_("nesting calls is not supported", str(e.exception))
|
||||
for line in client.stream_intervals("/newton/prep"):
|
||||
pass
|
||||
in_("not supported", str(e.exception))
|
||||
|
||||
# Test count
|
||||
eq_(client.stream_count("/newton/prep"), 14400)
|
||||
|
@ -306,19 +316,11 @@ class TestClient(object):
|
|||
client.http.get("/stream/list",retjson=True))
|
||||
|
||||
# Check non-json version of generator output
|
||||
for (a, b) in itertools.izip(
|
||||
client.http.get_gen("/stream/list",retjson=False),
|
||||
client.http.get_gen("/stream/list",retjson=True)):
|
||||
aa = list(client.http.get_gen("/stream/list",retjson=False))
|
||||
bb = list(client.http.get_gen("/stream/list",retjson=True))
|
||||
for (a, b) in itertools.izip(aa, bb):
|
||||
eq_(json.loads(a), b)
|
||||
|
||||
# Check PUT with generator out
|
||||
with assert_raises(ClientError) as e:
|
||||
client.http.put_gen("stream/insert", "",
|
||||
{ "path": "/newton/prep",
|
||||
"start": 0, "end": 0 }).next()
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("start must precede end", str(e.exception))
|
||||
|
||||
# Check 404 for missing streams
|
||||
for function in [ client.stream_intervals, client.stream_extract ]:
|
||||
with assert_raises(ClientError) as e:
|
||||
|
|
|
@ -208,12 +208,3 @@ class TestServer(object):
|
|||
data = getjson("/stream/get_metadata?path=/newton/prep"
|
||||
"&key=foo")
|
||||
eq_(data, {'foo': None})
|
||||
|
||||
|
||||
def test_insert(self):
|
||||
# GET instead of POST (no body)
|
||||
# (actual POST test is done by client code)
|
||||
with assert_raises(HTTPError) as e:
|
||||
getjson("/stream/insert?path=/newton/prep&start=0&end=0")
|
||||
eq_(e.exception.code, 400)
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user