diff options
author | Zac Medico <zmedico@gentoo.org> | 2018-03-14 01:01:26 -0700 |
---|---|---|
committer | Zac Medico <zmedico@gentoo.org> | 2018-04-02 09:53:23 -0700 |
commit | 4095be74985c5c2eead5fb480cf37baa11308d62 (patch) | |
tree | 5dbbd602ef1ce2241b8e70607db901941b7d19d1 /pym/portage/util/_eventloop/EventLoop.py | |
parent | FileNotFoundError is not defined in python2.7 (diff) | |
download | portage-4095be74985c5c2eead5fb480cf37baa11308d62.tar.gz portage-4095be74985c5c2eead5fb480cf37baa11308d62.tar.bz2 portage-4095be74985c5c2eead5fb480cf37baa11308d62.zip |
Add ForkExecutor (bug 649588)
This is useful for asynchronous operations that we might
need to cancel if they take too long, since (concurrent.
futures.ProcessPoolExecutor tasks are not cancellable).
The ability to cancel tasks makes this executor useful
as an alternative to portage.exception.AlarmSignal.
Also add an asyncio-compatible EventLoop.run_in_executor
method that uses ForkExecutor as the default executor,
which will later be used to implement the corresponding
asyncio.AbstractEventLoop run_in_executor method.
Bug: https://bugs.gentoo.org/649588
Reviewed-by: Alec Warner <antarus@gentoo.org>
Diffstat (limited to 'pym/portage/util/_eventloop/EventLoop.py')
-rw-r--r-- | pym/portage/util/_eventloop/EventLoop.py | 45 |
1 files changed, 44 insertions, 1 deletions
diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index f472a3dae..1574a6837 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -24,6 +24,7 @@ except ImportError: import portage portage.proxy.lazyimport.lazyimport(globals(), 'portage.util.futures.futures:_EventLoopFuture', + 'portage.util.futures.executor.fork:ForkExecutor', ) from portage import OrderedDict @@ -122,6 +123,7 @@ class EventLoop(object): self._idle_callbacks = OrderedDict() self._timeout_handlers = {} self._timeout_interval = None + self._default_executor = None self._poll_obj = None try: @@ -721,6 +723,46 @@ class EventLoop(object): return self._handle(self.timeout_add( delay * 1000, self._call_soon_callback(callback, args)), self) + def run_in_executor(self, executor, func, *args): + """ + Arrange for a func to be called in the specified executor. + + The executor argument should be an Executor instance. The default + executor is used if executor is None. + + Use functools.partial to pass keywords to the *func*. + + @param executor: executor + @type executor: concurrent.futures.Executor or None + @param func: a function to call + @type func: callable + @return: a Future + @rtype: asyncio.Future (or compatible) + """ + if executor is None: + executor = self._default_executor + if executor is None: + executor = ForkExecutor(loop=self) + self._default_executor = executor + return executor.submit(func, *args) + + def close(self): + """Close the event loop. + + This clears the queues and shuts down the executor, + and waits for it to finish. + """ + executor = self._default_executor + if executor is not None: + self._default_executor = None + executor.shutdown(wait=True) + + if self._poll_obj is not None: + close = getattr(self._poll_obj, 'close') + if close is not None: + close() + self._poll_obj = None + _can_poll_device = None @@ -782,10 +824,11 @@ class _epoll_adapter(object): that is associated with an epoll instance will close automatically when it is garbage collected, so it's not necessary to close it explicitly. """ - __slots__ = ('_epoll_obj',) + __slots__ = ('_epoll_obj', 'close') def __init__(self, epoll_obj): self._epoll_obj = epoll_obj + self.close = epoll_obj.close def register(self, fd, *args): self._epoll_obj.register(fd, *args) |