aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/_emerge/Scheduler.py')
-rw-r--r--lib/_emerge/Scheduler.py276
1 files changed, 167 insertions, 109 deletions
diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py
index 15d646c4f..614df9e78 100644
--- a/lib/_emerge/Scheduler.py
+++ b/lib/_emerge/Scheduler.py
@@ -1,4 +1,4 @@
-# Copyright 1999-2021 Gentoo Authors
+# Copyright 1999-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
from collections import deque
@@ -64,7 +64,6 @@ FAILURE = 1
class Scheduler(PollScheduler):
-
# max time between loadavg checks (seconds)
_loadavg_latency = 30
@@ -326,7 +325,7 @@ class Scheduler(PollScheduler):
# clear out existing fetch log if it exists
try:
open(self._fetch_log, "w").close()
- except EnvironmentError:
+ except OSError:
pass
self._running_portage = None
@@ -340,7 +339,6 @@ class Scheduler(PollScheduler):
)
def _handle_self_update(self):
-
if self._opts_no_self_update.intersection(self.myopts):
return os.EX_OK
@@ -455,7 +453,7 @@ class Scheduler(PollScheduler):
msg.append(pkg_str)
msg.append("")
writemsg_level(
- "".join("%s\n" % (l,) for l in msg),
+ "".join(f"{l}\n" for l in msg),
level=logging.INFO,
noiselevel=-1,
)
@@ -494,7 +492,6 @@ class Scheduler(PollScheduler):
return interactive_tasks
def _set_graph_config(self, graph_config):
-
if graph_config is None:
self._graph_config = None
self._pkg_cache = {}
@@ -728,11 +725,11 @@ class Scheduler(PollScheduler):
portdb = x.root_config.trees["porttree"].dbapi
ebuild_path = portdb.findname(x.cpv, myrepo=x.repo)
if ebuild_path is None:
- raise AssertionError("ebuild not found for '%s'" % x.cpv)
+ raise AssertionError(f"ebuild not found for '{x.cpv}'")
pkgsettings["O"] = os.path.dirname(ebuild_path)
if not digestgen(mysettings=pkgsettings, myportdb=portdb):
writemsg_level(
- "!!! Unable to generate manifest for '%s'.\n" % x.cpv,
+ f"!!! Unable to generate manifest for '{x.cpv}'.\n",
level=logging.ERROR,
noiselevel=-1,
)
@@ -777,7 +774,7 @@ class Scheduler(PollScheduler):
quiet_config = quiet_settings[root_config.root]
ebuild_path = portdb.findname(x.cpv, myrepo=x.repo)
if ebuild_path is None:
- raise AssertionError("ebuild not found for '%s'" % x.cpv)
+ raise AssertionError(f"ebuild not found for '{x.cpv}'")
quiet_config["O"] = os.path.dirname(ebuild_path)
if not digestcheck([], quiet_config, strict=True):
failures |= 1
@@ -787,12 +784,10 @@ class Scheduler(PollScheduler):
return os.EX_OK
def _add_prefetchers(self):
-
if not self._parallel_fetch:
return
if self._parallel_fetch:
-
prefetchers = self._prefetchers
for pkg in self._mergelist:
@@ -819,7 +814,6 @@ class Scheduler(PollScheduler):
pass
elif pkg.type_name == "ebuild":
-
prefetcher = EbuildFetcher(
background=True,
config_pool=self._ConfigPool(
@@ -838,7 +832,6 @@ class Scheduler(PollScheduler):
and "--getbinpkg" in self.myopts
and pkg.root_config.trees["bintree"].isremote(pkg.cpv)
):
-
prefetcher = BinpkgPrefetcher(
background=True, pkg=pkg, scheduler=self._sched_iface
)
@@ -872,12 +865,13 @@ class Scheduler(PollScheduler):
if self._terminated_tasks:
raise asyncio.CancelledError
- out_str = "Running pre-merge checks for " + colorize("INFORM", x.cpv)
- self._status_msg(out_str)
-
root_config = x.root_config
settings = self._allocate_config(root_config.root)
settings.setcpv(x)
+
+ color = "PKG_BINARY_MERGE" if x.built else "INFORM"
+ self._status_msg(f"Running pre-merge checks for {colorize(color, x.cpv)}")
+
if not x.built:
# Get required SRC_URI metadata (it's not cached in x.metadata
# because some packages have an extremely large SRC_URI value).
@@ -908,7 +902,6 @@ class Scheduler(PollScheduler):
current_task = None
try:
-
# Clean up the existing build dir, in case pkg_pretend
# checks for available space (bug #390711).
if existing_builddir:
@@ -921,7 +914,7 @@ class Scheduler(PollScheduler):
portdb = root_config.trees["porttree"].dbapi
ebuild_path = portdb.findname(x.cpv, myrepo=x.repo)
if ebuild_path is None:
- raise AssertionError("ebuild not found for '%s'" % x.cpv)
+ raise AssertionError(f"ebuild not found for '{x.cpv}'")
portage.package.ebuild.doebuild.doebuild_environment(
ebuild_path,
"clean",
@@ -937,6 +930,7 @@ class Scheduler(PollScheduler):
current_task = clean_phase
clean_phase.start()
await clean_phase.async_wait()
+ current_task = None
if x.built:
tree = "bintree"
@@ -947,6 +941,10 @@ class Scheduler(PollScheduler):
# is consuming time here.
if bintree.isremote(x.cpv):
fetcher = self._get_prefetcher(x)
+ if fetcher is not None and not fetcher.isAlive():
+ # Cancel it because it hasn't started yet.
+ fetcher.cancel()
+ fetcher = None
if fetcher is None:
fetcher = BinpkgFetcher(pkg=x, scheduler=loop)
fetcher.start()
@@ -955,6 +953,16 @@ class Scheduler(PollScheduler):
# handles fetch, verification, and the
# bintree.inject call which moves the file.
fetched = fetcher.pkg_path
+ else:
+ msg = (
+ "Fetching in the background:",
+ fetcher.pkg_path,
+ "To view fetch progress, run in another terminal:",
+ f"tail -f {self._fetch_log}",
+ )
+ out = portage.output.EOutput()
+ for l in msg:
+ out.einfo(l)
if await fetcher.async_wait() != os.EX_OK:
failures += 1
self._record_pkg_failure(x, settings, fetcher.returncode)
@@ -974,12 +982,34 @@ class Scheduler(PollScheduler):
self._record_pkg_failure(x, settings, verifier.returncode)
continue
+ current_task = None
if fetched:
- bintree.inject(x.cpv, filename=fetched)
+ if not bintree.inject(
+ x.cpv,
+ current_pkg_path=fetched,
+ allocated_pkg_path=fetcher.pkg_allocated_path,
+ ):
+ eerror(
+ "Binary package is not usable",
+ phase="pretend",
+ key=x.cpv,
+ )
+ failures += 1
+ self._record_pkg_failure(x, settings, 1)
+ continue
infloc = os.path.join(build_dir_path, "build-info")
ensure_dirs(infloc)
- await bintree.dbapi.unpack_metadata(settings, infloc, loop=loop)
+ try:
+ await bintree.dbapi.unpack_metadata(settings, infloc, loop=loop)
+ except portage.exception.SignatureException as e:
+ writemsg(
+ f"!!! Invalid binary package: '{bintree.getname(x.cpv)}', {e}\n",
+ noiselevel=-1,
+ )
+ failures += 1
+ self._record_pkg_failure(x, settings, 1)
+ continue
ebuild_path = os.path.join(infloc, x.pf + ".ebuild")
settings.configdict["pkg"]["EMERGE_FROM"] = "binary"
settings.configdict["pkg"]["MERGE_TYPE"] = "binary"
@@ -989,7 +1019,7 @@ class Scheduler(PollScheduler):
portdb = root_config.trees["porttree"].dbapi
ebuild_path = portdb.findname(x.cpv, myrepo=x.repo)
if ebuild_path is None:
- raise AssertionError("ebuild not found for '%s'" % x.cpv)
+ raise AssertionError(f"ebuild not found for '{x.cpv}'")
settings.configdict["pkg"]["EMERGE_FROM"] = "ebuild"
if self._build_opts.buildpkgonly:
settings.configdict["pkg"]["MERGE_TYPE"] = "buildonly"
@@ -1007,10 +1037,10 @@ class Scheduler(PollScheduler):
vardb = root_config.trees["vartree"].dbapi
settings["REPLACING_VERSIONS"] = " ".join(
- set(
+ {
portage.versions.cpv_getversion(match)
for match in vardb.match(x.slot_atom) + vardb.match("=" + x.cpv)
- )
+ }
)
pretend_phase = EbuildPhase(
phase="pretend", scheduler=sched_iface, settings=settings
@@ -1019,24 +1049,27 @@ class Scheduler(PollScheduler):
current_task = pretend_phase
pretend_phase.start()
ret = await pretend_phase.async_wait()
+ # Leave current_task assigned in order to trigger clean
+ # on success in the below finally block.
if ret != os.EX_OK:
failures += 1
self._record_pkg_failure(x, settings, ret)
- portage.elog.elog_process(x.cpv, settings)
finally:
-
if current_task is not None:
if current_task.isAlive():
current_task.cancel()
- if current_task.returncode == os.EX_OK:
- clean_phase = EbuildPhase(
- background=False,
- phase="clean",
- scheduler=sched_iface,
- settings=settings,
- )
- clean_phase.start()
- await clean_phase.async_wait()
+
+ portage.elog.elog_process(x.cpv, settings)
+
+ if current_task is not None and current_task.returncode == os.EX_OK:
+ clean_phase = EbuildPhase(
+ background=False,
+ phase="clean",
+ scheduler=sched_iface,
+ settings=settings,
+ )
+ clean_phase.start()
+ await clean_phase.async_wait()
await build_dir.async_unlock()
self._deallocate_config(settings)
@@ -1122,15 +1155,12 @@ class Scheduler(PollScheduler):
return rval
while True:
-
received_signal = []
def sighandler(signum, frame):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
- portage.util.writemsg(
- "\n\nExiting on signal %(signal)s\n" % {"signal": signum}
- )
+ portage.util.writemsg(f"\n\nExiting on signal {signum}\n")
self.terminate()
received_signal.append(128 + signum)
@@ -1233,7 +1263,7 @@ class Scheduler(PollScheduler):
),
mode="rb",
)
- except IOError:
+ except OSError:
pass
else:
if log_path.endswith(".gz"):
@@ -1247,7 +1277,7 @@ class Scheduler(PollScheduler):
for line in log_file:
writemsg_level(line, noiselevel=-1)
except zlib.error as e:
- writemsg_level("%s\n" % (e,), level=logging.ERROR, noiselevel=-1)
+ writemsg_level(f"{e}\n", level=logging.ERROR, noiselevel=-1)
finally:
log_file.close()
if log_file_real is not None:
@@ -1266,15 +1296,21 @@ class Scheduler(PollScheduler):
and self._failed_pkgs_die_msgs
and not mod_echo_output
):
+ failed_pkg_map = {}
+ for failed_pkg in self._failed_pkgs_all:
+ pkg = failed_pkg.pkg
+ failed_pkg_map[(pkg.cpv, pkg.root)] = pkg
for mysettings, key, logentries in self._failed_pkgs_die_msgs:
+ pkg = failed_pkg_map.get((key, mysettings["EROOT"]))
+ color = "PKG_BINARY_MERGE" if pkg and pkg.built else "INFORM"
+
root_msg = ""
if mysettings["ROOT"] != "/":
- root_msg = " merged to %s" % mysettings["ROOT"]
+ root_msg = f" merged to {mysettings['ROOT']}"
print()
printer.einfo(
- "Error messages for package %s%s:"
- % (colorize("INFORM", key), root_msg)
+ f"Error messages for package {colorize(color, key)}{root_msg}:"
)
print()
for phase in portage.const.EBUILD_PHASES:
@@ -1293,7 +1329,7 @@ class Scheduler(PollScheduler):
if len(self._failed_pkgs_all) > 1 or (self._failed_pkgs_all and keep_going):
if len(self._failed_pkgs_all) > 1:
msg = (
- "The following %d packages have " % len(self._failed_pkgs_all)
+ f"The following {len(self._failed_pkgs_all)} packages have "
+ "failed to build, install, or execute postinst:"
)
else:
@@ -1307,7 +1343,7 @@ class Scheduler(PollScheduler):
printer.eerror(line)
printer.eerror("")
for failed_pkg in self._failed_pkgs_all:
- msg = " %s" % (failed_pkg.pkg,)
+ msg = f" {failed_pkg.pkg}"
if failed_pkg.postinst_failure:
msg += " (postinst failed)"
log_path = self._locate_failure_log(failed_pkg)
@@ -1315,7 +1351,7 @@ class Scheduler(PollScheduler):
msg += ", Log file:"
printer.eerror(msg)
if log_path is not None:
- printer.eerror(" '%s'" % colorize("INFORM", log_path))
+ printer.eerror(f" '{colorize('INFORM', log_path)}'")
printer.eerror("")
if self._failed_pkgs_all:
@@ -1328,7 +1364,6 @@ class Scheduler(PollScheduler):
self._failed_pkgs_die_msgs.append((mysettings, key, errors))
def _locate_failure_log(self, failed_pkg):
-
log_paths = [failed_pkg.build_log]
for log_path in log_paths:
@@ -1484,16 +1519,21 @@ class Scheduler(PollScheduler):
self._deallocate_config(build.settings)
elif build.returncode == os.EX_OK:
self.curval += 1
- merge = PackageMerge(merge=build, scheduler=self._sched_iface)
+ merge = PackageMerge(
+ is_system_pkg=(build.pkg in self._deep_system_deps),
+ merge=build,
+ scheduler=self._sched_iface,
+ )
self._running_tasks[id(merge)] = merge
- if (
- not build.build_opts.buildpkgonly
- and build.pkg in self._deep_system_deps
+ # By default, merge-wait only allows merge when no builds are executing.
+ # As a special exception, dependencies on system packages are frequently
+ # unspecified and will therefore force merge-wait.
+ if not build.build_opts.buildpkgonly and (
+ "merge-wait" in build.settings.features or merge.is_system_pkg
):
- # Since dependencies on system packages are frequently
- # unspecified, merge them only when no builds are executing.
self._merge_wait_queue.append(merge)
- merge.addStartListener(self._system_merge_started)
+ if merge.is_system_pkg:
+ merge.addStartListener(self._system_merge_started)
else:
self._task_queues.merge.add(merge)
merge.addExitListener(self._merge_exit)
@@ -1547,7 +1587,6 @@ class Scheduler(PollScheduler):
self._event_loop.run_until_complete(self._main_exit)
def _merge(self):
-
if self._opts_no_background.intersection(self.myopts):
self._set_max_jobs(1)
@@ -1696,7 +1735,7 @@ class Scheduler(PollScheduler):
completed_tasks = self._completed_tasks
dependent = False
- traversed_nodes = set([pkg])
+ traversed_nodes = {pkg}
direct_deps = graph.child_nodes(pkg)
node_stack = direct_deps
direct_deps = frozenset(direct_deps)
@@ -1754,9 +1793,7 @@ class Scheduler(PollScheduler):
return self._jobs
def _schedule_tasks(self):
-
while True:
-
state_change = 0
# When the number of jobs and merges drops to zero,
@@ -1770,13 +1807,32 @@ class Scheduler(PollScheduler):
and not self._jobs
and not self._task_queues.merge
):
- task = self._merge_wait_queue.popleft()
- task.scheduler = self._sched_iface
- self._merge_wait_scheduled.append(task)
- self._task_queues.merge.add(task)
- task.addExitListener(self._merge_wait_exit_handler)
- self._status_display.merges = len(self._task_queues.merge)
- state_change += 1
+ while self._merge_wait_queue:
+ # If we added non-system packages to the merge queue in a
+ # previous iteration of this loop, then for system packages we
+ # need to come back later when the merge queue is empty.
+ # TODO: Maybe promote non-system packages to the front of the
+ # queue and process them within the current loop, though that
+ # causes merge order to differ from the order builds finish.
+ if (
+ self._task_queues.merge
+ and self._merge_wait_queue[0].is_system_pkg
+ ):
+ break
+ task = self._merge_wait_queue.popleft()
+ task.scheduler = self._sched_iface
+ self._merge_wait_scheduled.append(task)
+ self._task_queues.merge.add(task)
+ task.addExitListener(self._merge_wait_exit_handler)
+ self._status_display.merges = len(self._task_queues.merge)
+ state_change += 1
+ # For system packages, always serialize install regardless of
+ # parallel-install, in order to mitigate failures triggered
+ # by fragile states as in bug 256616. For other packages,
+ # continue to populate self._task_queues.merge, which will
+ # serialize install unless parallel-install is enabled.
+ if task.is_system_pkg:
+ break
if self._schedule_tasks_imp():
state_change += 1
@@ -1851,15 +1907,12 @@ class Scheduler(PollScheduler):
"""
if self._jobs and self._max_load is not None:
-
current_time = time.time()
if self._sigcont_time is not None:
-
elapsed_seconds = current_time - self._sigcont_time
# elapsed_seconds < 0 means the system clock has been adjusted
if elapsed_seconds > 0 and elapsed_seconds < self._sigcont_delay:
-
if self._job_delay_timeout_id is not None:
self._job_delay_timeout_id.cancel()
@@ -1884,7 +1937,6 @@ class Scheduler(PollScheduler):
elapsed_seconds = current_time - self._previous_job_start_time
# elapsed_seconds < 0 means the system clock has been adjusted
if elapsed_seconds > 0 and elapsed_seconds < delay:
-
if self._job_delay_timeout_id is not None:
self._job_delay_timeout_id.cancel()
@@ -1901,12 +1953,11 @@ class Scheduler(PollScheduler):
@return: True if state changed, False otherwise.
"""
- state_change = 0
+ state_change = False
while True:
-
if not self._keep_scheduling():
- return bool(state_change)
+ return state_change
if (
self._choose_pkg_return_early
@@ -1915,13 +1966,13 @@ class Scheduler(PollScheduler):
or not self._can_add_job()
or self._job_delay()
):
- return bool(state_change)
+ return state_change
pkg = self._choose_pkg()
if pkg is None:
- return bool(state_change)
+ return state_change
- state_change += 1
+ state_change = True
if not pkg.installed:
self._pkg_count.curval += 1
@@ -1934,15 +1985,6 @@ class Scheduler(PollScheduler):
self._task_queues.merge.addFront(merge)
merge.addExitListener(self._merge_exit)
- elif pkg.built:
- self._jobs += 1
- self._previous_job_start_time = time.time()
- self._status_display.running = self._jobs
- self._running_tasks[id(task)] = task
- task.scheduler = self._sched_iface
- self._task_queues.jobs.add(task)
- task.addExitListener(self._extract_exit)
-
else:
self._jobs += 1
self._previous_job_start_time = time.time()
@@ -1950,9 +1992,11 @@ class Scheduler(PollScheduler):
self._running_tasks[id(task)] = task
task.scheduler = self._sched_iface
self._task_queues.jobs.add(task)
- task.addExitListener(self._build_exit)
- return bool(state_change)
+ if pkg.built:
+ task.addExitListener(self._extract_exit)
+ else:
+ task.addExitListener(self._build_exit)
def _get_prefetcher(self, pkg):
try:
@@ -1963,7 +2007,9 @@ class Scheduler(PollScheduler):
# CPython 2.7, so it may be possible for CPython to raise KeyError
# here as well.
prefetcher = None
- if prefetcher is not None and not prefetcher.isAlive():
+ if prefetcher is not None and (
+ prefetcher.cancelled or not prefetcher.isAlive()
+ ):
try:
self._task_queues.fetch._task_queue.remove(prefetcher)
except ValueError:
@@ -1972,7 +2018,6 @@ class Scheduler(PollScheduler):
return prefetcher
def _task(self, pkg):
-
pkg_to_replace = None
if pkg.operation != "uninstall":
vardb = pkg.root_config.trees["vartree"].dbapi
@@ -2020,9 +2065,12 @@ class Scheduler(PollScheduler):
def _failed_pkg_msg(self, failed_pkg, action, preposition):
pkg = failed_pkg.pkg
- msg = "%s to %s %s" % (bad("Failed"), action, colorize("INFORM", pkg.cpv))
+
+ color = "PKG_BINARY_MERGE" if failed_pkg.pkg.built else "INFORM"
+
+ msg = f"{bad('Failed')} to {action} {colorize(color, pkg.cpv)}"
if pkg.root_config.settings["ROOT"] != "/":
- msg += " %s %s" % (preposition, pkg.root)
+ msg += f" {preposition} {pkg.root}"
log_path = self._locate_failure_log(failed_pkg)
if log_path is not None:
@@ -2030,7 +2078,7 @@ class Scheduler(PollScheduler):
self._status_msg(msg)
if log_path is not None:
- self._status_msg(" '%s'" % (colorize("INFORM", log_path),))
+ self._status_msg(f" '{colorize('INFORM', log_path)}'")
def _status_msg(self, msg):
"""
@@ -2068,6 +2116,19 @@ class Scheduler(PollScheduler):
for x in self._mergelist
if isinstance(x, Package) and x.operation == "merge"
]
+ # Store binpkgs using the same keys as $PKGDIR/Packages plus EROOT.
+ mtimedb["resume"]["binpkgs"] = [
+ {
+ "CPV": str(x.cpv),
+ "BUILD_ID": x.cpv.build_id,
+ "BUILD_TIME": x.cpv.build_time,
+ "MTIME": x.cpv.mtime,
+ "SIZE": x.cpv.file_size,
+ "EROOT": x.root,
+ }
+ for x in self._mergelist
+ if isinstance(x, Package) and x.type_name == "binary"
+ ]
mtimedb.commit()
@@ -2158,13 +2219,13 @@ class Scheduler(PollScheduler):
if not (isinstance(task, Package) and task.operation == "merge"):
continue
pkg = task
- msg = "emerge --keep-going:" + " %s" % (pkg.cpv,)
+ msg = "emerge --keep-going:" + f" {pkg.cpv}"
if pkg.root_config.settings["ROOT"] != "/":
- msg += " for %s" % (pkg.root,)
+ msg += f" for {pkg.root}"
if not atoms:
msg += " dropped because it is masked or unavailable"
else:
- msg += " dropped because it requires %s" % ", ".join(atoms)
+ msg += f" dropped because it requires {', '.join(set(atoms))}"
for line in textwrap.wrap(msg, msg_width):
eerror(line, phase="other", key=pkg.cpv)
settings = self.pkgsettings[pkg.root]
@@ -2191,16 +2252,14 @@ class Scheduler(PollScheduler):
it's supposed to be added or removed. Otherwise, do nothing.
"""
- if set(
- (
- "--buildpkgonly",
- "--fetchonly",
- "--fetch-all-uri",
- "--oneshot",
- "--onlydeps",
- "--pretend",
- )
- ).intersection(self.myopts):
+ if {
+ "--buildpkgonly",
+ "--fetchonly",
+ "--fetch-all-uri",
+ "--oneshot",
+ "--onlydeps",
+ "--pretend",
+ }.intersection(self.myopts):
return
if pkg.root != self.target_root:
@@ -2221,7 +2280,6 @@ class Scheduler(PollScheduler):
atom = self._world_atoms.get(pkg)
try:
-
if hasattr(world_set, "lock"):
world_set.lock()
world_locked = True
@@ -2241,16 +2299,16 @@ class Scheduler(PollScheduler):
if atom is not None:
if hasattr(world_set, "add"):
self._status_msg(
- ('Recording %s in "world" ' + "favorites file...") % atom
+ f'Recording {atom} in "world" favorites file...'
)
logger.log(
- " === (%s of %s) Updating world file (%s)"
- % (pkg_count.curval, pkg_count.maxval, pkg.cpv)
+ f" === ({pkg_count.curval} of {pkg_count.maxval}) "
+ f"Updating world file ({pkg.cpv})"
)
world_set.add(atom)
else:
writemsg_level(
- '\n!!! Unable to record %s in "world"\n' % (atom,),
+ f'\n!!! Unable to record {atom} in "world"\n',
level=logging.WARN,
noiselevel=-1,
)