diff options
Diffstat (limited to 'lib/_emerge/Scheduler.py')
-rw-r--r-- | lib/_emerge/Scheduler.py | 276 |
1 files changed, 167 insertions, 109 deletions
diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py index 15d646c4f..614df9e78 100644 --- a/lib/_emerge/Scheduler.py +++ b/lib/_emerge/Scheduler.py @@ -1,4 +1,4 @@ -# Copyright 1999-2021 Gentoo Authors +# Copyright 1999-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 from collections import deque @@ -64,7 +64,6 @@ FAILURE = 1 class Scheduler(PollScheduler): - # max time between loadavg checks (seconds) _loadavg_latency = 30 @@ -326,7 +325,7 @@ class Scheduler(PollScheduler): # clear out existing fetch log if it exists try: open(self._fetch_log, "w").close() - except EnvironmentError: + except OSError: pass self._running_portage = None @@ -340,7 +339,6 @@ class Scheduler(PollScheduler): ) def _handle_self_update(self): - if self._opts_no_self_update.intersection(self.myopts): return os.EX_OK @@ -455,7 +453,7 @@ class Scheduler(PollScheduler): msg.append(pkg_str) msg.append("") writemsg_level( - "".join("%s\n" % (l,) for l in msg), + "".join(f"{l}\n" for l in msg), level=logging.INFO, noiselevel=-1, ) @@ -494,7 +492,6 @@ class Scheduler(PollScheduler): return interactive_tasks def _set_graph_config(self, graph_config): - if graph_config is None: self._graph_config = None self._pkg_cache = {} @@ -728,11 +725,11 @@ class Scheduler(PollScheduler): portdb = x.root_config.trees["porttree"].dbapi ebuild_path = portdb.findname(x.cpv, myrepo=x.repo) if ebuild_path is None: - raise AssertionError("ebuild not found for '%s'" % x.cpv) + raise AssertionError(f"ebuild not found for '{x.cpv}'") pkgsettings["O"] = os.path.dirname(ebuild_path) if not digestgen(mysettings=pkgsettings, myportdb=portdb): writemsg_level( - "!!! Unable to generate manifest for '%s'.\n" % x.cpv, + f"!!! Unable to generate manifest for '{x.cpv}'.\n", level=logging.ERROR, noiselevel=-1, ) @@ -777,7 +774,7 @@ class Scheduler(PollScheduler): quiet_config = quiet_settings[root_config.root] ebuild_path = portdb.findname(x.cpv, myrepo=x.repo) if ebuild_path is None: - raise AssertionError("ebuild not found for '%s'" % x.cpv) + raise AssertionError(f"ebuild not found for '{x.cpv}'") quiet_config["O"] = os.path.dirname(ebuild_path) if not digestcheck([], quiet_config, strict=True): failures |= 1 @@ -787,12 +784,10 @@ class Scheduler(PollScheduler): return os.EX_OK def _add_prefetchers(self): - if not self._parallel_fetch: return if self._parallel_fetch: - prefetchers = self._prefetchers for pkg in self._mergelist: @@ -819,7 +814,6 @@ class Scheduler(PollScheduler): pass elif pkg.type_name == "ebuild": - prefetcher = EbuildFetcher( background=True, config_pool=self._ConfigPool( @@ -838,7 +832,6 @@ class Scheduler(PollScheduler): and "--getbinpkg" in self.myopts and pkg.root_config.trees["bintree"].isremote(pkg.cpv) ): - prefetcher = BinpkgPrefetcher( background=True, pkg=pkg, scheduler=self._sched_iface ) @@ -872,12 +865,13 @@ class Scheduler(PollScheduler): if self._terminated_tasks: raise asyncio.CancelledError - out_str = "Running pre-merge checks for " + colorize("INFORM", x.cpv) - self._status_msg(out_str) - root_config = x.root_config settings = self._allocate_config(root_config.root) settings.setcpv(x) + + color = "PKG_BINARY_MERGE" if x.built else "INFORM" + self._status_msg(f"Running pre-merge checks for {colorize(color, x.cpv)}") + if not x.built: # Get required SRC_URI metadata (it's not cached in x.metadata # because some packages have an extremely large SRC_URI value). @@ -908,7 +902,6 @@ class Scheduler(PollScheduler): current_task = None try: - # Clean up the existing build dir, in case pkg_pretend # checks for available space (bug #390711). if existing_builddir: @@ -921,7 +914,7 @@ class Scheduler(PollScheduler): portdb = root_config.trees["porttree"].dbapi ebuild_path = portdb.findname(x.cpv, myrepo=x.repo) if ebuild_path is None: - raise AssertionError("ebuild not found for '%s'" % x.cpv) + raise AssertionError(f"ebuild not found for '{x.cpv}'") portage.package.ebuild.doebuild.doebuild_environment( ebuild_path, "clean", @@ -937,6 +930,7 @@ class Scheduler(PollScheduler): current_task = clean_phase clean_phase.start() await clean_phase.async_wait() + current_task = None if x.built: tree = "bintree" @@ -947,6 +941,10 @@ class Scheduler(PollScheduler): # is consuming time here. if bintree.isremote(x.cpv): fetcher = self._get_prefetcher(x) + if fetcher is not None and not fetcher.isAlive(): + # Cancel it because it hasn't started yet. + fetcher.cancel() + fetcher = None if fetcher is None: fetcher = BinpkgFetcher(pkg=x, scheduler=loop) fetcher.start() @@ -955,6 +953,16 @@ class Scheduler(PollScheduler): # handles fetch, verification, and the # bintree.inject call which moves the file. fetched = fetcher.pkg_path + else: + msg = ( + "Fetching in the background:", + fetcher.pkg_path, + "To view fetch progress, run in another terminal:", + f"tail -f {self._fetch_log}", + ) + out = portage.output.EOutput() + for l in msg: + out.einfo(l) if await fetcher.async_wait() != os.EX_OK: failures += 1 self._record_pkg_failure(x, settings, fetcher.returncode) @@ -974,12 +982,34 @@ class Scheduler(PollScheduler): self._record_pkg_failure(x, settings, verifier.returncode) continue + current_task = None if fetched: - bintree.inject(x.cpv, filename=fetched) + if not bintree.inject( + x.cpv, + current_pkg_path=fetched, + allocated_pkg_path=fetcher.pkg_allocated_path, + ): + eerror( + "Binary package is not usable", + phase="pretend", + key=x.cpv, + ) + failures += 1 + self._record_pkg_failure(x, settings, 1) + continue infloc = os.path.join(build_dir_path, "build-info") ensure_dirs(infloc) - await bintree.dbapi.unpack_metadata(settings, infloc, loop=loop) + try: + await bintree.dbapi.unpack_metadata(settings, infloc, loop=loop) + except portage.exception.SignatureException as e: + writemsg( + f"!!! Invalid binary package: '{bintree.getname(x.cpv)}', {e}\n", + noiselevel=-1, + ) + failures += 1 + self._record_pkg_failure(x, settings, 1) + continue ebuild_path = os.path.join(infloc, x.pf + ".ebuild") settings.configdict["pkg"]["EMERGE_FROM"] = "binary" settings.configdict["pkg"]["MERGE_TYPE"] = "binary" @@ -989,7 +1019,7 @@ class Scheduler(PollScheduler): portdb = root_config.trees["porttree"].dbapi ebuild_path = portdb.findname(x.cpv, myrepo=x.repo) if ebuild_path is None: - raise AssertionError("ebuild not found for '%s'" % x.cpv) + raise AssertionError(f"ebuild not found for '{x.cpv}'") settings.configdict["pkg"]["EMERGE_FROM"] = "ebuild" if self._build_opts.buildpkgonly: settings.configdict["pkg"]["MERGE_TYPE"] = "buildonly" @@ -1007,10 +1037,10 @@ class Scheduler(PollScheduler): vardb = root_config.trees["vartree"].dbapi settings["REPLACING_VERSIONS"] = " ".join( - set( + { portage.versions.cpv_getversion(match) for match in vardb.match(x.slot_atom) + vardb.match("=" + x.cpv) - ) + } ) pretend_phase = EbuildPhase( phase="pretend", scheduler=sched_iface, settings=settings @@ -1019,24 +1049,27 @@ class Scheduler(PollScheduler): current_task = pretend_phase pretend_phase.start() ret = await pretend_phase.async_wait() + # Leave current_task assigned in order to trigger clean + # on success in the below finally block. if ret != os.EX_OK: failures += 1 self._record_pkg_failure(x, settings, ret) - portage.elog.elog_process(x.cpv, settings) finally: - if current_task is not None: if current_task.isAlive(): current_task.cancel() - if current_task.returncode == os.EX_OK: - clean_phase = EbuildPhase( - background=False, - phase="clean", - scheduler=sched_iface, - settings=settings, - ) - clean_phase.start() - await clean_phase.async_wait() + + portage.elog.elog_process(x.cpv, settings) + + if current_task is not None and current_task.returncode == os.EX_OK: + clean_phase = EbuildPhase( + background=False, + phase="clean", + scheduler=sched_iface, + settings=settings, + ) + clean_phase.start() + await clean_phase.async_wait() await build_dir.async_unlock() self._deallocate_config(settings) @@ -1122,15 +1155,12 @@ class Scheduler(PollScheduler): return rval while True: - received_signal = [] def sighandler(signum, frame): signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) - portage.util.writemsg( - "\n\nExiting on signal %(signal)s\n" % {"signal": signum} - ) + portage.util.writemsg(f"\n\nExiting on signal {signum}\n") self.terminate() received_signal.append(128 + signum) @@ -1233,7 +1263,7 @@ class Scheduler(PollScheduler): ), mode="rb", ) - except IOError: + except OSError: pass else: if log_path.endswith(".gz"): @@ -1247,7 +1277,7 @@ class Scheduler(PollScheduler): for line in log_file: writemsg_level(line, noiselevel=-1) except zlib.error as e: - writemsg_level("%s\n" % (e,), level=logging.ERROR, noiselevel=-1) + writemsg_level(f"{e}\n", level=logging.ERROR, noiselevel=-1) finally: log_file.close() if log_file_real is not None: @@ -1266,15 +1296,21 @@ class Scheduler(PollScheduler): and self._failed_pkgs_die_msgs and not mod_echo_output ): + failed_pkg_map = {} + for failed_pkg in self._failed_pkgs_all: + pkg = failed_pkg.pkg + failed_pkg_map[(pkg.cpv, pkg.root)] = pkg for mysettings, key, logentries in self._failed_pkgs_die_msgs: + pkg = failed_pkg_map.get((key, mysettings["EROOT"])) + color = "PKG_BINARY_MERGE" if pkg and pkg.built else "INFORM" + root_msg = "" if mysettings["ROOT"] != "/": - root_msg = " merged to %s" % mysettings["ROOT"] + root_msg = f" merged to {mysettings['ROOT']}" print() printer.einfo( - "Error messages for package %s%s:" - % (colorize("INFORM", key), root_msg) + f"Error messages for package {colorize(color, key)}{root_msg}:" ) print() for phase in portage.const.EBUILD_PHASES: @@ -1293,7 +1329,7 @@ class Scheduler(PollScheduler): if len(self._failed_pkgs_all) > 1 or (self._failed_pkgs_all and keep_going): if len(self._failed_pkgs_all) > 1: msg = ( - "The following %d packages have " % len(self._failed_pkgs_all) + f"The following {len(self._failed_pkgs_all)} packages have " + "failed to build, install, or execute postinst:" ) else: @@ -1307,7 +1343,7 @@ class Scheduler(PollScheduler): printer.eerror(line) printer.eerror("") for failed_pkg in self._failed_pkgs_all: - msg = " %s" % (failed_pkg.pkg,) + msg = f" {failed_pkg.pkg}" if failed_pkg.postinst_failure: msg += " (postinst failed)" log_path = self._locate_failure_log(failed_pkg) @@ -1315,7 +1351,7 @@ class Scheduler(PollScheduler): msg += ", Log file:" printer.eerror(msg) if log_path is not None: - printer.eerror(" '%s'" % colorize("INFORM", log_path)) + printer.eerror(f" '{colorize('INFORM', log_path)}'") printer.eerror("") if self._failed_pkgs_all: @@ -1328,7 +1364,6 @@ class Scheduler(PollScheduler): self._failed_pkgs_die_msgs.append((mysettings, key, errors)) def _locate_failure_log(self, failed_pkg): - log_paths = [failed_pkg.build_log] for log_path in log_paths: @@ -1484,16 +1519,21 @@ class Scheduler(PollScheduler): self._deallocate_config(build.settings) elif build.returncode == os.EX_OK: self.curval += 1 - merge = PackageMerge(merge=build, scheduler=self._sched_iface) + merge = PackageMerge( + is_system_pkg=(build.pkg in self._deep_system_deps), + merge=build, + scheduler=self._sched_iface, + ) self._running_tasks[id(merge)] = merge - if ( - not build.build_opts.buildpkgonly - and build.pkg in self._deep_system_deps + # By default, merge-wait only allows merge when no builds are executing. + # As a special exception, dependencies on system packages are frequently + # unspecified and will therefore force merge-wait. + if not build.build_opts.buildpkgonly and ( + "merge-wait" in build.settings.features or merge.is_system_pkg ): - # Since dependencies on system packages are frequently - # unspecified, merge them only when no builds are executing. self._merge_wait_queue.append(merge) - merge.addStartListener(self._system_merge_started) + if merge.is_system_pkg: + merge.addStartListener(self._system_merge_started) else: self._task_queues.merge.add(merge) merge.addExitListener(self._merge_exit) @@ -1547,7 +1587,6 @@ class Scheduler(PollScheduler): self._event_loop.run_until_complete(self._main_exit) def _merge(self): - if self._opts_no_background.intersection(self.myopts): self._set_max_jobs(1) @@ -1696,7 +1735,7 @@ class Scheduler(PollScheduler): completed_tasks = self._completed_tasks dependent = False - traversed_nodes = set([pkg]) + traversed_nodes = {pkg} direct_deps = graph.child_nodes(pkg) node_stack = direct_deps direct_deps = frozenset(direct_deps) @@ -1754,9 +1793,7 @@ class Scheduler(PollScheduler): return self._jobs def _schedule_tasks(self): - while True: - state_change = 0 # When the number of jobs and merges drops to zero, @@ -1770,13 +1807,32 @@ class Scheduler(PollScheduler): and not self._jobs and not self._task_queues.merge ): - task = self._merge_wait_queue.popleft() - task.scheduler = self._sched_iface - self._merge_wait_scheduled.append(task) - self._task_queues.merge.add(task) - task.addExitListener(self._merge_wait_exit_handler) - self._status_display.merges = len(self._task_queues.merge) - state_change += 1 + while self._merge_wait_queue: + # If we added non-system packages to the merge queue in a + # previous iteration of this loop, then for system packages we + # need to come back later when the merge queue is empty. + # TODO: Maybe promote non-system packages to the front of the + # queue and process them within the current loop, though that + # causes merge order to differ from the order builds finish. + if ( + self._task_queues.merge + and self._merge_wait_queue[0].is_system_pkg + ): + break + task = self._merge_wait_queue.popleft() + task.scheduler = self._sched_iface + self._merge_wait_scheduled.append(task) + self._task_queues.merge.add(task) + task.addExitListener(self._merge_wait_exit_handler) + self._status_display.merges = len(self._task_queues.merge) + state_change += 1 + # For system packages, always serialize install regardless of + # parallel-install, in order to mitigate failures triggered + # by fragile states as in bug 256616. For other packages, + # continue to populate self._task_queues.merge, which will + # serialize install unless parallel-install is enabled. + if task.is_system_pkg: + break if self._schedule_tasks_imp(): state_change += 1 @@ -1851,15 +1907,12 @@ class Scheduler(PollScheduler): """ if self._jobs and self._max_load is not None: - current_time = time.time() if self._sigcont_time is not None: - elapsed_seconds = current_time - self._sigcont_time # elapsed_seconds < 0 means the system clock has been adjusted if elapsed_seconds > 0 and elapsed_seconds < self._sigcont_delay: - if self._job_delay_timeout_id is not None: self._job_delay_timeout_id.cancel() @@ -1884,7 +1937,6 @@ class Scheduler(PollScheduler): elapsed_seconds = current_time - self._previous_job_start_time # elapsed_seconds < 0 means the system clock has been adjusted if elapsed_seconds > 0 and elapsed_seconds < delay: - if self._job_delay_timeout_id is not None: self._job_delay_timeout_id.cancel() @@ -1901,12 +1953,11 @@ class Scheduler(PollScheduler): @return: True if state changed, False otherwise. """ - state_change = 0 + state_change = False while True: - if not self._keep_scheduling(): - return bool(state_change) + return state_change if ( self._choose_pkg_return_early @@ -1915,13 +1966,13 @@ class Scheduler(PollScheduler): or not self._can_add_job() or self._job_delay() ): - return bool(state_change) + return state_change pkg = self._choose_pkg() if pkg is None: - return bool(state_change) + return state_change - state_change += 1 + state_change = True if not pkg.installed: self._pkg_count.curval += 1 @@ -1934,15 +1985,6 @@ class Scheduler(PollScheduler): self._task_queues.merge.addFront(merge) merge.addExitListener(self._merge_exit) - elif pkg.built: - self._jobs += 1 - self._previous_job_start_time = time.time() - self._status_display.running = self._jobs - self._running_tasks[id(task)] = task - task.scheduler = self._sched_iface - self._task_queues.jobs.add(task) - task.addExitListener(self._extract_exit) - else: self._jobs += 1 self._previous_job_start_time = time.time() @@ -1950,9 +1992,11 @@ class Scheduler(PollScheduler): self._running_tasks[id(task)] = task task.scheduler = self._sched_iface self._task_queues.jobs.add(task) - task.addExitListener(self._build_exit) - return bool(state_change) + if pkg.built: + task.addExitListener(self._extract_exit) + else: + task.addExitListener(self._build_exit) def _get_prefetcher(self, pkg): try: @@ -1963,7 +2007,9 @@ class Scheduler(PollScheduler): # CPython 2.7, so it may be possible for CPython to raise KeyError # here as well. prefetcher = None - if prefetcher is not None and not prefetcher.isAlive(): + if prefetcher is not None and ( + prefetcher.cancelled or not prefetcher.isAlive() + ): try: self._task_queues.fetch._task_queue.remove(prefetcher) except ValueError: @@ -1972,7 +2018,6 @@ class Scheduler(PollScheduler): return prefetcher def _task(self, pkg): - pkg_to_replace = None if pkg.operation != "uninstall": vardb = pkg.root_config.trees["vartree"].dbapi @@ -2020,9 +2065,12 @@ class Scheduler(PollScheduler): def _failed_pkg_msg(self, failed_pkg, action, preposition): pkg = failed_pkg.pkg - msg = "%s to %s %s" % (bad("Failed"), action, colorize("INFORM", pkg.cpv)) + + color = "PKG_BINARY_MERGE" if failed_pkg.pkg.built else "INFORM" + + msg = f"{bad('Failed')} to {action} {colorize(color, pkg.cpv)}" if pkg.root_config.settings["ROOT"] != "/": - msg += " %s %s" % (preposition, pkg.root) + msg += f" {preposition} {pkg.root}" log_path = self._locate_failure_log(failed_pkg) if log_path is not None: @@ -2030,7 +2078,7 @@ class Scheduler(PollScheduler): self._status_msg(msg) if log_path is not None: - self._status_msg(" '%s'" % (colorize("INFORM", log_path),)) + self._status_msg(f" '{colorize('INFORM', log_path)}'") def _status_msg(self, msg): """ @@ -2068,6 +2116,19 @@ class Scheduler(PollScheduler): for x in self._mergelist if isinstance(x, Package) and x.operation == "merge" ] + # Store binpkgs using the same keys as $PKGDIR/Packages plus EROOT. + mtimedb["resume"]["binpkgs"] = [ + { + "CPV": str(x.cpv), + "BUILD_ID": x.cpv.build_id, + "BUILD_TIME": x.cpv.build_time, + "MTIME": x.cpv.mtime, + "SIZE": x.cpv.file_size, + "EROOT": x.root, + } + for x in self._mergelist + if isinstance(x, Package) and x.type_name == "binary" + ] mtimedb.commit() @@ -2158,13 +2219,13 @@ class Scheduler(PollScheduler): if not (isinstance(task, Package) and task.operation == "merge"): continue pkg = task - msg = "emerge --keep-going:" + " %s" % (pkg.cpv,) + msg = "emerge --keep-going:" + f" {pkg.cpv}" if pkg.root_config.settings["ROOT"] != "/": - msg += " for %s" % (pkg.root,) + msg += f" for {pkg.root}" if not atoms: msg += " dropped because it is masked or unavailable" else: - msg += " dropped because it requires %s" % ", ".join(atoms) + msg += f" dropped because it requires {', '.join(set(atoms))}" for line in textwrap.wrap(msg, msg_width): eerror(line, phase="other", key=pkg.cpv) settings = self.pkgsettings[pkg.root] @@ -2191,16 +2252,14 @@ class Scheduler(PollScheduler): it's supposed to be added or removed. Otherwise, do nothing. """ - if set( - ( - "--buildpkgonly", - "--fetchonly", - "--fetch-all-uri", - "--oneshot", - "--onlydeps", - "--pretend", - ) - ).intersection(self.myopts): + if { + "--buildpkgonly", + "--fetchonly", + "--fetch-all-uri", + "--oneshot", + "--onlydeps", + "--pretend", + }.intersection(self.myopts): return if pkg.root != self.target_root: @@ -2221,7 +2280,6 @@ class Scheduler(PollScheduler): atom = self._world_atoms.get(pkg) try: - if hasattr(world_set, "lock"): world_set.lock() world_locked = True @@ -2241,16 +2299,16 @@ class Scheduler(PollScheduler): if atom is not None: if hasattr(world_set, "add"): self._status_msg( - ('Recording %s in "world" ' + "favorites file...") % atom + f'Recording {atom} in "world" favorites file...' ) logger.log( - " === (%s of %s) Updating world file (%s)" - % (pkg_count.curval, pkg_count.maxval, pkg.cpv) + f" === ({pkg_count.curval} of {pkg_count.maxval}) " + f"Updating world file ({pkg.cpv})" ) world_set.add(atom) else: writemsg_level( - '\n!!! Unable to record %s in "world"\n' % (atom,), + f'\n!!! Unable to record {atom} in "world"\n', level=logging.WARN, noiselevel=-1, ) |