From 2383f21652384458d0fc7a05a0c70610f22719d3 Mon Sep 17 00:00:00 2001 From: André Erdmann Date: Tue, 26 Jun 2012 17:37:24 +0200 Subject: remote * can now be used to feed the overlay creation package queue modified: roverlay/remote/basicrepo.py modified: roverlay/remote/repo.py modified: roverlay/remote/repolist.py modified: roverlay/remote/repoloader.py modified: roverlay/remote/rsync.py --- roverlay/remote/basicrepo.py | 70 ++++++++++++++++++++++++++++-------- roverlay/remote/repo.py | 18 ++++++---- roverlay/remote/repolist.py | 78 ++++++++++++++++++++++++++++++---------- roverlay/remote/repoloader.py | 7 +++- roverlay/remote/rsync.py | 83 ++++++++++++++++++++++++++++++------------- 5 files changed, 190 insertions(+), 66 deletions(-) diff --git a/roverlay/remote/basicrepo.py b/roverlay/remote/basicrepo.py index 9ade3a2..1574850 100644 --- a/roverlay/remote/basicrepo.py +++ b/roverlay/remote/basicrepo.py @@ -8,6 +8,11 @@ DEFAULT_PROTOCOL = 'http' LOCALREPO_SRC_URI = 'http://localhost/R-Packages' +SYNC_SUCCESS = 1 +SYNC_FAIL = 2 +REPO_READY = 4 + + def normalize_uri ( uri, protocol, force_protocol=False ): if not protocol: @@ -53,8 +58,29 @@ class LocalRepo ( object ): else: self.src_uri = src_uri + self.sync_status = 0 + # --- end of __init__ (...) --- + def ready ( self ): + return bool ( self.sync_status & REPO_READY ) + + def fail ( self ): + return bool ( self.sync_status & SYNC_FAIL ) + + def offline ( self ): + return 0 == self.sync_status & SYNC_SUCCESS + + def _set_ready ( self, is_synced ): + """comment TODO""" + if is_synced: + self.sync_status = SYNC_SUCCESS | REPO_READY + else: + self.sync_status = REPO_READY + + def _set_fail ( self ): + self.sync_status = SYNC_FAIL + def __str__ ( self ): return "repo '%s': DISTDIR '%s', SRC_URI '%s'" % ( self.name, self.distdir, self.src_uri @@ -79,7 +105,7 @@ class LocalRepo ( object ): if package_file is None: return self.src_uri else: - return '/'.join ( self.src_uri, package_file ) + return '/'.join ( ( self.src_uri, package_file ) ) # --- end of get_src_uri (...) --- # get_src(...) -> get_src_uri(...) @@ -90,17 +116,26 @@ class LocalRepo ( object ): return os.path.isdir ( self.distdir ) # --- end of exists (...) --- - def nosync ( self ): - """Returns True if the repo is ready for overlay creation, else False. - Useful for basic local distfiles verification without downloading - anything. - """ - return self.exists() + def sync ( self, sync_enabled=True ): + """Syncs this repo.""" - # --- end of nosync (...) --- + status = False + if sync_enabled and hasattr ( self, '_dosync' ): + status = self._dosync() - # sync() -> nosync(), LocalRepos don't have anything to sync - sync = nosync + elif hasattr ( self, '_nosync'): + status = self._nosync() + + else: + status = self.exists() + + if status: + self._set_ready ( is_synced=sync_enabled ) + else: + self._set_fail() + + return status + # --- end of sync (...) --- def scan_distdir ( self, is_package=None ): """Generator that scans the local distfiles dir of this repo and @@ -111,19 +146,26 @@ class LocalRepo ( object ): or None which means that all files are packages. Defaults to None. """ + + kw = { 'origin' : self } + if is_package is None: # unfiltered variant for dirpath, dirnames, filenames in os.walk ( self.distdir ): + kw ['distdir'] = dirpath if dirpath != self.distdir else None + for pkg in filenames: - yield PackageInfo ( filename=pkg, origin=self ) + yield PackageInfo ( filename=pkg, **kw ) elif hasattr ( is_package, '__call__' ): # filtered variant (adds an if is_package... before yield) for dirpath, dirnames, filenames in os.walk ( self.distdir ): + kw ['distdir'] = dirpath if dirpath != self.distdir else None + for pkg in filenames: if is_package ( os.path.join ( dirpath, pkg ) ): - yield PackageInfo ( filename=pkg, origin=self ) + yield PackageInfo ( filename=pkg, **kw ) else: @@ -227,14 +269,14 @@ class RemoteRepo ( LocalRepo ): # get_remote(...) -> get_remote_uri(...) get_remote = get_remote_uri - def sync ( self ): + def _dosync ( self ): """Gets packages from remote(s) and returns True if the repo is ready for overlay creation, else False. Derived classes have to implement this method. """ raise Exception ( "RemoteRepo does not implement sync()." ) - # --- end of sync (...) --- + # --- end of _dosync (...) --- def __str__ ( self ): return "repo '%s': DISTDIR '%s', SRC_URI '%s', REMOTE_URI '%s'" % ( diff --git a/roverlay/remote/repo.py b/roverlay/remote/repo.py index f54f448..febfaae 100644 --- a/roverlay/remote/repo.py +++ b/roverlay/remote/repo.py @@ -11,7 +11,7 @@ class RsyncRepo ( RemoteRepo ): def __init__ ( self, name, directory=None, src_uri=None, rsync_uri=None, base_uri=None, - extra_rsync_opts=None + **rsync_kw ): # super's init: name, remote protocol, directory_kw, **uri_kw # using '' as remote protocol which leaves uris unchanged when @@ -20,19 +20,23 @@ class RsyncRepo ( RemoteRepo ): name, '', directory=directory, src_uri=src_uri, remote_uri=rsync_uri, base_uri=base_uri ) - self.extra_rsync_opts = extra_rsync_opts + self.rsync_extra = rsync_kw + + self.sync_protocol = 'rsync' # --- end of __init__ (...) --- - def sync ( self ): + def _dosync ( self ): retcode = None try: job = RsyncJob ( remote=self.remote_uri, distdir=self.distdir, run_now=True, - extra_opts=self.extra_rsync_opts + **self.rsync_extra ) - if job.returncode == 0: return True + if job.returncode == 0: + self._set_ready ( is_synced=True ) + return True retcode = job.returncode except Exception as e: @@ -41,13 +45,13 @@ class RsyncRepo ( RemoteRepo ): logging.exception ( e ) retcode = '' - logging.error ( 'Repo %s cannot be used for ebuild creation due to errors ' 'while running rsync (return code was %s).' % ( self.name, retcode ) ) + self._set_fail() return False - # --- end of sync (...) --- + # --- end of _dosync (...) --- def __str__ ( self ): return "rsync repo '%s': DISTDIR '%s', SRC_URI '%s', RSYNC_URI '%s'" \ diff --git a/roverlay/remote/repolist.py b/roverlay/remote/repolist.py index 4617057..ae740dd 100644 --- a/roverlay/remote/repolist.py +++ b/roverlay/remote/repolist.py @@ -1,13 +1,19 @@ +import logging from roverlay import config - from roverlay.remote.repoloader import read_repofile +LOGGER = logging.getLogger ( 'RepoList' ) + class RepoList ( object ): def __init__ ( self ): self.repos = list() + self.sync_enabled = True + + # if True: use all repos when looking for packages, even those that + # could not be synced self.use_broken_repos = False def sort ( self ): @@ -25,28 +31,62 @@ class RepoList ( object ): self.load_file ( f ) # --- end of load (...) --- - def sync_all ( self, package_queue=None ): - q = None - if package_queue is None: - q = list() - add = q.append - else: - # TODO: _nowait? raises Exception when queue is full which is - # good in non-threaded execution - # -> timeout,.. - add = q.put + def _queue_packages_from_repo ( self, repo, add_method ): + if not repo.ready(): + if self.use_broken_repos: + # warn and continue + pass + else: + # repo cannot be used + LOGGER.warning ( "!!" ) + return False + + for p in repo.scan_distdir(): + LOGGER.debug ( "adding package %s from repo %s" % ( p, repo ) ) + add_method ( p ) + # --- end of _queue_packages_from_repo (...) --- + + def add_packages ( self, add_method ): + for repo in self.repos: + self._queue_packages_from_repo ( repo, add_method ) + # --- end of add_packages (...) --- + + def _sync_all_repos_and_run ( + self, + when_repo_success=None, when_repo_fail=None, when_repo_done=None, + when_all_done=None + ): + try_call = lambda f, *x, **z : None if f is None else f ( *x, **z ) + + LOGGER.debug ( "Syncing repos ..." ) + for repo in self.repos: + if repo.sync ( sync_enabled=self.sync_enabled ): + # repo successfully synced + try_call ( when_repo_success, repo ) + else: + # else log fail <> + try_call ( when_repo_fail, repo ) + try_call ( when_repo_done, repo ) - # !! TODO resume here. + try_call ( when_all_done ) + # --- end of _sync_all_repos_and_run (...) --- + def sync ( self ): + LOGGER.debug ( "Syncing repos ..." ) for repo in self.repos: - if repo.sync() if self.sync_enabled else repo.nosync(): - # scan repo and create package infos - for p in repo.scan_distdir(): add ( p ) - elif self.use_broken_repos: - # warn and scan repo - ## .. - for p in repo.scan_distdir(): add ( p ) + repo.sync ( sync_enabled=self.sync_enabled ) + # --- end of sync_all (...) --- + + def sync_and_add ( self, add_method ): + """Syncs all repos and adds packages immediately to the package queue.""" + # TODO: _nowait? raises Exception when queue is full which is + # good in non-threaded execution + # -> timeout,.. + + qput = lambda r: self._queue_packages_from_repo ( r, add_method ) + + self._sync_all_repos_and_run ( when_repo_done=qput ) # --- end of sync_all (...) --- diff --git a/roverlay/remote/repoloader.py b/roverlay/remote/repoloader.py index eae35c5..94dd5b7 100644 --- a/roverlay/remote/repoloader.py +++ b/roverlay/remote/repoloader.py @@ -44,13 +44,18 @@ def read_repofile ( repo_file, lenient=False ): src_uri = get ( 'src_uri' ) ) elif repo_type == 'rsync': + extra_opts = get ( 'extra_rsync_opts' ) + if extra_opts: + extra_opts = extra_opts.split ( ' ' ) + repo = RsyncRepo ( name = get ( 'name', name ), directory = get ( 'directory' ), src_uri = get ( 'src_uri' ), rsync_uri = get ( 'rsync_uri' ), base_uri = get ( 'base_uri' ), - extra_rsync_opts = get ( 'extra_rsync_opts' ) + extra_opts = extra_opts, + recursive = get ( 'recursive', False ) == 'yes', ) else: LOGGER.error ( "Unknown repo type %s for %s" % ( repo_type, name ) ) diff --git a/roverlay/remote/rsync.py b/roverlay/remote/rsync.py index e46d1db..f99c8a7 100644 --- a/roverlay/remote/rsync.py +++ b/roverlay/remote/rsync.py @@ -1,4 +1,5 @@ import os +import sys import subprocess from roverlay import config @@ -13,16 +14,24 @@ RSYNC_ENV = keepenv ( 'RSYNC_PASSWORD', ) +# TODO: +# either reraise an KeyboardInterrupt while running rsync (which stops script +# execution unless the interrupt is catched elsewhere) or just set a +# non-zero return code (-> 'repo cannot be used') +RERAISE_INTERRUPT = False + # --recursive is not in the default opts, subdirs in CRAN/contrib are -# either R release (2.xx.x[-patches] or the package archive) +# either R releases (2.xx.x[-patches]) or the package archive DEFAULT_RSYNC_OPTS = ( '--links', # copy symlinks as symlinks, '--safe-links', # but ignore links outside of tree '--times', # '--compress', # FIXME: add lzo if necessary - '--delete', # + '--dirs', # + '--prune-empty-dirs', # '--force', # allow deletion of non-empty dirs + '--delete', # '--human-readable', # '--stats', # '--chmod=ugo=r,u+w,Dugo+x', # 0755 for transferred dirs, 0644 for files @@ -30,11 +39,25 @@ DEFAULT_RSYNC_OPTS = ( class RsyncJob ( object ): def __init__ ( - self, remote=None, distdir=None, run_now=True, extra_opts=None + self, remote=None, distdir=None, run_now=True, + extra_opts=None, recursive=False ): - self.remote = remote - self.distdir = distdir - self.extra_opts = None + self.distdir = distdir + + # syncing directories, not files - always appending a slash at the end + # of remote + if remote [-1] != '/': + self.remote = remote + '/' + else: + self.remote = remote + + if recursive: + self.extra_opts = [ '--recursive' ] + if extra_opts: + self.extra_opts.extend ( extra_opts ) + else: + self.extra_opts = extra_opts + if run_now: self.run() # --- end of __init__ (...) --- @@ -51,36 +74,46 @@ class RsyncJob ( object ): if max_bw is not None: argv.append ( '--bwlimit=%i' % max_bw ) - if self.extra_opts is not None: - if isinstance ( self.extra_opts, str ) or \ - not hasattr ( self.extra_opts, '__iter__' )\ - : - argv.append ( self.extra_opts ) - else: - argv.extend ( self.extra_opts ) + if self.extra_opts: + argv.extend ( self.extra_opts ) argv.extend ( ( self.remote, self.distdir ) ) - return argv + + # removing emty args from argv + return tuple ( filter ( None, argv ) ) # --- end of _rsync_argv (...) --- def run ( self ): rsync_cmd = self._rsync_argv() + print ( ' '.join ( rsync_cmd ) ) os.makedirs ( self.distdir, exist_ok=True ) # TODO pipe/log/.., running this in blocking mode until implemented + try: + proc = subprocess.Popen ( + rsync_cmd, + stdin=None, stdout=None, stderr=None, + env=RSYNC_ENV + ) + + if proc.communicate() != ( None, None ): + raise AssertionError ( "expected None,None from communicate!" ) + + self.returncode = proc.returncode + + except KeyboardInterrupt: + sys.stderr.write ( + "\nKeyboard interrupt - waiting for rsync to exit...\n" + ) + if 'proc' in locals(): + proc.communicate() + self.returncode = proc.returncode + else: + self.returncode = 130 - proc = subprocess.Popen ( - rsync_cmd, - stdin=None, stdout=None, stderr=None, - env=RSYNC_ENV - ) - - if proc.communicate() != ( None, None ): - raise AssertionError ( "expected None,None from communicate!" ) - - self.returncode = proc.returncode - + if RERAISE_INTERRUPT: + raise # --- end of start (...) --- -- cgit v1.2.3-65-gdbad