summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2018-07-29 23:21:30 -0700
committerZac Medico <zmedico@gentoo.org>2018-09-23 21:24:10 -0700
commit884ad951d700d1871cab2e321e4d8635b1a0f698 (patch)
treeebb092065e67f98244c599a7e613072ee1484316
parentAdd _sync_decorator module (diff)
downloadportage-884ad951d700d1871cab2e321e4d8635b1a0f698.tar.gz
portage-884ad951d700d1871cab2e321e4d8635b1a0f698.tar.bz2
portage-884ad951d700d1871cab2e321e4d8635b1a0f698.zip
rsync: split out repo storage framework
Since there are many ways to manage repository storage, split out a repo storage framework. The HardlinkQuarantineRepoStorage class implements the existing default behavior, and the InplaceRepoStorage class implements the legacy behavior (when sync-allow-hardlinks is disabled in repos.conf). Each class implements RepoStorageInterface, which uses coroutine methods since coroutines are well-suited to the I/O bound tasks that these methods perform. The _sync_decorator is used to convert coroutine methods to synchronous methods, for smooth integration into the surrounding synchronous code. Bug: https://bugs.gentoo.org/662070 Reviewed-by: Brian Dolbec <dolsen@gentoo.org> Signed-off-by: Zac Medico <zmedico@gentoo.org>
-rw-r--r--lib/portage/repository/storage/__init__.py2
-rw-r--r--lib/portage/repository/storage/hardlink_quarantine.py95
-rw-r--r--lib/portage/repository/storage/inplace.py49
-rw-r--r--lib/portage/repository/storage/interface.py87
-rw-r--r--lib/portage/sync/controller.py1
-rw-r--r--lib/portage/sync/modules/rsync/rsync.py85
-rw-r--r--lib/portage/sync/syncbase.py53
7 files changed, 306 insertions, 66 deletions
diff --git a/lib/portage/repository/storage/__init__.py b/lib/portage/repository/storage/__init__.py
new file mode 100644
index 000000000..58496758f
--- /dev/null
+++ b/lib/portage/repository/storage/__init__.py
@@ -0,0 +1,2 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
diff --git a/lib/portage/repository/storage/hardlink_quarantine.py b/lib/portage/repository/storage/hardlink_quarantine.py
new file mode 100644
index 000000000..7e9cf4493
--- /dev/null
+++ b/lib/portage/repository/storage/hardlink_quarantine.py
@@ -0,0 +1,95 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.repository.storage.interface import (
+ RepoStorageException,
+ RepoStorageInterface,
+)
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import (
+ coroutine,
+ coroutine_return,
+)
+
+from _emerge.SpawnProcess import SpawnProcess
+
+
+class HardlinkQuarantineRepoStorage(RepoStorageInterface):
+ """
+ This is the default storage module, since its quite compatible with
+ most configurations.
+
+ It's desirable to be able to create shared hardlinks between the
+ download directory and the normal repository, and this is facilitated
+ by making the download directory be a subdirectory of the normal
+ repository location (ensuring that no mountpoints are crossed).
+ Shared hardlinks are created by using the rsync --link-dest option.
+
+ Since the download is initially unverified, it is safest to save
+ it in a quarantine directory. The quarantine directory is also
+ useful for making the repository update more atomic, so that it
+ less likely that normal repository location will be observed in
+ a partially synced state.
+ """
+ def __init__(self, repo, spawn_kwargs):
+ self._user_location = repo.location
+ self._update_location = None
+ self._spawn_kwargs = spawn_kwargs
+ self._current_update = None
+
+ @coroutine
+ def _check_call(self, cmd):
+ """
+ Run cmd and raise RepoStorageException on failure.
+
+ @param cmd: command to executre
+ @type cmd: list
+ """
+ p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), **self._spawn_kwargs)
+ p.start()
+ if (yield p.async_wait()) != os.EX_OK:
+ raise RepoStorageException('command exited with status {}: {}'.\
+ format(p.returncode, ' '.join(cmd)))
+
+ @coroutine
+ def init_update(self):
+ update_location = os.path.join(self._user_location, '.tmp-unverified-download-quarantine')
+ yield self._check_call(['rm', '-rf', update_location])
+
+ # Use rsync --link-dest to hardlink a files into self._update_location,
+ # since cp -l is not portable.
+ yield self._check_call(['rsync', '-a', '--link-dest', self._user_location,
+ '--exclude', '/{}'.format(os.path.basename(update_location)),
+ self._user_location + '/', update_location + '/'])
+
+ self._update_location = update_location
+
+ coroutine_return(self._update_location)
+
+ @property
+ def current_update(self):
+ if self._update_location is None:
+ raise RepoStorageException('current update does not exist')
+ return self._update_location
+
+ @coroutine
+ def commit_update(self):
+ update_location = self.current_update
+ self._update_location = None
+ yield self._check_call(['rsync', '-a', '--delete',
+ '--exclude', '/{}'.format(os.path.basename(update_location)),
+ update_location + '/', self._user_location + '/'])
+
+ yield self._check_call(['rm', '-rf', update_location])
+
+ @coroutine
+ def abort_update(self):
+ if self._update_location is not None:
+ update_location = self._update_location
+ self._update_location = None
+ yield self._check_call(['rm', '-rf', update_location])
+
+ @coroutine
+ def garbage_collection(self):
+ yield self.abort_update()
diff --git a/lib/portage/repository/storage/inplace.py b/lib/portage/repository/storage/inplace.py
new file mode 100644
index 000000000..f1117ad03
--- /dev/null
+++ b/lib/portage/repository/storage/inplace.py
@@ -0,0 +1,49 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.repository.storage.interface import (
+ RepoStorageException,
+ RepoStorageInterface,
+)
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
+
+
+class InplaceRepoStorage(RepoStorageInterface):
+ """
+ Legacy repo storage behavior, where updates are applied in-place.
+ This module is not recommended, since the repository is left in an
+ unspecified (possibly malicious) state if the update fails.
+ """
+ def __init__(self, repo, spawn_kwargs):
+ self._user_location = repo.location
+ self._update_location = None
+
+ @coroutine
+ def init_update(self):
+ self._update_location = self._user_location
+ coroutine_return(self._update_location)
+ yield None
+
+ @property
+ def current_update(self):
+ if self._update_location is None:
+ raise RepoStorageException('current update does not exist')
+ return self._update_location
+
+ @coroutine
+ def commit_update(self):
+ self.current_update
+ self._update_location = None
+ coroutine_return()
+ yield None
+
+ @coroutine
+ def abort_update(self):
+ self._update_location = None
+ coroutine_return()
+ yield None
+
+ @coroutine
+ def garbage_collection(self):
+ coroutine_return()
+ yield None
diff --git a/lib/portage/repository/storage/interface.py b/lib/portage/repository/storage/interface.py
new file mode 100644
index 000000000..f83c42b84
--- /dev/null
+++ b/lib/portage/repository/storage/interface.py
@@ -0,0 +1,87 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.exception import PortageException
+from portage.util.futures.compat_coroutine import coroutine
+
+
+class RepoStorageException(PortageException):
+ """
+ Base class for exceptions raise by RepoStorageInterface.
+ """
+
+
+class RepoStorageInterface(object):
+ """
+ Abstract repository storage interface.
+
+ Implementations can assume that the repo.location directory already
+ exists with appropriate permissions (SyncManager handles this).
+
+ TODO: Add a method to check of a previous uncommitted update, which
+ typically indicates a verification failure:
+ https://bugs.gentoo.org/662386
+ """
+ def __init__(self, repo, spawn_kwargs):
+ """
+ @param repo: repository configuration
+ @type repo: portage.repository.config.RepoConfig
+ @param spawn_kwargs: keyword arguments supported by the
+ portage.process.spawn function
+ @type spawn_kwargs: dict
+ """
+ raise NotImplementedError
+
+ @coroutine
+ def init_update(self):
+ """
+ Create an update directory as a destination to sync updates to.
+ The directory will be populated with files from the previous
+ immutable snapshot, if available. Note that this directory
+ may contain hardlinks that reference files in the previous
+ immutable snapshot, so these files should not be modified
+ (tools like rsync and git normally break hardlinks when
+ files need to be modified).
+
+ @rtype: str
+ @return: path of directory to update, populated with files from
+ the previous snapshot if available
+ """
+ raise NotImplementedError
+
+ @property
+ def current_update(self):
+ """
+ Get the current update directory which would have been returned
+ from the most recent call to the init_update method. This raises
+ RepoStorageException if the init_update method has not been
+ called.
+
+ @rtype: str
+ @return: path of directory to update
+ """
+ raise NotImplementedError
+
+ @coroutine
+ def commit_update(self):
+ """
+ Commit the current update directory, so that is becomes the
+ latest immutable snapshot.
+ """
+ raise NotImplementedError
+
+ @coroutine
+ def abort_update(self):
+ """
+ Delete the current update directory. If there was not an update
+ in progress, or it has already been committed, then this has
+ no effect.
+ """
+ raise NotImplementedError
+
+ @coroutine
+ def garbage_collection(self):
+ """
+ Remove expired snapshots.
+ """
+ raise NotImplementedError
diff --git a/lib/portage/sync/controller.py b/lib/portage/sync/controller.py
index 3bccf6f74..bf5750f7f 100644
--- a/lib/portage/sync/controller.py
+++ b/lib/portage/sync/controller.py
@@ -327,6 +327,7 @@ class SyncManager(object):
# override the defaults when sync_umask is set
if repo.sync_umask is not None:
spawn_kwargs["umask"] = int(repo.sync_umask, 8)
+ spawn_kwargs.setdefault("umask", 0o022)
self.spawn_kwargs = spawn_kwargs
if self.usersync_uid is not None:
diff --git a/lib/portage/sync/modules/rsync/rsync.py b/lib/portage/sync/modules/rsync/rsync.py
index e0748794a..0f8221776 100644
--- a/lib/portage/sync/modules/rsync/rsync.py
+++ b/lib/portage/sync/modules/rsync/rsync.py
@@ -59,55 +59,6 @@ class RsyncSync(NewBase):
def __init__(self):
NewBase.__init__(self, "rsync", RSYNC_PACKAGE_ATOM)
- def _select_download_dir(self):
- '''
- Select and return the download directory. It's desirable to be able
- to create shared hardlinks between the download directory to the
- normal repository, and this is facilitated by making the download
- directory be a subdirectory of the normal repository location
- (ensuring that no mountpoints are crossed). Shared hardlinks are
- created by using the rsync --link-dest option.
-
- Since the download is initially unverified, it is safest to save
- it in a quarantine directory. The quarantine directory is also
- useful for making the repository update more atomic, so that it
- less likely that normal repository location will be observed in
- a partially synced state.
-
- This method returns a quarantine directory if sync-allow-hardlinks
- is enabled in repos.conf, and otherwise it returne the normal
- repository location.
- '''
- if self.repo.sync_allow_hardlinks:
- return os.path.join(self.repo.location, '.tmp-unverified-download-quarantine')
- else:
- return self.repo.location
-
- def _commit_download(self, download_dir):
- '''
- Commit changes from download_dir if it does not refer to the
- normal repository location.
- '''
- exitcode = 0
- if self.repo.location != download_dir:
- rsynccommand = [self.bin_command] + self.rsync_opts + self.extra_rsync_opts
- rsynccommand.append('--exclude=/%s' % os.path.basename(download_dir))
- rsynccommand.append('%s/' % download_dir.rstrip('/'))
- rsynccommand.append('%s/' % self.repo.location)
- exitcode = portage.process.spawn(rsynccommand, **self.spawn_kwargs)
-
- return exitcode
-
- def _remove_download(self, download_dir):
- """
- Remove download_dir if it does not refer to the normal repository
- location.
- """
- exitcode = 0
- if self.repo.location != download_dir:
- exitcode = subprocess.call(['rm', '-rf', download_dir])
- return exitcode
-
def update(self):
'''Internal update function which performs the transfer'''
opts = self.options.get('emerge_config').opts
@@ -143,8 +94,8 @@ class RsyncSync(NewBase):
self.extra_rsync_opts.extend(portage.util.shlex_split(
self.repo.module_specific_options['sync-rsync-extra-opts']))
- download_dir = self._select_download_dir()
exitcode = 0
+ verify_failure = False
# Process GLEP74 verification options.
# Default verification to 'no'; it's enabled for ::gentoo
@@ -240,10 +191,14 @@ class RsyncSync(NewBase):
self.proto = "file"
dosyncuri = syncuri[7:]
unchanged, is_synced, exitcode, updatecache_flg = self._do_rsync(
- dosyncuri, timestamp, opts, download_dir)
+ dosyncuri, timestamp, opts)
self._process_exitcode(exitcode, dosyncuri, out, 1)
- if exitcode == 0 and not unchanged:
- self._commit_download(download_dir)
+ if exitcode == 0:
+ if unchanged:
+ self.repo_storage.abort_update()
+ else:
+ self.repo_storage.commit_update()
+ self.repo_storage.garbage_collection()
return (exitcode, updatecache_flg)
retries=0
@@ -375,7 +330,7 @@ class RsyncSync(NewBase):
dosyncuri = dosyncuri[6:].replace('/', ':/', 1)
unchanged, is_synced, exitcode, updatecache_flg = self._do_rsync(
- dosyncuri, timestamp, opts, download_dir)
+ dosyncuri, timestamp, opts)
if not unchanged:
local_state_unchanged = False
if is_synced:
@@ -390,6 +345,7 @@ class RsyncSync(NewBase):
# exit loop
exitcode = EXCEEDED_MAX_RETRIES
break
+
self._process_exitcode(exitcode, dosyncuri, out, maxretries)
if local_state_unchanged:
@@ -397,6 +353,8 @@ class RsyncSync(NewBase):
# in this case, so refer gemato to the normal repository
# location.
download_dir = self.repo.location
+ else:
+ download_dir = self.download_dir
# if synced successfully, verify now
if exitcode == 0 and self.verify_metamanifest:
@@ -448,14 +406,18 @@ class RsyncSync(NewBase):
% (e,),
level=logging.ERROR, noiselevel=-1)
exitcode = 1
+ verify_failure = True
if exitcode == 0 and not local_state_unchanged:
- exitcode = self._commit_download(download_dir)
+ self.repo_storage.commit_update()
+ self.repo_storage.garbage_collection()
return (exitcode, updatecache_flg)
finally:
- if exitcode == 0:
- self._remove_download(download_dir)
+ # Don't delete the update if verification failed, in case
+ # the cause needs to be investigated.
+ if not verify_failure:
+ self.repo_storage.abort_update()
if openpgp_env is not None:
openpgp_env.close()
@@ -594,7 +556,7 @@ class RsyncSync(NewBase):
return rsync_opts
- def _do_rsync(self, syncuri, timestamp, opts, download_dir):
+ def _do_rsync(self, syncuri, timestamp, opts):
updatecache_flg = False
is_synced = False
if timestamp != 0 and "--quiet" not in opts:
@@ -720,11 +682,6 @@ class RsyncSync(NewBase):
# actual sync
command = rsynccommand[:]
- if self.repo.location != download_dir:
- # Use shared hardlinks for files that are identical
- # in the previous snapshot of the repository.
- command.append('--link-dest=%s' % self.repo.location)
-
submodule_paths = self._get_submodule_paths()
if submodule_paths:
# The only way to select multiple directories to
@@ -738,7 +695,7 @@ class RsyncSync(NewBase):
else:
command.append(syncuri + "/")
- command.append(download_dir)
+ command.append(self.download_dir)
exitcode = None
try:
diff --git a/lib/portage/sync/syncbase.py b/lib/portage/sync/syncbase.py
index ce69a4fc0..e9b6ede4e 100644
--- a/lib/portage/sync/syncbase.py
+++ b/lib/portage/sync/syncbase.py
@@ -12,9 +12,11 @@ import logging
import os
import portage
+from portage.repository.storage.interface import RepoStorageException
from portage.util import writemsg_level
from portage.util._eventloop.global_event_loop import global_event_loop
from portage.util.backoff import RandomExponentialBackoff
+from portage.util.futures._sync_decorator import _sync_methods
from portage.util.futures.retry import retry
from portage.util.futures.executor.fork import ForkExecutor
from . import _SUBMODULE_PATH_MAP
@@ -40,6 +42,8 @@ class SyncBase(object):
self.repo = None
self.xterm_titles = None
self.spawn_kwargs = None
+ self._repo_storage = None
+ self._download_dir = None
self.bin_command = None
self._bin_command = bin_command
self.bin_pkg = bin_pkg
@@ -49,7 +53,8 @@ class SyncBase(object):
@property
def has_bin(self):
- '''Checks for existance of the external binary.
+ '''Checks for existance of the external binary, and also
+ checks for storage driver configuration problems.
MUST only be called after _kwargs() has set the logger
'''
@@ -61,8 +66,15 @@ class SyncBase(object):
writemsg_level("!!! %s\n" % l,
level=logging.ERROR, noiselevel=-1)
return False
- return True
+ try:
+ self.repo_storage
+ except RepoStorageException as e:
+ writemsg_level("!!! %s\n" % (e,),
+ level=logging.ERROR, noiselevel=-1)
+ return False
+
+ return True
def _kwargs(self, kwargs):
'''Sets internal variables from kwargs'''
@@ -73,6 +85,43 @@ class SyncBase(object):
self.xterm_titles = self.options.get('xterm_titles', False)
self.spawn_kwargs = self.options.get('spawn_kwargs', None)
+ def _select_storage_module(self):
+ '''
+ Select an appropriate implementation of RepoStorageInterface, based
+ on repos.conf settings.
+
+ @rtype: str
+ @return: name of the selected repo storage constructor
+ '''
+ if self.repo.sync_allow_hardlinks:
+ mod_name = 'portage.repository.storage.hardlink_quarantine.HardlinkQuarantineRepoStorage'
+ else:
+ mod_name = 'portage.repository.storage.inplace.InplaceRepoStorage'
+ return mod_name
+
+ @property
+ def repo_storage(self):
+ """
+ Get the repo storage driver instance. Raise RepoStorageException
+ if there is a configuration problem
+ """
+ if self._repo_storage is None:
+ storage_cls = portage.load_mod(self._select_storage_module())
+ self._repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs))
+ return self._repo_storage
+
+ @property
+ def download_dir(self):
+ """
+ Get the path of the download directory, where the repository
+ update is staged. The directory is initialized lazily, since
+ the repository might already be at the latest revision, and
+ there may be some cost associated with the directory
+ initialization.
+ """
+ if self._download_dir is None:
+ self._download_dir = self.repo_storage.init_update()
+ return self._download_dir
def exists(self, **kwargs):
'''Tests whether the repo actually exists'''