aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2020-09-19 17:32:57 -0700
committerZac Medico <zmedico@gentoo.org>2020-09-20 22:17:24 -0700
commitc7fa3f1eb1ce1ebc0d1219dacba555e1a29d5f22 (patch)
tree883362f7f34d3782f9a30b9818dd4c18b0da8ab2 /lib/_emerge
parent_slot_confict_backtrack: group similar missed updates (bug 743115) (diff)
downloadportage-c7fa3f1eb1ce1ebc0d1219dacba555e1a29d5f22.tar.gz
portage-c7fa3f1eb1ce1ebc0d1219dacba555e1a29d5f22.tar.bz2
portage-c7fa3f1eb1ce1ebc0d1219dacba555e1a29d5f22.zip
emerge: enable parallel-fetch during pkg_pretend (bug 710432)
Execute pkg_pretend phases in a coroutine while parallel-fetch is running concurrently. When it's time to execute the pkg_pretend phase for a remote binary package, use a Scheduler _get_prefetcher method to get a running prefetcher if available, and otherwise start a new fetcher. Since pkg_pretend phases now run inside of the --keep-going retry loop, --keep-going is now able to recover from pkg_pretend failures, which fixes bug 404157. Bug: https://bugs.gentoo.org/404157 Bug: https://bugs.gentoo.org/710432 Signed-off-by: Zac Medico <zmedico@gentoo.org>
Diffstat (limited to 'lib/_emerge')
-rw-r--r--lib/_emerge/Scheduler.py142
1 files changed, 99 insertions, 43 deletions
diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py
index a69421288..465f928a0 100644
--- a/lib/_emerge/Scheduler.py
+++ b/lib/_emerge/Scheduler.py
@@ -25,6 +25,7 @@ from portage._sets import SETPREFIX
from portage._sets.base import InternalPackageSet
from portage.util import ensure_dirs, writemsg, writemsg_level
from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
from portage.util.SlotObject import SlotObject
from portage.util._async.SchedulerInterface import SchedulerInterface
from portage.package.ebuild.digestcheck import digestcheck
@@ -766,7 +767,8 @@ class Scheduler(PollScheduler):
return prefetcher
- def _run_pkg_pretend(self):
+ @coroutine
+ def _run_pkg_pretend(self, loop=None):
"""
Since pkg_pretend output may be important, this method sends all
output directly to stdout (regardless of options like --quiet or
@@ -774,7 +776,7 @@ class Scheduler(PollScheduler):
"""
failures = 0
- sched_iface = self._sched_iface
+ sched_iface = loop = asyncio._wrap_loop(loop or self._sched_iface)
for x in self._mergelist:
if not isinstance(x, Package):
@@ -789,18 +791,28 @@ class Scheduler(PollScheduler):
if "pretend" not in x.defined_phases:
continue
- out_str =">>> Running pre-merge checks for " + colorize("INFORM", x.cpv) + "\n"
- portage.util.writemsg_stdout(out_str, noiselevel=-1)
+ out_str = "Running pre-merge checks for " + colorize("INFORM", x.cpv)
+ self._status_msg(out_str)
root_config = x.root_config
- settings = self.pkgsettings[root_config.root]
+ settings = self._allocate_config(root_config.root)
settings.setcpv(x)
+ if not x.built:
+ # Get required SRC_URI metadata (it's not cached in x.metadata
+ # because some packages have an extremely large SRC_URI value).
+ portdb = root_config.trees["porttree"].dbapi
+ (settings.configdict["pkg"]["SRC_URI"],) = yield portdb.async_aux_get(
+ x.cpv, ["SRC_URI"], myrepo=x.repo, loop=loop
+ )
# setcpv/package.env allows for per-package PORTAGE_TMPDIR so we
# have to validate it for each package
rval = _check_temp_dir(settings)
if rval != os.EX_OK:
- return rval
+ failures += 1
+ self._record_pkg_failure(x, settings, FAILURE)
+ self._deallocate_config(settings)
+ continue
build_dir_path = os.path.join(
os.path.realpath(settings["PORTAGE_TMPDIR"]),
@@ -809,7 +821,7 @@ class Scheduler(PollScheduler):
settings["PORTAGE_BUILDDIR"] = build_dir_path
build_dir = EbuildBuildDir(scheduler=sched_iface,
settings=settings)
- sched_iface.run_until_complete(build_dir.async_lock())
+ yield build_dir.async_lock()
current_task = None
try:
@@ -835,7 +847,7 @@ class Scheduler(PollScheduler):
phase='clean', scheduler=sched_iface, settings=settings)
current_task = clean_phase
clean_phase.start()
- clean_phase.wait()
+ yield clean_phase.async_wait()
if x.built:
tree = "bintree"
@@ -845,13 +857,19 @@ class Scheduler(PollScheduler):
# Display fetch on stdout, so that it's always clear what
# is consuming time here.
if bintree.isremote(x.cpv):
- fetcher = BinpkgFetcher(pkg=x,
- scheduler=sched_iface)
- fetcher.start()
- if fetcher.wait() != os.EX_OK:
+ fetcher = self._get_prefetcher(x)
+ if fetcher is None:
+ fetcher = BinpkgFetcher(pkg=x, scheduler=loop)
+ fetcher.start()
+ # We only set the fetched value when fetcher
+ # is a BinpkgFetcher, since BinpkgPrefetcher
+ # handles fetch, verification, and the
+ # bintree.inject call which moves the file.
+ fetched = fetcher.pkg_path
+ if (yield fetcher.async_wait()) != os.EX_OK:
failures += 1
+ self._record_pkg_failure(x, settings, fetcher.returncode)
continue
- fetched = fetcher.pkg_path
if fetched is False:
filename = bintree.getname(x.cpv)
@@ -861,8 +879,9 @@ class Scheduler(PollScheduler):
scheduler=sched_iface, _pkg_path=filename)
current_task = verifier
verifier.start()
- if verifier.wait() != os.EX_OK:
+ if (yield verifier.async_wait()) != os.EX_OK:
failures += 1
+ self._record_pkg_failure(x, settings, verifier.returncode)
continue
if fetched:
@@ -870,8 +889,7 @@ class Scheduler(PollScheduler):
infloc = os.path.join(build_dir_path, "build-info")
ensure_dirs(infloc)
- self._sched_iface.run_until_complete(
- bintree.dbapi.unpack_metadata(settings, infloc, loop=self._sched_iface))
+ yield bintree.dbapi.unpack_metadata(settings, infloc, loop=loop)
ebuild_path = os.path.join(infloc, x.pf + ".ebuild")
settings.configdict["pkg"]["EMERGE_FROM"] = "binary"
settings.configdict["pkg"]["MERGE_TYPE"] = "binary"
@@ -905,28 +923,45 @@ class Scheduler(PollScheduler):
current_task = pretend_phase
pretend_phase.start()
- ret = pretend_phase.wait()
+ ret = yield pretend_phase.async_wait()
if ret != os.EX_OK:
failures += 1
+ self._record_pkg_failure(x, settings, ret)
portage.elog.elog_process(x.cpv, settings)
finally:
if current_task is not None:
if current_task.isAlive():
current_task.cancel()
- current_task.wait()
if current_task.returncode == os.EX_OK:
clean_phase = EbuildPhase(background=False,
phase='clean', scheduler=sched_iface,
settings=settings)
clean_phase.start()
- clean_phase.wait()
+ yield clean_phase.async_wait()
- sched_iface.run_until_complete(build_dir.async_unlock())
+ yield build_dir.async_unlock()
+ self._deallocate_config(settings)
if failures:
- return FAILURE
- return os.EX_OK
+ return coroutine_return(FAILURE)
+ coroutine_return(os.EX_OK)
+
+ def _record_pkg_failure(self, pkg, settings, ret):
+ """Record a package failure. This eliminates the package
+ from the --keep-going merge list, and immediately calls
+ _failed_pkg_msg if we have not been terminated."""
+ self._failed_pkgs.append(
+ self._failed_pkg(
+ build_dir=settings.get("PORTAGE_BUILDDIR"),
+ build_log=settings.get("PORTAGE_LOG_FILE"),
+ pkg=pkg,
+ returncode=ret,
+ )
+ )
+ if not self._terminated_tasks:
+ self._failed_pkg_msg(self._failed_pkgs[-1], "emerge", "for")
+ self._status_display.failed = len(self._failed_pkgs)
def merge(self):
if "--resume" in self.myopts:
@@ -988,11 +1023,6 @@ class Scheduler(PollScheduler):
if rval != os.EX_OK and not keep_going:
return rval
- if not fetchonly:
- rval = self._run_pkg_pretend()
- if rval != os.EX_OK:
- return rval
-
while True:
received_signal = []
@@ -1389,8 +1419,6 @@ class Scheduler(PollScheduler):
if self._opts_no_background.intersection(self.myopts):
self._set_max_jobs(1)
- self._add_prefetchers()
- self._add_packages()
failed_pkgs = self._failed_pkgs
portage.locks._quiet = self._background
portage.elog.add_listener(self._elog_listener)
@@ -1406,6 +1434,30 @@ class Scheduler(PollScheduler):
rval = os.EX_OK
try:
+ self._add_prefetchers()
+ if not self._build_opts.fetchonly:
+ # Run pkg_pretend concurrently with parallel-fetch, and be careful
+ # to respond appropriately to termination, so that we don't start
+ # any new tasks after we've been terminated. Temporarily make the
+ # status display quiet so that its output is not interleaved with
+ # pkg_pretend output.
+ status_quiet = self._status_display.quiet
+ self._status_display.quiet = True
+ try:
+ rval = self._sched_iface.run_until_complete(
+ self._run_pkg_pretend(loop=self._sched_iface)
+ )
+ except asyncio.CancelledError:
+ self.terminate()
+ finally:
+ self._status_display.quiet = status_quiet
+ self._termination_check()
+ if self._terminated_tasks:
+ rval = 128 + signal.SIGINT
+ if rval != os.EX_OK:
+ return rval
+
+ self._add_packages()
self._main_loop()
finally:
self._main_loop_cleanup()
@@ -1742,6 +1794,23 @@ class Scheduler(PollScheduler):
return bool(state_change)
+ def _get_prefetcher(self, pkg):
+ try:
+ prefetcher = self._prefetchers.pop(pkg, None)
+ except KeyError:
+ # KeyError observed with PyPy 1.8, despite None given as default.
+ # Note that PyPy 1.8 has the same WeakValueDictionary code as
+ # CPython 2.7, so it may be possible for CPython to raise KeyError
+ # here as well.
+ prefetcher = None
+ if prefetcher is not None and not prefetcher.isAlive():
+ try:
+ self._task_queues.fetch._task_queue.remove(prefetcher)
+ except ValueError:
+ pass
+ prefetcher = None
+ return prefetcher
+
def _task(self, pkg):
pkg_to_replace = None
@@ -1758,20 +1827,7 @@ class Scheduler(PollScheduler):
"installed", pkg.root_config, installed=True,
operation="uninstall")
- try:
- prefetcher = self._prefetchers.pop(pkg, None)
- except KeyError:
- # KeyError observed with PyPy 1.8, despite None given as default.
- # Note that PyPy 1.8 has the same WeakValueDictionary code as
- # CPython 2.7, so it may be possible for CPython to raise KeyError
- # here as well.
- prefetcher = None
- if prefetcher is not None and not prefetcher.isAlive():
- try:
- self._task_queues.fetch._task_queue.remove(prefetcher)
- except ValueError:
- pass
- prefetcher = None
+ prefetcher = self._get_prefetcher(pkg)
task = MergeListItem(args_set=self._args_set,
background=self._background, binpkg_opts=self._binpkg_opts,