diff options
| -rw-r--r-- | debug_toolbar/utils/tracking/__init__.py | 77 | ||||
| -rw-r--r-- | tests/tests.py | 139 |
2 files changed, 1 insertions, 215 deletions
diff --git a/debug_toolbar/utils/tracking/__init__.py b/debug_toolbar/utils/tracking/__init__.py index 166b1b5..420de1e 100644 --- a/debug_toolbar/utils/tracking/__init__.py +++ b/debug_toolbar/utils/tracking/__init__.py @@ -6,20 +6,6 @@ import types from django.utils.importlib import import_module -def post_dispatch(func): - def wrapped(callback): - register_hook(func, 'after', callback) - return callback - return wrapped - - -def pre_dispatch(func): - def wrapped(callback): - register_hook(func, 'before', callback) - return callback - return wrapped - - def replace_method(klass, method_name): original = getattr(klass, method_name) @@ -36,66 +22,3 @@ def replace_method(klass, method_name): return wrapped return inner - - -def fire_hook(hook, sender, **kwargs): - try: - for callback in callbacks[hook].get(id(sender), []): - callback(sender=sender, **kwargs) - except Exception as e: - # Log the exception, dont mess w/ the underlying function - logging.exception(e) - - -def _replace_function(func, wrapped): - if isinstance(func, types.FunctionType): - if func.__module__ == '__builtin__': - # oh shit - __builtins__[func] = wrapped - else: - module = import_module(func.__module__) - setattr(module, func.__name__, wrapped) - elif getattr(func, '__self__', None): - setattr(func.__self__, func.__name__, classmethod(wrapped)) - elif hasattr(func, 'im_class'): - # for unbound methods - Python 2 only - setattr(func.im_class, func.__name__, wrapped) - else: - raise NotImplementedError - -callbacks = { - 'before': {}, - 'after': {}, -} - - -def register_hook(func, hook, callback): - """ - def myhook(sender, args, kwargs): - print func, "executed - print "args:", args - print "kwargs:", kwargs - register_hook(BaseDatabaseWrapper.cursor, 'before', myhook) - """ - - assert hook in ('before', 'after') - - def wrapped(*args, **kwargs): - start = time.time() - fire_hook('before', sender=wrapped.__wrapped__, args=args, kwargs=kwargs, - start=start) - result = wrapped.__wrapped__(*args, **kwargs) - stop = time.time() - fire_hook('after', sender=wrapped.__wrapped__, args=args, kwargs=kwargs, - result=result, start=start, stop=stop) - actual = getattr(func, '__wrapped__', func) - wrapped.__wrapped__ = actual - wrapped.__doc__ = getattr(actual, '__doc__', None) - wrapped.__name__ = actual.__name__ - - id_ = id(actual) - if id_ not in callbacks[hook]: - callbacks[hook][id_] = [] - callbacks[hook][id_].append(callback) - - _replace_function(func, wrapped) diff --git a/tests/tests.py b/tests/tests.py index 32f8d6b..2af03f3 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -18,7 +18,7 @@ from debug_toolbar.panels.request_vars import RequestVarsDebugPanel from debug_toolbar.panels.template import TemplateDebugPanel from debug_toolbar.toolbar.loader import DebugToolbar from debug_toolbar.utils import get_name_from_obj -from debug_toolbar.utils.tracking import pre_dispatch, post_dispatch, callbacks + rf = RequestFactory() @@ -270,140 +270,3 @@ class TemplatePanelTestCase(BaseTestCase): ctx = template_panel.templates[0]['context'][base_ctx_idx] self.assertIn('<<queryset of auth.User>>', ctx) self.assertIn('<<triggers database query>>', ctx) - - -def module_func(*args, **kwargs): - """Used by dispatch tests""" - return 'blah' - - -class TrackingTestCase(BaseTestCase): - @classmethod - def class_method(cls, *args, **kwargs): - return 'blah' - - def class_func(self, *args, **kwargs): - """Used by dispatch tests""" - return 'blah' - - def test_pre_hook(self): - foo = {} - - @pre_dispatch(module_func) - def test(**kwargs): - foo.update(kwargs) - - self.assertTrue(hasattr(module_func, '__wrapped__')) - self.assertEqual(len(callbacks['before']), 1) - - module_func('hi', foo='bar') - - self.assertTrue('sender' in foo, foo) - # best we can do - self.assertEqual(foo['sender'].__name__, 'module_func') - self.assertTrue('start' in foo, foo) - self.assertTrue(foo['start'] > 0) - self.assertTrue('stop' not in foo, foo) - self.assertTrue('args' in foo, foo) - self.assertTrue(len(foo['args']), 1) - self.assertEqual(foo['args'][0], 'hi') - self.assertTrue('kwargs' in foo, foo) - self.assertTrue(len(foo['kwargs']), 1) - self.assertTrue('foo' in foo['kwargs']) - self.assertEqual(foo['kwargs']['foo'], 'bar') - - callbacks['before'] = {} - - @pre_dispatch(TrackingTestCase.class_func) - def test(**kwargs): - foo.update(kwargs) - - self.assertTrue(hasattr(TrackingTestCase.class_func, '__wrapped__')) - self.assertEqual(len(callbacks['before']), 1) - - self.class_func('hello', foo='bar') - - self.assertTrue('sender' in foo, foo) - # best we can do - self.assertEqual(foo['sender'].__name__, 'class_func') - self.assertTrue('start' in foo, foo) - self.assertTrue(foo['start'] > 0) - self.assertTrue('stop' not in foo, foo) - self.assertTrue('args' in foo, foo) - self.assertTrue(len(foo['args']), 2) - self.assertEqual(foo['args'][1], 'hello') - self.assertTrue('kwargs' in foo, foo) - self.assertTrue(len(foo['kwargs']), 1) - self.assertTrue('foo' in foo['kwargs']) - self.assertEqual(foo['kwargs']['foo'], 'bar') - - callbacks['before'] = {} - - @pre_dispatch(TrackingTestCase.class_method) - def test(**kwargs): - foo.update(kwargs) - - self.assertTrue(hasattr(TrackingTestCase.class_method, '__wrapped__')) - self.assertEqual(len(callbacks['before']), 1) - - TrackingTestCase.class_method() - - self.assertTrue('sender' in foo, foo) - # best we can do - self.assertEqual(foo['sender'].__name__, 'class_method') - self.assertTrue('start' in foo, foo) - self.assertTrue('stop' not in foo, foo) - self.assertTrue('args' in foo, foo) - - def test_post_hook(self): - foo = {} - - @post_dispatch(module_func) - def test(**kwargs): - foo.update(kwargs) - - self.assertTrue(hasattr(module_func, '__wrapped__')) - self.assertEqual(len(callbacks['after']), 1) - - module_func('hi', foo='bar') - - self.assertTrue('sender' in foo, foo) - # best we can do - self.assertEqual(foo['sender'].__name__, 'module_func') - self.assertTrue('start' in foo, foo) - self.assertTrue(foo['start'] > 0) - self.assertTrue('stop' in foo, foo) - self.assertTrue(foo['stop'] > foo['start']) - self.assertTrue('args' in foo, foo) - self.assertTrue(len(foo['args']), 1) - self.assertEqual(foo['args'][0], 'hi') - self.assertTrue('kwargs' in foo, foo) - self.assertTrue(len(foo['kwargs']), 1) - self.assertTrue('foo' in foo['kwargs']) - self.assertEqual(foo['kwargs']['foo'], 'bar') - - callbacks['after'] = {} - - @post_dispatch(TrackingTestCase.class_func) - def test(**kwargs): - foo.update(kwargs) - - self.assertTrue(hasattr(TrackingTestCase.class_func, '__wrapped__')) - self.assertEqual(len(callbacks['after']), 1) - - self.class_func('hello', foo='bar') - - self.assertTrue('sender' in foo, foo) - # best we can do - self.assertEqual(foo['sender'].__name__, 'class_func') - self.assertTrue('start' in foo, foo) - self.assertTrue(foo['start'] > 0) - self.assertTrue('stop' in foo, foo) - self.assertTrue(foo['stop'] > foo['start']) - self.assertTrue('args' in foo, foo) - self.assertTrue(len(foo['args']), 2) - self.assertEqual(foo['args'][1], 'hello') - self.assertTrue('kwargs' in foo, foo) - self.assertTrue(len(foo['kwargs']), 1) - self.assertTrue('foo' in foo['kwargs']) - self.assertEqual(foo['kwargs']['foo'], 'bar') |
