aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2018-04-17 02:51:36 -0700
committerZac Medico <zmedico@gentoo.org>2018-04-17 10:44:58 -0700
commita9e8ebaa6979ccf0bb385e457d695bedc7b65bf5 (patch)
tree59366379e645801a29fdfd190d507b7612b4a395
parentIndexedVardb: pass db to _pkg_str (bug 653372) (diff)
downloadportage-a9e8ebaa.tar.gz
portage-a9e8ebaa.tar.bz2
portage-a9e8ebaa.zip
Add async_iter_completed for asyncio migration (bug 591760)
This serves as a wrapper around portage's internal TaskScheduler class, allowing TaskScheduler API consumers to be migrated to use asyncio interfaces. Bug: https://bugs.gentoo.org/591760
-rw-r--r--pym/portage/tests/util/futures/test_iter_completed.py37
-rw-r--r--pym/portage/util/futures/iter_completed.py61
2 files changed, 91 insertions, 7 deletions
diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py
index 9c23aefb1..1344523c6 100644
--- a/pym/portage/tests/util/futures/test_iter_completed.py
+++ b/pym/portage/tests/util/futures/test_iter_completed.py
@@ -5,7 +5,11 @@ import time
from portage.tests import TestCase
from portage.util._async.ForkProcess import ForkProcess
from portage.util._eventloop.global_event_loop import global_event_loop
-from portage.util.futures.iter_completed import iter_completed
+from portage.util.futures import asyncio
+from portage.util.futures.iter_completed import (
+ iter_completed,
+ async_iter_completed,
+)
class SleepProcess(ForkProcess):
@@ -48,3 +52,34 @@ class IterCompletedTestCase(TestCase):
for seconds, future in zip(expected_order, iter_completed(future_generator(),
max_jobs=True, max_load=None, loop=loop)):
self.assertEqual(seconds, future.result())
+
+ def testAsyncCancel(self):
+
+ loop = global_event_loop()._asyncio_wrapper
+ input_futures = set()
+ future_count = 3
+
+ def future_generator():
+ for i in range(future_count):
+ future = loop.create_future()
+ loop.call_soon(lambda future: None if future.done()
+ else future.set_result(None), future)
+ input_futures.add(future)
+ yield future
+
+ for future_done_set in async_iter_completed(future_generator(),
+ max_jobs=True, max_load=None, loop=loop):
+ future_done_set.cancel()
+ break
+
+ # With max_jobs=True, async_iter_completed should have executed
+ # the generator until it raised StopIteration.
+ self.assertEqual(future_count, len(input_futures))
+
+ loop.run_until_complete(asyncio.wait(input_futures, loop=loop))
+
+ # The futures may have results or they may have been cancelled
+ # by TaskScheduler, and behavior varies depending on the python
+ # interpreter.
+ for future in input_futures:
+ future.cancelled() or future.result()
diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py
index 8d324de84..5ad075305 100644
--- a/pym/portage/util/futures/iter_completed.py
+++ b/pym/portage/util/futures/iter_completed.py
@@ -1,6 +1,7 @@
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
+import functools
import multiprocessing
from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
@@ -31,6 +32,38 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
"""
loop = loop or global_event_loop()
loop = getattr(loop, '_asyncio_wrapper', loop)
+
+ for future_done_set in async_iter_completed(futures,
+ max_jobs=max_jobs, max_load=max_load, loop=loop):
+ for future in loop.run_until_complete(future_done_set):
+ yield future
+
+
+def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
+ """
+ An asynchronous version of iter_completed. This yields futures, which
+ when done, result in a set of input futures that are done. This serves
+ as a wrapper around portage's internal TaskScheduler class, using
+ standard asyncio interfaces.
+
+ @param futures: iterator of asyncio.Future (or compatible)
+ @type futures: iterator
+ @param max_jobs: max number of futures to process concurrently (default
+ is multiprocessing.cpu_count())
+ @type max_jobs: int
+ @param max_load: max load allowed when scheduling a new future,
+ otherwise schedule no more than 1 future at a time (default
+ is multiprocessing.cpu_count())
+ @type max_load: int or float
+ @param loop: event loop
+ @type loop: EventLoop
+ @return: iterator of futures, which when done, result in a set of
+ input futures that are done
+ @rtype: iterator
+ """
+ loop = loop or global_event_loop()
+ loop = getattr(loop, '_asyncio_wrapper', loop)
+
max_jobs = max_jobs or multiprocessing.cpu_count()
max_load = max_load or multiprocessing.cpu_count()
@@ -46,19 +79,35 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
max_load=max_load,
event_loop=loop._loop)
+ def done_callback(future_done_set, wait_result):
+ """Propagate results from wait_result to future_done_set."""
+ if future_done_set.cancelled():
+ return
+ done, pending = wait_result.result()
+ for future in done:
+ del future_map[id(future)]
+ future_done_set.set_result(done)
+
+ def cancel_callback(wait_result, future_done_set):
+ """Cancel wait_result if future_done_set has been cancelled."""
+ if future_done_set.cancelled() and not wait_result.done():
+ wait_result.cancel()
+
try:
scheduler.start()
# scheduler should ensure that future_map is non-empty until
# task_generator is exhausted
while future_map:
- done, pending = loop.run_until_complete(
+ wait_result = asyncio.ensure_future(
asyncio.wait(list(future_map.values()),
- return_when=asyncio.FIRST_COMPLETED, loop=loop))
- for future in done:
- del future_map[id(future)]
- yield future
-
+ return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop)
+ future_done_set = loop.create_future()
+ future_done_set.add_done_callback(
+ functools.partial(cancel_callback, wait_result))
+ wait_result.add_done_callback(
+ functools.partial(done_callback, future_done_set))
+ yield future_done_set
finally:
# cleanup in case of interruption by SIGINT, etc
scheduler.cancel()