Browse Source

Add back a proxy version of the Serializer, which is much simpler.

tags/nilmdb-1.2
Jim Paris 8 years ago
parent
commit
9b06e46bf1
3 changed files with 79 additions and 21 deletions
  1. +1
    -1
      nilmdb/utils/__init__.py
  2. +68
    -19
      nilmdb/utils/serializer.py
  3. +10
    -1
      tests/test_serializer.py

+ 1
- 1
nilmdb/utils/__init__.py View File

@@ -2,7 +2,7 @@

from nilmdb.utils.timer import Timer
from nilmdb.utils.iteratorizer import Iteratorizer
from nilmdb.utils.serializer import serialized
from nilmdb.utils.serializer import serialized, serializer_proxy
from nilmdb.utils.lrucache import lru_cache
from nilmdb.utils.diskusage import du, human_size
from nilmdb.utils.mustclose import must_close


+ 68
- 19
nilmdb/utils/serializer.py View File

@@ -39,6 +39,17 @@ class SerializerThread(threading.Thread):
result_queue.put((exception, result))
del exception, result

def _call_in_thread(__func, __queue, *args, **kwargs):
"""Make a call by putting it in the serialization thread's
queue and waiting for a response"""
result_queue = Queue.Queue()
__queue.put((result_queue, __func, args, kwargs))
( exc_info, result ) = result_queue.get()
if exc_info is None:
return result
else:
raise exc_info[0], exc_info[1], exc_info[2]

def serialized():
"""Class decorator that ensures that a class's methods
are called in a serialized fashion from a single thread.
@@ -57,39 +68,30 @@ def serialized():
setattr(cls, wrapper.__name__, decorator.decorator(wrapper, orig))

def serializer(orig, *args, **kwargs):
"""Make a call by putting it in the serialization thread's
queue and waiting for a response"""
self = args[0]
if self.__serialized_thread == threading.current_thread():
if self.__s_thread == threading.current_thread():
# Already in the serialization thread, so call directly.
return orig(*args, **kwargs)

result_queue = Queue.Queue()
self.__serialized_call_queue.put((result_queue, orig, args, kwargs))
( exc_info, result ) = result_queue.get()
if exc_info is None:
return result
else:
raise exc_info[0], exc_info[1], exc_info[2]
return _call_in_thread(orig, self.__s_call_queue, *args, **kwargs)

@wrap_class_method
def __init__(orig, self, *args, **kwargs):
# Create serializer thread, and have it call the original __init__
if getattr(self, "__serialized_thread", None):
if getattr(self, "__s_thread", None):
raise Exception("can't apply serialize decorator twice")
self.__serialized_call_queue = Queue.Queue()
self.__serialized_thread = SerializerThread(
self.__serialized_call_queue)
self.__serialized_thread.daemon = True
self.__serialized_thread.start()
self.__s_call_queue = Queue.Queue()
self.__s_thread = SerializerThread(
self.__s_call_queue)
self.__s_thread.daemon = True
self.__s_thread.start()
return serializer(orig, self, *args, **kwargs)

@wrap_class_method
def __del__(orig, self):
try:
# Tell the serializer thread it's time to exit.
self.__serialized_call_queue.put((None, None, None, None))
self.__serialized_thread.join()
self.__s_call_queue.put((None, None, None, None))
self.__s_thread.join()
except AttributeError: # pragma: no cover
pass
# Note that __del__ isn't called from inside the serializer thread.
@@ -105,5 +107,52 @@ def serialized():
continue
# Set up wrapper
setattr(cls, name, decorator.decorator(serializer, method.im_func))

setattr(cls, "_thread_safe", True)
return cls
return class_decorator

def serializer_proxy(__cls, *args, **kwargs):
"""Instantiates the given class with the given arguments,
and returns a SerializerObjectProxy object that proxies all method
calls to the new object, as well as attribute retrievals.

The proxied requests, including instantiation, are performed in a
single thread and serialized between caller threads.

This is like the @serialized() class decorator, except it returns
a new type of class, rather than changing the existing one. It
also handles attributes."""

class SerializerCallProxy(object):
def __init__(self, call_queue, func):
self.call_queue = call_queue
self.result_queue = Queue.Queue()
self.func = func
def __call__(self, *args, **kwargs):
return _call_in_thread(self.func, self.call_queue,
*args, **kwargs)

class SerializerObjectProxy(object):
def __init__(self, __cls, *args, **kwargs):
self.__s_call_queue = Queue.Queue()
self.__s_thread = SerializerThread(
self.__s_call_queue)
self.__s_thread.daemon = True
self.__s_thread.start()
self.__s_object = _call_in_thread(__cls, self.__s_call_queue,
*args, **kwargs)
self._thread_safe = True

def __getattr__(self, key):
attr = getattr(self.__s_object, key)
if not callable(attr):
getter = SerializerCallProxy(self.__s_call_queue, getattr)
return getter(self.__s_object, key)
return SerializerCallProxy(self.__s_call_queue, attr)

def __del__(self):
self.__s_call_queue.put((None, None, None, None))
self.__s_thread.join()

return SerializerObjectProxy(__cls, *args, **kwargs)

+ 10
- 1
tests/test_serializer.py View File

@@ -12,7 +12,7 @@ from testutil.helpers import *
class Foo(object):
val = 0

def __init__(self):
def __init__(self, asdf = "asdf"):
self.init_thread = threading.current_thread().name

@classmethod
@@ -123,3 +123,12 @@ class TestSerialized2(object):
f.close()
a = Foo2()
del a

class TestSerializer(Base):
# Now test the SerializerProxy version
def setUp(self):
self.foo = nilmdb.utils.serializer_proxy(Foo2, "qwer")

def verify_result(self):
eq_(self.foo.val, 20)
eq_(self.foo.init_thread, self.foo.test_thread)

Loading…
Cancel
Save