- """HTTP client library"""
-
- import nilmdb.utils
- from nilmdb.client.errors import ClientError, ServerError, Error
-
- import simplejson as json
- import urllib.parse
- import requests
-
- class HTTPClient(object):
- """Class to manage and perform HTTP requests from the client"""
- def __init__(self, baseurl = "", post_json = False, verify_ssl = True):
- """If baseurl is supplied, all other functions that take
- a URL can be given a relative URL instead."""
- # Verify / clean up URL
- reparsed = urllib.parse.urlparse(baseurl).geturl()
- if '://' not in reparsed:
- reparsed = urllib.parse.urlparse("http://" + baseurl).geturl()
- self.baseurl = reparsed.rstrip('/') + '/'
-
- # Note whether we want SSL verification
- self.verify_ssl = verify_ssl
-
- # Saved response, so that tests can verify a few things.
- self._last_response = {}
-
- # Whether to send application/json POST bodies (versus
- # x-www-form-urlencoded)
- self.post_json = post_json
-
- def _handle_error(self, url, code, body):
- # Default variables for exception. We use the entire body as
- # the default message, in case we can't extract it from a JSON
- # response.
- args = { "url" : url,
- "status" : str(code),
- "message" : body,
- "traceback" : None }
- try:
- # Fill with server-provided data if we can
- jsonerror = json.loads(body)
- args["status"] = jsonerror["status"]
- args["message"] = jsonerror["message"]
- args["traceback"] = jsonerror["traceback"]
- except Exception:
- pass
- if code >= 400 and code <= 499:
- raise ClientError(**args)
- else:
- if code >= 500 and code <= 599:
- if args["message"] is None:
- args["message"] = ("(no message; try disabling " +
- "response.stream option in " +
- "nilmdb.server for better debugging)")
- raise ServerError(**args)
- else:
- raise Error(**args)
-
- def close(self):
- pass
-
- def _do_req(self, method, url, query_data, body_data, stream, headers):
- url = urllib.parse.urljoin(self.baseurl, url)
- try:
- # Create a new session, ensure we send "Connection: close",
- # and explicitly close connection after the transfer.
- # This is to avoid HTTP/1.1 persistent connections
- # (keepalive), because they have fundamental race
- # conditions when there are delays between requests:
- # a new request may be sent at the same instant that the
- # server decides to timeout the connection.
- session = requests.Session()
- if headers is None:
- headers = {}
- headers["Connection"] = "close"
- response = session.request(method, url,
- params = query_data,
- data = body_data,
- stream = stream,
- headers = headers,
- verify = self.verify_ssl)
-
- # Close the connection. If it's a generator (stream =
- # True), the requests library shouldn't actually close the
- # HTTP connection until all data has been read from the
- # response.
- session.close()
- except requests.RequestException as e:
- raise ServerError(status = "502 Error", url = url,
- message = str(e))
- if response.status_code != 200:
- self._handle_error(url, response.status_code, response.content)
- self._last_response = response
- if response.headers["content-type"] in ("application/json",
- "application/x-json-stream"):
- return (response, True)
- else:
- return (response, False)
-
- # Normal versions that return data directly
- def _req(self, method, url, query = None, body = None, headers = None):
- """
- Make a request and return the body data as a string or parsed
- JSON object, or raise an error if it contained an error.
- """
- (response, isjson) = self._do_req(method, url, query, body,
- stream = False, headers = headers)
- if isjson:
- return json.loads(response.content)
- return response.content
-
- def get(self, url, params = None):
- """Simple GET (parameters in URL)"""
- return self._req("GET", url, params, None)
-
- def post(self, url, params = None):
- """Simple POST (parameters in body)"""
- if self.post_json:
- return self._req("POST", url, None,
- json.dumps(params),
- { 'Content-type': 'application/json' })
- else:
- return self._req("POST", url, None, params)
-
- def put(self, url, data, params = None,
- content_type = "application/octet-stream"):
- """Simple PUT (parameters in URL, data in body)"""
- h = { 'Content-type': content_type }
- return self._req("PUT", url, query = params, body = data, headers = h)
-
- # Generator versions that return data one line at a time.
- def _req_gen(self, method, url, query = None, body = None,
- headers = None, binary = False):
- """
- Make a request and return a generator that gives back strings
- or JSON decoded lines of the body data, or raise an error if
- it contained an eror.
- """
- (response, isjson) = self._do_req(method, url, query, body,
- stream = True, headers = headers)
-
- # Like the iter_lines function in Requests, but only splits on
- # the specified line ending.
- def lines(source, ending):
- pending = None
- for chunk in source:
- if pending is not None:
- chunk = pending + chunk
- tmp = chunk.split(ending)
- lines = tmp[:-1]
- if chunk.endswith(ending):
- pending = None
- else:
- pending = tmp[-1]
- for line in lines:
- yield line
- if pending is not None:
- yield pending
-
- # Yield the chunks or lines as requested
- if binary:
- for chunk in response.iter_content(chunk_size = 65536):
- yield chunk
- elif isjson:
- for line in lines(response.iter_content(chunk_size = 1),
- ending = b'\r\n'):
- yield json.loads(line)
- else:
- for line in lines(response.iter_content(chunk_size = 65536),
- ending = b'\n'):
- yield line
-
- def get_gen(self, url, params = None, binary = False):
- """Simple GET (parameters in URL) returning a generator"""
- return self._req_gen("GET", url, params, binary = binary)
-
- def post_gen(self, url, params = None):
- """Simple POST (parameters in body) returning a generator"""
- if self.post_json:
- return self._req_gen("POST", url, None,
- json.dumps(params),
- { 'Content-type': 'application/json' })
- else:
- return self._req_gen("POST", url, None, params)
-
- # Not much use for a POST or PUT generator, since they don't
- # return much data.
|