Replace pyCurl with Requests
Only tested with v1.1.0. It's not clear how well older versions will work.
This commit is contained in:
parent
14afa02db6
commit
50a4a60786
|
@ -156,7 +156,7 @@ class Client(object):
|
|||
params["start"] = float_to_string(start)
|
||||
if end is not None:
|
||||
params["end"] = float_to_string(end)
|
||||
return self.http.get_gen("stream/intervals", params, retjson = True)
|
||||
return self.http.get_gen("stream/intervals", params)
|
||||
|
||||
def stream_extract(self, path, start = None, end = None, count = False):
|
||||
"""
|
||||
|
@ -176,8 +176,7 @@ class Client(object):
|
|||
params["end"] = float_to_string(end)
|
||||
if count:
|
||||
params["count"] = 1
|
||||
|
||||
return self.http.get_gen("stream/extract", params, retjson = False)
|
||||
return self.http.get_gen("stream/extract", params)
|
||||
|
||||
def stream_count(self, path, start = None, end = None):
|
||||
"""
|
||||
|
|
|
@ -6,21 +6,7 @@ from nilmdb.client.errors import ClientError, ServerError, Error
|
|||
|
||||
import simplejson as json
|
||||
import urlparse
|
||||
import pycurl
|
||||
import cStringIO
|
||||
import threading
|
||||
|
||||
import warnings # can remove
|
||||
|
||||
class HTTPClientLock(object):
|
||||
def __init__(self):
|
||||
self.lock = threading.Lock()
|
||||
def __enter__(self):
|
||||
if not self.lock.acquire(False):
|
||||
raise Exception("Client is already performing a request, and "
|
||||
"nested or concurrent calls are not supported.")
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.lock.release()
|
||||
import requests
|
||||
|
||||
class HTTPClient(object):
|
||||
"""Class to manage and perform HTTP requests from the client"""
|
||||
|
@ -32,42 +18,19 @@ class HTTPClient(object):
|
|||
if '://' not in reparsed:
|
||||
reparsed = urlparse.urlparse("http://" + baseurl).geturl()
|
||||
self.baseurl = reparsed
|
||||
self.multi = pycurl.CurlMulti()
|
||||
self.curl = pycurl.Curl()
|
||||
# Add and remove the handle to workaround a curl bug (debian #701713)
|
||||
self.multi.add_handle(self.curl)
|
||||
self.multi.remove_handle(self.curl)
|
||||
#self.multi = nilmdb.utils.threadsafety.verify_proxy(pycurl.CurlMulti)()
|
||||
#self.curl = nilmdb.utils.threadsafety.verify_proxy(pycurl.Curl)()
|
||||
self.curl.setopt(pycurl.SSL_VERIFYHOST, 2)
|
||||
self.curl.setopt(pycurl.FOLLOWLOCATION, 1)
|
||||
self.curl.setopt(pycurl.MAXREDIRS, 5)
|
||||
self.curl.setopt(pycurl.NOSIGNAL, 1)
|
||||
self.lock = HTTPClientLock()
|
||||
self._setup_url()
|
||||
|
||||
def _setup_url(self, url = "", params = None, post = False):
|
||||
url = urlparse.urljoin(self.baseurl, url)
|
||||
if params is None:
|
||||
encoded = ""
|
||||
else:
|
||||
encoded = nilmdb.utils.urllib.urlencode(params)
|
||||
self.curl.setopt(pycurl.POST, 1 if post else 0)
|
||||
if post:
|
||||
self.curl.setopt(pycurl.POSTFIELDS, encoded)
|
||||
else:
|
||||
if encoded:
|
||||
url = urlparse.urljoin(url, '?' + encoded)
|
||||
self.curl.setopt(pycurl.URL, url)
|
||||
self.url = url
|
||||
# Build Requests session object, enable SSL verification
|
||||
self.session = requests.Session()
|
||||
self.session.verify = True
|
||||
|
||||
def _check_error(self, code, body):
|
||||
# Saved headers, for tests to check
|
||||
self._headers = {}
|
||||
|
||||
def _handle_error(self, url, code, body):
|
||||
# Default variables for exception. We use the entire body as
|
||||
# the default message, in case we can't extract it from a JSON
|
||||
# response.
|
||||
if code == 200:
|
||||
return
|
||||
args = { "url" : self.url,
|
||||
args = { "url" : url,
|
||||
"status" : str(code),
|
||||
"message" : body,
|
||||
"traceback" : None }
|
||||
|
@ -91,137 +54,67 @@ class HTTPClient(object):
|
|||
else:
|
||||
raise Error(**args)
|
||||
|
||||
def _req_generator(self, url, params, post_params = False):
|
||||
"""
|
||||
Like self._req(), but returns a generator that spits out
|
||||
arbitrary-sized chunks of the resulting data.
|
||||
"""
|
||||
self._setup_url(url, params, post_params)
|
||||
data = []
|
||||
error_body = []
|
||||
self.headers = cStringIO.StringIO()
|
||||
req_status = [None]
|
||||
req_expect_status = [True]
|
||||
def header_callback(data):
|
||||
"""Process header data as it comes in, so we can strip out
|
||||
the status line."""
|
||||
# This weirdness is to handle the fact that
|
||||
# there can be a "HTTP/1.1 100 Continue" block before
|
||||
# the real response. Save all statuses that we see.
|
||||
if req_expect_status[0]:
|
||||
req_status[0] = int(data.split(" ")[1])
|
||||
req_expect_status[0] = False
|
||||
elif data == "\r\n":
|
||||
req_expect_status[0] = True
|
||||
self.headers.write(data)
|
||||
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
|
||||
|
||||
# Use the Multi object so we don't have to block in a callback
|
||||
def write_callback(newdata):
|
||||
data.append(newdata)
|
||||
self.curl.setopt(pycurl.WRITEFUNCTION, write_callback)
|
||||
self.multi.add_handle(self.curl)
|
||||
try:
|
||||
while True:
|
||||
self.multi.select()
|
||||
with nilmdb.utils.Timer("perform"):
|
||||
(ret, handles) = self.multi.perform()
|
||||
# Check data first
|
||||
if len(data):
|
||||
if req_status[0] == 200:
|
||||
# If we had a 200 response, yield the data to caller.
|
||||
yield "".join(data)
|
||||
else:
|
||||
# Otherwise, collect it into an error string.
|
||||
error_body.extend(data)
|
||||
data = []
|
||||
# If we got data, we're doing well; call perform again
|
||||
continue
|
||||
(in_queue, ok_objects, error_objects) = self.multi.info_read()
|
||||
if error_objects:
|
||||
raise ServerError(status = "502 Error",
|
||||
url = self.url,
|
||||
message = error_objects[0][2])
|
||||
if ret == pycurl.E_CALL_MULTI_PERFORM:
|
||||
continue
|
||||
if handles == 0:
|
||||
break
|
||||
finally:
|
||||
# Have to pull out info before removing handle
|
||||
self._num_connects = self.curl.getinfo(pycurl.NUM_CONNECTS)
|
||||
code = self.curl.getinfo(pycurl.RESPONSE_CODE)
|
||||
# Remove handle
|
||||
self.multi.remove_handle(self.curl)
|
||||
self._check_error(code, "".join(error_body))
|
||||
|
||||
def _req(self, url, params, post_params = False):
|
||||
"""
|
||||
GET or POST that returns raw data. Returns the body
|
||||
data as a string, or raises an error if it contained an error.
|
||||
"""
|
||||
print "_req", url, params, post_params
|
||||
body = []
|
||||
for data in self._req_generator(url, params, post_params):
|
||||
body.append(data)
|
||||
return "".join(body)
|
||||
|
||||
def close(self):
|
||||
with self.lock:
|
||||
self.curl.close()
|
||||
self.session.close()
|
||||
|
||||
def _iterate_lines(self, it):
|
||||
"""
|
||||
Given an iterator that returns arbitrarily-sized chunks
|
||||
of data, return '\n'-delimited lines of text
|
||||
"""
|
||||
partial = ""
|
||||
for chunk in it:
|
||||
partial += chunk
|
||||
lines = partial.split("\n")
|
||||
for line in lines[0:-1]:
|
||||
yield line
|
||||
partial = lines[-1]
|
||||
if partial != "":
|
||||
yield partial
|
||||
|
||||
def _maybe_json(self, retjson, str):
|
||||
"""Parse str as JSON if retjson is true, otherwise return
|
||||
it directly."""
|
||||
if retjson:
|
||||
return json.loads(str)
|
||||
return str
|
||||
def _do_req(self, method, url, query_data, body_data, stream):
|
||||
url = urlparse.urljoin(self.baseurl, url)
|
||||
try:
|
||||
response = self.session.request(method, url,
|
||||
params = query_data,
|
||||
data = body_data)
|
||||
except requests.RequestException as e:
|
||||
raise ServerError(status = "502 Error", url = url,
|
||||
message = str(e.message))
|
||||
if response.status_code != 200:
|
||||
self._handle_error(url, response.status_code, response.content)
|
||||
self._headers = response.headers
|
||||
if response.headers["content-type"] in ("application/json",
|
||||
"application/x-json-stream"):
|
||||
return (response, True)
|
||||
else:
|
||||
return (response, False)
|
||||
|
||||
# Normal versions that return data directly
|
||||
def get(self, url, params = None, retjson = True):
|
||||
def _req(self, method, url, query = None, body = None):
|
||||
"""
|
||||
Make a request and return the body data as a string or parsed
|
||||
JSON object, or raise an error if it contained an error.
|
||||
"""
|
||||
(response, isjson) = self._do_req(method, url, query, body, False)
|
||||
if isjson:
|
||||
return json.loads(response.content)
|
||||
return response.content
|
||||
|
||||
def get(self, url, params = None):
|
||||
"""Simple GET (parameters in URL)"""
|
||||
with self.lock:
|
||||
self.curl.setopt(pycurl.UPLOAD, 0)
|
||||
self.curl.setopt(pycurl.READFUNCTION, lambda: None)
|
||||
return self._maybe_json(retjson, self._req(url, params, False))
|
||||
return self._req("GET", url, params, None)
|
||||
|
||||
def post(self, url, params = None, retjson = True):
|
||||
def post(self, url, params = None):
|
||||
"""Simple POST (parameters in body)"""
|
||||
with self.lock:
|
||||
self.curl.setopt(pycurl.UPLOAD, 0)
|
||||
self.curl.setopt(pycurl.READFUNCTION, lambda: None)
|
||||
return self._maybe_json(retjson, self._req(url, params, True))
|
||||
return self._req("POST", url, None, params)
|
||||
|
||||
def put(self, url, data, params = None, retjson = True):
|
||||
def put(self, url, data, params = None):
|
||||
"""Simple PUT (parameters in URL, data in body)"""
|
||||
with self.lock:
|
||||
self.curl.setopt(pycurl.UPLOAD, 1)
|
||||
data = cStringIO.StringIO(data)
|
||||
self.curl.setopt(pycurl.READFUNCTION, data.read)
|
||||
return self._maybe_json(retjson, self._req(url, params, False))
|
||||
return self._req("PUT", url, params, data)
|
||||
|
||||
# Generator versions that return data one line at a time.
|
||||
def get_gen(self, url, params = None, retjson = True):
|
||||
"""Simple GET that yields one resonse line at a time"""
|
||||
with self.lock:
|
||||
self.curl.setopt(pycurl.UPLOAD, 0)
|
||||
self.curl.setopt(pycurl.READFUNCTION, lambda: None)
|
||||
for line in self._iterate_lines(self._req_generator(url, params)):
|
||||
yield self._maybe_json(retjson, line)
|
||||
def _req_gen(self, method, url, query = None, body = None):
|
||||
"""
|
||||
Make a request and return a generator that gives back strings
|
||||
or JSON decoded lines of the body data, or raise an error if
|
||||
it contained an eror.
|
||||
"""
|
||||
(response, isjson) = self._do_req(method, url, query, body, True)
|
||||
for line in response.iter_lines():
|
||||
if isjson:
|
||||
yield json.loads(line)
|
||||
else:
|
||||
yield line
|
||||
|
||||
def get_gen(self, url, params = None):
|
||||
"""Simple GET (parameters in URL) returning a generator"""
|
||||
return self._req_gen("GET", url, params)
|
||||
|
||||
# Not much use for a POST or PUT generator, since they don't
|
||||
# return much data.
|
||||
|
|
|
@ -277,17 +277,16 @@ class Stream(NilmApp):
|
|||
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
@cherrypy.expose
|
||||
@chunked_response
|
||||
@response_type("text/plain")
|
||||
@response_type("application/x-json-stream")
|
||||
def intervals(self, path, start = None, end = None):
|
||||
"""
|
||||
Get intervals from backend database. Streams the resulting
|
||||
intervals as JSON strings separated by newlines. This may
|
||||
intervals as JSON strings separated by CR LF pairs. This may
|
||||
make multiple requests to the nilmdb backend to avoid causing
|
||||
it to block for too long.
|
||||
|
||||
Note that the response type is set to 'text/plain' even
|
||||
though we're sending back JSON; this is because we're not
|
||||
really returning a single JSON object.
|
||||
Note that the response type is the non-standard
|
||||
'application/x-json-stream' for lack of a better option.
|
||||
"""
|
||||
if start is not None:
|
||||
start = float(start)
|
||||
|
@ -307,8 +306,8 @@ class Stream(NilmApp):
|
|||
def content(start, end):
|
||||
# Note: disable chunked responses to see tracebacks from here.
|
||||
while True:
|
||||
(intervals, restart) = self.db.stream_intervals(path, start, end)
|
||||
response = ''.join([ json.dumps(i) + "\n" for i in intervals ])
|
||||
(ints, restart) = self.db.stream_intervals(path, start, end)
|
||||
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
|
||||
yield response
|
||||
if restart == 0:
|
||||
break
|
||||
|
|
|
@ -275,13 +275,6 @@ class TestClient(object):
|
|||
with assert_raises(ClientError) as e:
|
||||
client.stream_remove("/newton/prep", 123, 120)
|
||||
|
||||
# Test the exception we get if we nest requests
|
||||
with assert_raises(Exception) as e:
|
||||
for data in client.stream_extract("/newton/prep"):
|
||||
for line in client.stream_intervals("/newton/prep"):
|
||||
pass
|
||||
in_("not supported", str(e.exception))
|
||||
|
||||
# Test count
|
||||
eq_(client.stream_count("/newton/prep"), 14400)
|
||||
|
||||
|
@ -311,16 +304,6 @@ class TestClient(object):
|
|||
with assert_raises(ServerError) as e:
|
||||
client.http.get_gen("http://nosuchurl/").next()
|
||||
|
||||
# Check non-json version of string output
|
||||
eq_(json.loads(client.http.get("/stream/list",retjson=False)),
|
||||
client.http.get("/stream/list",retjson=True))
|
||||
|
||||
# Check non-json version of generator output
|
||||
aa = list(client.http.get_gen("/stream/list",retjson=False))
|
||||
bb = list(client.http.get_gen("/stream/list",retjson=True))
|
||||
for (a, b) in itertools.izip(aa, bb):
|
||||
eq_(json.loads(a), b)
|
||||
|
||||
# Check 404 for missing streams
|
||||
for function in [ client.stream_intervals, client.stream_extract ]:
|
||||
with assert_raises(ClientError) as e:
|
||||
|
@ -339,37 +322,41 @@ class TestClient(object):
|
|||
client = nilmdb.Client(url = testurl)
|
||||
http = client.http
|
||||
|
||||
# Use a warning rather than returning a test failure, so that we can
|
||||
# still disable chunked responses for debugging.
|
||||
# Use a warning rather than returning a test failure for the
|
||||
# transfer-encoding, so that we can still disable chunked
|
||||
# responses for debugging.
|
||||
|
||||
def headers():
|
||||
h = ""
|
||||
for (k, v) in http._headers.items():
|
||||
h += k + ": " + v + "\n"
|
||||
return h.lower()
|
||||
|
||||
# Intervals
|
||||
x = http.get("stream/intervals", { "path": "/newton/prep" },
|
||||
retjson=False)
|
||||
headers = http._headers.getvalue()
|
||||
lines_(x, 1)
|
||||
if "Transfer-Encoding: chunked" not in headers:
|
||||
x = http.get("stream/intervals", { "path": "/newton/prep" })
|
||||
if "transfer-encoding: chunked" not in headers():
|
||||
warnings.warn("Non-chunked HTTP response for /stream/intervals")
|
||||
if "Content-Type: text/plain;charset=utf-8" not in headers:
|
||||
raise AssertionError("/stream/intervals is not text/plain:\n" +
|
||||
headers)
|
||||
if "content-type: application/x-json-stream" not in headers():
|
||||
raise AssertionError("/stream/intervals content type "
|
||||
"is not application/x-json-stream:\n" +
|
||||
headers())
|
||||
|
||||
# Extract
|
||||
x = http.get("stream/extract",
|
||||
{ "path": "/newton/prep",
|
||||
"start": "123",
|
||||
"end": "124" }, retjson=False)
|
||||
headers = http._headers.getvalue()
|
||||
if "Transfer-Encoding: chunked" not in headers:
|
||||
"end": "124" })
|
||||
if "transfer-encoding: chunked" not in headers():
|
||||
warnings.warn("Non-chunked HTTP response for /stream/extract")
|
||||
if "Content-Type: text/plain;charset=utf-8" not in headers:
|
||||
if "content-type: text/plain;charset=utf-8" not in headers():
|
||||
raise AssertionError("/stream/extract is not text/plain:\n" +
|
||||
headers)
|
||||
headers())
|
||||
|
||||
# Make sure Access-Control-Allow-Origin gets set
|
||||
if "Access-Control-Allow-Origin: " not in headers:
|
||||
if "access-control-allow-origin: " not in headers():
|
||||
raise AssertionError("No Access-Control-Allow-Origin (CORS) "
|
||||
"header in /stream/extract response:\n" +
|
||||
headers)
|
||||
headers())
|
||||
|
||||
client.close()
|
||||
|
||||
|
|
|
@ -163,16 +163,16 @@ class TestCmdline(object):
|
|||
|
||||
# try some URL constructions
|
||||
self.fail("--url http://nosuchurl/ info")
|
||||
self.contain("Couldn't resolve host 'nosuchurl'")
|
||||
self.contain("error connecting to server")
|
||||
|
||||
self.fail("--url nosuchurl info")
|
||||
self.contain("Couldn't resolve host 'nosuchurl'")
|
||||
self.contain("error connecting to server")
|
||||
|
||||
self.fail("-u nosuchurl/foo info")
|
||||
self.contain("Couldn't resolve host 'nosuchurl'")
|
||||
self.contain("error connecting to server")
|
||||
|
||||
self.fail("-u localhost:0 info")
|
||||
self.contain("couldn't connect to host")
|
||||
self.fail("-u localhost:1 info")
|
||||
self.contain("error connecting to server")
|
||||
|
||||
self.ok("-u localhost:12380 info")
|
||||
self.ok("info")
|
||||
|
|
Loading…
Reference in New Issue
Block a user