Remove class decorator version of the serializer; it's too fragile
This commit is contained in:
parent
9b06e46bf1
commit
7761a91242
|
@ -2,7 +2,7 @@
|
|||
|
||||
from nilmdb.utils.timer import Timer
|
||||
from nilmdb.utils.iteratorizer import Iteratorizer
|
||||
from nilmdb.utils.serializer import serialized, serializer_proxy
|
||||
from nilmdb.utils.serializer import serializer_proxy
|
||||
from nilmdb.utils.lrucache import lru_cache
|
||||
from nilmdb.utils.diskusage import du, human_size
|
||||
from nilmdb.utils.mustclose import must_close
|
||||
|
|
|
@ -50,68 +50,6 @@ def _call_in_thread(__func, __queue, *args, **kwargs):
|
|||
else:
|
||||
raise exc_info[0], exc_info[1], exc_info[2]
|
||||
|
||||
def serialized():
|
||||
"""Class decorator that ensures that a class's methods
|
||||
are called in a serialized fashion from a single thread.
|
||||
|
||||
Both the initial instantiation and all method calls are performed
|
||||
in the new thread. Non-function attributes are not handled. Note
|
||||
also that generator functions will get created in the new thread,
|
||||
but the actual generator itself would run in the caller's thread.
|
||||
"""
|
||||
def class_decorator(cls):
|
||||
def wrap_class_method(wrapper):
|
||||
try:
|
||||
orig = getattr(cls, wrapper.__name__).im_func
|
||||
except:
|
||||
orig = lambda x: None
|
||||
setattr(cls, wrapper.__name__, decorator.decorator(wrapper, orig))
|
||||
|
||||
def serializer(orig, *args, **kwargs):
|
||||
self = args[0]
|
||||
if self.__s_thread == threading.current_thread():
|
||||
# Already in the serialization thread, so call directly.
|
||||
return orig(*args, **kwargs)
|
||||
return _call_in_thread(orig, self.__s_call_queue, *args, **kwargs)
|
||||
|
||||
@wrap_class_method
|
||||
def __init__(orig, self, *args, **kwargs):
|
||||
# Create serializer thread, and have it call the original __init__
|
||||
if getattr(self, "__s_thread", None):
|
||||
raise Exception("can't apply serialize decorator twice")
|
||||
self.__s_call_queue = Queue.Queue()
|
||||
self.__s_thread = SerializerThread(
|
||||
self.__s_call_queue)
|
||||
self.__s_thread.daemon = True
|
||||
self.__s_thread.start()
|
||||
return serializer(orig, self, *args, **kwargs)
|
||||
|
||||
@wrap_class_method
|
||||
def __del__(orig, self):
|
||||
try:
|
||||
# Tell the serializer thread it's time to exit.
|
||||
self.__s_call_queue.put((None, None, None, None))
|
||||
self.__s_thread.join()
|
||||
except AttributeError: # pragma: no cover
|
||||
pass
|
||||
# Note that __del__ isn't called from inside the serializer thread.
|
||||
return orig(self)
|
||||
|
||||
# Wrap all other functions with the serializer
|
||||
for (name, method) in inspect.getmembers(cls, inspect.ismethod):
|
||||
# Skip class methods
|
||||
if method.__self__ is not None:
|
||||
continue
|
||||
# Skip functions we already wrapped
|
||||
if name in [ "__del__", "__init__" ]:
|
||||
continue
|
||||
# Set up wrapper
|
||||
setattr(cls, name, decorator.decorator(serializer, method.im_func))
|
||||
|
||||
setattr(cls, "_thread_safe", True)
|
||||
return cls
|
||||
return class_decorator
|
||||
|
||||
def serializer_proxy(__cls, *args, **kwargs):
|
||||
"""Instantiates the given class with the given arguments,
|
||||
and returns a SerializerObjectProxy object that proxies all method
|
||||
|
@ -119,11 +57,7 @@ def serializer_proxy(__cls, *args, **kwargs):
|
|||
|
||||
The proxied requests, including instantiation, are performed in a
|
||||
single thread and serialized between caller threads.
|
||||
|
||||
This is like the @serialized() class decorator, except it returns
|
||||
a new type of class, rather than changing the existing one. It
|
||||
also handles attributes."""
|
||||
|
||||
"""
|
||||
class SerializerCallProxy(object):
|
||||
def __init__(self, call_queue, func):
|
||||
self.call_queue = call_queue
|
||||
|
|
|
@ -36,33 +36,6 @@ class Foo(object):
|
|||
printf("[%s] value changed: %d -> %d\n",
|
||||
threading.current_thread().name, oldval, newval)
|
||||
|
||||
@nilmdb.utils.serialized()
|
||||
class Foo2(Foo):
|
||||
def __del__(self):
|
||||
pass
|
||||
pass
|
||||
|
||||
@nilmdb.utils.serialized()
|
||||
class Foo3(Foo):
|
||||
pass
|
||||
|
||||
# applying decorators more than once won't work
|
||||
@nilmdb.utils.serialized()
|
||||
@nilmdb.utils.serialized()
|
||||
class Foo3(Foo):
|
||||
pass
|
||||
|
||||
# but using serialized and others is fine
|
||||
@nilmdb.utils.serialized()
|
||||
@nilmdb.utils.must_close(wrap_verify = True)
|
||||
class Foo4(Foo):
|
||||
pass
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = True)
|
||||
@nilmdb.utils.serialized()
|
||||
class Foo5(Foo):
|
||||
pass
|
||||
|
||||
class Base(object):
|
||||
|
||||
def test_1_wrapping(self):
|
||||
|
@ -93,41 +66,10 @@ class TestUnserialized(Base):
|
|||
# Init and tests ran in different threads
|
||||
ne_(self.foo.init_thread, self.foo.test_thread)
|
||||
|
||||
class TestSerialized(Base):
|
||||
def setUp(self):
|
||||
self.foo = Foo2()
|
||||
|
||||
def tearDown(self):
|
||||
del self.foo
|
||||
|
||||
def verify_result(self):
|
||||
# This should have worked
|
||||
eq_(self.foo.val, 20)
|
||||
|
||||
# Init and tests ran in the same thread
|
||||
eq_(self.foo.init_thread, self.foo.test_thread)
|
||||
|
||||
class TestSerialized2(object):
|
||||
def test_misc(self):
|
||||
with assert_raises(Exception) as e:
|
||||
self.foo = Foo3()
|
||||
in_(str(e.exception), "can't apply serialize decorator twice")
|
||||
|
||||
a = Foo2()
|
||||
b = Foo2()
|
||||
c = Foo2()
|
||||
d = Foo2()
|
||||
e = Foo4()
|
||||
f = Foo5()
|
||||
e.close()
|
||||
f.close()
|
||||
a = Foo2()
|
||||
del a
|
||||
|
||||
class TestSerializer(Base):
|
||||
# Now test the SerializerProxy version
|
||||
def setUp(self):
|
||||
self.foo = nilmdb.utils.serializer_proxy(Foo2, "qwer")
|
||||
self.foo = nilmdb.utils.serializer_proxy(Foo, "qwer")
|
||||
|
||||
def verify_result(self):
|
||||
eq_(self.foo.val, 20)
|
||||
|
|
Loading…
Reference in New Issue
Block a user