aboutsummaryrefslogtreecommitdiff
blob: e70238705ca96ca27255729f516820322cfbc823 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# Copyright 2012-2021 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import fcntl
import functools
import multiprocessing
import signal
import sys

import portage
from portage import os
from portage.util.futures import asyncio
from _emerge.SpawnProcess import SpawnProcess


class ForkProcess(SpawnProcess):

    __slots__ = ("_proc", "_proc_join_task")

    # Number of seconds between poll attempts for process exit status
    # (after the sentinel has become ready).
    _proc_join_interval = 0.1

    def _spawn(self, args, fd_pipes=None, **kwargs):
        """
        Override SpawnProcess._spawn to fork a subprocess that calls
        self._run(). This uses multiprocessing.Process in order to leverage
        any pre-fork and post-fork interpreter housekeeping that it provides,
        promoting a healthy state for the forked interpreter.
        """
        # Since multiprocessing.Process closes sys.__stdin__, create a
        # temporary duplicate of fd_pipes[0] so that sys.__stdin__ can
        # be restored in the subprocess, in case this is needed for
        # things like PROPERTIES=interactive support.
        stdin_dup = None
        try:
            stdin_fd = fd_pipes.get(0)
            if stdin_fd is not None and stdin_fd == portage._get_stdin().fileno():
                stdin_dup = os.dup(stdin_fd)
                fcntl.fcntl(
                    stdin_dup, fcntl.F_SETFD, fcntl.fcntl(stdin_fd, fcntl.F_GETFD)
                )
                fd_pipes[0] = stdin_dup
            self._proc = multiprocessing.Process(
                target=self._bootstrap, args=(fd_pipes,)
            )
            self._proc.start()
        finally:
            if stdin_dup is not None:
                os.close(stdin_dup)

        self._proc_join_task = asyncio.ensure_future(
            self._proc_join(self._proc, loop=self.scheduler), loop=self.scheduler
        )
        self._proc_join_task.add_done_callback(
            functools.partial(self._proc_join_done, self._proc)
        )

        return [self._proc.pid]

    def _cancel(self):
        if self._proc is None:
            super(ForkProcess, self)._cancel()
        else:
            self._proc.terminate()

    def _async_wait(self):
        if self._proc_join_task is None:
            super(ForkProcess, self)._async_wait()

    def _async_waitpid(self):
        if self._proc_join_task is None:
            super(ForkProcess, self)._async_waitpid()

    async def _proc_join(self, proc, loop=None):
        sentinel_reader = self.scheduler.create_future()
        self.scheduler.add_reader(
            proc.sentinel,
            lambda: sentinel_reader.done() or sentinel_reader.set_result(None),
        )
        try:
            await sentinel_reader
        finally:
            # If multiprocessing.Process supports the close method, then
            # access to proc.sentinel will raise ValueError if the
            # sentinel has been closed. In this case it's not safe to call
            # remove_reader, since the file descriptor may have been closed
            # and then reallocated to a concurrent coroutine. When the
            # close method is not supported, proc.sentinel remains open
            # until proc's finalizer is called.
            try:
                self.scheduler.remove_reader(proc.sentinel)
            except ValueError:
                pass

        # Now that proc.sentinel is ready, poll until process exit
        # status has become available.
        while True:
            proc.join(0)
            if proc.exitcode is not None:
                break
            await asyncio.sleep(self._proc_join_interval, loop=loop)

    def _proc_join_done(self, proc, future):
        future.cancelled() or future.result()
        self._was_cancelled()
        if self.returncode is None:
            self.returncode = proc.exitcode

        self._proc = None
        if hasattr(proc, "close"):
            proc.close()
        self._proc_join_task = None
        self._async_wait()

    def _unregister(self):
        super(ForkProcess, self)._unregister()
        if self._proc is not None:
            if self._proc.is_alive():
                self._proc.terminate()
            self._proc = None
        if self._proc_join_task is not None:
            self._proc_join_task.cancel()
            self._proc_join_task = None

    def _bootstrap(self, fd_pipes):
        # Use default signal handlers in order to avoid problems
        # killing subprocesses as reported in bug #353239.
        signal.signal(signal.SIGINT, signal.SIG_DFL)
        signal.signal(signal.SIGTERM, signal.SIG_DFL)

        # Unregister SIGCHLD handler and wakeup_fd for the parent
        # process's event loop (bug 655656).
        signal.signal(signal.SIGCHLD, signal.SIG_DFL)
        try:
            wakeup_fd = signal.set_wakeup_fd(-1)
            if wakeup_fd > 0:
                os.close(wakeup_fd)
        except (ValueError, OSError):
            pass

        portage.locks._close_fds()
        # We don't exec, so use close_fds=False
        # (see _setup_pipes docstring).
        portage.process._setup_pipes(fd_pipes, close_fds=False)

        # Since multiprocessing.Process closes sys.__stdin__ and
        # makes sys.stdin refer to os.devnull, restore it when
        # appropriate.
        if 0 in fd_pipes:
            # It's possible that sys.stdin.fileno() is already 0,
            # and in that case the above _setup_pipes call will
            # have already updated its identity via dup2. Otherwise,
            # perform the dup2 call now, and also copy the file
            # descriptor flags.
            if sys.stdin.fileno() != 0:
                os.dup2(0, sys.stdin.fileno())
                fcntl.fcntl(
                    sys.stdin.fileno(), fcntl.F_SETFD, fcntl.fcntl(0, fcntl.F_GETFD)
                )
            sys.__stdin__ = sys.stdin

        sys.exit(self._run())

    def _run(self):
        raise NotImplementedError(self)