aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/portage/util/futures/_asyncio')
-rw-r--r--lib/portage/util/futures/_asyncio/__init__.py73
-rw-r--r--lib/portage/util/futures/_asyncio/meson.build8
-rw-r--r--lib/portage/util/futures/_asyncio/streams.py2
3 files changed, 64 insertions, 19 deletions
diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py
index ccf800c66..e377a9cdd 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2018-2021 Gentoo Authors
+# Copyright 2018-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
__all__ = (
@@ -9,18 +9,22 @@ __all__ = (
"CancelledError",
"Future",
"InvalidStateError",
+ "Lock",
"TimeoutError",
"get_child_watcher",
"get_event_loop",
"set_child_watcher",
"get_event_loop_policy",
"set_event_loop_policy",
+ "run",
+ "shield",
"sleep",
"Task",
"wait",
+ "wait_for",
)
-import subprocess
+import sys
import types
import weakref
@@ -34,20 +38,20 @@ from asyncio import (
FIRST_EXCEPTION,
Future,
InvalidStateError,
+ iscoroutinefunction,
+ Lock as _Lock,
+ shield,
TimeoutError,
+ wait_for,
)
-try:
- import threading
-except ImportError:
- import dummy_threading as threading
+import threading
import portage
portage.proxy.lazyimport.lazyimport(
globals(),
"portage.util.futures.unix_events:_PortageEventLoopPolicy",
- "portage.util.futures:compat_coroutine@_compat_coroutine",
)
from portage.util._eventloop.asyncio_event_loop import (
AsyncioEventLoop as _AsyncioEventLoop,
@@ -106,6 +110,14 @@ def set_child_watcher(watcher):
return get_event_loop_policy().set_child_watcher(watcher)
+# Emulate run since it's the preferred python API.
+def run(coro):
+ return _safe_loop().run_until_complete(coro)
+
+
+run.__doc__ = _real_asyncio.run.__doc__
+
+
def create_subprocess_exec(*args, **kwargs):
"""
Create a subprocess.
@@ -146,17 +158,18 @@ def wait(futures, loop=None, timeout=None, return_when=ALL_COMPLETED):
return _real_asyncio.wait(futures, timeout=timeout, return_when=return_when)
-def iscoroutinefunction(func):
+class Lock(_Lock):
"""
- Return True if func is a decorated coroutine function,
- supporting both asyncio.coroutine and compat_coroutine since
- their behavior is identical for all practical purposes.
+ Inject loop parameter for python3.9 or less in order to avoid
+ "got Future <Future pending> attached to a different loop" errors.
"""
- if _compat_coroutine._iscoroutinefunction(func):
- return True
- if _real_asyncio.iscoroutinefunction(func):
- return True
- return False
+
+ def __init__(self, **kwargs):
+ if sys.version_info >= (3, 10):
+ kwargs.pop("loop", None)
+ elif "loop" not in kwargs:
+ kwargs["loop"] = _safe_loop()._loop
+ super().__init__(**kwargs)
class Task(Future):
@@ -299,13 +312,37 @@ def _safe_loop():
def _get_running_loop():
+ """
+ This calls the real asyncio get_running_loop() and wraps that with
+ portage's internal AsyncioEventLoop wrapper. If there is no running
+ asyncio event loop but portage has a reference to another running
+ loop in this thread, then use that instead.
+
+ This behavior enables portage internals to use the real asyncio.run
+ while remaining compatible with internal code that does not use the
+ real asyncio.run.
+ """
+ try:
+ _loop = _real_asyncio.get_running_loop()
+ except RuntimeError:
+ _loop = None
+
with _thread_weakrefs.lock:
if _thread_weakrefs.pid == portage.getpid():
try:
loop = _thread_weakrefs.loops[threading.get_ident()]
except KeyError:
- return None
- return loop if loop.is_running() else None
+ pass
+ else:
+ if _loop is loop._loop:
+ return loop
+ elif _loop is None:
+ return loop if loop.is_running() else None
+
+ # If _loop it not None here it means it was probably a temporary
+ # loop created by asyncio.run, so we don't try to cache it, and
+ # just return a temporary wrapper.
+ return None if _loop is None else _AsyncioEventLoop(loop=_loop)
def _thread_weakrefs_atexit():
diff --git a/lib/portage/util/futures/_asyncio/meson.build b/lib/portage/util/futures/_asyncio/meson.build
new file mode 100644
index 000000000..fa0bc4a86
--- /dev/null
+++ b/lib/portage/util/futures/_asyncio/meson.build
@@ -0,0 +1,8 @@
+py.install_sources(
+ [
+ 'streams.py',
+ '__init__.py',
+ ],
+ subdir : 'portage/util/futures/_asyncio',
+ pure : not native_extensions
+)
diff --git a/lib/portage/util/futures/_asyncio/streams.py b/lib/portage/util/futures/_asyncio/streams.py
index 95a4244a6..6b902975c 100644
--- a/lib/portage/util/futures/_asyncio/streams.py
+++ b/lib/portage/util/futures/_asyncio/streams.py
@@ -76,7 +76,7 @@ async def _writer(output_file, content, loop=DeprecationWarning):
while content:
try:
content = content[os.write(fd, content) :]
- except EnvironmentError as e:
+ except OSError as e:
if e.errno != errno.EAGAIN:
raise
waiter = loop.create_future()