aboutsummaryrefslogtreecommitdiff
blob: d9ab2b47aab2d45fd3e6dbd96dcc7288e5fe145e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# Copyright 2010-2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import functools
import io
import multiprocessing
import platform

import fcntl
import portage
from portage import os, _unicode_decode
from portage.package.ebuild._ipc.QueryCommand import QueryCommand
from portage.util._ctypes import load_libc
import portage.elog.messages
from portage.util._async.ForkProcess import ForkProcess
from portage.util import no_color


class MergeProcess(ForkProcess):
    """
    Merge packages in a subprocess, so the Scheduler can run in the main
    thread while files are moved or copied asynchronously.
    """

    __slots__ = (
        "mycat",
        "mypkg",
        "settings",
        "treetype",
        "vartree",
        "blockers",
        "pkgloc",
        "infloc",
        "myebuild",
        "mydbapi",
        "postinst_failure",
        "prev_mtimes",
        "unmerge",
        "_elog_reader_fd",
        "_buf",
        "_counter",
        "_dblink",
        "_elog_keys",
        "_locked_vdb",
        "_mtime_reader",
    )

    def _start(self):
        # Portage should always call setcpv prior to this
        # point, but here we have a fallback as a convenience
        # for external API consumers. It's important that
        # this metadata access happens in the parent process,
        # since closing of file descriptors in the subprocess
        # can prevent access to open database connections such
        # as that used by the sqlite metadata cache module.
        cpv = f"{self.mycat}/{self.mypkg}"
        settings = self.settings
        if cpv != settings.mycpv or "EAPI" not in settings.configdict["pkg"]:
            settings.reload()
            settings.reset()
            settings.setcpv(cpv, mydb=self.mydbapi)

        # This caches the libc library lookup in the current
        # process, so that it's only done once rather than
        # for each child process.
        if platform.system() == "Linux" and "merge-sync" in settings.features:
            load_libc()

        # Inherit stdin by default, so that the pdb SIGUSR1
        # handler is usable for the subprocess.
        if self.fd_pipes is None:
            self.fd_pipes = {}
        else:
            self.fd_pipes = self.fd_pipes.copy()
        self.fd_pipes.setdefault(0, portage._get_stdin().fileno())

        self.log_filter_file = self.settings.get("PORTAGE_LOG_FILTER_FILE_CMD")
        super()._start()

    def _lock_vdb(self):
        """
        Lock the vdb if FEATURES=parallel-install is NOT enabled,
        otherwise do nothing. This is implemented with
        vardbapi.lock(), which supports reentrance by the
        subprocess that we spawn.
        """
        if "parallel-install" not in self.settings.features:
            self.vartree.dbapi.lock()
            self._locked_vdb = True

    def _unlock_vdb(self):
        """
        Unlock the vdb if we hold a lock, otherwise do nothing.
        """
        if self._locked_vdb:
            self.vartree.dbapi.unlock()
            self._locked_vdb = False

    def _elog_output_handler(self):
        output = self._read_buf(self._elog_reader_fd.fileno())
        if output:
            lines = _unicode_decode(output).split("\n")
            if len(lines) == 1:
                self._buf += lines[0]
            else:
                lines[0] = self._buf + lines[0]
                self._buf = lines.pop()
                out = io.StringIO()
                for line in lines:
                    funcname, phase, key, msg = line.split(" ", 3)
                    self._elog_keys.add(key)
                    reporter = getattr(portage.elog.messages, funcname)
                    reporter(msg, phase=phase, key=key, out=out)

        elif output is not None:  # EIO/POLLHUP
            self.scheduler.remove_reader(self._elog_reader_fd.fileno())
            self._elog_reader_fd.close()
            self._elog_reader_fd = None
            return False

    def _mtime_handler(self):
        if self._mtime_reader is not None:
            try:
                mtimes = self._mtime_reader.recv()
            except EOFError:
                self.scheduler.remove_reader(self._mtime_reader.fileno())
                self._mtime_reader.close()
                self._mtime_reader = None
            else:
                if self.prev_mtimes is not None:
                    self.prev_mtimes.clear()
                    self.prev_mtimes.update(mtimes)

    def _spawn(self, args, fd_pipes, **kwargs):
        """
        Extend the superclass _spawn method to perform some pre-fork and
        post-fork actions.
        """

        elog_reader_fd, elog_writer_fd = multiprocessing.Pipe(duplex=False)

        fcntl.fcntl(
            elog_reader_fd.fileno(),
            fcntl.F_SETFL,
            fcntl.fcntl(elog_reader_fd.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK,
        )

        mtime_reader, mtime_writer = multiprocessing.Pipe(duplex=False)
        self.scheduler.add_reader(mtime_reader.fileno(), self._mtime_handler)
        self._mtime_reader = mtime_reader

        blockers = None
        if self.blockers is not None:
            # Query blockers in the main process, since closing
            # of file descriptors in the subprocess can prevent
            # access to open database connections such as that
            # used by the sqlite metadata cache module.
            blockers = self.blockers()
        mylink = portage.dblink(
            self.mycat,
            self.mypkg,
            settings=self.settings,
            treetype=self.treetype,
            vartree=self.vartree,
            blockers=blockers,
            pipe=elog_writer_fd,
            mtime_pipe=mtime_writer,
        )
        self.scheduler.add_reader(elog_reader_fd.fileno(), self._elog_output_handler)

        # If a concurrent emerge process tries to install a package
        # in the same SLOT as this one at the same time, there is an
        # extremely unlikely chance that the COUNTER values will not be
        # ordered correctly unless we lock the vdb here.
        # FEATURES=parallel-install skips this lock in order to
        # improve performance, and the risk is practically negligible.
        self._lock_vdb()
        if not self.unmerge:
            self._counter = self.vartree.dbapi.counter_tick()

        self._dblink = mylink
        self._elog_reader_fd = elog_reader_fd

        # Since the entire QueryCommand._db is not required, only pass
        # in tree types that QueryCommand specifically requires.
        child_db = {}
        parent_db = portage.db if QueryCommand._db is None else QueryCommand._db
        for root in parent_db:
            child_db[root] = {}
            for tree_type in ("vartree", "porttree"):
                child_db[root][tree_type] = parent_db[root][tree_type]

        self.target = functools.partial(
            self._target,
            self._counter,
            self._dblink,
            self.infloc,
            self.mydbapi,
            self.myebuild,
            self.pkgloc,
            self.prev_mtimes,
            self.settings,
            self.unmerge,
            self.vartree.dbapi,
            child_db,
        )

        pids = super()._spawn(args, fd_pipes, **kwargs)
        elog_writer_fd.close()
        mtime_writer.close()
        self._buf = ""
        self._elog_keys = set()
        # Discard messages which will be collected by the subprocess,
        # in order to avoid duplicates (bug #446136).
        portage.elog.messages.collect_messages(key=mylink.mycpv)

        # invalidate relevant vardbapi caches
        if self.vartree.dbapi._categories is not None:
            self.vartree.dbapi._categories = None
        self.vartree.dbapi._pkgs_changed = True
        self.vartree.dbapi._clear_pkg_cache(mylink)

        return pids

    @staticmethod
    def _target(
        counter,
        mylink,
        infloc,
        mydbapi,
        myebuild,
        pkgloc,
        prev_mtimes,
        settings,
        unmerge,
        vardb,
        db,
    ):
        if QueryCommand._db is None:
            # Initialize QueryCommand._db for AbstractEbuildProcess/EbuildIpcDaemon
            # when not using the multiprocessing fork start method.
            QueryCommand._db = db
        portage.output.havecolor = not no_color(settings)
        # Avoid wastful updates of the vdb cache.
        vardb._flush_cache_enabled = False

        # In this subprocess we don't want PORTAGE_BACKGROUND to
        # suppress stdout/stderr output since they are pipes. We
        # also don't want to open PORTAGE_LOG_FILE, since it will
        # already be opened by the parent process, so we set the
        # "subprocess" value for use in conditional logging code
        # involving PORTAGE_LOG_FILE.
        if not unmerge:
            # unmerge phases have separate logs
            if settings.get("PORTAGE_BACKGROUND") == "1":
                settings["PORTAGE_BACKGROUND_UNMERGE"] = "1"
            else:
                settings["PORTAGE_BACKGROUND_UNMERGE"] = "0"
            settings.backup_changes("PORTAGE_BACKGROUND_UNMERGE")
        settings["PORTAGE_BACKGROUND"] = "subprocess"
        settings.backup_changes("PORTAGE_BACKGROUND")

        rval = 1
        if unmerge:
            if not mylink.exists():
                rval = os.EX_OK
            elif mylink.unmerge(ldpath_mtimes=prev_mtimes) == os.EX_OK:
                mylink.lockdb()
                try:
                    mylink.delete()
                finally:
                    mylink.unlockdb()
                rval = os.EX_OK
        else:
            rval = mylink.merge(
                pkgloc,
                infloc,
                myebuild=myebuild,
                mydbapi=mydbapi,
                prev_mtimes=prev_mtimes,
                counter=counter,
            )
        return rval

    def _proc_join_done(self, proc, future):
        """
        Extend _proc_join_done to react to RETURNCODE_POSTINST_FAILURE.
        """
        if (
            not future.cancelled()
            and proc.exitcode == portage.const.RETURNCODE_POSTINST_FAILURE
        ):
            self.postinst_failure = True
            self.returncode = os.EX_OK
        super()._proc_join_done(proc, future)

    def _unregister(self):
        """
        Unregister from the scheduler and close open files.
        """

        if not self.unmerge:
            # Populate the vardbapi cache for the new package
            # while its inodes are still hot.
            try:
                self.vartree.dbapi.aux_get(self.settings.mycpv, ["EAPI"])
            except KeyError:
                pass

        self._unlock_vdb()
        if self._elog_reader_fd is not None:
            self.scheduler.remove_reader(self._elog_reader_fd.fileno())
            self._elog_reader_fd.close()
            self._elog_reader_fd = None
        if self._elog_keys is not None:
            for key in self._elog_keys:
                portage.elog.elog_process(
                    key, self.settings, phasefilter=("prerm", "postrm")
                )
            self._elog_keys = None

        super()._unregister()