aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2023-10-19 21:11:48 -0700
committerZac Medico <zmedico@gentoo.org>2023-10-21 21:17:48 -0700
commit6abc969109754ab086db2bac5be1029de1a015c3 (patch)
tree0476f7c5faf1f8052001ca750c872b2ca5824a42
parentNEWS: Update for ban of non-PMS helpers (diff)
downloadportage-6abc969109754ab086db2bac5be1029de1a015c3.tar.gz
portage-6abc969109754ab086db2bac5be1029de1a015c3.tar.bz2
portage-6abc969109754ab086db2bac5be1029de1a015c3.zip
ForkProcess: Implement fd_pipes via send_handle
This new fd_pipes implementation is only enabled when the multiprocessing start method is not fork, ensuring backward compatibility with existing ForkProcess callers that rely on the fork start method. Note that the new fd_pipes implementation uses a thread via run_in_executor, and threads are not recommended for mixing with the fork start method due to cpython issue 84559. Bug: https://bugs.gentoo.org/915896 Signed-off-by: Zac Medico <zmedico@gentoo.org>
-rw-r--r--lib/portage/tests/process/test_ForkProcess.py7
-rw-r--r--lib/portage/util/_async/ForkProcess.py142
2 files changed, 124 insertions, 25 deletions
diff --git a/lib/portage/tests/process/test_ForkProcess.py b/lib/portage/tests/process/test_ForkProcess.py
index c07c60e9c..bc0b836f1 100644
--- a/lib/portage/tests/process/test_ForkProcess.py
+++ b/lib/portage/tests/process/test_ForkProcess.py
@@ -4,6 +4,7 @@
import functools
import multiprocessing
import tempfile
+from unittest.mock import patch
from portage import os
from portage.tests import TestCase
@@ -37,3 +38,9 @@ class ForkProcessTestCase(TestCase):
with open(logfile.name, "rb") as output:
self.assertEqual(output.read(), test_string.encode("utf-8"))
+
+ def test_spawn_logfile_no_send_handle(self):
+ with patch(
+ "portage.util._async.ForkProcess.ForkProcess._HAVE_SEND_HANDLE", new=False
+ ):
+ self.test_spawn_logfile()
diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
index 09e40a2d3..6d216a5c4 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -10,6 +10,7 @@ import sys
import portage
from portage import os
+from portage.cache.mappings import slot_dict_class
from portage.util.futures import asyncio
from _emerge.SpawnProcess import SpawnProcess
@@ -26,29 +27,36 @@ class ForkProcess(SpawnProcess):
"_proc_join_task",
)
+ _file_names = ("connection", "slave_fd")
+ _files_dict = slot_dict_class(_file_names, prefix="")
+
# Number of seconds between poll attempts for process exit status
# (after the sentinel has become ready).
_proc_join_interval = 0.1
- def _start(self):
- if self.fd_pipes or self.logfile:
- if self.fd_pipes:
- if multiprocessing.get_start_method() != "fork":
- raise NotImplementedError(
- 'fd_pipes only supported with multiprocessing start method "fork"'
- )
- super()._start()
- return
+ _HAVE_SEND_HANDLE = getattr(multiprocessing.reduction, "HAVE_SEND_HANDLE", False)
- if self.logfile:
- if multiprocessing.get_start_method() == "fork":
- # Use superclass pty support.
- super()._start()
- return
+ def _start(self):
+ if multiprocessing.get_start_method() == "fork":
+ # Backward compatibility mode.
+ super()._start()
+ return
+
+ # This mode supports multiprocessing start methods
+ # other than fork. Note that the fd_pipes implementation
+ # uses a thread via run_in_executor, and threads are not
+ # recommended for mixing with the fork start method due
+ # to cpython issue 84559.
+ if self.fd_pipes and not self._HAVE_SEND_HANDLE:
+ raise NotImplementedError(
+ 'fd_pipes only supported with HAVE_SEND_HANDLE or multiprocessing start method "fork"'
+ )
- # Log via multiprocessing.Pipe if necessary.
- pr, pw = multiprocessing.Pipe(duplex=False)
- self._child_connection = pw
+ if self.fd_pipes or self.logfile:
+ # Log via multiprocessing.Pipe if necessary.
+ connection, self._child_connection = multiprocessing.Pipe(
+ duplex=self._HAVE_SEND_HANDLE
+ )
retval = self._spawn(self.args, fd_pipes=self.fd_pipes)
@@ -59,11 +67,71 @@ class ForkProcess(SpawnProcess):
self._async_waitpid()
else:
self._child_connection.close()
+ self.fd_pipes = self.fd_pipes or {}
stdout_fd = None
if not self.background:
- stdout_fd = os.dup(sys.__stdout__.fileno())
+ self.fd_pipes.setdefault(0, portage._get_stdin().fileno())
+ self.fd_pipes.setdefault(1, sys.__stdout__.fileno())
+ self.fd_pipes.setdefault(2, sys.__stderr__.fileno())
+ stdout_fd = os.dup(self.fd_pipes[1])
+
+ if self._HAVE_SEND_HANDLE:
+ master_fd, slave_fd = self._pipe(self.fd_pipes)
+ self.fd_pipes[1] = slave_fd
+ self.fd_pipes[2] = slave_fd
+ self._files = self._files_dict(connection=connection, slave_fd=slave_fd)
+ else:
+ master_fd = connection
+
+ self._start_main_task(
+ master_fd, log_file_path=self.logfile, stdout_fd=stdout_fd
+ )
- self._start_main_task(pr, log_file_path=self.logfile, stdout_fd=stdout_fd)
+ @property
+ def _fd_pipes_send_handle(self):
+ """Returns True if we have a connection to implement fd_pipes via send_handle."""
+ return bool(
+ self._HAVE_SEND_HANDLE
+ and self._files
+ and getattr(self._files, "connection", False)
+ )
+
+ def _send_fd_pipes(self):
+ """
+ Communicate with _bootstrap to send fd_pipes via send_handle.
+ This performs blocking IO, intended for invocation via run_in_executor.
+ """
+ fd_list = list(set(self.fd_pipes.values()))
+ self._files.connection.send(
+ (self.fd_pipes, fd_list),
+ )
+ for fd in fd_list:
+ multiprocessing.reduction.send_handle(
+ self._files.connection,
+ fd,
+ self.pid,
+ )
+
+ async def _main(self, build_logger, pipe_logger, loop=None):
+ try:
+ if self._fd_pipes_send_handle:
+ await self.scheduler.run_in_executor(
+ None,
+ self._send_fd_pipes,
+ )
+ except asyncio.CancelledError:
+ self._main_cancel(build_logger, pipe_logger)
+ raise
+ finally:
+ if self._files:
+ if hasattr(self._files, "connection"):
+ self._files.connection.close()
+ del self._files.connection
+ if hasattr(self._files, "slave_fd"):
+ os.close(self._files.slave_fd)
+ del self._files.slave_fd
+
+ await super()._main(build_logger, pipe_logger, loop=loop)
def _spawn(self, args, fd_pipes=None, **kwargs):
"""
@@ -102,9 +170,21 @@ class ForkProcess(SpawnProcess):
stdin_dup, fcntl.F_SETFD, fcntl.fcntl(stdin_fd, fcntl.F_GETFD)
)
fd_pipes[0] = stdin_dup
+
+ if self._fd_pipes_send_handle:
+ # Handle fd_pipes in _main instead.
+ fd_pipes = None
+
self._proc = multiprocessing.Process(
target=self._bootstrap,
- args=(self._child_connection, fd_pipes, target, args, kwargs),
+ args=(
+ self._child_connection,
+ self._HAVE_SEND_HANDLE,
+ fd_pipes,
+ target,
+ args,
+ kwargs,
+ ),
)
self._proc.start()
finally:
@@ -186,7 +266,7 @@ class ForkProcess(SpawnProcess):
self._proc_join_task = None
@staticmethod
- def _bootstrap(child_connection, fd_pipes, target, args, kwargs):
+ def _bootstrap(child_connection, have_send_handle, fd_pipes, target, args, kwargs):
# Use default signal handlers in order to avoid problems
# killing subprocesses as reported in bug #353239.
signal.signal(signal.SIGINT, signal.SIG_DFL)
@@ -205,10 +285,22 @@ class ForkProcess(SpawnProcess):
portage.locks._close_fds()
if child_connection is not None:
- fd_pipes = fd_pipes or {}
- fd_pipes[sys.stdout.fileno()] = child_connection.fileno()
- fd_pipes[sys.stderr.fileno()] = child_connection.fileno()
- fd_pipes[child_connection.fileno()] = child_connection.fileno()
+ if have_send_handle:
+ fd_pipes, fd_list = child_connection.recv()
+ fd_pipes_map = {}
+ for fd in fd_list:
+ fd_pipes_map[fd] = multiprocessing.reduction.recv_handle(
+ child_connection
+ )
+ child_connection.close()
+ for k, v in list(fd_pipes.items()):
+ fd_pipes[k] = fd_pipes_map[v]
+
+ else:
+ fd_pipes = fd_pipes or {}
+ fd_pipes[sys.stdout.fileno()] = child_connection.fileno()
+ fd_pipes[sys.stderr.fileno()] = child_connection.fileno()
+ fd_pipes[child_connection.fileno()] = child_connection.fileno()
if fd_pipes:
# We don't exec, so use close_fds=False