diff options
author | Zac Medico <zmedico@gentoo.org> | 2021-01-03 23:38:55 -0800 |
---|---|---|
committer | Zac Medico <zmedico@gentoo.org> | 2021-01-04 00:11:51 -0800 |
commit | e93549105d5f009e47710db93abea9a7aeb34324 (patch) | |
tree | 7c64835edc66878962884d09bc0688ab4b71335c /lib/portage/tests | |
parent | ci: enable repoman tests (diff) | |
download | portage-e93549105d5f009e47710db93abea9a7aeb34324.tar.gz portage-e93549105d5f009e47710db93abea9a7aeb34324.tar.bz2 portage-e93549105d5f009e47710db93abea9a7aeb34324.zip |
test_retry: use context manager to cancel pending futures
Cancel pending futures in order to avoid this "Task was destroyed but
it is pending!" warning message following migration to PEP 492
coroutines with async and await syntax:
testHangForever (portage.tests.util.futures.test_retry.RetryForkExecutorTestCase) ... ok
testHangForever (portage.tests.util.futures.test_retry.RetryTestCase) ... ok
testHangForever (portage.tests.util.futures.test_retry.RetryThreadExecutorTestCase) ... ok
----------------------------------------------------------------------
Ran 3 tests in 0.839s
OK
Task was destroyed but it is pending!
task: <Task cancelling name='Task-4' coro=<HangForever.__call__() running at portage/tests/util/futures/test_retry.py:58> wait_for=<Future cancelled> cb=[RetryForkExecutorTestCase._wrap_coroutine_func.<locals>.wrapper.<locals>.done_callback() at portage/tests/util/futures/test_retry.py:192]>
Signed-off-by: Zac Medico <zmedico@gentoo.org>
Diffstat (limited to 'lib/portage/tests')
-rw-r--r-- | lib/portage/tests/util/futures/test_retry.py | 181 |
1 files changed, 106 insertions, 75 deletions
diff --git a/lib/portage/tests/util/futures/test_retry.py b/lib/portage/tests/util/futures/test_retry.py index ce5fb3e11..6648b1b2c 100644 --- a/lib/portage/tests/util/futures/test_retry.py +++ b/lib/portage/tests/util/futures/test_retry.py @@ -1,15 +1,18 @@ # Copyright 2018-2020 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import Future, ThreadPoolExecutor +import contextlib try: import threading except ImportError: import dummy_threading as threading +import weakref import time +import portage from portage.tests import TestCase from portage.util._eventloop.global_event_loop import global_event_loop from portage.util.backoff import RandomExponentialBackoff @@ -64,99 +67,100 @@ class HangForever: A callable object that sleeps forever. """ def __call__(self): - return global_event_loop().create_future() + return asyncio.Future() class RetryTestCase(TestCase): + @contextlib.contextmanager def _wrap_coroutine_func(self, coroutine_func): """ Derived classes may override this method in order to implement alternative forms of execution. """ - return coroutine_func + yield coroutine_func def testSucceedLater(self): loop = global_event_loop() - func_coroutine = self._wrap_coroutine_func(SucceedLater(1)) - decorator = retry(try_max=9999, - delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine, loop=loop) - result = loop.run_until_complete(decorated_func()) - self.assertEqual(result, 'success') + with self._wrap_coroutine_func(SucceedLater(1)) as func_coroutine: + decorator = retry(try_max=9999, + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) + decorated_func = decorator(func_coroutine, loop=loop) + result = loop.run_until_complete(decorated_func()) + self.assertEqual(result, 'success') def testSucceedNever(self): loop = global_event_loop() - func_coroutine = self._wrap_coroutine_func(SucceedNever()) - decorator = retry(try_max=4, try_timeout=None, - delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine, loop=loop) - done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) - self.assertEqual(len(done), 1) - self.assertTrue(isinstance(done.pop().exception().__cause__, SucceedNeverException)) + with self._wrap_coroutine_func(SucceedNever()) as func_coroutine: + decorator = retry(try_max=4, try_timeout=None, + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) + decorated_func = decorator(func_coroutine, loop=loop) + done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) + self.assertEqual(len(done), 1) + self.assertTrue(isinstance(done.pop().exception().__cause__, SucceedNeverException)) def testSucceedNeverReraise(self): loop = global_event_loop() - func_coroutine = self._wrap_coroutine_func(SucceedNever()) - decorator = retry(reraise=True, try_max=4, try_timeout=None, - delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine, loop=loop) - done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) - self.assertEqual(len(done), 1) - self.assertTrue(isinstance(done.pop().exception(), SucceedNeverException)) + with self._wrap_coroutine_func(SucceedNever()) as func_coroutine: + decorator = retry(reraise=True, try_max=4, try_timeout=None, + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) + decorated_func = decorator(func_coroutine, loop=loop) + done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) + self.assertEqual(len(done), 1) + self.assertTrue(isinstance(done.pop().exception(), SucceedNeverException)) def testHangForever(self): loop = global_event_loop() - func_coroutine = self._wrap_coroutine_func(HangForever()) - decorator = retry(try_max=2, try_timeout=0.1, - delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine, loop=loop) - done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) - self.assertEqual(len(done), 1) - self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError)) + with self._wrap_coroutine_func(HangForever()) as func_coroutine: + decorator = retry(try_max=2, try_timeout=0.1, + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) + decorated_func = decorator(func_coroutine, loop=loop) + done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) + self.assertEqual(len(done), 1) + self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError)) def testHangForeverReraise(self): loop = global_event_loop() - func_coroutine = self._wrap_coroutine_func(HangForever()) - decorator = retry(reraise=True, try_max=2, try_timeout=0.1, - delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine, loop=loop) - done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) - self.assertEqual(len(done), 1) - self.assertTrue(isinstance(done.pop().exception(), asyncio.TimeoutError)) + with self._wrap_coroutine_func(HangForever()) as func_coroutine: + decorator = retry(reraise=True, try_max=2, try_timeout=0.1, + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) + decorated_func = decorator(func_coroutine, loop=loop) + done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) + self.assertEqual(len(done), 1) + self.assertTrue(isinstance(done.pop().exception(), asyncio.TimeoutError)) def testCancelRetry(self): loop = global_event_loop() - func_coroutine = self._wrap_coroutine_func(SucceedNever()) - decorator = retry(try_timeout=0.1, - delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine, loop=loop) - future = decorated_func() - loop.call_later(0.3, future.cancel) - done, pending = loop.run_until_complete(asyncio.wait([future], loop=loop)) - self.assertEqual(len(done), 1) - self.assertTrue(done.pop().cancelled()) + with self._wrap_coroutine_func(SucceedNever()) as func_coroutine: + decorator = retry(try_timeout=0.1, + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) + decorated_func = decorator(func_coroutine, loop=loop) + future = decorated_func() + loop.call_later(0.3, future.cancel) + done, pending = loop.run_until_complete(asyncio.wait([future], loop=loop)) + self.assertEqual(len(done), 1) + self.assertTrue(done.pop().cancelled()) def testOverallTimeoutWithException(self): loop = global_event_loop() - func_coroutine = self._wrap_coroutine_func(SucceedNever()) - decorator = retry(try_timeout=0.1, overall_timeout=0.3, - delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine, loop=loop) - done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) - self.assertEqual(len(done), 1) - self.assertTrue(isinstance(done.pop().exception().__cause__, SucceedNeverException)) + with self._wrap_coroutine_func(SucceedNever()) as func_coroutine: + decorator = retry(try_timeout=0.1, overall_timeout=0.3, + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) + decorated_func = decorator(func_coroutine, loop=loop) + done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) + self.assertEqual(len(done), 1) + self.assertTrue(isinstance(done.pop().exception().__cause__, SucceedNeverException)) def testOverallTimeoutWithTimeoutError(self): loop = global_event_loop() # results in TimeoutError because it hangs forever - func_coroutine = self._wrap_coroutine_func(HangForever()) - decorator = retry(try_timeout=0.1, overall_timeout=0.3, - delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) - decorated_func = decorator(func_coroutine, loop=loop) - done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) - self.assertEqual(len(done), 1) - self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError)) + with self._wrap_coroutine_func(HangForever()) as func_coroutine: + decorator = retry(try_timeout=0.1, overall_timeout=0.3, + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) + decorated_func = decorator(func_coroutine, loop=loop) + done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) + self.assertEqual(len(done), 1) + self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError)) class RetryForkExecutorTestCase(RetryTestCase): @@ -184,43 +188,70 @@ class RetryForkExecutorTestCase(RetryTestCase): def tearDown(self): self._tearDownExecutor() + @contextlib.contextmanager def _wrap_coroutine_func(self, coroutine_func): parent_loop = global_event_loop() + parent_pid = portage.getpid() + pending = weakref.WeakValueDictionary() # Since ThreadPoolExecutor does not propagate cancellation of a # parent_future to the underlying coroutine, use kill_switch to # propagate task cancellation to wrapper, so that HangForever's # thread returns when retry eventually cancels parent_future. def wrapper(kill_switch): - loop = global_event_loop() - if loop is parent_loop: + if portage.getpid() == parent_pid: # thread in main process - result = coroutine_func() - event = threading.Event() - loop.call_soon_threadsafe(result.add_done_callback, - lambda result: event.set()) - loop.call_soon_threadsafe(kill_switch.add_done_callback, - lambda kill_switch: event.set()) - event.wait() - return result.result() + def done_callback(result): + result.cancelled() or result.exception() or result.result() + kill_switch.set() + def start_coroutine(future): + result = asyncio.ensure_future(coroutine_func(), loop=parent_loop) + pending[id(result)] = result + result.add_done_callback(done_callback) + future.set_result(result) + future = Future() + parent_loop.call_soon_threadsafe(start_coroutine, future) + kill_switch.wait() + if not future.done(): + future.cancel() + raise asyncio.CancelledError + elif not future.result().done(): + future.result().cancel() + raise asyncio.CancelledError + else: + return future.result().result() # child process + loop = global_event_loop() try: return loop.run_until_complete(coroutine_func()) finally: loop.close() def execute_wrapper(): - kill_switch = parent_loop.create_future() + kill_switch = threading.Event() parent_future = asyncio.ensure_future( parent_loop.run_in_executor(self._executor, wrapper, kill_switch), loop=parent_loop) - parent_future.add_done_callback( - lambda parent_future: None if kill_switch.done() - else kill_switch.set_result(None)) + def kill_callback(parent_future): + if not kill_switch.is_set(): + kill_switch.set() + parent_future.add_done_callback(kill_callback) return parent_future - return execute_wrapper + try: + yield execute_wrapper + finally: + while True: + try: + _, future = pending.popitem() + except KeyError: + break + try: + parent_loop.run_until_complete(future) + except (Exception, asyncio.CancelledError): + pass + future.cancelled() or future.exception() or future.result() class RetryThreadExecutorTestCase(RetryForkExecutorTestCase): |