aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2018-03-14 01:01:26 -0700
committerZac Medico <zmedico@gentoo.org>2018-04-02 09:53:23 -0700
commit4095be74985c5c2eead5fb480cf37baa11308d62 (patch)
tree5dbbd602ef1ce2241b8e70607db901941b7d19d1 /pym/portage/util/_eventloop/EventLoop.py
parentFileNotFoundError is not defined in python2.7 (diff)
downloadportage-4095be74985c5c2eead5fb480cf37baa11308d62.tar.gz
portage-4095be74985c5c2eead5fb480cf37baa11308d62.tar.bz2
portage-4095be74985c5c2eead5fb480cf37baa11308d62.zip
Add ForkExecutor (bug 649588)
This is useful for asynchronous operations that we might need to cancel if they take too long, since (concurrent. futures.ProcessPoolExecutor tasks are not cancellable). The ability to cancel tasks makes this executor useful as an alternative to portage.exception.AlarmSignal. Also add an asyncio-compatible EventLoop.run_in_executor method that uses ForkExecutor as the default executor, which will later be used to implement the corresponding asyncio.AbstractEventLoop run_in_executor method. Bug: https://bugs.gentoo.org/649588 Reviewed-by: Alec Warner <antarus@gentoo.org>
Diffstat (limited to 'pym/portage/util/_eventloop/EventLoop.py')
-rw-r--r--pym/portage/util/_eventloop/EventLoop.py45
1 files changed, 44 insertions, 1 deletions
diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
index f472a3dae..1574a6837 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -24,6 +24,7 @@ except ImportError:
import portage
portage.proxy.lazyimport.lazyimport(globals(),
'portage.util.futures.futures:_EventLoopFuture',
+ 'portage.util.futures.executor.fork:ForkExecutor',
)
from portage import OrderedDict
@@ -122,6 +123,7 @@ class EventLoop(object):
self._idle_callbacks = OrderedDict()
self._timeout_handlers = {}
self._timeout_interval = None
+ self._default_executor = None
self._poll_obj = None
try:
@@ -721,6 +723,46 @@ class EventLoop(object):
return self._handle(self.timeout_add(
delay * 1000, self._call_soon_callback(callback, args)), self)
+ def run_in_executor(self, executor, func, *args):
+ """
+ Arrange for a func to be called in the specified executor.
+
+ The executor argument should be an Executor instance. The default
+ executor is used if executor is None.
+
+ Use functools.partial to pass keywords to the *func*.
+
+ @param executor: executor
+ @type executor: concurrent.futures.Executor or None
+ @param func: a function to call
+ @type func: callable
+ @return: a Future
+ @rtype: asyncio.Future (or compatible)
+ """
+ if executor is None:
+ executor = self._default_executor
+ if executor is None:
+ executor = ForkExecutor(loop=self)
+ self._default_executor = executor
+ return executor.submit(func, *args)
+
+ def close(self):
+ """Close the event loop.
+
+ This clears the queues and shuts down the executor,
+ and waits for it to finish.
+ """
+ executor = self._default_executor
+ if executor is not None:
+ self._default_executor = None
+ executor.shutdown(wait=True)
+
+ if self._poll_obj is not None:
+ close = getattr(self._poll_obj, 'close')
+ if close is not None:
+ close()
+ self._poll_obj = None
+
_can_poll_device = None
@@ -782,10 +824,11 @@ class _epoll_adapter(object):
that is associated with an epoll instance will close automatically when
it is garbage collected, so it's not necessary to close it explicitly.
"""
- __slots__ = ('_epoll_obj',)
+ __slots__ = ('_epoll_obj', 'close')
def __init__(self, epoll_obj):
self._epoll_obj = epoll_obj
+ self.close = epoll_obj.close
def register(self, fd, *args):
self._epoll_obj.register(fd, *args)