We don't actually use nilmdb.utils.threadsafety.verify_proxy in the main NilmDB code, but it's useful for finding errors. It found an issue with __getattr__ in SerializerProxy which (1) can't be avoided? (2) is now commented in the code (3) shouldn't matter in real usetags/nilmdb-2.0.0
@@ -79,12 +79,25 @@ def serializer_proxy(obj_or_type): | |||
self._thread_safe = True | |||
def __getattr__(self, key): | |||
# If the attribute is a function, we want to return a | |||
# proxy that will perform the call through the serializer | |||
# when called. Otherwise, we want to return the value | |||
# directly. This means we need to grab the attribute once, | |||
# and therefore self.__object.__getattr__ may be called | |||
# in an unsafe way, from the caller's thread. | |||
attr = getattr(self.__object, key) | |||
if not callable(attr): | |||
# It's not callable, so perform the getattr from within | |||
# the serializer thread, then return its value. | |||
# That may differ from the "attr" value we just grabbed | |||
# from here, due to forced ordering in the serializer. | |||
getter = SerializerCallProxy(self.__call_queue, getattr, self) | |||
return getter(self.__object, key) | |||
r = SerializerCallProxy(self.__call_queue, attr, self) | |||
return r | |||
else: | |||
# It is callable, so return an object that will proxy through | |||
# the serializer when called. | |||
r = SerializerCallProxy(self.__call_queue, attr, self) | |||
return r | |||
# For an interable object, on __iter__(), save the object's | |||
# iterator and return this proxy. On next(), call the object's | |||
@@ -3,19 +3,18 @@ import threading | |||
import warnings | |||
import types | |||
def verify_proxy(obj_or_type, exception = False, check_thread = True, | |||
def verify_proxy(obj_or_type, check_thread = True, | |||
check_concurrent = True): | |||
"""Wrap the given object or type in a VerifyObjectProxy. | |||
Returns a VerifyObjectProxy that proxies all method calls to the | |||
given object, as well as attribute retrievals. | |||
When calling methods, the following checks are performed. If | |||
exception is True, an exception is raised. Otherwise, a warning | |||
is printed. | |||
When calling methods, the following checks are performed. On | |||
failure, an exception is raised. | |||
check_thread = True # Warn/fail if two different threads call methods. | |||
check_concurrent = True # Warn/fail if two functions are concurrently | |||
check_thread = True # Fail if two different threads call methods. | |||
check_concurrent = True # Fail if two functions are concurrently | |||
# run through this proxy | |||
""" | |||
class Namespace(object): | |||
@@ -42,10 +41,7 @@ def verify_proxy(obj_or_type, exception = False, check_thread = True, | |||
" but %s called %s.%s", | |||
p.thread.name, p.classname, p.thread_callee, | |||
this.name, p.classname, callee) | |||
if exception: | |||
raise AssertionError(err) | |||
else: # pragma: no cover | |||
warnings.warn(err) | |||
raise AssertionError(err) | |||
need_concur_unlock = False | |||
if check_concurrent: | |||
@@ -54,10 +50,7 @@ def verify_proxy(obj_or_type, exception = False, check_thread = True, | |||
"while %s is still in %s.%s", | |||
this.name, p.classname, callee, | |||
p.concur_tname, p.classname, p.concur_callee) | |||
if exception: | |||
raise AssertionError(err) | |||
else: # pragma: no cover | |||
warnings.warn(err) | |||
raise AssertionError(err) | |||
else: | |||
p.concur_tname = this.name | |||
p.concur_callee = callee | |||
@@ -80,17 +73,12 @@ def verify_proxy(obj_or_type, exception = False, check_thread = True, | |||
p.concur_tname = None | |||
p.concur_callee = None | |||
self.__obj = obj_or_type | |||
try: | |||
if type(obj_or_type) in (type, type): | |||
p.classname = self.__obj.__name__ | |||
else: | |||
p.classname = self.__obj.__class__.__name__ | |||
except AttributeError: # pragma: no cover | |||
p.classname = "???" | |||
if type(obj_or_type) in (type, type): | |||
p.classname = self.__obj.__name__ | |||
else: | |||
p.classname = self.__obj.__class__.__name__ | |||
def __getattr__(self, key): | |||
if key.startswith("_VerifyObjectProxy__"): # pragma: no cover | |||
raise AttributeError | |||
attr = getattr(self.__obj, key) | |||
if not callable(attr): | |||
return VerifyCallProxy(getattr, self.__ns)(self.__obj, key) | |||
@@ -29,6 +29,9 @@ class Foo(object): | |||
def t(self): | |||
pass | |||
def reent(self, func): | |||
func() | |||
def tester(self, debug = False): | |||
# purposely not thread-safe | |||
self.test_thread = threading.current_thread().name | |||
@@ -76,14 +76,14 @@ class TestThreadSafety(object): | |||
def test(self): | |||
proxy = nilmdb.utils.threadsafety.verify_proxy | |||
self.tryit(Test(), True, True) | |||
self.tryit(proxy(Test(), True, True, True), False, False) | |||
self.tryit(proxy(Test(), True, True, False), False, True) | |||
self.tryit(proxy(Test(), True, False, True), True, False) | |||
self.tryit(proxy(Test(), True, False, False), True, True) | |||
self.tryit(proxy(Test, True, True, True)(), False, False) | |||
self.tryit(proxy(Test, True, True, False)(), False, True) | |||
self.tryit(proxy(Test, True, False, True)(), True, False) | |||
self.tryit(proxy(Test, True, False, False)(), True, True) | |||
self.tryit(proxy(Test(), True, True), False, False) | |||
self.tryit(proxy(Test(), True, False), False, True) | |||
self.tryit(proxy(Test(), False, True), True, False) | |||
self.tryit(proxy(Test(), False, False), True, True) | |||
self.tryit(proxy(Test, True, True)(), False, False) | |||
self.tryit(proxy(Test, True, False)(), False, True) | |||
self.tryit(proxy(Test, False, True)(), True, False) | |||
self.tryit(proxy(Test, False, False)(), True, True) | |||
proxy(proxy(proxy(Test))()).foo() | |||