If you need asynchronous support in python unit tests, there are a few possibilities. A simple google search points you to a few packages, one being tornado. Tornado’s tornado.testing.AsyncTestCase seems straight-forward. However it does have a shortcoming: it’s not thread-safe. Trying to use it with a thread will result in a sporadic exception. Consider this thread-safe, functionally similar alternative.
from threading import Event
from unittest import TestCase
class TimeoutError(Exception):
pass
class AsyncTestCase(TestCase):
""" Provides functionality similar to tornado.testing.AsyncTestCase except is thread-safe.
"""
def __init__(self):
self._done = Event()
def setUp(self):
super(AsyncTestCase, self).setUp()
self._done.clear()
def tearDown(self):
super(AsyncTestCase, self).tearDown()
self._done.clear()
def wait(self, timeout=None):
""" Wait for event to be set. Raise an exception if the event isn't set by the timeout.
"""
if timeout is None:
timeout = get_async_test_timeout()
self._done.wait(timeout)
if not self.is_done():
raise TimeoutError()
def stop(self):
""" Set the event to terminate wait.
"""
self._done.set()
def is_done(self):
return self._done.isSet()
# taken from tornado.
def get_async_test_timeout(default=5):
"""Get the global timeout setting for async tests.
Returns a float, the timeout in seconds.
"""
try:
return float(os.environ.get('ASYNC_TEST_TIMEOUT'))
except (ValueError, TypeError):
return default