aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndré Erdmann <dywi@mailerd.de>2012-06-26 17:37:24 +0200
committerAndré Erdmann <dywi@mailerd.de>2012-06-26 17:37:24 +0200
commit2383f21652384458d0fc7a05a0c70610f22719d3 (patch)
treeac67c252d8dab535d06b2075bbf3dd3c8bfba695
parentthe remote module (used for repo syncing) (diff)
downloadR_overlay-2383f21652384458d0fc7a05a0c70610f22719d3.tar.gz
R_overlay-2383f21652384458d0fc7a05a0c70610f22719d3.tar.bz2
R_overlay-2383f21652384458d0fc7a05a0c70610f22719d3.zip
remote
* can now be used to feed the overlay creation package queue modified: roverlay/remote/basicrepo.py modified: roverlay/remote/repo.py modified: roverlay/remote/repolist.py modified: roverlay/remote/repoloader.py modified: roverlay/remote/rsync.py
-rw-r--r--roverlay/remote/basicrepo.py70
-rw-r--r--roverlay/remote/repo.py18
-rw-r--r--roverlay/remote/repolist.py78
-rw-r--r--roverlay/remote/repoloader.py7
-rw-r--r--roverlay/remote/rsync.py83
5 files changed, 190 insertions, 66 deletions
diff --git a/roverlay/remote/basicrepo.py b/roverlay/remote/basicrepo.py
index 9ade3a2..1574850 100644
--- a/roverlay/remote/basicrepo.py
+++ b/roverlay/remote/basicrepo.py
@@ -8,6 +8,11 @@ DEFAULT_PROTOCOL = 'http'
LOCALREPO_SRC_URI = 'http://localhost/R-Packages'
+SYNC_SUCCESS = 1
+SYNC_FAIL = 2
+REPO_READY = 4
+
+
def normalize_uri ( uri, protocol, force_protocol=False ):
if not protocol:
@@ -53,8 +58,29 @@ class LocalRepo ( object ):
else:
self.src_uri = src_uri
+ self.sync_status = 0
+
# --- end of __init__ (...) ---
+ def ready ( self ):
+ return bool ( self.sync_status & REPO_READY )
+
+ def fail ( self ):
+ return bool ( self.sync_status & SYNC_FAIL )
+
+ def offline ( self ):
+ return 0 == self.sync_status & SYNC_SUCCESS
+
+ def _set_ready ( self, is_synced ):
+ """comment TODO"""
+ if is_synced:
+ self.sync_status = SYNC_SUCCESS | REPO_READY
+ else:
+ self.sync_status = REPO_READY
+
+ def _set_fail ( self ):
+ self.sync_status = SYNC_FAIL
+
def __str__ ( self ):
return "repo '%s': DISTDIR '%s', SRC_URI '%s'" % (
self.name, self.distdir, self.src_uri
@@ -79,7 +105,7 @@ class LocalRepo ( object ):
if package_file is None:
return self.src_uri
else:
- return '/'.join ( self.src_uri, package_file )
+ return '/'.join ( ( self.src_uri, package_file ) )
# --- end of get_src_uri (...) ---
# get_src(...) -> get_src_uri(...)
@@ -90,17 +116,26 @@ class LocalRepo ( object ):
return os.path.isdir ( self.distdir )
# --- end of exists (...) ---
- def nosync ( self ):
- """Returns True if the repo is ready for overlay creation, else False.
- Useful for basic local distfiles verification without downloading
- anything.
- """
- return self.exists()
+ def sync ( self, sync_enabled=True ):
+ """Syncs this repo."""
- # --- end of nosync (...) ---
+ status = False
+ if sync_enabled and hasattr ( self, '_dosync' ):
+ status = self._dosync()
- # sync() -> nosync(), LocalRepos don't have anything to sync
- sync = nosync
+ elif hasattr ( self, '_nosync'):
+ status = self._nosync()
+
+ else:
+ status = self.exists()
+
+ if status:
+ self._set_ready ( is_synced=sync_enabled )
+ else:
+ self._set_fail()
+
+ return status
+ # --- end of sync (...) ---
def scan_distdir ( self, is_package=None ):
"""Generator that scans the local distfiles dir of this repo and
@@ -111,19 +146,26 @@ class LocalRepo ( object ):
or None which means that all files are packages.
Defaults to None.
"""
+
+ kw = { 'origin' : self }
+
if is_package is None:
# unfiltered variant
for dirpath, dirnames, filenames in os.walk ( self.distdir ):
+ kw ['distdir'] = dirpath if dirpath != self.distdir else None
+
for pkg in filenames:
- yield PackageInfo ( filename=pkg, origin=self )
+ yield PackageInfo ( filename=pkg, **kw )
elif hasattr ( is_package, '__call__' ):
# filtered variant (adds an if is_package... before yield)
for dirpath, dirnames, filenames in os.walk ( self.distdir ):
+ kw ['distdir'] = dirpath if dirpath != self.distdir else None
+
for pkg in filenames:
if is_package ( os.path.join ( dirpath, pkg ) ):
- yield PackageInfo ( filename=pkg, origin=self )
+ yield PackageInfo ( filename=pkg, **kw )
else:
@@ -227,14 +269,14 @@ class RemoteRepo ( LocalRepo ):
# get_remote(...) -> get_remote_uri(...)
get_remote = get_remote_uri
- def sync ( self ):
+ def _dosync ( self ):
"""Gets packages from remote(s) and returns True if the repo is ready
for overlay creation, else False.
Derived classes have to implement this method.
"""
raise Exception ( "RemoteRepo does not implement sync()." )
- # --- end of sync (...) ---
+ # --- end of _dosync (...) ---
def __str__ ( self ):
return "repo '%s': DISTDIR '%s', SRC_URI '%s', REMOTE_URI '%s'" % (
diff --git a/roverlay/remote/repo.py b/roverlay/remote/repo.py
index f54f448..febfaae 100644
--- a/roverlay/remote/repo.py
+++ b/roverlay/remote/repo.py
@@ -11,7 +11,7 @@ class RsyncRepo ( RemoteRepo ):
def __init__ (
self, name,
directory=None, src_uri=None, rsync_uri=None, base_uri=None,
- extra_rsync_opts=None
+ **rsync_kw
):
# super's init: name, remote protocol, directory_kw, **uri_kw
# using '' as remote protocol which leaves uris unchanged when
@@ -20,19 +20,23 @@ class RsyncRepo ( RemoteRepo ):
name, '', directory=directory,
src_uri=src_uri, remote_uri=rsync_uri, base_uri=base_uri
)
- self.extra_rsync_opts = extra_rsync_opts
+ self.rsync_extra = rsync_kw
+
+ self.sync_protocol = 'rsync'
# --- end of __init__ (...) ---
- def sync ( self ):
+ def _dosync ( self ):
retcode = None
try:
job = RsyncJob (
remote=self.remote_uri, distdir=self.distdir,
run_now=True,
- extra_opts=self.extra_rsync_opts
+ **self.rsync_extra
)
- if job.returncode == 0: return True
+ if job.returncode == 0:
+ self._set_ready ( is_synced=True )
+ return True
retcode = job.returncode
except Exception as e:
@@ -41,13 +45,13 @@ class RsyncRepo ( RemoteRepo ):
logging.exception ( e )
retcode = '<undef>'
-
logging.error (
'Repo %s cannot be used for ebuild creation due to errors '
'while running rsync (return code was %s).' % ( self.name, retcode )
)
+ self._set_fail()
return False
- # --- end of sync (...) ---
+ # --- end of _dosync (...) ---
def __str__ ( self ):
return "rsync repo '%s': DISTDIR '%s', SRC_URI '%s', RSYNC_URI '%s'" \
diff --git a/roverlay/remote/repolist.py b/roverlay/remote/repolist.py
index 4617057..ae740dd 100644
--- a/roverlay/remote/repolist.py
+++ b/roverlay/remote/repolist.py
@@ -1,13 +1,19 @@
+import logging
from roverlay import config
-
from roverlay.remote.repoloader import read_repofile
+LOGGER = logging.getLogger ( 'RepoList' )
+
class RepoList ( object ):
def __init__ ( self ):
self.repos = list()
+
self.sync_enabled = True
+
+ # if True: use all repos when looking for packages, even those that
+ # could not be synced
self.use_broken_repos = False
def sort ( self ):
@@ -25,28 +31,62 @@ class RepoList ( object ):
self.load_file ( f )
# --- end of load (...) ---
- def sync_all ( self, package_queue=None ):
- q = None
- if package_queue is None:
- q = list()
- add = q.append
- else:
- # TODO: _nowait? raises Exception when queue is full which is
- # good in non-threaded execution
- # -> timeout,..
- add = q.put
+ def _queue_packages_from_repo ( self, repo, add_method ):
+ if not repo.ready():
+ if self.use_broken_repos:
+ # warn and continue
+ pass
+ else:
+ # repo cannot be used
+ LOGGER.warning ( "!!" )
+ return False
+
+ for p in repo.scan_distdir():
+ LOGGER.debug ( "adding package %s from repo %s" % ( p, repo ) )
+ add_method ( p )
+ # --- end of _queue_packages_from_repo (...) ---
+
+ def add_packages ( self, add_method ):
+ for repo in self.repos:
+ self._queue_packages_from_repo ( repo, add_method )
+ # --- end of add_packages (...) ---
+
+ def _sync_all_repos_and_run (
+ self,
+ when_repo_success=None, when_repo_fail=None, when_repo_done=None,
+ when_all_done=None
+ ):
+ try_call = lambda f, *x, **z : None if f is None else f ( *x, **z )
+
+ LOGGER.debug ( "Syncing repos ..." )
+ for repo in self.repos:
+ if repo.sync ( sync_enabled=self.sync_enabled ):
+ # repo successfully synced
+ try_call ( when_repo_success, repo )
+ else:
+ # else log fail <>
+ try_call ( when_repo_fail, repo )
+ try_call ( when_repo_done, repo )
- # !! TODO resume here.
+ try_call ( when_all_done )
+ # --- end of _sync_all_repos_and_run (...) ---
+ def sync ( self ):
+ LOGGER.debug ( "Syncing repos ..." )
for repo in self.repos:
- if repo.sync() if self.sync_enabled else repo.nosync():
- # scan repo and create package infos
- for p in repo.scan_distdir(): add ( p )
- elif self.use_broken_repos:
- # warn and scan repo
- ## ..
- for p in repo.scan_distdir(): add ( p )
+ repo.sync ( sync_enabled=self.sync_enabled )
+ # --- end of sync_all (...) ---
+
+ def sync_and_add ( self, add_method ):
+ """Syncs all repos and adds packages immediately to the package queue."""
+ # TODO: _nowait? raises Exception when queue is full which is
+ # good in non-threaded execution
+ # -> timeout,..
+
+ qput = lambda r: self._queue_packages_from_repo ( r, add_method )
+
+ self._sync_all_repos_and_run ( when_repo_done=qput )
# --- end of sync_all (...) ---
diff --git a/roverlay/remote/repoloader.py b/roverlay/remote/repoloader.py
index eae35c5..94dd5b7 100644
--- a/roverlay/remote/repoloader.py
+++ b/roverlay/remote/repoloader.py
@@ -44,13 +44,18 @@ def read_repofile ( repo_file, lenient=False ):
src_uri = get ( 'src_uri' )
)
elif repo_type == 'rsync':
+ extra_opts = get ( 'extra_rsync_opts' )
+ if extra_opts:
+ extra_opts = extra_opts.split ( ' ' )
+
repo = RsyncRepo (
name = get ( 'name', name ),
directory = get ( 'directory' ),
src_uri = get ( 'src_uri' ),
rsync_uri = get ( 'rsync_uri' ),
base_uri = get ( 'base_uri' ),
- extra_rsync_opts = get ( 'extra_rsync_opts' )
+ extra_opts = extra_opts,
+ recursive = get ( 'recursive', False ) == 'yes',
)
else:
LOGGER.error ( "Unknown repo type %s for %s" % ( repo_type, name ) )
diff --git a/roverlay/remote/rsync.py b/roverlay/remote/rsync.py
index e46d1db..f99c8a7 100644
--- a/roverlay/remote/rsync.py
+++ b/roverlay/remote/rsync.py
@@ -1,4 +1,5 @@
import os
+import sys
import subprocess
from roverlay import config
@@ -13,16 +14,24 @@ RSYNC_ENV = keepenv (
'RSYNC_PASSWORD',
)
+# TODO:
+# either reraise an KeyboardInterrupt while running rsync (which stops script
+# execution unless the interrupt is catched elsewhere) or just set a
+# non-zero return code (-> 'repo cannot be used')
+RERAISE_INTERRUPT = False
+
# --recursive is not in the default opts, subdirs in CRAN/contrib are
-# either R release (2.xx.x[-patches] or the package archive)
+# either R releases (2.xx.x[-patches]) or the package archive
DEFAULT_RSYNC_OPTS = (
'--links', # copy symlinks as symlinks,
'--safe-links', # but ignore links outside of tree
'--times', #
'--compress', # FIXME: add lzo if necessary
- '--delete', #
+ '--dirs', #
+ '--prune-empty-dirs', #
'--force', # allow deletion of non-empty dirs
+ '--delete', #
'--human-readable', #
'--stats', #
'--chmod=ugo=r,u+w,Dugo+x', # 0755 for transferred dirs, 0644 for files
@@ -30,11 +39,25 @@ DEFAULT_RSYNC_OPTS = (
class RsyncJob ( object ):
def __init__ (
- self, remote=None, distdir=None, run_now=True, extra_opts=None
+ self, remote=None, distdir=None, run_now=True,
+ extra_opts=None, recursive=False
):
- self.remote = remote
- self.distdir = distdir
- self.extra_opts = None
+ self.distdir = distdir
+
+ # syncing directories, not files - always appending a slash at the end
+ # of remote
+ if remote [-1] != '/':
+ self.remote = remote + '/'
+ else:
+ self.remote = remote
+
+ if recursive:
+ self.extra_opts = [ '--recursive' ]
+ if extra_opts:
+ self.extra_opts.extend ( extra_opts )
+ else:
+ self.extra_opts = extra_opts
+
if run_now: self.run()
# --- end of __init__ (...) ---
@@ -51,36 +74,46 @@ class RsyncJob ( object ):
if max_bw is not None:
argv.append ( '--bwlimit=%i' % max_bw )
- if self.extra_opts is not None:
- if isinstance ( self.extra_opts, str ) or \
- not hasattr ( self.extra_opts, '__iter__' )\
- :
- argv.append ( self.extra_opts )
- else:
- argv.extend ( self.extra_opts )
+ if self.extra_opts:
+ argv.extend ( self.extra_opts )
argv.extend ( ( self.remote, self.distdir ) )
- return argv
+
+ # removing emty args from argv
+ return tuple ( filter ( None, argv ) )
# --- end of _rsync_argv (...) ---
def run ( self ):
rsync_cmd = self._rsync_argv()
+ print ( ' '.join ( rsync_cmd ) )
os.makedirs ( self.distdir, exist_ok=True )
# TODO pipe/log/.., running this in blocking mode until implemented
+ try:
+ proc = subprocess.Popen (
+ rsync_cmd,
+ stdin=None, stdout=None, stderr=None,
+ env=RSYNC_ENV
+ )
+
+ if proc.communicate() != ( None, None ):
+ raise AssertionError ( "expected None,None from communicate!" )
+
+ self.returncode = proc.returncode
+
+ except KeyboardInterrupt:
+ sys.stderr.write (
+ "\nKeyboard interrupt - waiting for rsync to exit...\n"
+ )
+ if 'proc' in locals():
+ proc.communicate()
+ self.returncode = proc.returncode
+ else:
+ self.returncode = 130
- proc = subprocess.Popen (
- rsync_cmd,
- stdin=None, stdout=None, stderr=None,
- env=RSYNC_ENV
- )
-
- if proc.communicate() != ( None, None ):
- raise AssertionError ( "expected None,None from communicate!" )
-
- self.returncode = proc.returncode
-
+ if RERAISE_INTERRUPT:
+ raise
# --- end of start (...) ---