# Copyright 2014-2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 ''' Base class for performing sync operations. This class contains common initialization code and functions. ''' from __future__ import unicode_literals import functools import logging import os import portage from portage.repository.storage.interface import RepoStorageException from portage.util import writemsg_level from portage.util._eventloop.global_event_loop import global_event_loop from portage.util.backoff import RandomExponentialBackoff from portage.util.futures._sync_decorator import _sync_methods from portage.util.futures.retry import retry from portage.util.futures.executor.fork import ForkExecutor from . import _SUBMODULE_PATH_MAP class SyncBase(object): '''Base Sync class for subclassing''' short_desc = "Perform sync operations on repositories" @staticmethod def name(): return "BlankSync" def can_progressbar(self, func): return False def __init__(self, bin_command, bin_pkg): self.options = None self.settings = None self.logger = None self.repo = None self.xterm_titles = None self.spawn_kwargs = None self._repo_storage = None self._download_dir = None self.bin_command = None self._bin_command = bin_command self.bin_pkg = bin_pkg if bin_command: self.bin_command = portage.process.find_binary(bin_command) @property def has_bin(self): '''Checks for existance of the external binary, and also checks for storage driver configuration problems. MUST only be called after _kwargs() has set the logger ''' if self.bin_command is None: msg = ["Command not found: %s" % self._bin_command, "Type \"emerge %s\" to enable %s support." % (self.bin_pkg, self._bin_command)] for l in msg: writemsg_level("!!! %s\n" % l, level=logging.ERROR, noiselevel=-1) return False try: self.repo_storage except RepoStorageException as e: writemsg_level("!!! %s\n" % (e,), level=logging.ERROR, noiselevel=-1) return False return True def _kwargs(self, kwargs): '''Sets internal variables from kwargs''' self.options = kwargs.get('options', {}) self.settings = self.options.get('settings', None) self.logger = self.options.get('logger', None) self.repo = self.options.get('repo', None) self.xterm_titles = self.options.get('xterm_titles', False) self.spawn_kwargs = self.options.get('spawn_kwargs', None) def _select_storage_module(self): ''' Select an appropriate implementation of RepoStorageInterface, based on repos.conf settings. @rtype: str @return: name of the selected repo storage constructor ''' if self.repo.sync_rcu: mod_name = 'portage.repository.storage.hardlink_rcu.HardlinkRcuRepoStorage' elif self.repo.sync_allow_hardlinks: mod_name = 'portage.repository.storage.hardlink_quarantine.HardlinkQuarantineRepoStorage' else: mod_name = 'portage.repository.storage.inplace.InplaceRepoStorage' return mod_name @property def repo_storage(self): """ Get the repo storage driver instance. Raise RepoStorageException if there is a configuration problem """ if self._repo_storage is None: storage_cls = portage.load_mod(self._select_storage_module()) self._repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs)) return self._repo_storage @property def download_dir(self): """ Get the path of the download directory, where the repository update is staged. The directory is initialized lazily, since the repository might already be at the latest revision, and there may be some cost associated with the directory initialization. """ if self._download_dir is None: self._download_dir = self.repo_storage.init_update() return self._download_dir def exists(self, **kwargs): '''Tests whether the repo actually exists''' if kwargs: self._kwargs(kwargs) elif not self.repo: return False if not os.path.exists(self.repo.location): return False return True def sync(self, **kwargs): '''Sync the repository''' raise NotImplementedError def post_sync(self, portdb, location, emerge_config): '''repo.sync_type == "Blank": # NOTE: Do this after reloading the config, in case # it did not exist prior to sync, so that the config # and portdb properly account for its existence. ''' pass def _get_submodule_paths(self): paths = [] emerge_config = self.options.get('emerge_config') if emerge_config is not None: for name in emerge_config.opts.get('--sync-submodule', []): paths.extend(_SUBMODULE_PATH_MAP[name]) return tuple(paths) def retrieve_head(self, **kwargs): '''Get information about the head commit''' raise NotImplementedError def _key_refresh_retry_decorator(self): ''' Return a retry decorator, or None if retry is disabled. If retry fails, the function reraises the exception raised by the decorated function. If retry times out and no exception is available to reraise, the function raises TimeoutError. ''' errors = [] if self.repo.sync_openpgp_key_refresh_retry_count is None: return None try: retry_count = int(self.repo.sync_openpgp_key_refresh_retry_count) except Exception as e: errors.append('sync-openpgp-key-refresh-retry-count: {}'.format(e)) else: if retry_count <= 0: return None if self.repo.sync_openpgp_key_refresh_retry_overall_timeout is None: retry_overall_timeout = None else: try: retry_overall_timeout = float(self.repo.sync_openpgp_key_refresh_retry_overall_timeout) except Exception as e: errors.append('sync-openpgp-key-refresh-retry-overall-timeout: {}'.format(e)) else: if retry_overall_timeout < 0: errors.append('sync-openpgp-key-refresh-retry-overall-timeout: ' 'value must be greater than or equal to zero: {}'.format(retry_overall_timeout)) elif retry_overall_timeout == 0: retry_overall_timeout = None if self.repo.sync_openpgp_key_refresh_retry_delay_mult is None: retry_delay_mult = None else: try: retry_delay_mult = float(self.repo.sync_openpgp_key_refresh_retry_delay_mult) except Exception as e: errors.append('sync-openpgp-key-refresh-retry-delay-mult: {}'.format(e)) else: if retry_delay_mult <= 0: errors.append('sync-openpgp-key-refresh-retry-mult: ' 'value must be greater than zero: {}'.format(retry_delay_mult)) if self.repo.sync_openpgp_key_refresh_retry_delay_exp_base is None: retry_delay_exp_base = None else: try: retry_delay_exp_base = float(self.repo.sync_openpgp_key_refresh_retry_delay_exp_base) except Exception as e: errors.append('sync-openpgp-key-refresh-retry-delay-exp: {}'.format(e)) else: if retry_delay_exp_base <= 0: errors.append('sync-openpgp-key-refresh-retry-delay-exp: ' 'value must be greater than zero: {}'.format(retry_delay_mult)) if errors: lines = [] lines.append('') lines.append('!!! Retry disabled for openpgp key refresh:') lines.append('') for msg in errors: lines.append(' {}'.format(msg)) lines.append('') for line in lines: writemsg_level("{}\n".format(line), level=logging.ERROR, noiselevel=-1) return None return retry( reraise=True, try_max=retry_count, overall_timeout=(retry_overall_timeout if retry_overall_timeout > 0 else None), delay_func=RandomExponentialBackoff( multiplier=(1 if retry_delay_mult is None else retry_delay_mult), base=(2 if retry_delay_exp_base is None else retry_delay_exp_base))) def _refresh_keys(self, openpgp_env): """ Refresh keys stored in openpgp_env. Raises gemato.exceptions.GematoException or asyncio.TimeoutError on failure. @param openpgp_env: openpgp environment @type openpgp_env: gemato.openpgp.OpenPGPEnvironment """ out = portage.output.EOutput(quiet=('--quiet' in self.options['emerge_config'].opts)) out.ebegin('Refreshing keys via WKD') if openpgp_env.refresh_keys_wkd(): out.eend(0) return out.eend(1) out.ebegin('Refreshing keys from keyserver{}'.format( ('' if self.repo.sync_openpgp_keyserver is None else ' ' + self.repo.sync_openpgp_keyserver))) retry_decorator = self._key_refresh_retry_decorator() if retry_decorator is None: openpgp_env.refresh_keys_keyserver(keyserver=self.repo.sync_openpgp_keyserver) else: def noisy_refresh_keys(): """ Since retry does not help for some types of errors, display errors as soon as they occur. """ try: openpgp_env.refresh_keys_keyserver(keyserver=self.repo.sync_openpgp_keyserver) except Exception as e: writemsg_level("%s\n" % (e,), level=logging.ERROR, noiselevel=-1) raise # retry # The ThreadPoolExecutor that asyncio uses by default # does not support cancellation of tasks, therefore # use ForkExecutor for task cancellation support, in # order to enforce timeouts. loop = global_event_loop() with ForkExecutor(loop=loop) as executor: func_coroutine = functools.partial(loop.run_in_executor, executor, noisy_refresh_keys) decorated_func = retry_decorator(func_coroutine, loop=loop) loop.run_until_complete(decorated_func()) out.eend(0) class NewBase(SyncBase): '''Subclasses Syncbase adding a new() and runs it instead of update() if the repository does not exist()''' def __init__(self, bin_command, bin_pkg): SyncBase.__init__(self, bin_command, bin_pkg) def sync(self, **kwargs): '''Sync the repository''' if kwargs: self._kwargs(kwargs) if not self.has_bin: return (1, False) if not self.exists(): return self.new() return self.update() def new(self, **kwargs): '''Do the initial download and install of the repository''' raise NotImplementedError def update(self): '''Update existing repository ''' raise NotImplementedError