aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2015-08-10 00:42:51 -0700
committerZac Medico <zmedico@gentoo.org>2015-08-13 12:49:39 -0700
commit496ff326dc18890889d1ea5d2aec590394635960 (patch)
treee3b57dc9f9805656c2db3751ea9b4c683e23df1a
parentAdd golang-vcs to the list of live eclasses (diff)
downloadportage-496ff326.tar.gz
portage-496ff326.tar.bz2
portage-496ff326.zip
sync repositories in parallel (bug 557426)
Repos will now be synced in parallel (including their post-sync hooks), but a given repo will only be synced after its master(s) have synced (in case that matters for hooks). Output of concurrent processes will be mixed (irrelevant with --quiet). Support for FEATURES=metadata-transfer will be handled in the main process, which may be required for some backends (such as sqlite). X-Gentoo-Bug: 557426 X-Gentoo-Bug-url: https://bugs.gentoo.org/show_bug.cgi?id=557426 Acked-by: Brian Dolbec <dolsen@gentoo.org>
-rw-r--r--pym/portage/emaint/modules/sync/sync.py129
-rw-r--r--pym/portage/sync/controller.py31
-rw-r--r--pym/portage/tests/sync/test_sync_local.py6
-rw-r--r--pym/portage/util/_async/AsyncFunction.py67
4 files changed, 219 insertions, 14 deletions
diff --git a/pym/portage/emaint/modules/sync/sync.py b/pym/portage/emaint/modules/sync/sync.py
index b4630731d..879d0f096 100644
--- a/pym/portage/emaint/modules/sync/sync.py
+++ b/pym/portage/emaint/modules/sync/sync.py
@@ -13,6 +13,10 @@ from portage.output import bold, red, create_color_func
from portage._global_updates import _global_updates
from portage.sync.controller import SyncManager
from portage.util import writemsg_level
+from portage.util.digraph import digraph
+from portage.util._async.AsyncScheduler import AsyncScheduler
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util._eventloop.EventLoop import EventLoop
import _emerge
from _emerge.emergelog import emergelog
@@ -201,6 +205,7 @@ class SyncRepos(object):
k = "--" + k.replace("_", "-")
self.emerge_config.opts[k] = v
+ selected_repos = [repo for repo in selected_repos if repo.sync_type is not None]
msgs = []
if not selected_repos:
msgs.append("Emaint sync, nothing to sync... returning")
@@ -213,13 +218,20 @@ class SyncRepos(object):
sync_manager = SyncManager(
self.emerge_config.target_config.settings, emergelog)
- retvals = []
- for repo in selected_repos:
- if repo.sync_type is not None:
- returncode, message = sync_manager.sync(self.emerge_config, repo)
- retvals.append((repo.name, returncode))
- if message:
- msgs.append(message)
+
+ max_jobs = (self.emerge_config.opts.get('--jobs', 1)
+ if 'parallel-fetch' in self.emerge_config.
+ target_config.settings.features else 1)
+ sync_scheduler = SyncScheduler(emerge_config=self.emerge_config,
+ selected_repos=selected_repos, sync_manager=sync_manager,
+ max_jobs=max_jobs,
+ event_loop=global_event_loop() if portage._internal_caller else
+ EventLoop(main=False))
+
+ sync_scheduler.start()
+ sync_scheduler.wait()
+ retvals = sync_scheduler.retvals
+ msgs.extend(sync_scheduler.msgs)
# Reload the whole config.
portage._sync_mode = False
@@ -287,3 +299,106 @@ class SyncRepos(object):
messages.append("Action: %s for repo: %s, returned code = %s"
% (action, rval[0], rval[1]))
return messages
+
+
+class SyncScheduler(AsyncScheduler):
+ '''
+ Sync repos in parallel, but don't sync a given repo until all
+ of its masters have synced.
+ '''
+ def __init__(self, **kwargs):
+ '''
+ @param emerge_config: an emerge_config instance
+ @param selected_repos: list of RepoConfig instances
+ @param sync_manager: a SyncManger instance
+ '''
+ self._emerge_config = kwargs.pop('emerge_config')
+ self._selected_repos = kwargs.pop('selected_repos')
+ self._sync_manager = kwargs.pop('sync_manager')
+ AsyncScheduler.__init__(self, **kwargs)
+ self._init_graph()
+ self.retvals = []
+ self.msgs = []
+
+ def _init_graph(self):
+ '''
+ Graph relationships between repos and their masters.
+ '''
+ self._sync_graph = digraph()
+ self._leaf_nodes = []
+ self._repo_map = {}
+ self._running_repos = set()
+ for repo in self._selected_repos:
+ self._repo_map[repo.name] = repo
+ self._sync_graph.add(repo.name, None)
+ for master in repo.masters:
+ self._repo_map[master.name] = master
+ self._sync_graph.add(master.name, repo.name)
+ self._update_leaf_nodes()
+
+ def _task_exit(self, task):
+ '''
+ Remove the task from the graph, in order to expose
+ more leaf nodes.
+ '''
+ self._running_tasks.discard(task)
+ returncode = task.returncode
+ if task.returncode == os.EX_OK:
+ returncode, message, updatecache_flg = task.result
+ if message:
+ self.msgs.append(message)
+ repo = task.kwargs['repo'].name
+ self._running_repos.remove(repo)
+ self.retvals.append((repo, returncode))
+ self._sync_graph.remove(repo)
+ self._update_leaf_nodes()
+ super(SyncScheduler, self)._task_exit(self)
+
+ def _update_leaf_nodes(self):
+ '''
+ Populate self._leaf_nodes with current leaves from
+ self._sync_graph. If a circular master relationship
+ is discovered, choose a random node to break the cycle.
+ '''
+ if self._sync_graph and not self._leaf_nodes:
+ self._leaf_nodes = [obj for obj in
+ self._sync_graph.leaf_nodes()
+ if obj not in self._running_repos]
+
+ if not (self._leaf_nodes or self._running_repos):
+ # If there is a circular master relationship,
+ # choose a random node to break the cycle.
+ self._leaf_nodes = [next(iter(self._sync_graph))]
+
+ def _next_task(self):
+ '''
+ Return a task for the next available leaf node.
+ '''
+ if not self._sync_graph:
+ raise StopIteration()
+ # If self._sync_graph is non-empty, then self._leaf_nodes
+ # is guaranteed to be non-empty, since otherwise
+ # _can_add_job would have returned False and prevented
+ # _next_task from being immediately called.
+ node = self._leaf_nodes.pop()
+ self._running_repos.add(node)
+ self._update_leaf_nodes()
+
+ task = self._sync_manager.async(
+ self._emerge_config, self._repo_map[node])
+ return task
+
+ def _can_add_job(self):
+ '''
+ Returns False if there are no leaf nodes available.
+ '''
+ if not AsyncScheduler._can_add_job(self):
+ return False
+ return bool(self._leaf_nodes) and not self._terminated.is_set()
+
+ def _keep_scheduling(self):
+ '''
+ Schedule as long as the graph is non-empty, and we haven't
+ been terminated.
+ '''
+ return bool(self._sync_graph) and not self._terminated.is_set()
diff --git a/pym/portage/sync/controller.py b/pym/portage/sync/controller.py
index 307487f96..e992cc4c4 100644
--- a/pym/portage/sync/controller.py
+++ b/pym/portage/sync/controller.py
@@ -21,6 +21,7 @@ bad = create_color_func("BAD")
warn = create_color_func("WARN")
from portage.package.ebuild.doebuild import _check_temp_dir
from portage.metadata import action_metadata
+from portage.util._async.AsyncFunction import AsyncFunction
from portage import OrderedDict
from portage import _unicode_decode
from portage import util
@@ -113,12 +114,18 @@ class SyncManager(object):
return desc
return []
+ def async(self, emerge_config=None, repo=None):
+ proc = AsyncFunction(target=self.sync,
+ kwargs=dict(emerge_config=emerge_config, repo=repo))
+ proc.addExitListener(self._sync_callback)
+ return proc
- def sync(self, emerge_config=None, repo=None, callback=None):
+ def sync(self, emerge_config=None, repo=None):
self.emerge_config = emerge_config
- self.callback = callback or self._sync_callback
+ self.callback = None
self.repo = repo
self.exitcode = 1
+ self.updatecache_flg = False
if repo.sync_type in self.module_names:
tasks = [self.module_controller.get_class(repo.sync_type)]
else:
@@ -149,13 +156,14 @@ class SyncManager(object):
self.perform_post_sync_hook(repo.name, repo.sync_uri, repo.location)
- return self.exitcode, None
+ return self.exitcode, None, self.updatecache_flg
def do_callback(self, result):
#print("result:", result, "callback()", self.callback)
exitcode, updatecache_flg = result
self.exitcode = exitcode
+ self.updatecache_flg = updatecache_flg
if exitcode == 0:
msg = "=== Sync completed for %s" % self.repo.name
self.logger(self.xterm_titles, msg)
@@ -310,17 +318,28 @@ class SyncManager(object):
os.umask(0o022)
return os.EX_OK
+ def _sync_callback(self, proc):
+ """
+ This is called in the parent process, serially, for each of the
+ sync jobs when they complete. Some cache backends such as sqlite
+ may require that cache access be performed serially in the
+ parent process like this.
+ """
+ repo = proc.kwargs['repo']
+ exitcode = proc.returncode
+ updatecache_flg = False
+ if proc.returncode == os.EX_OK:
+ exitcode, message, updatecache_flg = proc.result
- def _sync_callback(self, exitcode, updatecache_flg):
if updatecache_flg and "metadata-transfer" not in self.settings.features:
updatecache_flg = False
if updatecache_flg and \
os.path.exists(os.path.join(
- self.repo.location, 'metadata', 'md5-cache')):
+ repo.location, 'metadata', 'md5-cache')):
# Only update cache for repo.location since that's
# the only one that's been synced here.
action_metadata(self.settings, self.portdb, self.emerge_config.opts,
- porttrees=[self.repo.location])
+ porttrees=[repo.location])
diff --git a/pym/portage/tests/sync/test_sync_local.py b/pym/portage/tests/sync/test_sync_local.py
index f50caba19..7753a261c 100644
--- a/pym/portage/tests/sync/test_sync_local.py
+++ b/pym/portage/tests/sync/test_sync_local.py
@@ -55,8 +55,12 @@ class SyncLocalTestCase(TestCase):
"dev-libs/A-0": {}
}
+ user_config = {
+ 'make.conf': ('FEATURES="metadata-transfer"',)
+ }
+
playground = ResolverPlayground(ebuilds=ebuilds,
- profile=profile, user_config={}, debug=debug)
+ profile=profile, user_config=user_config, debug=debug)
settings = playground.settings
eprefix = settings["EPREFIX"]
eroot = settings["EROOT"]
diff --git a/pym/portage/util/_async/AsyncFunction.py b/pym/portage/util/_async/AsyncFunction.py
new file mode 100644
index 000000000..b6142a214
--- /dev/null
+++ b/pym/portage/util/_async/AsyncFunction.py
@@ -0,0 +1,67 @@
+# Copyright 2015 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import pickle
+import traceback
+
+from portage import os
+from portage.util._async.ForkProcess import ForkProcess
+from _emerge.PipeReader import PipeReader
+
+class AsyncFunction(ForkProcess):
+ """
+ Execute a function call in a fork, and retrieve the function
+ return value via pickling/unpickling, accessible as the
+ "result" attribute after the forked process has exited.
+ """
+
+ __slots__ = ('args', 'kwargs', 'result', 'target',
+ '_async_func_reader', '_async_func_reader_pw')
+
+ def _start(self):
+ pr, pw = os.pipe()
+ self.fd_pipes = {}
+ self.fd_pipes[pw] = pw
+ self._async_func_reader_pw = pw
+ self._async_func_reader = PipeReader(
+ input_files={"input":pr},
+ scheduler=self.scheduler)
+ self._async_func_reader.addExitListener(self._async_func_reader_exit)
+ self._async_func_reader.start()
+ ForkProcess._start(self)
+ os.close(pw)
+
+ def _run(self):
+ try:
+ result = self.target(*(self.args or []), **(self.kwargs or {}))
+ os.write(self._async_func_reader_pw, pickle.dumps(result))
+ except Exception:
+ traceback.print_exc()
+ return 1
+
+ return os.EX_OK
+
+ def _pipe_logger_exit(self, pipe_logger):
+ # Ignore this event, since we want to ensure that we exit
+ # only after _async_func_reader_exit has reached EOF.
+ self._pipe_logger = None
+
+ def _async_func_reader_exit(self, pipe_reader):
+ try:
+ self.result = pickle.loads(pipe_reader.getvalue())
+ except Exception:
+ # The child process will have printed a traceback in this case,
+ # and returned an unsuccessful returncode.
+ pass
+ self._async_func_reader = None
+ self._unregister()
+ self.wait()
+
+ def _unregister(self):
+ ForkProcess._unregister(self)
+
+ pipe_reader = self._async_func_reader
+ if pipe_reader is not None:
+ self._async_func_reader = None
+ pipe_reader.removeExitListener(self._async_func_reader_exit)
+ pipe_reader.cancel()