Browse Source

For stream_interval, make the server handle sending multiple requests

to the database, not the client.  The server now maintains the open
HTTP connection and sends a continuous streaming reply to the GET
request.

HTTP client side uses an Iteratorizer to turn the curl.perform()
callback into an interator, and returns an iterator that yields
individual lines to the caller rather than buffering up all the data
at once.  Should still be able to handle errors etc.

Server changed to return a "streaming JSON" instance for the 
/stream/interval requests.  This is just a series of independent
JSON documents (one per interval), separated by newlines.

Adjust nilmdb's max_results a bit.  Now, multiple requests only exist
between the server <-> nilmdb threads, and they exist just to avoid 
blocking the nilmdb thread by any one server thread for too long.
So adjust the size accordingly to match the fact that this is non-json
encoded data.


git-svn-id: https://bucket.mit.edu/svn/nilm/nilmdb@10881 ddd99763-3ecb-0310-9145-efcb8ce7c51f
tags/bxinterval-last
Jim Paris 10 years ago
parent
commit
3b0b9175d6
5 changed files with 118 additions and 49 deletions
  1. +1
    -13
      nilmdb/client.py
  2. +99
    -27
      nilmdb/httpclient.py
  3. +3
    -3
      nilmdb/nilmdb.py
  4. +14
    -6
      nilmdb/server.py
  5. +1
    -0
      setup.cfg

+ 1
- 13
nilmdb/client.py View File

@@ -114,8 +114,6 @@ class Client(object):
def stream_intervals(self, path, start = None, end = None):
"""
Return a generator that yields each stream interval.
Multiple requests are made to the server if the results
get truncated.
"""
params = {
"path": path
@@ -124,16 +122,7 @@ class Client(object):
params["start"] = repr(start) # use repr to keep precision
if end is not None:
params["end"] = repr(end)

while True:
(intervals, restart) = self.http.get("stream/intervals", params)
for interval in intervals:
yield interval
if restart:
# Restart where we left off
params["start"] = repr(restart)
else:
break
return self.http.get_gen("stream/intervals", params)

def stream_extract(self, path, start = None, end = None, bare = False):
"""
@@ -159,4 +148,3 @@ class Client(object):
if more:
# Restart where we left off
params["start"] = repr(intervals[-1][1])


+ 99
- 27
nilmdb/httpclient.py View File

@@ -12,8 +12,8 @@ import urlparse
import urllib
import pycurl
import cStringIO
import Queue
import threading
import nilmdb.iteratorizer

class Error(Exception):
"""Base exception for both ClientError and ServerError responses"""
@@ -89,29 +89,49 @@ class HTTPClient(object):
else:
raise Error(**args)

def _perform_generator(self):
"""Like self.curl.perform(), but runs the perform in a
separate thread and returns a generator instead of using
the WRITEFUNCTION callback"""
queue = Queue.Queue(maxsize=1)
def callback(data):
queue.put((1, data))
def thread():
try:
self.curl.perform()
except:
info = sys.exc_info()
queue.put((2, info))
queue.put((0, None))
self.curl.setopt(pycurl.WRITEFUNCTION, callback)
self.curl.perform()

def _req_generator(self, url, params):
"""
Like self._req(), but runs the perform in a separate thread.
It returns a generator that spits out arbitrary-sized chunks
of the resulting data, instead of using the WRITEFUNCTION
callback.
"""
self._setup_url(url, params)
self._status = None
error_body = ""
def header_callback(data):
if self._status is None:
self._status = int(data.split(" ")[1])
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
def func(callback):
self.curl.setopt(pycurl.WRITEFUNCTION, callback)
self.curl.perform()
try:
for i in nilmdb.iteratorizer.Iteratorizer(func):
if self._status == 200:
# If we had a 200 response, yield the data to the caller.
yield i
else:
# Otherwise, collect it into an error string.
error_body += i
except pycurl.error as e:
raise ServerError(status = "502 Error",
url = self.url,
message = e[1])
# Raise an exception if there was an error
self._check_error(error_body)

def _req(self, url, params):
"""GET or POST that returns raw data"""
"""
GET or POST that returns raw data. Returns the body
data as a string, or raises an error if it contained an error.
"""
self._setup_url(url, params)
body = cStringIO.StringIO()
self.curl.setopt(pycurl.WRITEFUNCTION, body.write)
def header_callback(data):
pass
self.curl.setopt(pycurl.HEADERFUNCTION, header_callback)
try:
self.curl.perform()
except pycurl.error as e:
@@ -119,27 +139,79 @@ class HTTPClient(object):
url = self.url,
message = e[1])
body_str = body.getvalue()
# Raise an exception if there was an error
self._check_error(body_str)
return body_str

def close(self):
self.curl.close()

def get(self, url, params = None, retjson = True):
"""Simple GET"""
self.curl.setopt(pycurl.UPLOAD, 0)
def _iterate_lines(self, it):
"""
Given an iterator that returns arbitrarily-sized chunks
of data, return '\n'-delimited lines of text
"""
partial = ""
for chunk in it:
partial += chunk
lines = partial.split("\n")
for line in lines[0:-1]:
yield line
partial = lines[-1]
if partial != "":
yield partial

# Non-generator versions
def _doreq(self, url, params, retjson):
"""
Perform a request, and return the body.

url: URL to request (relative to baseurl)
params: dictionary of query parameters
retjson: expect JSON and return python objects instead of string
"""
out = self._req(url, params)
if retjson:
return json.loads(out)
return out

def get(self, url, params = None, retjson = True):
"""Simple GET"""
self.curl.setopt(pycurl.UPLOAD, 0)
return self._doreq(url, params, retjson)

def put(self, url, postdata, params = None, retjson = True):
"""Simple PUT"""
self._setup_url(url, params)
data = cStringIO.StringIO(postdata)
self.curl.setopt(pycurl.UPLOAD, 1)
self.curl.setopt(pycurl.READFUNCTION, data.read)
out = self._req(url, params)
if retjson:
return json.loads(out)
return out
return self._doreq(url, params, retjson)

# Generator versions
def _doreq_gen(self, url, params, retjson):
"""
Perform a request, and return lines of the body in a generator.

url: URL to request (relative to baseurl)
params: dictionary of query parameters
retjson: expect JSON and yield python objects instead of strings
"""
for line in self._iterate_lines(self._req_generator(url, params)):
if retjson:
yield json.loads(line)
else:
yield line

def get_gen(self, url, params = None, retjson = True):
"""Simple GET, returning a generator"""
self.curl.setopt(pycurl.UPLOAD, 0)
return self._doreq_gen(url, params, retjson)

def put_gen(self, url, postdata, params = None, retjson = True):
"""Simple PUT, returning a generator"""
self._setup_url(url, params)
data = cStringIO.StringIO(postdata)
self.curl.setopt(pycurl.UPLOAD, 1)
self.curl.setopt(pycurl.READFUNCTION, data.read)
return self._doreq_gen(url, params, retjson)

+ 3
- 3
nilmdb/nilmdb.py View File

@@ -117,7 +117,7 @@ class NilmDB(object):
self.con.execute("PRAGMA synchronous=OFF")

# Approximate largest response that we want to send in a single
# reply (for stream_intervals, stream_response)
# reply (for stream_intervals, stream_extract)
if response_size:
self.response_size = response_size
else:
@@ -237,8 +237,8 @@ class NilmDB(object):
starting timestamp to the point at which it was truncated.
"""

# About 38 bytes per interval in the JSON output.
max_results = min(self.response_size / 38, 2)
# About 16 bytes per interval
max_results = min(self.response_size / 16, 2)

stream_id = self._stream_id(path)
intervals = self._get_intervals(stream_id)


+ 14
- 6
nilmdb/server.py View File

@@ -178,17 +178,25 @@ class Stream(NilmApp):
# /stream/intervals?path=/newton/prep
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose
@cherrypy.tools.json_out()
def intervals(self, path, start = None, end = None):
# This might not return complete data. If the return value
# `restart` is nonzero, retry the request with start=restart
# to get the rest.
"""
Get intervals from backend database. Streams the resulting
intervals as JSON strings separated by newlines. This may
make multiple requests to the nilmdb backend to avoid causing
it to block for too long.
"""
if start is not None:
start = float(start)
if end is not None:
end = float(end)
(intervals, restart) = self.db.stream_intervals(path, start, end)
return (intervals, restart)

while True:
(intervals, restart) = self.db.stream_intervals(path, start, end)
for interval in intervals:
yield json.dumps(interval) + "\n"
if restart == 0:
break
start = restart

# /stream/extract?path=/newton/prep&start=1234567890.0&end=1234567899.0
@cherrypy.expose


+ 1
- 0
setup.cfg View File

@@ -16,6 +16,7 @@ verbosity=2
#tests=tests/test_client.py
#tests=tests/test_timestamper.py
#tests=tests/test_serializer.py
#tests=tests/test_iteratorizer.py
#tests=tests/test_client.py:TestClient.test_client_nilmdb
#with-profile=
#profile-sort=time


Loading…
Cancel
Save