# Copyright 2010-2020 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import io import platform import fcntl import portage from portage import os, _unicode_decode from portage.util._ctypes import find_library import portage.elog.messages from portage.util._async.ForkProcess import ForkProcess class MergeProcess(ForkProcess): """ Merge packages in a subprocess, so the Scheduler can run in the main thread while files are moved or copied asynchronously. """ __slots__ = ( "mycat", "mypkg", "settings", "treetype", "vartree", "blockers", "pkgloc", "infloc", "myebuild", "mydbapi", "postinst_failure", "prev_mtimes", "unmerge", "_elog_reader_fd", "_buf", "_counter", "_dblink", "_elog_keys", "_locked_vdb", ) def _start(self): # Portage should always call setcpv prior to this # point, but here we have a fallback as a convenience # for external API consumers. It's important that # this metadata access happens in the parent process, # since closing of file descriptors in the subprocess # can prevent access to open database connections such # as that used by the sqlite metadata cache module. cpv = "%s/%s" % (self.mycat, self.mypkg) settings = self.settings if cpv != settings.mycpv or "EAPI" not in settings.configdict["pkg"]: settings.reload() settings.reset() settings.setcpv(cpv, mydb=self.mydbapi) # This caches the libc library lookup in the current # process, so that it's only done once rather than # for each child process. if platform.system() == "Linux" and "merge-sync" in settings.features: find_library("c") # Inherit stdin by default, so that the pdb SIGUSR1 # handler is usable for the subprocess. if self.fd_pipes is None: self.fd_pipes = {} else: self.fd_pipes = self.fd_pipes.copy() self.fd_pipes.setdefault(0, portage._get_stdin().fileno()) self.log_filter_file = self.settings.get("PORTAGE_LOG_FILTER_FILE_CMD") super(MergeProcess, self)._start() def _lock_vdb(self): """ Lock the vdb if FEATURES=parallel-install is NOT enabled, otherwise do nothing. This is implemented with vardbapi.lock(), which supports reentrance by the subprocess that we spawn. """ if "parallel-install" not in self.settings.features: self.vartree.dbapi.lock() self._locked_vdb = True def _unlock_vdb(self): """ Unlock the vdb if we hold a lock, otherwise do nothing. """ if self._locked_vdb: self.vartree.dbapi.unlock() self._locked_vdb = False def _elog_output_handler(self): output = self._read_buf(self._elog_reader_fd) if output: lines = _unicode_decode(output).split("\n") if len(lines) == 1: self._buf += lines[0] else: lines[0] = self._buf + lines[0] self._buf = lines.pop() out = io.StringIO() for line in lines: funcname, phase, key, msg = line.split(" ", 3) self._elog_keys.add(key) reporter = getattr(portage.elog.messages, funcname) reporter(msg, phase=phase, key=key, out=out) elif output is not None: # EIO/POLLHUP self.scheduler.remove_reader(self._elog_reader_fd) os.close(self._elog_reader_fd) self._elog_reader_fd = None return False def _spawn(self, args, fd_pipes, **kwargs): """ Extend the superclass _spawn method to perform some pre-fork and post-fork actions. """ elog_reader_fd, elog_writer_fd = os.pipe() fcntl.fcntl( elog_reader_fd, fcntl.F_SETFL, fcntl.fcntl(elog_reader_fd, fcntl.F_GETFL) | os.O_NONBLOCK, ) blockers = None if self.blockers is not None: # Query blockers in the main process, since closing # of file descriptors in the subprocess can prevent # access to open database connections such as that # used by the sqlite metadata cache module. blockers = self.blockers() mylink = portage.dblink( self.mycat, self.mypkg, settings=self.settings, treetype=self.treetype, vartree=self.vartree, blockers=blockers, pipe=elog_writer_fd, ) fd_pipes[elog_writer_fd] = elog_writer_fd self.scheduler.add_reader(elog_reader_fd, self._elog_output_handler) # If a concurrent emerge process tries to install a package # in the same SLOT as this one at the same time, there is an # extremely unlikely chance that the COUNTER values will not be # ordered correctly unless we lock the vdb here. # FEATURES=parallel-install skips this lock in order to # improve performance, and the risk is practically negligible. self._lock_vdb() if not self.unmerge: self._counter = self.vartree.dbapi.counter_tick() self._dblink = mylink self._elog_reader_fd = elog_reader_fd pids = super(MergeProcess, self)._spawn(args, fd_pipes, **kwargs) os.close(elog_writer_fd) self._buf = "" self._elog_keys = set() # Discard messages which will be collected by the subprocess, # in order to avoid duplicates (bug #446136). portage.elog.messages.collect_messages(key=mylink.mycpv) # invalidate relevant vardbapi caches if self.vartree.dbapi._categories is not None: self.vartree.dbapi._categories = None self.vartree.dbapi._pkgs_changed = True self.vartree.dbapi._clear_pkg_cache(mylink) return pids def _run(self): os.close(self._elog_reader_fd) counter = self._counter mylink = self._dblink portage.output.havecolor = self.settings.get("NOCOLOR") not in ("yes", "true") # Avoid wastful updates of the vdb cache. self.vartree.dbapi._flush_cache_enabled = False # In this subprocess we don't want PORTAGE_BACKGROUND to # suppress stdout/stderr output since they are pipes. We # also don't want to open PORTAGE_LOG_FILE, since it will # already be opened by the parent process, so we set the # "subprocess" value for use in conditional logging code # involving PORTAGE_LOG_FILE. if not self.unmerge: # unmerge phases have separate logs if self.settings.get("PORTAGE_BACKGROUND") == "1": self.settings["PORTAGE_BACKGROUND_UNMERGE"] = "1" else: self.settings["PORTAGE_BACKGROUND_UNMERGE"] = "0" self.settings.backup_changes("PORTAGE_BACKGROUND_UNMERGE") self.settings["PORTAGE_BACKGROUND"] = "subprocess" self.settings.backup_changes("PORTAGE_BACKGROUND") rval = 1 if self.unmerge: if not mylink.exists(): rval = os.EX_OK elif mylink.unmerge(ldpath_mtimes=self.prev_mtimes) == os.EX_OK: mylink.lockdb() try: mylink.delete() finally: mylink.unlockdb() rval = os.EX_OK else: rval = mylink.merge( self.pkgloc, self.infloc, myebuild=self.myebuild, mydbapi=self.mydbapi, prev_mtimes=self.prev_mtimes, counter=counter, ) return rval def _proc_join_done(self, proc, future): """ Extend _proc_join_done to react to RETURNCODE_POSTINST_FAILURE. """ if ( not future.cancelled() and proc.exitcode == portage.const.RETURNCODE_POSTINST_FAILURE ): self.postinst_failure = True self.returncode = os.EX_OK super(MergeProcess, self)._proc_join_done(proc, future) def _unregister(self): """ Unregister from the scheduler and close open files. """ if not self.unmerge: # Populate the vardbapi cache for the new package # while its inodes are still hot. try: self.vartree.dbapi.aux_get(self.settings.mycpv, ["EAPI"]) except KeyError: pass self._unlock_vdb() if self._elog_reader_fd is not None: self.scheduler.remove_reader(self._elog_reader_fd) os.close(self._elog_reader_fd) self._elog_reader_fd = None if self._elog_keys is not None: for key in self._elog_keys: portage.elog.elog_process( key, self.settings, phasefilter=("prerm", "postrm") ) self._elog_keys = None super(MergeProcess, self)._unregister()