aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2020-02-23 13:44:58 -0800
committerZac Medico <zmedico@gentoo.org>2020-02-23 18:35:15 -0800
commit4df7a0a0c16c5ded65ad601d39840797b7704770 (patch)
tree6e724cf5dc3b445f44aa2cae6354d113ae5b0e01 /lib
parentEbuildBuildDir: use async_start method (diff)
downloadportage-4df7a0a0c16c5ded65ad601d39840797b7704770.tar.gz
portage-4df7a0a0c16c5ded65ad601d39840797b7704770.tar.bz2
portage-4df7a0a0c16c5ded65ad601d39840797b7704770.zip
ForkExecutor: use async_start method
Also, fix AsynchronousTask.async_start to handle cancellation of the _async_start coroutine, ensuring that start and exit listeners are notified in this case (otherwise RetryForkExecutorTestCase will hang). Signed-off-by: Zac Medico <zmedico@gentoo.org>
Diffstat (limited to 'lib')
-rw-r--r--lib/_emerge/AsynchronousTask.py15
-rw-r--r--lib/portage/util/futures/executor/fork.py21
2 files changed, 31 insertions, 5 deletions
diff --git a/lib/_emerge/AsynchronousTask.py b/lib/_emerge/AsynchronousTask.py
index d1e23cdf1..1e9e177cb 100644
--- a/lib/_emerge/AsynchronousTask.py
+++ b/lib/_emerge/AsynchronousTask.py
@@ -25,8 +25,19 @@ class AsynchronousTask(SlotObject):
@coroutine
def async_start(self):
- yield self._async_start()
- self._start_hook()
+ try:
+ if self._was_cancelled():
+ raise asyncio.CancelledError
+ yield self._async_start()
+ if self._was_cancelled():
+ raise asyncio.CancelledError
+ except asyncio.CancelledError:
+ self.cancel()
+ self._was_cancelled()
+ self._async_wait()
+ raise
+ finally:
+ self._start_hook()
@coroutine
def _async_start(self):
diff --git a/lib/portage/util/futures/executor/fork.py b/lib/portage/util/futures/executor/fork.py
index add7b3c9e..3549fdb31 100644
--- a/lib/portage/util/futures/executor/fork.py
+++ b/lib/portage/util/futures/executor/fork.py
@@ -13,6 +13,7 @@ import traceback
from portage.util._async.AsyncFunction import AsyncFunction
from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
from portage.util.cpuinfo import get_cpu_count
@@ -51,11 +52,25 @@ class ForkExecutor(object):
while (not self._shutdown and self._submit_queue and
len(self._running_tasks) < self._max_workers):
future, proc = self._submit_queue.popleft()
- future.add_done_callback(functools.partial(self._cancel_cb, proc))
- proc.addExitListener(functools.partial(self._proc_exit, future))
proc.scheduler = self._loop
- proc.start()
self._running_tasks[id(proc)] = proc
+ future.add_done_callback(functools.partial(self._cancel_cb, proc))
+ proc_future = asyncio.ensure_future(self._proc_coroutine(proc), loop=self._loop)
+ proc_future.add_done_callback(functools.partial(self._proc_coroutine_done, future, proc))
+
+ @coroutine
+ def _proc_coroutine(self, proc):
+ yield proc.async_start()
+ yield proc.async_wait()
+
+ def _proc_coroutine_done(self, future, proc, proc_future):
+ try:
+ proc_future.result()
+ except asyncio.CancelledError:
+ future.done() or future.cancel()
+ if proc.poll() is None:
+ proc.cancel()
+ self._proc_exit(future, proc)
def _cancel_cb(self, proc, future):
if future.cancelled():