aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/portage/tests/ebuild/test_doebuild_fd_pipes.py8
-rw-r--r--lib/portage/tests/util/futures/test_iter_completed.py2
-rw-r--r--lib/portage/util/_async/AsyncScheduler.py20
-rw-r--r--lib/portage/util/futures/iter_completed.py38
4 files changed, 53 insertions, 15 deletions
diff --git a/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py b/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
index 05ea24c4b..50fc5fe1c 100644
--- a/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
+++ b/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
@@ -109,18 +109,16 @@ class DoebuildFdPipesTestCase(TestCase):
output_fd: pw,
},
"prev_mtimes": {}})
+ producer.addStartListener(lambda producer: os.close(pw))
+ # PipeReader closes pr
consumer = PipeReader(
input_files={"producer" : pr})
task_scheduler = TaskScheduler(iter([producer, consumer]),
max_jobs=2)
- try:
- loop.run_until_complete(task_scheduler.async_start())
- finally:
- # PipeReader closes pr
- os.close(pw)
+ loop.run_until_complete(task_scheduler.async_start())
task_scheduler.wait()
output = portage._unicode_decode(
diff --git a/lib/portage/tests/util/futures/test_iter_completed.py b/lib/portage/tests/util/futures/test_iter_completed.py
index aa24f5685..03ace915a 100644
--- a/lib/portage/tests/util/futures/test_iter_completed.py
+++ b/lib/portage/tests/util/futures/test_iter_completed.py
@@ -76,6 +76,8 @@ class IterCompletedTestCase(TestCase):
for future_done_set in async_iter_completed(future_generator(),
max_jobs=True, max_load=True, loop=loop):
+ while not input_futures:
+ loop.run_until_complete(asyncio.sleep(0, loop=loop))
future_done_set.cancel()
break
diff --git a/lib/portage/util/_async/AsyncScheduler.py b/lib/portage/util/_async/AsyncScheduler.py
index c6b523eaa..b9070061a 100644
--- a/lib/portage/util/_async/AsyncScheduler.py
+++ b/lib/portage/util/_async/AsyncScheduler.py
@@ -1,7 +1,11 @@
# Copyright 2012-2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
+import functools
+
from portage import os
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
from _emerge.AsynchronousTask import AsynchronousTask
from _emerge.PollScheduler import PollScheduler
@@ -62,8 +66,8 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
else:
self._running_tasks.add(task)
task.scheduler = self._sched_iface
- task.addExitListener(self._task_exit)
- task.start()
+ future = asyncio.ensure_future(self._task_coroutine(task), loop=self._sched_iface)
+ future.add_done_callback(functools.partial(self._task_coroutine_done, task))
if self._loadavg_check_id is not None:
self._loadavg_check_id.cancel()
@@ -73,6 +77,18 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
# Triggers cleanup and exit listeners if there's nothing left to do.
self.poll()
+ @coroutine
+ def _task_coroutine(self, task):
+ yield task.async_start()
+ yield task.async_wait()
+
+ def _task_coroutine_done(self, task, future):
+ try:
+ future.result()
+ except asyncio.CancelledError:
+ self.cancel()
+ self._task_exit(task)
+
def _task_exit(self, task):
self._running_tasks.discard(task)
if task.returncode != os.EX_OK:
diff --git a/lib/portage/util/futures/iter_completed.py b/lib/portage/util/futures/iter_completed.py
index 9554b4338..1fb30eb70 100644
--- a/lib/portage/util/futures/iter_completed.py
+++ b/lib/portage/util/futures/iter_completed.py
@@ -6,6 +6,7 @@ import functools
from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
from portage.util._async.TaskScheduler import TaskScheduler
from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
from portage.util.cpuinfo import get_cpu_count
@@ -90,21 +91,42 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
if future_done_set.cancelled() and not wait_result.done():
wait_result.cancel()
+ @coroutine
+ def fetch_wait_result(scheduler, first, loop=None):
+ if first:
+ yield scheduler.async_start()
+
+ # If the current coroutine awakens just after a call to
+ # done_callback but before scheduler has been notified of
+ # corresponding done future(s), then wait here until scheduler
+ # is notified (which will cause future_map to populate).
+ while not future_map and scheduler.poll() is None:
+ yield asyncio.sleep(0, loop=loop)
+
+ if not future_map:
+ if scheduler.poll() is not None:
+ coroutine_return((set(), set()))
+ else:
+ raise AssertionError('expected non-empty future_map')
+
+ wait_result = yield asyncio.wait(list(future_map.values()),
+ return_when=asyncio.FIRST_COMPLETED, loop=loop)
+
+ coroutine_return(wait_result)
+
+ first = True
try:
- scheduler.start()
-
- # scheduler should ensure that future_map is non-empty until
- # task_generator is exhausted
- while future_map:
- wait_result = asyncio.ensure_future(
- asyncio.wait(list(future_map.values()),
- return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop)
+ while True:
+ wait_result = asyncio.ensure_future(fetch_wait_result(scheduler, first, loop=loop), loop=loop)
+ first = False
future_done_set = loop.create_future()
future_done_set.add_done_callback(
functools.partial(cancel_callback, wait_result))
wait_result.add_done_callback(
functools.partial(done_callback, future_done_set))
yield future_done_set
+ if not future_map and scheduler.poll() is not None:
+ break
finally:
# cleanup in case of interruption by SIGINT, etc
scheduler.cancel()