aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/portage/util/futures/iter_completed.py')
-rw-r--r--lib/portage/util/futures/iter_completed.py183
1 files changed, 183 insertions, 0 deletions
diff --git a/lib/portage/util/futures/iter_completed.py b/lib/portage/util/futures/iter_completed.py
new file mode 100644
index 000000000..31b5e0c78
--- /dev/null
+++ b/lib/portage/util/futures/iter_completed.py
@@ -0,0 +1,183 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import functools
+import multiprocessing
+
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
+from portage.util._async.TaskScheduler import TaskScheduler
+from portage.util.futures import asyncio
+
+
+def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
+ """
+ This is similar to asyncio.as_completed, but takes an iterator of
+ futures as input, and includes support for max_jobs and max_load
+ parameters.
+
+ @param futures: iterator of asyncio.Future (or compatible)
+ @type futures: iterator
+ @param max_jobs: max number of futures to process concurrently (default
+ is multiprocessing.cpu_count())
+ @type max_jobs: int
+ @param max_load: max load allowed when scheduling a new future,
+ otherwise schedule no more than 1 future at a time (default
+ is multiprocessing.cpu_count())
+ @type max_load: int or float
+ @param loop: event loop
+ @type loop: EventLoop
+ @return: iterator of futures that are done
+ @rtype: iterator
+ """
+ loop = asyncio._wrap_loop(loop)
+
+ for future_done_set in async_iter_completed(futures,
+ max_jobs=max_jobs, max_load=max_load, loop=loop):
+ for future in loop.run_until_complete(future_done_set):
+ yield future
+
+
+def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
+ """
+ An asynchronous version of iter_completed. This yields futures, which
+ when done, result in a set of input futures that are done. This serves
+ as a wrapper around portage's internal TaskScheduler class, using
+ standard asyncio interfaces.
+
+ @param futures: iterator of asyncio.Future (or compatible)
+ @type futures: iterator
+ @param max_jobs: max number of futures to process concurrently (default
+ is multiprocessing.cpu_count())
+ @type max_jobs: int
+ @param max_load: max load allowed when scheduling a new future,
+ otherwise schedule no more than 1 future at a time (default
+ is multiprocessing.cpu_count())
+ @type max_load: int or float
+ @param loop: event loop
+ @type loop: EventLoop
+ @return: iterator of futures, which when done, result in a set of
+ input futures that are done
+ @rtype: iterator
+ """
+ loop = asyncio._wrap_loop(loop)
+
+ max_jobs = max_jobs or multiprocessing.cpu_count()
+ max_load = max_load or multiprocessing.cpu_count()
+
+ future_map = {}
+ def task_generator():
+ for future in futures:
+ future_map[id(future)] = future
+ yield AsyncTaskFuture(future=future)
+
+ scheduler = TaskScheduler(
+ task_generator(),
+ max_jobs=max_jobs,
+ max_load=max_load,
+ event_loop=loop)
+
+ def done_callback(future_done_set, wait_result):
+ """Propagate results from wait_result to future_done_set."""
+ if future_done_set.cancelled():
+ return
+ done, pending = wait_result.result()
+ for future in done:
+ del future_map[id(future)]
+ future_done_set.set_result(done)
+
+ def cancel_callback(wait_result, future_done_set):
+ """Cancel wait_result if future_done_set has been cancelled."""
+ if future_done_set.cancelled() and not wait_result.done():
+ wait_result.cancel()
+
+ try:
+ scheduler.start()
+
+ # scheduler should ensure that future_map is non-empty until
+ # task_generator is exhausted
+ while future_map:
+ wait_result = asyncio.ensure_future(
+ asyncio.wait(list(future_map.values()),
+ return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop)
+ future_done_set = loop.create_future()
+ future_done_set.add_done_callback(
+ functools.partial(cancel_callback, wait_result))
+ wait_result.add_done_callback(
+ functools.partial(done_callback, future_done_set))
+ yield future_done_set
+ finally:
+ # cleanup in case of interruption by SIGINT, etc
+ scheduler.cancel()
+ scheduler.wait()
+
+
+def iter_gather(futures, max_jobs=None, max_load=None, loop=None):
+ """
+ This is similar to asyncio.gather, but takes an iterator of
+ futures as input, and includes support for max_jobs and max_load
+ parameters.
+
+ @param futures: iterator of asyncio.Future (or compatible)
+ @type futures: iterator
+ @param max_jobs: max number of futures to process concurrently (default
+ is multiprocessing.cpu_count())
+ @type max_jobs: int
+ @param max_load: max load allowed when scheduling a new future,
+ otherwise schedule no more than 1 future at a time (default
+ is multiprocessing.cpu_count())
+ @type max_load: int or float
+ @param loop: event loop
+ @type loop: EventLoop
+ @return: a Future resulting in a list of done input futures, in the
+ same order that they were yielded from the input iterator
+ @rtype: asyncio.Future (or compatible)
+ """
+ loop = asyncio._wrap_loop(loop)
+ result = loop.create_future()
+ futures_list = []
+
+ def future_generator():
+ for future in futures:
+ futures_list.append(future)
+ yield future
+
+ completed_iter = async_iter_completed(
+ future_generator(),
+ max_jobs=max_jobs,
+ max_load=max_load,
+ loop=loop,
+ )
+
+ def handle_result(future_done_set):
+ if result.cancelled():
+ if not future_done_set.cancelled():
+ # All exceptions must be consumed from future_done_set, in order
+ # to avoid triggering the event loop's exception handler.
+ list(future.exception() for future in future_done_set.result()
+ if not future.cancelled())
+ return
+
+ try:
+ handle_result.current_task = next(completed_iter)
+ except StopIteration:
+ result.set_result(futures_list)
+ else:
+ handle_result.current_task.add_done_callback(handle_result)
+
+ try:
+ handle_result.current_task = next(completed_iter)
+ except StopIteration:
+ handle_result.current_task = None
+ result.set_result(futures_list)
+ else:
+ handle_result.current_task.add_done_callback(handle_result)
+
+ def cancel_callback(result):
+ if (result.cancelled() and
+ handle_result.current_task is not None and
+ not handle_result.current_task.done()):
+ handle_result.current_task.cancel()
+
+ result.add_done_callback(cancel_callback)
+
+ return result