@@ -156,7 +156,7 @@ class Client(object): | |||
params["start"] = float_to_string(start) | |||
if end is not None: | |||
params["end"] = float_to_string(end) | |||
return self.http.get_gen("stream/intervals", params, retjson = True) | |||
return self.http.get_gen("stream/intervals", params) | |||
def stream_extract(self, path, start = None, end = None, count = False): | |||
""" | |||
@@ -176,8 +176,7 @@ class Client(object): | |||
params["end"] = float_to_string(end) | |||
if count: | |||
params["count"] = 1 | |||
return self.http.get_gen("stream/extract", params, retjson = False) | |||
return self.http.get_gen("stream/extract", params) | |||
def stream_count(self, path, start = None, end = None): | |||
""" | |||
@@ -6,21 +6,7 @@ from nilmdb.client.errors import ClientError, ServerError, Error | |||
import simplejson as json | |||
import urlparse | |||
import pycurl | |||
import cStringIO | |||
import threading | |||
import warnings # can remove | |||
class HTTPClientLock(object): | |||
def __init__(self): | |||
self.lock = threading.Lock() | |||
def __enter__(self): | |||
if not self.lock.acquire(False): | |||
raise Exception("Client is already performing a request, and " | |||
"nested or concurrent calls are not supported.") | |||
def __exit__(self, exc_type, exc_value, traceback): | |||
self.lock.release() | |||
import requests | |||
class HTTPClient(object): | |||
"""Class to manage and perform HTTP requests from the client""" | |||
@@ -32,42 +18,19 @@ class HTTPClient(object): | |||
if '://' not in reparsed: | |||
reparsed = urlparse.urlparse("http://" + baseurl).geturl() | |||
self.baseurl = reparsed | |||
self.multi = pycurl.CurlMulti() | |||
self.curl = pycurl.Curl() | |||
# Add and remove the handle to workaround a curl bug (debian #701713) | |||
self.multi.add_handle(self.curl) | |||
self.multi.remove_handle(self.curl) | |||
#self.multi = nilmdb.utils.threadsafety.verify_proxy(pycurl.CurlMulti)() | |||
#self.curl = nilmdb.utils.threadsafety.verify_proxy(pycurl.Curl)() | |||
self.curl.setopt(pycurl.SSL_VERIFYHOST, 2) | |||
self.curl.setopt(pycurl.FOLLOWLOCATION, 1) | |||
self.curl.setopt(pycurl.MAXREDIRS, 5) | |||
self.curl.setopt(pycurl.NOSIGNAL, 1) | |||
self.lock = HTTPClientLock() | |||
self._setup_url() | |||
def _setup_url(self, url = "", params = None, post = False): | |||
url = urlparse.urljoin(self.baseurl, url) | |||
if params is None: | |||
encoded = "" | |||
else: | |||
encoded = nilmdb.utils.urllib.urlencode(params) | |||
self.curl.setopt(pycurl.POST, 1 if post else 0) | |||
if post: | |||
self.curl.setopt(pycurl.POSTFIELDS, encoded) | |||
else: | |||
if encoded: | |||
url = urlparse.urljoin(url, '?' + encoded) | |||
self.curl.setopt(pycurl.URL, url) | |||
self.url = url | |||
# Build Requests session object, enable SSL verification | |||
self.session = requests.Session() | |||
self.session.verify = True | |||
# Saved response, so that tests can verify a few things. | |||
self._last_response = {} | |||
def _check_error(self, code, body): | |||
def _handle_error(self, url, code, body): | |||
# Default variables for exception. We use the entire body as | |||
# the default message, in case we can't extract it from a JSON | |||
# response. | |||
if code == 200: | |||
return | |||
args = { "url" : self.url, | |||
args = { "url" : url, | |||
"status" : str(code), | |||
"message" : body, | |||
"traceback" : None } | |||
@@ -91,137 +54,67 @@ class HTTPClient(object): | |||
else: | |||
raise Error(**args) | |||
def _req_generator(self, url, params, post_params = False): | |||
""" | |||
Like self._req(), but returns a generator that spits out | |||
arbitrary-sized chunks of the resulting data. | |||
""" | |||
self._setup_url(url, params, post_params) | |||
data = [] | |||
error_body = [] | |||
self.headers = cStringIO.StringIO() | |||
req_status = [None] | |||
req_expect_status = [True] | |||
def header_callback(data): | |||
"""Process header data as it comes in, so we can strip out | |||
the status line.""" | |||
# This weirdness is to handle the fact that | |||
# there can be a "HTTP/1.1 100 Continue" block before | |||
# the real response. Save all statuses that we see. | |||
if req_expect_status[0]: | |||
req_status[0] = int(data.split(" ")[1]) | |||
req_expect_status[0] = False | |||
elif data == "\r\n": | |||
req_expect_status[0] = True | |||
self.headers.write(data) | |||
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback) | |||
def close(self): | |||
self.session.close() | |||
# Use the Multi object so we don't have to block in a callback | |||
def write_callback(newdata): | |||
data.append(newdata) | |||
self.curl.setopt(pycurl.WRITEFUNCTION, write_callback) | |||
self.multi.add_handle(self.curl) | |||
def _do_req(self, method, url, query_data, body_data, stream): | |||
url = urlparse.urljoin(self.baseurl, url) | |||
try: | |||
while True: | |||
self.multi.select() | |||
with nilmdb.utils.Timer("perform"): | |||
(ret, handles) = self.multi.perform() | |||
# Check data first | |||
if len(data): | |||
if req_status[0] == 200: | |||
# If we had a 200 response, yield the data to caller. | |||
yield "".join(data) | |||
else: | |||
# Otherwise, collect it into an error string. | |||
error_body.extend(data) | |||
data = [] | |||
# If we got data, we're doing well; call perform again | |||
continue | |||
(in_queue, ok_objects, error_objects) = self.multi.info_read() | |||
if error_objects: | |||
raise ServerError(status = "502 Error", | |||
url = self.url, | |||
message = error_objects[0][2]) | |||
if ret == pycurl.E_CALL_MULTI_PERFORM: | |||
continue | |||
if handles == 0: | |||
break | |||
finally: | |||
# Have to pull out info before removing handle | |||
self._num_connects = self.curl.getinfo(pycurl.NUM_CONNECTS) | |||
code = self.curl.getinfo(pycurl.RESPONSE_CODE) | |||
# Remove handle | |||
self.multi.remove_handle(self.curl) | |||
self._check_error(code, "".join(error_body)) | |||
def _req(self, url, params, post_params = False): | |||
""" | |||
GET or POST that returns raw data. Returns the body | |||
data as a string, or raises an error if it contained an error. | |||
""" | |||
print "_req", url, params, post_params | |||
body = [] | |||
for data in self._req_generator(url, params, post_params): | |||
body.append(data) | |||
return "".join(body) | |||
def close(self): | |||
with self.lock: | |||
self.curl.close() | |||
response = self.session.request(method, url, | |||
params = query_data, | |||
data = body_data) | |||
except requests.RequestException as e: | |||
raise ServerError(status = "502 Error", url = url, | |||
message = str(e.message)) | |||
if response.status_code != 200: | |||
self._handle_error(url, response.status_code, response.content) | |||
self._last_response = response | |||
if response.headers["content-type"] in ("application/json", | |||
"application/x-json-stream"): | |||
return (response, True) | |||
else: | |||
return (response, False) | |||
def _iterate_lines(self, it): | |||
# Normal versions that return data directly | |||
def _req(self, method, url, query = None, body = None): | |||
""" | |||
Given an iterator that returns arbitrarily-sized chunks | |||
of data, return '\n'-delimited lines of text | |||
Make a request and return the body data as a string or parsed | |||
JSON object, or raise an error if it contained an error. | |||
""" | |||
partial = "" | |||
for chunk in it: | |||
partial += chunk | |||
lines = partial.split("\n") | |||
for line in lines[0:-1]: | |||
yield line | |||
partial = lines[-1] | |||
if partial != "": | |||
yield partial | |||
def _maybe_json(self, retjson, str): | |||
"""Parse str as JSON if retjson is true, otherwise return | |||
it directly.""" | |||
if retjson: | |||
return json.loads(str) | |||
return str | |||
(response, isjson) = self._do_req(method, url, query, body, False) | |||
if isjson: | |||
return json.loads(response.content) | |||
return response.content | |||
# Normal versions that return data directly | |||
def get(self, url, params = None, retjson = True): | |||
def get(self, url, params = None): | |||
"""Simple GET (parameters in URL)""" | |||
with self.lock: | |||
self.curl.setopt(pycurl.UPLOAD, 0) | |||
self.curl.setopt(pycurl.READFUNCTION, lambda: None) | |||
return self._maybe_json(retjson, self._req(url, params, False)) | |||
return self._req("GET", url, params, None) | |||
def post(self, url, params = None, retjson = True): | |||
def post(self, url, params = None): | |||
"""Simple POST (parameters in body)""" | |||
with self.lock: | |||
self.curl.setopt(pycurl.UPLOAD, 0) | |||
self.curl.setopt(pycurl.READFUNCTION, lambda: None) | |||
return self._maybe_json(retjson, self._req(url, params, True)) | |||
return self._req("POST", url, None, params) | |||
def put(self, url, data, params = None, retjson = True): | |||
def put(self, url, data, params = None): | |||
"""Simple PUT (parameters in URL, data in body)""" | |||
with self.lock: | |||
self.curl.setopt(pycurl.UPLOAD, 1) | |||
data = cStringIO.StringIO(data) | |||
self.curl.setopt(pycurl.READFUNCTION, data.read) | |||
return self._maybe_json(retjson, self._req(url, params, False)) | |||
return self._req("PUT", url, params, data) | |||
# Generator versions that return data one line at a time. | |||
def get_gen(self, url, params = None, retjson = True): | |||
"""Simple GET that yields one resonse line at a time""" | |||
with self.lock: | |||
self.curl.setopt(pycurl.UPLOAD, 0) | |||
self.curl.setopt(pycurl.READFUNCTION, lambda: None) | |||
for line in self._iterate_lines(self._req_generator(url, params)): | |||
yield self._maybe_json(retjson, line) | |||
def _req_gen(self, method, url, query = None, body = None): | |||
""" | |||
Make a request and return a generator that gives back strings | |||
or JSON decoded lines of the body data, or raise an error if | |||
it contained an eror. | |||
""" | |||
(response, isjson) = self._do_req(method, url, query, body, True) | |||
for line in response.iter_lines(): | |||
if isjson: | |||
yield json.loads(line) | |||
else: | |||
yield line | |||
def get_gen(self, url, params = None): | |||
"""Simple GET (parameters in URL) returning a generator""" | |||
return self._req_gen("GET", url, params) | |||
# Not much use for a POST or PUT generator, since they don't | |||
# return much data. |
@@ -277,17 +277,16 @@ class Stream(NilmApp): | |||
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0 | |||
@cherrypy.expose | |||
@chunked_response | |||
@response_type("text/plain") | |||
@response_type("application/x-json-stream") | |||
def intervals(self, path, start = None, end = None): | |||
""" | |||
Get intervals from backend database. Streams the resulting | |||
intervals as JSON strings separated by newlines. This may | |||
intervals as JSON strings separated by CR LF pairs. This may | |||
make multiple requests to the nilmdb backend to avoid causing | |||
it to block for too long. | |||
Note that the response type is set to 'text/plain' even | |||
though we're sending back JSON; this is because we're not | |||
really returning a single JSON object. | |||
Note that the response type is the non-standard | |||
'application/x-json-stream' for lack of a better option. | |||
""" | |||
if start is not None: | |||
start = float(start) | |||
@@ -307,8 +306,8 @@ class Stream(NilmApp): | |||
def content(start, end): | |||
# Note: disable chunked responses to see tracebacks from here. | |||
while True: | |||
(intervals, restart) = self.db.stream_intervals(path, start, end) | |||
response = ''.join([ json.dumps(i) + "\n" for i in intervals ]) | |||
(ints, restart) = self.db.stream_intervals(path, start, end) | |||
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ]) | |||
yield response | |||
if restart == 0: | |||
break | |||
@@ -6,6 +6,5 @@ from nilmdb.utils.serializer import serializer_proxy | |||
from nilmdb.utils.lrucache import lru_cache | |||
from nilmdb.utils.diskusage import du, human_size | |||
from nilmdb.utils.mustclose import must_close | |||
from nilmdb.utils.urllib import urlencode | |||
from nilmdb.utils import atomic | |||
import nilmdb.utils.threadsafety |
@@ -1,37 +0,0 @@ | |||
from __future__ import absolute_import | |||
from urllib import quote_plus, _is_unicode | |||
# urllib.urlencode insists on encoding Unicode as ASCII. This is based | |||
# on that function, except we always encode it as UTF-8 instead. | |||
def urlencode(query): | |||
"""Encode a dictionary into a URL query string. | |||
If any values in the query arg are sequences, each sequence | |||
element is converted to a separate parameter. | |||
""" | |||
query = query.items() | |||
l = [] | |||
for k, v in query: | |||
k = quote_plus(str(k)) | |||
if isinstance(v, str): | |||
v = quote_plus(v) | |||
l.append(k + '=' + v) | |||
elif _is_unicode(v): | |||
v = quote_plus(v.encode("utf-8","strict")) | |||
l.append(k + '=' + v) | |||
else: | |||
try: | |||
# is this a sufficient test for sequence-ness? | |||
len(v) | |||
except TypeError: | |||
# not a sequence | |||
v = quote_plus(str(v)) | |||
l.append(k + '=' + v) | |||
else: | |||
# loop over the sequence | |||
for elt in v: | |||
l.append(k + '=' + quote_plus(str(elt))) | |||
return '&'.join(l) |
@@ -6,6 +6,7 @@ from nilmdb.utils import timestamper | |||
from nilmdb.client import ClientError, ServerError | |||
from nilmdb.utils import datetime_tz | |||
from nose.plugins.skip import SkipTest | |||
from nose.tools import * | |||
from nose.tools import assert_raises | |||
import itertools | |||
@@ -275,13 +276,6 @@ class TestClient(object): | |||
with assert_raises(ClientError) as e: | |||
client.stream_remove("/newton/prep", 123, 120) | |||
# Test the exception we get if we nest requests | |||
with assert_raises(Exception) as e: | |||
for data in client.stream_extract("/newton/prep"): | |||
for line in client.stream_intervals("/newton/prep"): | |||
pass | |||
in_("not supported", str(e.exception)) | |||
# Test count | |||
eq_(client.stream_count("/newton/prep"), 14400) | |||
@@ -311,16 +305,6 @@ class TestClient(object): | |||
with assert_raises(ServerError) as e: | |||
client.http.get_gen("http://nosuchurl/").next() | |||
# Check non-json version of string output | |||
eq_(json.loads(client.http.get("/stream/list",retjson=False)), | |||
client.http.get("/stream/list",retjson=True)) | |||
# Check non-json version of generator output | |||
aa = list(client.http.get_gen("/stream/list",retjson=False)) | |||
bb = list(client.http.get_gen("/stream/list",retjson=True)) | |||
for (a, b) in itertools.izip(aa, bb): | |||
eq_(json.loads(a), b) | |||
# Check 404 for missing streams | |||
for function in [ client.stream_intervals, client.stream_extract ]: | |||
with assert_raises(ClientError) as e: | |||
@@ -339,37 +323,41 @@ class TestClient(object): | |||
client = nilmdb.Client(url = testurl) | |||
http = client.http | |||
# Use a warning rather than returning a test failure, so that we can | |||
# still disable chunked responses for debugging. | |||
# Use a warning rather than returning a test failure for the | |||
# transfer-encoding, so that we can still disable chunked | |||
# responses for debugging. | |||
def headers(): | |||
h = "" | |||
for (k, v) in http._last_response.headers.items(): | |||
h += k + ": " + v + "\n" | |||
return h.lower() | |||
# Intervals | |||
x = http.get("stream/intervals", { "path": "/newton/prep" }, | |||
retjson=False) | |||
headers = http._headers.getvalue() | |||
lines_(x, 1) | |||
if "Transfer-Encoding: chunked" not in headers: | |||
x = http.get("stream/intervals", { "path": "/newton/prep" }) | |||
if "transfer-encoding: chunked" not in headers(): | |||
warnings.warn("Non-chunked HTTP response for /stream/intervals") | |||
if "Content-Type: text/plain;charset=utf-8" not in headers: | |||
raise AssertionError("/stream/intervals is not text/plain:\n" + | |||
headers) | |||
if "content-type: application/x-json-stream" not in headers(): | |||
raise AssertionError("/stream/intervals content type " | |||
"is not application/x-json-stream:\n" + | |||
headers()) | |||
# Extract | |||
x = http.get("stream/extract", | |||
{ "path": "/newton/prep", | |||
"start": "123", | |||
"end": "124" }, retjson=False) | |||
headers = http._headers.getvalue() | |||
if "Transfer-Encoding: chunked" not in headers: | |||
"end": "124" }) | |||
if "transfer-encoding: chunked" not in headers(): | |||
warnings.warn("Non-chunked HTTP response for /stream/extract") | |||
if "Content-Type: text/plain;charset=utf-8" not in headers: | |||
if "content-type: text/plain;charset=utf-8" not in headers(): | |||
raise AssertionError("/stream/extract is not text/plain:\n" + | |||
headers) | |||
headers()) | |||
# Make sure Access-Control-Allow-Origin gets set | |||
if "Access-Control-Allow-Origin: " not in headers: | |||
if "access-control-allow-origin: " not in headers(): | |||
raise AssertionError("No Access-Control-Allow-Origin (CORS) " | |||
"header in /stream/extract response:\n" + | |||
headers) | |||
headers()) | |||
client.close() | |||
@@ -434,31 +422,6 @@ class TestClient(object): | |||
with assert_raises(ClientError) as e: | |||
c.stream_remove("/newton/prep", 123, 120) | |||
def test_client_00_persistent(self): | |||
# Check that connections are persistent when they should be | |||
with nilmdb.Client(url = testurl) as c: | |||
# First request makes a connection | |||
c.stream_create("/persist/test", "uint16_1") | |||
eq_(c.http._num_connects, 1) | |||
# Non-generator | |||
c.stream_list("/persist/test") | |||
eq_(c.http._num_connects, 0) | |||
c.stream_list("/persist/test") | |||
eq_(c.http._num_connects, 0) | |||
# Generators | |||
for x in c.stream_intervals("/persist/test"): | |||
pass | |||
eq_(c.http._num_connects, 0) | |||
for x in c.stream_intervals("/persist/test"): | |||
pass | |||
eq_(c.http._num_connects, 0) | |||
# Clean up | |||
c.stream_destroy("/persist/test") | |||
eq_(c.http._num_connects, 0) | |||
def test_client_10_context(self): | |||
# Test using the client's stream insertion context manager to | |||
# insert data. | |||
@@ -605,3 +568,38 @@ class TestClient(object): | |||
# Clean up | |||
client.stream_destroy("/empty/test") | |||
client.close() | |||
def test_client_12_persistent(self): | |||
# Check that connections are persistent when they should be. | |||
# This is pretty hard to test; we have to poke deep into | |||
# the Requests library. | |||
with nilmdb.Client(url = testurl) as c: | |||
def connections(): | |||
try: | |||
poolmanager = c.http._last_response.connection.poolmanager | |||
pool = poolmanager.pools[('http','localhost',12380)] | |||
return (pool.num_connections, pool.num_requests) | |||
except: | |||
raise SkipTest("can't get connection info") | |||
# First request makes a connection | |||
c.stream_create("/persist/test", "uint16_1") | |||
eq_(connections(), (1, 1)) | |||
# Non-generator | |||
c.stream_list("/persist/test") | |||
eq_(connections(), (1, 2)) | |||
c.stream_list("/persist/test") | |||
eq_(connections(), (1, 3)) | |||
# Generators | |||
for x in c.stream_intervals("/persist/test"): | |||
pass | |||
eq_(connections(), (1, 4)) | |||
for x in c.stream_intervals("/persist/test"): | |||
pass | |||
eq_(connections(), (1, 5)) | |||
# Clean up | |||
c.stream_destroy("/persist/test") | |||
eq_(connections(), (1, 6)) |
@@ -163,16 +163,16 @@ class TestCmdline(object): | |||
# try some URL constructions | |||
self.fail("--url http://nosuchurl/ info") | |||
self.contain("Couldn't resolve host 'nosuchurl'") | |||
self.contain("error connecting to server") | |||
self.fail("--url nosuchurl info") | |||
self.contain("Couldn't resolve host 'nosuchurl'") | |||
self.contain("error connecting to server") | |||
self.fail("-u nosuchurl/foo info") | |||
self.contain("Couldn't resolve host 'nosuchurl'") | |||
self.contain("error connecting to server") | |||
self.fail("-u localhost:0 info") | |||
self.contain("couldn't connect to host") | |||
self.fail("-u localhost:1 info") | |||
self.contain("error connecting to server") | |||
self.ok("-u localhost:12380 info") | |||
self.ok("info") | |||