#!/usr/bin/env python2 # Thanks to Zac Medico for working example of using an api import portage from portage.dep import Atom # to not use own emerge option parser. Options may change but I hope # parse_opts function will always be there from _emerge.main import parse_opts # TODO: check if actions always here try: from _emerge import actions, Package from _emerge.create_depgraph_params import create_depgraph_params from _emerge.depgraph import backtrack_depgraph, resume_depgraph, depgraph except ImportError, Err: # non-critical, just print warning(TODO: strerr) print "Error while loading modules: %s" % Err.message class portage_api: """ class for accessing the portage api """ def __init__(self): """ test """ settings,trees,mtimedb=actions.load_emerge_config() #import pdb; pdb.set_trace() self.settings=settings self.trees=trees self.mtimedb=mtimedb self.vartree=trees[portage.root]['vartree'] self.vardb=self.vartree.dbapi self.portdb=portage.portdb self.metadata_keys = [ k for k in portage.auxdbkeys if not k.startswith("UNUSED_")] self.use=self.settings["USE"] # This function is from portage and almost unchanged # It is here because old versions of portage not have this function def expand_new_virt(self,atom): """ Iterate over the recursively expanded RDEPEND atoms of a new-style virtual. If atom is not a new-style virtual or it does not match an installed package then it is yielded without any expansion. """ vardb=self.vartree.dbapi if not isinstance(atom, Atom): atom = Atom(atom) if not atom.cp.startswith("virtual/"): yield atom return traversed = set() stack = [atom] while stack: atom = stack.pop() if atom.blocker or \ not atom.cp.startswith("virtual/"): yield atom continue matches = vardb.match(atom) if not (matches and matches[-1].startswith("virtual/")): yield atom continue virt_cpv = matches[-1] if virt_cpv in traversed: continue traversed.add(virt_cpv) eapi, iuse, rdepend, use = vardb.aux_get(virt_cpv, ["EAPI", "IUSE", "RDEPEND", "USE"]) if not portage.eapi_is_supported(eapi): yield atom continue iuse_defaults=[ u[1:] for u in iuse.split() if u.startswith("+")] use=use.split() for u in iuse_defaults: if u not in use: use.append(u) success, atoms = portage.dep_check(rdepend, None, vardb.settings, myuse=use, myroot=vardb.root, trees={vardb.root:{"porttree":vardb.vartree, "vartree":vardb.vartree}}) if success: stack.extend(atoms) else: yield atom def get_best_visible_pkg(self,pkg,db="portdb"): """ Gets best candidate on installing. Returns empty string if no found :param pkg: package name :param db: name of db to look. Can be "vardb" or "portdb" """ try: if db=="portdb": return self.portdb.xmatch("bestmatch-visible", pkg) elif db=="vardb": return self.vardb.match(pkg)[0] else: return '' except: return '' # trying to repeat action_build(...) logic in _emerge/actions.py def get_merge_list(self,emergeargs): """ Gets list of packages that emerge with emergeargs-arguments will merge This function uses very internal functions of portage so it may be unreliable in various portage versions :param emergeargs: list of raw args of emerge, for example, ['-1','bash'] """ try: action, opts, files = parse_opts(emergeargs, silent=True) resume = False if "--resume" in opts and \ ("resume" in self.mtimedb or "resume_backup" in self.mtimedb): resume = True if "resume" not in self.mtimedb: self.mtimedb["resume"] = self.mtimedb["resume_backup"] # "opts" is a list for backward compatibility. resume_opts = self.mtimedb["resume"].get("myopts", []) if isinstance(resume_opts, list): resume_opts = dict((k,True) for k in resume_opts) for opt in ("--ask", "--color", "--skipfirst", "--tree"): resume_opts.pop(opt, None) # Current options always override resume_opts. resume_opts.update(opts) opts.clear() opts.update(resume_opts) params=create_depgraph_params(opts,action) if resume: success, mydepgraph, dropped_tasks = resume_depgraph( self.settings, self.trees, self.mtimedb, opts, params, None) if not success: return [] else: success, mydepgraph, favorites = backtrack_depgraph( self.settings, self.trees, opts, params, action, files, None) if not success: return [] ret=[] for pkg in mydepgraph.altlist(): if type(pkg) is Package.Package: ret.append(pkg.cpv) return ret except: return [] # non-recursive dependency getter def get_dep(self,pkg,dep_type=["RDEPEND","DEPEND"],db="portdb"): """ Gets current dependencies of a package. Looks in portage db by default :param pkg: name of package :param dep_type: type of dependencies to recurse. Can be ["DEPEND"] or ["RDEPEND", "DEPEND"] :param db: name of db to look. Can be "vardb" or "portdb" :returns: **set** of packages names """ ret=set() pkg = self.get_best_visible_pkg(pkg,db) if not pkg: return ret # we found the best visible match in common tree #import pdb; pdb.set_trace() if db=="portdb": aux_get=self.portdb.aux_get elif db=="vardb": aux_get=self.vardb.aux_get else: return ret metadata = dict(zip(self.metadata_keys, aux_get(pkg, self.metadata_keys))) dep_str = " ".join(metadata[k] for k in dep_type) # the IUSE default are very important for us iuse_defaults=[ u[1:] for u in metadata.get("IUSE",'').split() if u.startswith("+")] use=self.use.split() for u in iuse_defaults: if u not in use: use.append(u) success, atoms = portage.dep_check(dep_str, None, self.settings, myuse=use, trees=portage.db, myroot=self.settings["ROOT"]) if not success: return ret for atom in atoms: atomname = self.vartree.dep_bestmatch(atom) if not atomname: continue for unvirt_pkg in self.expand_new_virt('='+atomname): for pkg in self.vartree.dep_match(unvirt_pkg): ret.add(pkg) return ret # recursive dependency getter def get_deps(self,pkg,dep_type=["RDEPEND","DEPEND"],db="portdb"): """ Gets current dependencies of a package on any depth All dependencies **must** be installed :param pkg: name of package :param dep_type: type of dependencies to recurse. Can be ["DEPEND"] or ["RDEPEND", "DEPEND"] :param db: name of db to look. Can be "vardb" or "portdb" :returns: **set** of packages names """ ret=set() # get porttree dependencies on the first package pkg = self.get_best_visible_pkg(pkg,db) if not pkg: return ret #print "doing",pkg known_packages=set() unknown_packages=self.get_dep(pkg,dep_type,db) ret=ret.union(unknown_packages) while unknown_packages: p=unknown_packages.pop() #print "proceeding "+p if p in known_packages: continue known_packages.add(p) current_deps=self.get_dep(p,dep_type,'vardb') unknown_packages=unknown_packages.union(current_deps) ret=ret.union(current_deps) return ret def get_deps_for_package_building(self, pkg): """ returns buildtime dependencies of current package and all runtime dependencies of that buildtime dependencies """ buildtime_deps=self.get_dep(pkg, ["DEPEND"],"portdb") runtime_deps=set() for dep in buildtime_deps: runtime_deps|=self.get_deps(dep,["RDEPEND","PDEPEND"],"vardb") ret = buildtime_deps | runtime_deps return ret def get_system_packages_list(self): """ returns all packages from system set. They are always implicit dependencies :returns: **list** of package names """ ret=[] for atom in self.settings.packages: for pre_pkg in self.vartree.dep_match(atom): for unvirt_pkg in self.expand_new_virt('='+pre_pkg): for pkg in self.vartree.dep_match(unvirt_pkg): ret.append(pkg) return ret def get_system_packages_rdeps(self): """ returns runtime dependencies of packages from system set :returns: **list** of package names """ ret=set() for pkg in self.get_system_packages_list(): ret=ret.union(self.get_deps(pkg,["RDEPEND"])) return list(ret) def parse_emerge_args(self,args): """ call emerge arguments parser :param args: arguments passed to emerge :returns: **tuple** (action string, options dictionary, files or atoms list) """ action, opts, files = parse_opts(args, silent=True) return action, opts, files