aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/portage/util/futures')
-rw-r--r--lib/portage/util/futures/_asyncio/__init__.py73
-rw-r--r--lib/portage/util/futures/_asyncio/meson.build8
-rw-r--r--lib/portage/util/futures/_asyncio/streams.py2
-rw-r--r--lib/portage/util/futures/_sync_decorator.py9
-rw-r--r--lib/portage/util/futures/compat_coroutine.py141
-rw-r--r--lib/portage/util/futures/executor/fork.py19
-rw-r--r--lib/portage/util/futures/executor/meson.build8
-rw-r--r--lib/portage/util/futures/extendedfutures.py12
-rw-r--r--lib/portage/util/futures/iter_completed.py3
-rw-r--r--lib/portage/util/futures/meson.build16
-rw-r--r--lib/portage/util/futures/retry.py2
-rw-r--r--lib/portage/util/futures/unix_events.py5
12 files changed, 109 insertions, 189 deletions
diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py
index ccf800c66..e377a9cdd 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2018-2021 Gentoo Authors
+# Copyright 2018-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
__all__ = (
@@ -9,18 +9,22 @@ __all__ = (
"CancelledError",
"Future",
"InvalidStateError",
+ "Lock",
"TimeoutError",
"get_child_watcher",
"get_event_loop",
"set_child_watcher",
"get_event_loop_policy",
"set_event_loop_policy",
+ "run",
+ "shield",
"sleep",
"Task",
"wait",
+ "wait_for",
)
-import subprocess
+import sys
import types
import weakref
@@ -34,20 +38,20 @@ from asyncio import (
FIRST_EXCEPTION,
Future,
InvalidStateError,
+ iscoroutinefunction,
+ Lock as _Lock,
+ shield,
TimeoutError,
+ wait_for,
)
-try:
- import threading
-except ImportError:
- import dummy_threading as threading
+import threading
import portage
portage.proxy.lazyimport.lazyimport(
globals(),
"portage.util.futures.unix_events:_PortageEventLoopPolicy",
- "portage.util.futures:compat_coroutine@_compat_coroutine",
)
from portage.util._eventloop.asyncio_event_loop import (
AsyncioEventLoop as _AsyncioEventLoop,
@@ -106,6 +110,14 @@ def set_child_watcher(watcher):
return get_event_loop_policy().set_child_watcher(watcher)
+# Emulate run since it's the preferred python API.
+def run(coro):
+ return _safe_loop().run_until_complete(coro)
+
+
+run.__doc__ = _real_asyncio.run.__doc__
+
+
def create_subprocess_exec(*args, **kwargs):
"""
Create a subprocess.
@@ -146,17 +158,18 @@ def wait(futures, loop=None, timeout=None, return_when=ALL_COMPLETED):
return _real_asyncio.wait(futures, timeout=timeout, return_when=return_when)
-def iscoroutinefunction(func):
+class Lock(_Lock):
"""
- Return True if func is a decorated coroutine function,
- supporting both asyncio.coroutine and compat_coroutine since
- their behavior is identical for all practical purposes.
+ Inject loop parameter for python3.9 or less in order to avoid
+ "got Future <Future pending> attached to a different loop" errors.
"""
- if _compat_coroutine._iscoroutinefunction(func):
- return True
- if _real_asyncio.iscoroutinefunction(func):
- return True
- return False
+
+ def __init__(self, **kwargs):
+ if sys.version_info >= (3, 10):
+ kwargs.pop("loop", None)
+ elif "loop" not in kwargs:
+ kwargs["loop"] = _safe_loop()._loop
+ super().__init__(**kwargs)
class Task(Future):
@@ -299,13 +312,37 @@ def _safe_loop():
def _get_running_loop():
+ """
+ This calls the real asyncio get_running_loop() and wraps that with
+ portage's internal AsyncioEventLoop wrapper. If there is no running
+ asyncio event loop but portage has a reference to another running
+ loop in this thread, then use that instead.
+
+ This behavior enables portage internals to use the real asyncio.run
+ while remaining compatible with internal code that does not use the
+ real asyncio.run.
+ """
+ try:
+ _loop = _real_asyncio.get_running_loop()
+ except RuntimeError:
+ _loop = None
+
with _thread_weakrefs.lock:
if _thread_weakrefs.pid == portage.getpid():
try:
loop = _thread_weakrefs.loops[threading.get_ident()]
except KeyError:
- return None
- return loop if loop.is_running() else None
+ pass
+ else:
+ if _loop is loop._loop:
+ return loop
+ elif _loop is None:
+ return loop if loop.is_running() else None
+
+ # If _loop it not None here it means it was probably a temporary
+ # loop created by asyncio.run, so we don't try to cache it, and
+ # just return a temporary wrapper.
+ return None if _loop is None else _AsyncioEventLoop(loop=_loop)
def _thread_weakrefs_atexit():
diff --git a/lib/portage/util/futures/_asyncio/meson.build b/lib/portage/util/futures/_asyncio/meson.build
new file mode 100644
index 000000000..fa0bc4a86
--- /dev/null
+++ b/lib/portage/util/futures/_asyncio/meson.build
@@ -0,0 +1,8 @@
+py.install_sources(
+ [
+ 'streams.py',
+ '__init__.py',
+ ],
+ subdir : 'portage/util/futures/_asyncio',
+ pure : not native_extensions
+)
diff --git a/lib/portage/util/futures/_asyncio/streams.py b/lib/portage/util/futures/_asyncio/streams.py
index 95a4244a6..6b902975c 100644
--- a/lib/portage/util/futures/_asyncio/streams.py
+++ b/lib/portage/util/futures/_asyncio/streams.py
@@ -76,7 +76,7 @@ async def _writer(output_file, content, loop=DeprecationWarning):
while content:
try:
content = content[os.write(fd, content) :]
- except EnvironmentError as e:
+ except OSError as e:
if e.errno != errno.EAGAIN:
raise
waiter = loop.create_future()
diff --git a/lib/portage/util/futures/_sync_decorator.py b/lib/portage/util/futures/_sync_decorator.py
index d4df26a9b..436e7c346 100644
--- a/lib/portage/util/futures/_sync_decorator.py
+++ b/lib/portage/util/futures/_sync_decorator.py
@@ -39,14 +39,15 @@ def _sync_methods(obj, loop=None):
loop = asyncio._wrap_loop(loop)
return _ObjectAttrWrapper(
obj,
- lambda attr: _sync_decorator(attr, loop=loop)
- if asyncio.iscoroutinefunction(attr)
- else attr,
+ lambda attr: (
+ _sync_decorator(attr, loop=loop)
+ if asyncio.iscoroutinefunction(attr)
+ else attr
+ ),
)
class _ObjectAttrWrapper(portage.proxy.objectproxy.ObjectProxy):
-
__slots__ = ("_obj", "_attr_wrapper")
def __init__(self, obj, attr_wrapper):
diff --git a/lib/portage/util/futures/compat_coroutine.py b/lib/portage/util/futures/compat_coroutine.py
deleted file mode 100644
index c7e436343..000000000
--- a/lib/portage/util/futures/compat_coroutine.py
+++ /dev/null
@@ -1,141 +0,0 @@
-# Copyright 2018-2021 Gentoo Foundation
-# Distributed under the terms of the GNU General Public License v2
-
-import functools
-
-import portage
-
-portage.proxy.lazyimport.lazyimport(
- globals(),
- "portage.util.futures:asyncio",
-)
-
-# A marker for iscoroutinefunction.
-_is_coroutine = object()
-
-
-def _iscoroutinefunction(func):
- """
- Return True if func is a decorated coroutine function
- created with the coroutine decorator for this module.
- """
- return getattr(func, "_is_coroutine", None) is _is_coroutine
-
-
-def coroutine(generator_func):
- """
- A decorator for a generator function that behaves as coroutine function.
- The generator should yield a Future instance in order to wait for it,
- and the result becomes the result of the current yield-expression,
- via the PEP 342 generator send() method.
-
- The decorated function returns a Future which is done when the generator
- is exhausted. The generator can return a value via the coroutine_return
- function.
-
- @param generator_func: A generator function that yields Futures, and
- will receive the result of each Future as the result of the
- corresponding yield-expression.
- @type generator_func: function
- @rtype: function
- @return: A function which calls the given generator function and
- returns a Future that is done when the generator is exhausted.
- """
- # Note that functools.partial does not work for decoration of
- # methods, since it doesn't implement the descriptor protocol.
- # This problem is solve by defining a wrapper function.
- @functools.wraps(generator_func)
- def wrapped(*args, **kwargs):
- return _generator_future(generator_func, *args, **kwargs)
-
- wrapped._is_coroutine = _is_coroutine
- return wrapped
-
-
-def coroutine_return(result=None):
- """
- Terminate the current coroutine and set the result of the associated
- Future.
-
- @param result: of the current coroutine's Future
- @type object
- """
- raise _CoroutineReturnValue(result)
-
-
-def _generator_future(generator_func, *args, **kwargs):
- """
- Call generator_func with the given arguments, and return a Future
- that is done when the resulting generation is exhausted. If a
- keyword argument named 'loop' is given, then it is used instead of
- the default event loop.
- """
- loop = kwargs.get("loop")
- loop = asyncio._wrap_loop(loop)
- result = loop.create_future()
- _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop)
- return result
-
-
-class _CoroutineReturnValue(Exception):
- def __init__(self, result):
- self.result = result
-
-
-class _GeneratorTask:
- """
- Asynchronously executes the generator to completion, waiting for
- the result of each Future that it yields, and sending the result
- to the generator.
- """
-
- def __init__(self, generator, result, loop):
- self._generator = generator
- self._result = result
- self._current_task = None
- self._loop = loop
- result.add_done_callback(self._cancel_callback)
- loop.call_soon(self._next)
-
- def _cancel_callback(self, result):
- if result.cancelled() and self._current_task is not None:
- # The done callback for self._current_task invokes
- # _next in either case here.
- self._current_task.done() or self._current_task.cancel()
-
- def _next(self, previous=None):
- self._current_task = None
- if self._result.cancelled():
- if previous is not None:
- # Consume exceptions, in order to avoid triggering
- # the event loop's exception handler.
- previous.cancelled() or previous.exception()
-
- # This will throw asyncio.CancelledError in the coroutine if
- # there's an opportunity (yield) before the generator raises
- # StopIteration.
- previous = self._result
- try:
- if previous is None:
- future = next(self._generator)
- elif previous.cancelled():
- future = self._generator.throw(asyncio.CancelledError())
- elif previous.exception() is None:
- future = self._generator.send(previous.result())
- else:
- future = self._generator.throw(previous.exception())
-
- except asyncio.CancelledError:
- self._result.cancel()
- except _CoroutineReturnValue as e:
- if not self._result.cancelled():
- self._result.set_result(e.result)
- except StopIteration:
- if not self._result.cancelled():
- self._result.set_result(None)
- except Exception as e:
- if not self._result.cancelled():
- self._result.set_exception(e)
- else:
- self._current_task = asyncio.ensure_future(future, loop=self._loop)
- self._current_task.add_done_callback(self._next)
diff --git a/lib/portage/util/futures/executor/fork.py b/lib/portage/util/futures/executor/fork.py
index 0c3342944..1e3d01072 100644
--- a/lib/portage/util/futures/executor/fork.py
+++ b/lib/portage/util/futures/executor/fork.py
@@ -1,4 +1,4 @@
-# Copyright 2018 Gentoo Foundation
+# Copyright 2018-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
__all__ = ("ForkExecutor",)
@@ -6,7 +6,6 @@ __all__ = ("ForkExecutor",)
import collections
import functools
import os
-import sys
import traceback
from portage.util._async.AsyncFunction import AsyncFunction
@@ -41,7 +40,9 @@ class ForkExecutor:
"""
future = self._loop.create_future()
proc = AsyncFunction(
- target=functools.partial(self._guarded_fn_call, fn, args, kwargs)
+ target=functools.partial(self._guarded_fn_call, fn, args, kwargs),
+ # Directly inherit stdio streams and run in the foreground with no log.
+ create_pipe=False,
)
self._submit_queue.append((future, proc))
self._schedule()
@@ -92,9 +93,7 @@ class ForkExecutor:
# distinguish between kill and crash
future.set_exception(
Exception(
- "pid {} crashed or killed, exitcode {}".format(
- proc.pid, proc.returncode
- )
+ f"pid {proc.pid} crashed or killed, exitcode {proc.returncode}"
)
)
@@ -123,7 +122,7 @@ class _ExceptionWithTraceback:
tb = traceback.format_exception(type(exc), exc, exc.__traceback__)
tb = "".join(tb)
self.exc = exc
- self.tb = '\n"""\n%s"""' % tb
+ self.tb = f'\n"""\n{tb}"""'
def __reduce__(self):
return _rebuild_exc, (self.exc, self.tb)
@@ -140,9 +139,3 @@ class _RemoteTraceback(Exception):
def _rebuild_exc(exc, tb):
exc.__cause__ = _RemoteTraceback(tb)
return exc
-
-
-if sys.version_info < (3,):
- # Python 2 does not support exception chaining, so
- # don't bother to preserve the traceback.
- _ExceptionWithTraceback = lambda exc: exc
diff --git a/lib/portage/util/futures/executor/meson.build b/lib/portage/util/futures/executor/meson.build
new file mode 100644
index 000000000..fdd7c06f9
--- /dev/null
+++ b/lib/portage/util/futures/executor/meson.build
@@ -0,0 +1,8 @@
+py.install_sources(
+ [
+ 'fork.py',
+ '__init__.py',
+ ],
+ subdir : 'portage/util/futures/executor',
+ pure : not native_extensions
+)
diff --git a/lib/portage/util/futures/extendedfutures.py b/lib/portage/util/futures/extendedfutures.py
index c23feafb5..b772698b9 100644
--- a/lib/portage/util/futures/extendedfutures.py
+++ b/lib/portage/util/futures/extendedfutures.py
@@ -37,7 +37,7 @@ class ExtendedFuture(Future):
set.
"""
self.default_result = default_result
- super(ExtendedFuture, self).__init__()
+ super().__init__()
self.set = self.set_result
def set_result(self, data, ignore_InvalidState=False):
@@ -48,14 +48,14 @@ class ExtendedFuture(Future):
"""
if ignore_InvalidState:
try:
- super(ExtendedFuture, self).set_result(data)
+ super().set_result(data)
except InvalidStateError:
pass
else:
- super(ExtendedFuture, self).set_result(data)
+ super().set_result(data)
def get(self, default=UNSET_CONST.result()):
- """Convienience function to wrap result() but adds an optional
+ """Convenience function to wrap result() but adds an optional
default value to return rather than raise an InvalidStateError
@param default: Optional override for the classwide default_result
@@ -77,12 +77,12 @@ class ExtendedFuture(Future):
def exception(self):
try:
- return super(ExtendedFuture, self).exception(timeout=0)
+ return super().exception(timeout=0)
except concurrent.futures.TimeoutError:
raise InvalidStateError
def result(self):
try:
- return super(ExtendedFuture, self).result(timeout=0)
+ return super().result(timeout=0)
except concurrent.futures.TimeoutError:
raise InvalidStateError
diff --git a/lib/portage/util/futures/iter_completed.py b/lib/portage/util/futures/iter_completed.py
index f4b4e5e0b..5ee0b48c7 100644
--- a/lib/portage/util/futures/iter_completed.py
+++ b/lib/portage/util/futures/iter_completed.py
@@ -34,8 +34,7 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
for future_done_set in async_iter_completed(
futures, max_jobs=max_jobs, max_load=max_load, loop=loop
):
- for future in loop.run_until_complete(future_done_set):
- yield future
+ yield from loop.run_until_complete(future_done_set)
def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
diff --git a/lib/portage/util/futures/meson.build b/lib/portage/util/futures/meson.build
new file mode 100644
index 000000000..d561fa312
--- /dev/null
+++ b/lib/portage/util/futures/meson.build
@@ -0,0 +1,16 @@
+py.install_sources(
+ [
+ 'extendedfutures.py',
+ 'futures.py',
+ 'iter_completed.py',
+ 'retry.py',
+ 'unix_events.py',
+ '_sync_decorator.py',
+ '__init__.py',
+ ],
+ subdir : 'portage/util/futures',
+ pure : not native_extensions
+)
+
+subdir('executor')
+subdir('_asyncio')
diff --git a/lib/portage/util/futures/retry.py b/lib/portage/util/futures/retry.py
index 496bfb562..a8a3ad4fb 100644
--- a/lib/portage/util/futures/retry.py
+++ b/lib/portage/util/futures/retry.py
@@ -84,7 +84,7 @@ def _retry(
reraise,
func,
*args,
- **kwargs
+ **kwargs,
):
"""
Retry coroutine, used to implement retry decorator.
diff --git a/lib/portage/util/futures/unix_events.py b/lib/portage/util/futures/unix_events.py
index f2f01f0c6..374497010 100644
--- a/lib/portage/util/futures/unix_events.py
+++ b/lib/portage/util/futures/unix_events.py
@@ -23,7 +23,6 @@ if hasattr(os, "set_blocking"):
def _set_nonblocking(fd):
os.set_blocking(fd, False)
-
else:
def _set_nonblocking(fd):
@@ -71,11 +70,11 @@ class _AsyncioEventLoopPolicy(_PortageEventLoopPolicy):
def get_event_loop(self):
self._check_recursion()
- return super(_AsyncioEventLoopPolicy, self).get_event_loop()
+ return super().get_event_loop()
def get_child_watcher(self):
self._check_recursion()
- return super(_AsyncioEventLoopPolicy, self).get_child_watcher()
+ return super().get_child_watcher()
DefaultEventLoopPolicy = _AsyncioEventLoopPolicy