diff options
Diffstat (limited to 'lib/portage/util/futures')
-rw-r--r-- | lib/portage/util/futures/_asyncio/__init__.py | 73 | ||||
-rw-r--r-- | lib/portage/util/futures/_asyncio/meson.build | 8 | ||||
-rw-r--r-- | lib/portage/util/futures/_asyncio/streams.py | 2 | ||||
-rw-r--r-- | lib/portage/util/futures/_sync_decorator.py | 9 | ||||
-rw-r--r-- | lib/portage/util/futures/compat_coroutine.py | 141 | ||||
-rw-r--r-- | lib/portage/util/futures/executor/fork.py | 19 | ||||
-rw-r--r-- | lib/portage/util/futures/executor/meson.build | 8 | ||||
-rw-r--r-- | lib/portage/util/futures/extendedfutures.py | 12 | ||||
-rw-r--r-- | lib/portage/util/futures/iter_completed.py | 3 | ||||
-rw-r--r-- | lib/portage/util/futures/meson.build | 16 | ||||
-rw-r--r-- | lib/portage/util/futures/retry.py | 2 | ||||
-rw-r--r-- | lib/portage/util/futures/unix_events.py | 5 |
12 files changed, 109 insertions, 189 deletions
diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py index ccf800c66..e377a9cdd 100644 --- a/lib/portage/util/futures/_asyncio/__init__.py +++ b/lib/portage/util/futures/_asyncio/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2018-2021 Gentoo Authors +# Copyright 2018-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 __all__ = ( @@ -9,18 +9,22 @@ __all__ = ( "CancelledError", "Future", "InvalidStateError", + "Lock", "TimeoutError", "get_child_watcher", "get_event_loop", "set_child_watcher", "get_event_loop_policy", "set_event_loop_policy", + "run", + "shield", "sleep", "Task", "wait", + "wait_for", ) -import subprocess +import sys import types import weakref @@ -34,20 +38,20 @@ from asyncio import ( FIRST_EXCEPTION, Future, InvalidStateError, + iscoroutinefunction, + Lock as _Lock, + shield, TimeoutError, + wait_for, ) -try: - import threading -except ImportError: - import dummy_threading as threading +import threading import portage portage.proxy.lazyimport.lazyimport( globals(), "portage.util.futures.unix_events:_PortageEventLoopPolicy", - "portage.util.futures:compat_coroutine@_compat_coroutine", ) from portage.util._eventloop.asyncio_event_loop import ( AsyncioEventLoop as _AsyncioEventLoop, @@ -106,6 +110,14 @@ def set_child_watcher(watcher): return get_event_loop_policy().set_child_watcher(watcher) +# Emulate run since it's the preferred python API. +def run(coro): + return _safe_loop().run_until_complete(coro) + + +run.__doc__ = _real_asyncio.run.__doc__ + + def create_subprocess_exec(*args, **kwargs): """ Create a subprocess. @@ -146,17 +158,18 @@ def wait(futures, loop=None, timeout=None, return_when=ALL_COMPLETED): return _real_asyncio.wait(futures, timeout=timeout, return_when=return_when) -def iscoroutinefunction(func): +class Lock(_Lock): """ - Return True if func is a decorated coroutine function, - supporting both asyncio.coroutine and compat_coroutine since - their behavior is identical for all practical purposes. + Inject loop parameter for python3.9 or less in order to avoid + "got Future <Future pending> attached to a different loop" errors. """ - if _compat_coroutine._iscoroutinefunction(func): - return True - if _real_asyncio.iscoroutinefunction(func): - return True - return False + + def __init__(self, **kwargs): + if sys.version_info >= (3, 10): + kwargs.pop("loop", None) + elif "loop" not in kwargs: + kwargs["loop"] = _safe_loop()._loop + super().__init__(**kwargs) class Task(Future): @@ -299,13 +312,37 @@ def _safe_loop(): def _get_running_loop(): + """ + This calls the real asyncio get_running_loop() and wraps that with + portage's internal AsyncioEventLoop wrapper. If there is no running + asyncio event loop but portage has a reference to another running + loop in this thread, then use that instead. + + This behavior enables portage internals to use the real asyncio.run + while remaining compatible with internal code that does not use the + real asyncio.run. + """ + try: + _loop = _real_asyncio.get_running_loop() + except RuntimeError: + _loop = None + with _thread_weakrefs.lock: if _thread_weakrefs.pid == portage.getpid(): try: loop = _thread_weakrefs.loops[threading.get_ident()] except KeyError: - return None - return loop if loop.is_running() else None + pass + else: + if _loop is loop._loop: + return loop + elif _loop is None: + return loop if loop.is_running() else None + + # If _loop it not None here it means it was probably a temporary + # loop created by asyncio.run, so we don't try to cache it, and + # just return a temporary wrapper. + return None if _loop is None else _AsyncioEventLoop(loop=_loop) def _thread_weakrefs_atexit(): diff --git a/lib/portage/util/futures/_asyncio/meson.build b/lib/portage/util/futures/_asyncio/meson.build new file mode 100644 index 000000000..fa0bc4a86 --- /dev/null +++ b/lib/portage/util/futures/_asyncio/meson.build @@ -0,0 +1,8 @@ +py.install_sources( + [ + 'streams.py', + '__init__.py', + ], + subdir : 'portage/util/futures/_asyncio', + pure : not native_extensions +) diff --git a/lib/portage/util/futures/_asyncio/streams.py b/lib/portage/util/futures/_asyncio/streams.py index 95a4244a6..6b902975c 100644 --- a/lib/portage/util/futures/_asyncio/streams.py +++ b/lib/portage/util/futures/_asyncio/streams.py @@ -76,7 +76,7 @@ async def _writer(output_file, content, loop=DeprecationWarning): while content: try: content = content[os.write(fd, content) :] - except EnvironmentError as e: + except OSError as e: if e.errno != errno.EAGAIN: raise waiter = loop.create_future() diff --git a/lib/portage/util/futures/_sync_decorator.py b/lib/portage/util/futures/_sync_decorator.py index d4df26a9b..436e7c346 100644 --- a/lib/portage/util/futures/_sync_decorator.py +++ b/lib/portage/util/futures/_sync_decorator.py @@ -39,14 +39,15 @@ def _sync_methods(obj, loop=None): loop = asyncio._wrap_loop(loop) return _ObjectAttrWrapper( obj, - lambda attr: _sync_decorator(attr, loop=loop) - if asyncio.iscoroutinefunction(attr) - else attr, + lambda attr: ( + _sync_decorator(attr, loop=loop) + if asyncio.iscoroutinefunction(attr) + else attr + ), ) class _ObjectAttrWrapper(portage.proxy.objectproxy.ObjectProxy): - __slots__ = ("_obj", "_attr_wrapper") def __init__(self, obj, attr_wrapper): diff --git a/lib/portage/util/futures/compat_coroutine.py b/lib/portage/util/futures/compat_coroutine.py deleted file mode 100644 index c7e436343..000000000 --- a/lib/portage/util/futures/compat_coroutine.py +++ /dev/null @@ -1,141 +0,0 @@ -# Copyright 2018-2021 Gentoo Foundation -# Distributed under the terms of the GNU General Public License v2 - -import functools - -import portage - -portage.proxy.lazyimport.lazyimport( - globals(), - "portage.util.futures:asyncio", -) - -# A marker for iscoroutinefunction. -_is_coroutine = object() - - -def _iscoroutinefunction(func): - """ - Return True if func is a decorated coroutine function - created with the coroutine decorator for this module. - """ - return getattr(func, "_is_coroutine", None) is _is_coroutine - - -def coroutine(generator_func): - """ - A decorator for a generator function that behaves as coroutine function. - The generator should yield a Future instance in order to wait for it, - and the result becomes the result of the current yield-expression, - via the PEP 342 generator send() method. - - The decorated function returns a Future which is done when the generator - is exhausted. The generator can return a value via the coroutine_return - function. - - @param generator_func: A generator function that yields Futures, and - will receive the result of each Future as the result of the - corresponding yield-expression. - @type generator_func: function - @rtype: function - @return: A function which calls the given generator function and - returns a Future that is done when the generator is exhausted. - """ - # Note that functools.partial does not work for decoration of - # methods, since it doesn't implement the descriptor protocol. - # This problem is solve by defining a wrapper function. - @functools.wraps(generator_func) - def wrapped(*args, **kwargs): - return _generator_future(generator_func, *args, **kwargs) - - wrapped._is_coroutine = _is_coroutine - return wrapped - - -def coroutine_return(result=None): - """ - Terminate the current coroutine and set the result of the associated - Future. - - @param result: of the current coroutine's Future - @type object - """ - raise _CoroutineReturnValue(result) - - -def _generator_future(generator_func, *args, **kwargs): - """ - Call generator_func with the given arguments, and return a Future - that is done when the resulting generation is exhausted. If a - keyword argument named 'loop' is given, then it is used instead of - the default event loop. - """ - loop = kwargs.get("loop") - loop = asyncio._wrap_loop(loop) - result = loop.create_future() - _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop) - return result - - -class _CoroutineReturnValue(Exception): - def __init__(self, result): - self.result = result - - -class _GeneratorTask: - """ - Asynchronously executes the generator to completion, waiting for - the result of each Future that it yields, and sending the result - to the generator. - """ - - def __init__(self, generator, result, loop): - self._generator = generator - self._result = result - self._current_task = None - self._loop = loop - result.add_done_callback(self._cancel_callback) - loop.call_soon(self._next) - - def _cancel_callback(self, result): - if result.cancelled() and self._current_task is not None: - # The done callback for self._current_task invokes - # _next in either case here. - self._current_task.done() or self._current_task.cancel() - - def _next(self, previous=None): - self._current_task = None - if self._result.cancelled(): - if previous is not None: - # Consume exceptions, in order to avoid triggering - # the event loop's exception handler. - previous.cancelled() or previous.exception() - - # This will throw asyncio.CancelledError in the coroutine if - # there's an opportunity (yield) before the generator raises - # StopIteration. - previous = self._result - try: - if previous is None: - future = next(self._generator) - elif previous.cancelled(): - future = self._generator.throw(asyncio.CancelledError()) - elif previous.exception() is None: - future = self._generator.send(previous.result()) - else: - future = self._generator.throw(previous.exception()) - - except asyncio.CancelledError: - self._result.cancel() - except _CoroutineReturnValue as e: - if not self._result.cancelled(): - self._result.set_result(e.result) - except StopIteration: - if not self._result.cancelled(): - self._result.set_result(None) - except Exception as e: - if not self._result.cancelled(): - self._result.set_exception(e) - else: - self._current_task = asyncio.ensure_future(future, loop=self._loop) - self._current_task.add_done_callback(self._next) diff --git a/lib/portage/util/futures/executor/fork.py b/lib/portage/util/futures/executor/fork.py index 0c3342944..1e3d01072 100644 --- a/lib/portage/util/futures/executor/fork.py +++ b/lib/portage/util/futures/executor/fork.py @@ -1,4 +1,4 @@ -# Copyright 2018 Gentoo Foundation +# Copyright 2018-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 __all__ = ("ForkExecutor",) @@ -6,7 +6,6 @@ __all__ = ("ForkExecutor",) import collections import functools import os -import sys import traceback from portage.util._async.AsyncFunction import AsyncFunction @@ -41,7 +40,9 @@ class ForkExecutor: """ future = self._loop.create_future() proc = AsyncFunction( - target=functools.partial(self._guarded_fn_call, fn, args, kwargs) + target=functools.partial(self._guarded_fn_call, fn, args, kwargs), + # Directly inherit stdio streams and run in the foreground with no log. + create_pipe=False, ) self._submit_queue.append((future, proc)) self._schedule() @@ -92,9 +93,7 @@ class ForkExecutor: # distinguish between kill and crash future.set_exception( Exception( - "pid {} crashed or killed, exitcode {}".format( - proc.pid, proc.returncode - ) + f"pid {proc.pid} crashed or killed, exitcode {proc.returncode}" ) ) @@ -123,7 +122,7 @@ class _ExceptionWithTraceback: tb = traceback.format_exception(type(exc), exc, exc.__traceback__) tb = "".join(tb) self.exc = exc - self.tb = '\n"""\n%s"""' % tb + self.tb = f'\n"""\n{tb}"""' def __reduce__(self): return _rebuild_exc, (self.exc, self.tb) @@ -140,9 +139,3 @@ class _RemoteTraceback(Exception): def _rebuild_exc(exc, tb): exc.__cause__ = _RemoteTraceback(tb) return exc - - -if sys.version_info < (3,): - # Python 2 does not support exception chaining, so - # don't bother to preserve the traceback. - _ExceptionWithTraceback = lambda exc: exc diff --git a/lib/portage/util/futures/executor/meson.build b/lib/portage/util/futures/executor/meson.build new file mode 100644 index 000000000..fdd7c06f9 --- /dev/null +++ b/lib/portage/util/futures/executor/meson.build @@ -0,0 +1,8 @@ +py.install_sources( + [ + 'fork.py', + '__init__.py', + ], + subdir : 'portage/util/futures/executor', + pure : not native_extensions +) diff --git a/lib/portage/util/futures/extendedfutures.py b/lib/portage/util/futures/extendedfutures.py index c23feafb5..b772698b9 100644 --- a/lib/portage/util/futures/extendedfutures.py +++ b/lib/portage/util/futures/extendedfutures.py @@ -37,7 +37,7 @@ class ExtendedFuture(Future): set. """ self.default_result = default_result - super(ExtendedFuture, self).__init__() + super().__init__() self.set = self.set_result def set_result(self, data, ignore_InvalidState=False): @@ -48,14 +48,14 @@ class ExtendedFuture(Future): """ if ignore_InvalidState: try: - super(ExtendedFuture, self).set_result(data) + super().set_result(data) except InvalidStateError: pass else: - super(ExtendedFuture, self).set_result(data) + super().set_result(data) def get(self, default=UNSET_CONST.result()): - """Convienience function to wrap result() but adds an optional + """Convenience function to wrap result() but adds an optional default value to return rather than raise an InvalidStateError @param default: Optional override for the classwide default_result @@ -77,12 +77,12 @@ class ExtendedFuture(Future): def exception(self): try: - return super(ExtendedFuture, self).exception(timeout=0) + return super().exception(timeout=0) except concurrent.futures.TimeoutError: raise InvalidStateError def result(self): try: - return super(ExtendedFuture, self).result(timeout=0) + return super().result(timeout=0) except concurrent.futures.TimeoutError: raise InvalidStateError diff --git a/lib/portage/util/futures/iter_completed.py b/lib/portage/util/futures/iter_completed.py index f4b4e5e0b..5ee0b48c7 100644 --- a/lib/portage/util/futures/iter_completed.py +++ b/lib/portage/util/futures/iter_completed.py @@ -34,8 +34,7 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None): for future_done_set in async_iter_completed( futures, max_jobs=max_jobs, max_load=max_load, loop=loop ): - for future in loop.run_until_complete(future_done_set): - yield future + yield from loop.run_until_complete(future_done_set) def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): diff --git a/lib/portage/util/futures/meson.build b/lib/portage/util/futures/meson.build new file mode 100644 index 000000000..d561fa312 --- /dev/null +++ b/lib/portage/util/futures/meson.build @@ -0,0 +1,16 @@ +py.install_sources( + [ + 'extendedfutures.py', + 'futures.py', + 'iter_completed.py', + 'retry.py', + 'unix_events.py', + '_sync_decorator.py', + '__init__.py', + ], + subdir : 'portage/util/futures', + pure : not native_extensions +) + +subdir('executor') +subdir('_asyncio') diff --git a/lib/portage/util/futures/retry.py b/lib/portage/util/futures/retry.py index 496bfb562..a8a3ad4fb 100644 --- a/lib/portage/util/futures/retry.py +++ b/lib/portage/util/futures/retry.py @@ -84,7 +84,7 @@ def _retry( reraise, func, *args, - **kwargs + **kwargs, ): """ Retry coroutine, used to implement retry decorator. diff --git a/lib/portage/util/futures/unix_events.py b/lib/portage/util/futures/unix_events.py index f2f01f0c6..374497010 100644 --- a/lib/portage/util/futures/unix_events.py +++ b/lib/portage/util/futures/unix_events.py @@ -23,7 +23,6 @@ if hasattr(os, "set_blocking"): def _set_nonblocking(fd): os.set_blocking(fd, False) - else: def _set_nonblocking(fd): @@ -71,11 +70,11 @@ class _AsyncioEventLoopPolicy(_PortageEventLoopPolicy): def get_event_loop(self): self._check_recursion() - return super(_AsyncioEventLoopPolicy, self).get_event_loop() + return super().get_event_loop() def get_child_watcher(self): self._check_recursion() - return super(_AsyncioEventLoopPolicy, self).get_child_watcher() + return super().get_child_watcher() DefaultEventLoopPolicy = _AsyncioEventLoopPolicy |