Add pycurl-specific hack to Iteratorizer

Inside the pycurl callback, we can't raise exceptions, because the
pycurl extension module will unconditionally print the exception
itself, and not pass it up to the caller.  Instead, we have the
callback return a value that tells curl to abort.  (-1 would be best,
in case we were given 0 bytes, but the extension doesn't support
that either).

This resolves the 'Exception("should die")' problem when interrupting
a streaming generator like stream_extract.
This commit is contained in:
Jim Paris 2013-01-24 19:06:20 -05:00
parent 294ec6988b
commit 40b966aef2
3 changed files with 27 additions and 7 deletions

View File

@ -119,7 +119,7 @@ class HTTPClient(object):
self.curl.setopt(pycurl.WRITEFUNCTION, callback) self.curl.setopt(pycurl.WRITEFUNCTION, callback)
self.curl.perform() self.curl.perform()
try: try:
with nilmdb.utils.Iteratorizer(func) as it: with nilmdb.utils.Iteratorizer(func, curl_hack = True) as it:
for i in it: for i in it:
if self._status == 200: if self._status == 200:
# If we had a 200 response, yield the data to caller. # If we had a 200 response, yield the data to caller.

View File

@ -10,7 +10,7 @@ import contextlib
# Based partially on http://stackoverflow.com/questions/9968592/ # Based partially on http://stackoverflow.com/questions/9968592/
class IteratorizerThread(threading.Thread): class IteratorizerThread(threading.Thread):
def __init__(self, queue, function): def __init__(self, queue, function, curl_hack):
""" """
function: function to execute, which takes the function: function to execute, which takes the
callback (provided by this class) as an argument callback (provided by this class) as an argument
@ -19,11 +19,24 @@ class IteratorizerThread(threading.Thread):
self.function = function self.function = function
self.queue = queue self.queue = queue
self.die = False self.die = False
self.curl_hack = curl_hack
def callback(self, data): def callback(self, data):
if self.die: try:
raise Exception() # trigger termination if self.die:
self.queue.put((1, data)) raise Exception() # trigger termination
self.queue.put((1, data))
except:
if self.curl_hack:
# We can't raise exceptions, because the pycurl
# extension module will unconditionally print the
# exception itself, and not pass it up to the caller.
# Instead, just return a value that tells curl to
# abort. (-1 would be best, in case we were given 0
# bytes, but the extension doesn't support that).
self.queue.put((2, sys.exc_info()))
return 0
raise
def run(self): def run(self):
try: try:
@ -34,7 +47,7 @@ class IteratorizerThread(threading.Thread):
self.queue.put((0, result)) self.queue.put((0, result))
@contextlib.contextmanager @contextlib.contextmanager
def Iteratorizer(function): def Iteratorizer(function, curl_hack = False):
""" """
Context manager that takes a function expecting a callback, Context manager that takes a function expecting a callback,
and provides an iterable that yields the values passed to that and provides an iterable that yields the values passed to that
@ -49,7 +62,7 @@ def Iteratorizer(function):
print 'function returned:', it.retval print 'function returned:', it.retval
""" """
queue = Queue.Queue(maxsize = 1) queue = Queue.Queue(maxsize = 1)
thread = IteratorizerThread(queue, function) thread = IteratorizerThread(queue, function, curl_hack)
thread.daemon = True thread.daemon = True
thread.start() thread.start()

View File

@ -52,3 +52,10 @@ class TestIteratorizer(object):
it.next() it.next()
foo() foo()
eq_(it.retval, None) eq_(it.retval, None)
# Do the same thing when the curl hack is applied
def foo():
with nilmdb.utils.Iteratorizer(f, curl_hack = True) as it:
it.next()
foo()
eq_(it.retval, None)