You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

125 lines
4.8 KiB

  1. import queue
  2. import threading
  3. import sys
  4. import decorator
  5. import inspect
  6. import types
  7. import functools
  8. # This file provides a class that will wrap an object and serialize
  9. # all calls to its methods. All calls to that object will be queued
  10. # and executed from a single thread, regardless of which thread makes
  11. # the call.
  12. # Based partially on http://stackoverflow.com/questions/2642515/
  13. class SerializerThread(threading.Thread):
  14. """Thread that retrieves call information from the queue, makes the
  15. call, and returns the results."""
  16. def __init__(self, classname, call_queue):
  17. threading.Thread.__init__(self)
  18. self.name = "Serializer-" + classname + "-" + self.name
  19. self.call_queue = call_queue
  20. def run(self):
  21. while True:
  22. result_queue, func, args, kwargs = self.call_queue.get()
  23. # Terminate if result_queue is None
  24. if result_queue is None:
  25. return
  26. exception = None
  27. result = None
  28. try:
  29. result = func(*args, **kwargs) # wrapped
  30. except:
  31. exception = sys.exc_info()
  32. # Ensure we delete these before returning a result, so
  33. # we don't unncessarily hold onto a reference while
  34. # we're waiting for the next call.
  35. del func, args, kwargs
  36. result_queue.put((exception, result))
  37. del exception, result
  38. def serializer_proxy(obj_or_type):
  39. """Wrap the given object or type in a SerializerObjectProxy.
  40. Returns a SerializerObjectProxy object that proxies all method
  41. calls to the object, as well as attribute retrievals.
  42. The proxied requests, including instantiation, are performed in a
  43. single thread and serialized between caller threads.
  44. """
  45. class SerializerCallProxy(object):
  46. def __init__(self, call_queue, func, objectproxy):
  47. self.call_queue = call_queue
  48. self.func = func
  49. # Need to hold a reference to object proxy so it doesn't
  50. # go away (and kill the thread) until after get called.
  51. self.objectproxy = objectproxy
  52. def __call__(self, *args, **kwargs):
  53. result_queue = queue.Queue()
  54. self.call_queue.put((result_queue, self.func, args, kwargs))
  55. ( exc_info, result ) = result_queue.get()
  56. if exc_info is None:
  57. return result
  58. else:
  59. raise exc_info[1].with_traceback(exc_info[2])
  60. class SerializerObjectProxy(object):
  61. def __init__(self, obj_or_type, *args, **kwargs):
  62. self.__object = obj_or_type
  63. try:
  64. if type(obj_or_type) == type:
  65. classname = obj_or_type.__name__
  66. else:
  67. classname = obj_or_type.__class__.__name__
  68. except AttributeError: # pragma: no cover
  69. classname = "???"
  70. self.__call_queue = queue.Queue()
  71. self.__thread = SerializerThread(classname, self.__call_queue)
  72. self.__thread.daemon = True
  73. self.__thread.start()
  74. self._thread_safe = True
  75. def __getattr__(self, key):
  76. attr = getattr(self.__object, key)
  77. if not callable(attr):
  78. getter = SerializerCallProxy(self.__call_queue, getattr, self)
  79. return getter(self.__object, key)
  80. r = SerializerCallProxy(self.__call_queue, attr, self)
  81. return r
  82. # For an interable object, on __iter__(), save the object's
  83. # iterator and return this proxy. On next(), call the object's
  84. # iterator through this proxy.
  85. def __iter__(self):
  86. attr = getattr(self.__object, "__iter__")
  87. self.__iter = SerializerCallProxy(self.__call_queue, attr, self)()
  88. return self
  89. def __next__(self):
  90. return SerializerCallProxy(self.__call_queue,
  91. self.__iter.__next__, self)()
  92. def __getitem__(self, key):
  93. return self.__getattr__("__getitem__")(key)
  94. def __call__(self, *args, **kwargs):
  95. """Call this to instantiate the type, if a type was passed
  96. to serializer_proxy. Otherwise, pass the call through."""
  97. ret = SerializerCallProxy(self.__call_queue,
  98. self.__object, self)(*args, **kwargs)
  99. if type(self.__object) == type:
  100. # Instantiation
  101. self.__object = ret
  102. return self
  103. return ret
  104. def __del__(self):
  105. try:
  106. # Signal thread to exit, but don't wait for it.
  107. self.__call_queue.put((None, None, None, None))
  108. except: # pragma: no cover
  109. pass
  110. return SerializerObjectProxy(obj_or_type)