These functions can now take an object or a type (class). If given an object, they will wrap subsequent calls to that object. If given a type, they will return an object that can be instantiated to create a new object, and all calls including __init__ will be covered by the serialization or thread verification.
110 lines
4.2 KiB
Python
110 lines
4.2 KiB
Python
from nilmdb.utils.printf import *
|
|
import threading
|
|
import warnings
|
|
import types
|
|
|
|
def verify_proxy(obj_or_type, exception = False, check_thread = True,
|
|
check_concurrent = True):
|
|
"""Wrap the given object or type in a VerifyObjectProxy.
|
|
|
|
Returns a VerifyObjectProxy that proxies all method calls to the
|
|
given object, as well as attribute retrievals.
|
|
|
|
When calling methods, the following checks are performed. If
|
|
exception is True, an exception is raised. Otherwise, a warning
|
|
is printed.
|
|
|
|
check_thread = True # Warn/fail if two different threads call methods.
|
|
check_concurrent = True # Warn/fail if two functions are concurrently
|
|
# run through this proxy
|
|
"""
|
|
class Namespace(object):
|
|
pass
|
|
class VerifyCallProxy(object):
|
|
def __init__(self, func, parent_namespace):
|
|
self.func = func
|
|
self.parent_namespace = parent_namespace
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
p = self.parent_namespace
|
|
this = threading.current_thread()
|
|
try:
|
|
callee = self.func.__name__
|
|
except AttributeError:
|
|
callee = "???"
|
|
|
|
if p.thread is None:
|
|
p.thread = this
|
|
p.thread_callee = callee
|
|
|
|
if check_thread and p.thread != this:
|
|
err = sprintf("unsafe threading: %s called %s.%s,"
|
|
" but %s called %s.%s",
|
|
p.thread.name, p.classname, p.thread_callee,
|
|
this.name, p.classname, callee)
|
|
if exception:
|
|
raise AssertionError(err)
|
|
else: # pragma: no cover
|
|
warnings.warn(err)
|
|
|
|
need_concur_unlock = False
|
|
if check_concurrent:
|
|
if p.concur_lock.acquire(False) == False:
|
|
err = sprintf("unsafe concurrency: %s called %s.%s "
|
|
"while %s is still in %s.%s",
|
|
this.name, p.classname, callee,
|
|
p.concur_tname, p.classname, p.concur_callee)
|
|
if exception:
|
|
raise AssertionError(err)
|
|
else: # pragma: no cover
|
|
warnings.warn(err)
|
|
else:
|
|
p.concur_tname = this.name
|
|
p.concur_callee = callee
|
|
need_concur_unlock = True
|
|
|
|
try:
|
|
ret = self.func(*args, **kwargs)
|
|
finally:
|
|
if need_concur_unlock:
|
|
p.concur_lock.release()
|
|
return ret
|
|
|
|
class VerifyObjectProxy(object):
|
|
def __init__(self, obj_or_type, *args, **kwargs):
|
|
p = Namespace()
|
|
self.__ns = p
|
|
p.thread = None
|
|
p.thread_callee = None
|
|
p.concur_lock = threading.Lock()
|
|
p.concur_tname = None
|
|
p.concur_callee = None
|
|
self.__obj = obj_or_type
|
|
try:
|
|
if type(obj_or_type) in (types.TypeType, types.ClassType):
|
|
p.classname = self.__obj.__name__
|
|
else:
|
|
p.classname = self.__obj.__class__.__name__
|
|
except AttributeError: # pragma: no cover
|
|
p.classname = "???"
|
|
|
|
def __getattr__(self, key):
|
|
if key.startswith("_VerifyObjectProxy__"): # pragma: no cover
|
|
raise AttributeError
|
|
attr = getattr(self.__obj, key)
|
|
if not callable(attr):
|
|
return VerifyCallProxy(getattr, self.__ns)(self.__obj, key)
|
|
return VerifyCallProxy(attr, self.__ns)
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
"""Call this to instantiate the type, if a type was passed
|
|
to verify_proxy. Otherwise, pass the call through."""
|
|
ret = VerifyCallProxy(self.__obj, self.__ns)(*args, **kwargs)
|
|
if type(self.__obj) in (types.TypeType, types.ClassType):
|
|
# Instantiation
|
|
self.__obj = ret
|
|
return self
|
|
return ret
|
|
|
|
return VerifyObjectProxy(obj_or_type)
|