diff options
Diffstat (limited to 'lib/_emerge/BinpkgFetcher.py')
-rw-r--r-- | lib/_emerge/BinpkgFetcher.py | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/lib/_emerge/BinpkgFetcher.py b/lib/_emerge/BinpkgFetcher.py new file mode 100644 index 000000000..36d027de3 --- /dev/null +++ b/lib/_emerge/BinpkgFetcher.py @@ -0,0 +1,240 @@ +# Copyright 1999-2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import functools + +from _emerge.AsynchronousLock import AsynchronousLock +from _emerge.CompositeTask import CompositeTask +from _emerge.SpawnProcess import SpawnProcess +try: + from urllib.parse import urlparse as urllib_parse_urlparse +except ImportError: + from urlparse import urlparse as urllib_parse_urlparse +import stat +import sys +import portage +from portage import os +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture +from portage.util._pty import _create_pty_or_pipe + +if sys.hexversion >= 0x3000000: + long = int + + +class BinpkgFetcher(CompositeTask): + + __slots__ = ("pkg", "pretend", "logfile", "pkg_path") + + def __init__(self, **kwargs): + CompositeTask.__init__(self, **kwargs) + pkg = self.pkg + self.pkg_path = pkg.root_config.trees["bintree"].getname( + pkg.cpv) + ".partial" + + def _start(self): + fetcher = _BinpkgFetcherProcess(background=self.background, + logfile=self.logfile, pkg=self.pkg, pkg_path=self.pkg_path, + pretend=self.pretend, scheduler=self.scheduler) + + if not self.pretend: + portage.util.ensure_dirs(os.path.dirname(self.pkg_path)) + if "distlocks" in self.pkg.root_config.settings.features: + self._start_task( + AsyncTaskFuture(future=fetcher.async_lock()), + functools.partial(self._start_locked, fetcher)) + return + + self._start_task(fetcher, self._fetcher_exit) + + def _start_locked(self, fetcher, lock_task): + self._assert_current(lock_task) + if lock_task.cancelled: + self._default_final_exit(lock_task) + return + + lock_task.future.result() + self._start_task(fetcher, self._fetcher_exit) + + def _fetcher_exit(self, fetcher): + self._assert_current(fetcher) + if not self.pretend and fetcher.returncode == os.EX_OK: + fetcher.sync_timestamp() + if fetcher.locked: + self._start_task( + AsyncTaskFuture(future=fetcher.async_unlock()), + functools.partial(self._fetcher_exit_unlocked, fetcher)) + else: + self._fetcher_exit_unlocked(fetcher) + + def _fetcher_exit_unlocked(self, fetcher, unlock_task=None): + if unlock_task is not None: + self._assert_current(unlock_task) + if unlock_task.cancelled: + self._default_final_exit(unlock_task) + return + + unlock_task.future.result() + + self._current_task = None + self.returncode = fetcher.returncode + self._async_wait() + + +class _BinpkgFetcherProcess(SpawnProcess): + + __slots__ = ("pkg", "pretend", "locked", "pkg_path", "_lock_obj") + + def _start(self): + pkg = self.pkg + pretend = self.pretend + bintree = pkg.root_config.trees["bintree"] + settings = bintree.settings + pkg_path = self.pkg_path + + exists = os.path.exists(pkg_path) + resume = exists and os.path.basename(pkg_path) in bintree.invalids + if not (pretend or resume): + # Remove existing file or broken symlink. + try: + os.unlink(pkg_path) + except OSError: + pass + + # urljoin doesn't work correctly with + # unrecognized protocols like sftp + if bintree._remote_has_index: + instance_key = bintree.dbapi._instance_key(pkg.cpv) + rel_uri = bintree._remotepkgs[instance_key].get("PATH") + if not rel_uri: + rel_uri = pkg.cpv + ".tbz2" + remote_base_uri = bintree._remotepkgs[ + instance_key]["BASE_URI"] + uri = remote_base_uri.rstrip("/") + "/" + rel_uri.lstrip("/") + else: + uri = settings["PORTAGE_BINHOST"].rstrip("/") + \ + "/" + pkg.pf + ".tbz2" + + if pretend: + portage.writemsg_stdout("\n%s\n" % uri, noiselevel=-1) + self.returncode = os.EX_OK + self._async_wait() + return + + protocol = urllib_parse_urlparse(uri)[0] + fcmd_prefix = "FETCHCOMMAND" + if resume: + fcmd_prefix = "RESUMECOMMAND" + fcmd = settings.get(fcmd_prefix + "_" + protocol.upper()) + if not fcmd: + fcmd = settings.get(fcmd_prefix) + + fcmd_vars = { + "DISTDIR" : os.path.dirname(pkg_path), + "URI" : uri, + "FILE" : os.path.basename(pkg_path) + } + + for k in ("PORTAGE_SSH_OPTS",): + v = settings.get(k) + if v is not None: + fcmd_vars[k] = v + + fetch_env = dict(settings.items()) + fetch_args = [portage.util.varexpand(x, mydict=fcmd_vars) \ + for x in portage.util.shlex_split(fcmd)] + + if self.fd_pipes is None: + self.fd_pipes = {} + fd_pipes = self.fd_pipes + + # Redirect all output to stdout since some fetchers like + # wget pollute stderr (if portage detects a problem then it + # can send it's own message to stderr). + fd_pipes.setdefault(0, portage._get_stdin().fileno()) + fd_pipes.setdefault(1, sys.__stdout__.fileno()) + fd_pipes.setdefault(2, sys.__stdout__.fileno()) + + self.args = fetch_args + self.env = fetch_env + if settings.selinux_enabled(): + self._selinux_type = settings["PORTAGE_FETCH_T"] + SpawnProcess._start(self) + + def _pipe(self, fd_pipes): + """When appropriate, use a pty so that fetcher progress bars, + like wget has, will work properly.""" + if self.background or not sys.__stdout__.isatty(): + # When the output only goes to a log file, + # there's no point in creating a pty. + return os.pipe() + stdout_pipe = None + if not self.background: + stdout_pipe = fd_pipes.get(1) + got_pty, master_fd, slave_fd = \ + _create_pty_or_pipe(copy_term_size=stdout_pipe) + return (master_fd, slave_fd) + + def sync_timestamp(self): + # If possible, update the mtime to match the remote package if + # the fetcher didn't already do it automatically. + bintree = self.pkg.root_config.trees["bintree"] + if bintree._remote_has_index: + remote_mtime = bintree._remotepkgs[ + bintree.dbapi._instance_key( + self.pkg.cpv)].get("_mtime_") + if remote_mtime is not None: + try: + remote_mtime = long(remote_mtime) + except ValueError: + pass + else: + try: + local_mtime = os.stat(self.pkg_path)[stat.ST_MTIME] + except OSError: + pass + else: + if remote_mtime != local_mtime: + try: + os.utime(self.pkg_path, + (remote_mtime, remote_mtime)) + except OSError: + pass + + def async_lock(self): + """ + This raises an AlreadyLocked exception if lock() is called + while a lock is already held. In order to avoid this, call + unlock() or check whether the "locked" attribute is True + or False before calling lock(). + """ + if self._lock_obj is not None: + raise self.AlreadyLocked((self._lock_obj,)) + + result = self.scheduler.create_future() + + def acquired_lock(async_lock): + if async_lock.wait() == os.EX_OK: + self.locked = True + result.set_result(None) + else: + result.set_exception(AssertionError( + "AsynchronousLock failed with returncode %s" + % (async_lock.returncode,))) + + self._lock_obj = AsynchronousLock(path=self.pkg_path, + scheduler=self.scheduler) + self._lock_obj.addExitListener(acquired_lock) + self._lock_obj.start() + return result + + class AlreadyLocked(portage.exception.PortageException): + pass + + def async_unlock(self): + if self._lock_obj is None: + raise AssertionError('already unlocked') + result = self._lock_obj.async_unlock() + self._lock_obj = None + self.locked = False + return result + |