aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'pym/gentoolkit/query.py')
-rw-r--r--pym/gentoolkit/query.py720
1 files changed, 364 insertions, 356 deletions
diff --git a/pym/gentoolkit/query.py b/pym/gentoolkit/query.py
index c2b8d56..4304670 100644
--- a/pym/gentoolkit/query.py
+++ b/pym/gentoolkit/query.py
@@ -6,9 +6,7 @@
"""Provides common methods on a package query."""
-__all__ = (
- 'Query',
-)
+__all__ = ("Query",)
# =======
# Imports
@@ -34,357 +32,367 @@ from gentoolkit.sets import get_set_atoms, SETPREFIX
# Classes
# =======
+
class Query(CPV):
- """Provides common methods on a package query."""
-
- def __init__(self, query, is_regex=False):
- """Create query object.
-
- @type is_regex: bool
- @param is_regex: query is a regular expression
- """
-
- # We need at least one of these chars for a valid query
- needed_chars = ascii_letters + digits + '*'
- if not set(query).intersection(needed_chars):
- raise errors.GentoolkitInvalidPackage(query)
-
- # Separate repository
- repository = None
- if query.count(':') == 2:
- query, repository = query.rsplit(':', 1)
- self.query = query.rstrip(':') # Don't leave dangling colon
- self.repo_filter = repository
- self.is_regex = is_regex
- self.query_type = self._get_query_type()
-
- # Name the rest of the chunks, if possible
- if self.query_type != "set":
- try:
- atom = Atom(self.query)
- self.__dict__.update(atom.__dict__)
- except errors.GentoolkitInvalidAtom:
- CPV.__init__(self, self.query)
- self.operator = ''
- self.atom = self.cpv
-
- def __repr__(self):
- rx = ''
- if self.is_regex:
- rx = ' regex'
- repo = ''
- if self.repo_filter:
- repo = ' in %s' % self.repo_filter
- return "<%s%s %r%s>" % (self.__class__.__name__, rx, self.query, repo)
-
- def __str__(self):
- return self.query
-
- def print_summary(self):
- """Print a summary of the query."""
-
- if self.query_type == "set":
- cat_str = ""
- pkg_str = pp.emph(self.query)
- else:
- try:
- cat, pkg = self.category, self.name + self.fullversion
- except errors.GentoolkitInvalidCPV:
- cat = ''
- pkg = self.atom
- if cat and not self.is_regex:
- cat_str = "in %s " % pp.emph(cat.lstrip('><=~!'))
- else:
- cat_str = ""
-
- if self.is_regex:
- pkg_str = pp.emph(self.query)
- else:
- pkg_str = pp.emph(pkg)
-
- repo = ''
- if self.repo_filter is not None:
- repo = ' %s' % pp.section(self.repo_filter)
-
- pp.uprint(" * Searching%s for %s %s..." % (repo, pkg_str, cat_str))
-
- def smart_find(
- self,
- in_installed=True,
- in_porttree=True,
- in_overlay=True,
- include_masked=True,
- show_progress=True,
- no_matches_fatal=True,
- **kwargs
- ):
- """A high-level wrapper around gentoolkit package-finder functions.
-
- @type in_installed: bool
- @param in_installed: search for query in VARDB
- @type in_porttree: bool
- @param in_porttree: search for query in PORTDB
- @type in_overlay: bool
- @param in_overlay: search for query in overlays
- @type show_progress: bool
- @param show_progress: output search progress
- @type no_matches_fatal: bool
- @param no_matches_fatal: raise errors.GentoolkitNoMatches
- @rtype: list
- @return: Package objects matching query
- """
-
- if in_installed:
- if in_porttree or in_overlay:
- simple_package_finder = partial(
- self.find,
- include_masked=include_masked
- )
- complex_package_finder = helpers.get_cpvs
- else:
- simple_package_finder = self.find_installed
- complex_package_finder = helpers.get_installed_cpvs
- elif in_porttree or in_overlay:
- simple_package_finder = partial(
- self.find,
- include_masked=include_masked,
- in_installed=False
- )
- complex_package_finder = helpers.get_uninstalled_cpvs
- else:
- raise errors.GentoolkitFatalError(
- "Not searching in installed, Portage tree, or overlay. "
- "Nothing to do."
- )
-
- if self.query_type == "set":
- self.package_finder = simple_package_finder
- matches = self._do_set_lookup(show_progress=show_progress)
- elif self.query_type == "simple":
- self.package_finder = simple_package_finder
- matches = self._do_simple_lookup(
- in_installed=in_installed,
- show_progress=show_progress
- )
- else:
- self.package_finder = complex_package_finder
- matches = self._do_complex_lookup(show_progress=show_progress)
-
- if self.repo_filter is not None:
- matches = self._filter_by_repository(matches)
-
- if no_matches_fatal and not matches:
- ii = in_installed and not (in_porttree or in_overlay)
- raise errors.GentoolkitNoMatches(self.query, in_installed=ii)
- return matches
-
- def find(self, in_installed=True, include_masked=True):
- """Returns a list of Package objects that matched the query.
-
- @rtype: list
- @return: matching Package objects
- """
-
- if not self.query:
- return []
-
- try:
- if include_masked:
- matches = portage.db[portage.root]["porttree"].dbapi.xmatch("match-all", self.query)
- else:
- matches = portage.db[portage.root]["porttree"].dbapi.match(self.query)
- if in_installed:
- matches.extend(portage.db[portage.root]["vartree"].dbapi.match(self.query))
- except portage.exception.InvalidAtom as err:
- message = "query.py: find(), query=%s, InvalidAtom=%s" %(
- self.query, str(err))
- raise errors.GentoolkitInvalidAtom(message)
-
- return [Package(x) for x in set(matches)]
-
- def find_installed(self):
- """Return a list of Package objects that matched the search key."""
-
- try:
- matches = portage.db[portage.root]["vartree"].dbapi.match(self.query)
- # catch the ambiguous package Exception
- except portage.exception.AmbiguousPackageName as err:
- matches = []
- for pkgkey in err.args[0]:
- matches.extend(portage.db[portage.root]["vartree"].dbapi.match(pkgkey))
- except portage.exception.InvalidAtom as err:
- raise errors.GentoolkitInvalidAtom(err)
-
- return [Package(x) for x in set(matches)]
-
- def find_best(self, include_keyworded=True, include_masked=True):
- """Returns the "best" version available.
-
- Order of preference:
- highest available stable =>
- highest available keyworded =>
- highest available masked
-
- @rtype: Package object or None
- @return: best of up to three options
- @raise errors.GentoolkitInvalidAtom: if query is not valid input
- """
-
- best = keyworded = masked = None
- try:
- best = portage.db[portage.root]["porttree"].dbapi.xmatch("bestmatch-visible", self.query)
- except portage.exception.InvalidAtom as err:
- message = "query.py: find_best(), bestmatch-visible, " + \
- "query=%s, InvalidAtom=%s" %(self.query, str(err))
- raise errors.GentoolkitInvalidAtom(message)
- # xmatch can return an empty string, so checking for None is not enough
- if not best:
- if not (include_keyworded or include_masked):
- return None
- try:
- matches = portage.db[portage.root]["porttree"].dbapi.xmatch("match-all", self.query)
- except portage.exception.InvalidAtom as err:
- message = "query.py: find_best(), match-all, query=%s, InvalidAtom=%s" %(
- self.query, str(err))
- raise errors.GentoolkitInvalidAtom(message)
- masked = portage.best(matches)
- keywordable = []
- for m in matches:
- status = portage.getmaskingstatus(m)
- if 'package.mask' not in status or 'profile' not in status:
- keywordable.append(m)
- if matches:
- keyworded = portage.best(keywordable)
- else:
- return Package(best)
- if include_keyworded and keyworded:
- return Package(keyworded)
- if include_masked and masked:
- return Package(masked)
- return None
-
- def uses_globbing(self):
- """Check the query to see if it is using globbing.
-
- @rtype: bool
- @return: True if query uses globbing, else False
- """
-
- if set('!*?[]').intersection(self.query):
- # Is query an atom such as '=sys-apps/portage-2.2*'?
- if self.query[0] != '=':
- return True
-
- return False
-
- def is_ranged(self):
- """Return True if the query appears to be ranged, else False."""
-
- q = self.query
- return q.startswith(('~', '<', '>')) or q.endswith('*')
-
- def _do_simple_lookup(self, in_installed=True, show_progress=True):
- """Find matches for a query which is an atom or cpv."""
-
- result = []
-
- if show_progress and CONFIG['verbose']:
- self.print_summary()
-
- result = self.package_finder()
- if not in_installed:
- result = [x for x in result if not x.is_installed()]
-
- return result
-
- def _do_complex_lookup(self, show_progress=True):
- """Find matches for a query which is a regex or includes globbing."""
-
- result = []
-
- if show_progress and not CONFIG["piping"]:
- self.print_summary()
-
- try:
- cat = CPV(self.query).category
- except errors.GentoolkitInvalidCPV:
- cat = ''
-
- pre_filter = []
- # The "get_" functions can pre-filter against the whole package key,
- # but since we allow globbing now, we run into issues like:
- # >>> portage.dep.dep_getkey("sys-apps/portage-*")
- # 'sys-apps/portage-'
- # So the only way to guarantee we don't overrun the key is to
- # prefilter by cat only.
- if cat:
- if self.is_regex:
- cat_re = cat
- else:
- cat_re = fnmatch.translate(cat)
- predicate = lambda x: re.match(cat_re, x.split("/", 1)[0])
- pre_filter = self.package_finder(predicate=predicate)
-
- # Post-filter
- if self.is_regex:
- try:
- re.compile(self.query)
- except re.error:
- raise errors.GentoolkitInvalidRegex(self.query)
- predicate = lambda x: re.search(self.query, x)
- else:
- if cat:
- query_re = fnmatch.translate(self.query)
- else:
- query_re = fnmatch.translate("*/%s" % self.query)
- predicate = lambda x: re.search(query_re, x)
- if pre_filter:
- result = [x for x in pre_filter if predicate(x)]
- else:
- result = self.package_finder(predicate=predicate)
-
- return [Package(x) for x in result]
-
- def _do_set_lookup(self, show_progress=True):
- """Find matches for a query that is a package set."""
-
- if show_progress and not CONFIG["piping"]:
- self.print_summary()
-
- setname = self.query[len(SETPREFIX):]
- result = []
- try:
- atoms = get_set_atoms(setname)
- except errors.GentoolkitSetNotFound:
- return result
-
- q = self.query
- for atom in atoms:
- self.query = str(atom)
- result.extend(self._do_simple_lookup(show_progress=False))
- self.query = q
-
- return result
-
- def _filter_by_repository(self, matches):
- """Filter out packages which do not belong to self.repo_filter."""
-
- result = []
- for match in matches:
- repo_name = match.repo_name()
- if repo_name == self.repo_filter:
- result.append(match)
- elif (not repo_name and
- self.repo_filter in ('unknown', 'null')):
- result.append(match)
-
- return result
-
- def _get_query_type(self):
- """Determine of what type the query is."""
-
- if self.query.startswith(SETPREFIX):
- return "set"
- elif self.is_regex or self.uses_globbing():
- return "complex"
- return "simple"
+ """Provides common methods on a package query."""
+
+ def __init__(self, query, is_regex=False):
+ """Create query object.
+
+ @type is_regex: bool
+ @param is_regex: query is a regular expression
+ """
+
+ # We need at least one of these chars for a valid query
+ needed_chars = ascii_letters + digits + "*"
+ if not set(query).intersection(needed_chars):
+ raise errors.GentoolkitInvalidPackage(query)
+
+ # Separate repository
+ repository = None
+ if query.count(":") == 2:
+ query, repository = query.rsplit(":", 1)
+ self.query = query.rstrip(":") # Don't leave dangling colon
+ self.repo_filter = repository
+ self.is_regex = is_regex
+ self.query_type = self._get_query_type()
+
+ # Name the rest of the chunks, if possible
+ if self.query_type != "set":
+ try:
+ atom = Atom(self.query)
+ self.__dict__.update(atom.__dict__)
+ except errors.GentoolkitInvalidAtom:
+ CPV.__init__(self, self.query)
+ self.operator = ""
+ self.atom = self.cpv
+
+ def __repr__(self):
+ rx = ""
+ if self.is_regex:
+ rx = " regex"
+ repo = ""
+ if self.repo_filter:
+ repo = " in %s" % self.repo_filter
+ return "<%s%s %r%s>" % (self.__class__.__name__, rx, self.query, repo)
+
+ def __str__(self):
+ return self.query
+
+ def print_summary(self):
+ """Print a summary of the query."""
+
+ if self.query_type == "set":
+ cat_str = ""
+ pkg_str = pp.emph(self.query)
+ else:
+ try:
+ cat, pkg = self.category, self.name + self.fullversion
+ except errors.GentoolkitInvalidCPV:
+ cat = ""
+ pkg = self.atom
+ if cat and not self.is_regex:
+ cat_str = "in %s " % pp.emph(cat.lstrip("><=~!"))
+ else:
+ cat_str = ""
+
+ if self.is_regex:
+ pkg_str = pp.emph(self.query)
+ else:
+ pkg_str = pp.emph(pkg)
+
+ repo = ""
+ if self.repo_filter is not None:
+ repo = " %s" % pp.section(self.repo_filter)
+
+ pp.uprint(" * Searching%s for %s %s..." % (repo, pkg_str, cat_str))
+
+ def smart_find(
+ self,
+ in_installed=True,
+ in_porttree=True,
+ in_overlay=True,
+ include_masked=True,
+ show_progress=True,
+ no_matches_fatal=True,
+ **kwargs
+ ):
+ """A high-level wrapper around gentoolkit package-finder functions.
+
+ @type in_installed: bool
+ @param in_installed: search for query in VARDB
+ @type in_porttree: bool
+ @param in_porttree: search for query in PORTDB
+ @type in_overlay: bool
+ @param in_overlay: search for query in overlays
+ @type show_progress: bool
+ @param show_progress: output search progress
+ @type no_matches_fatal: bool
+ @param no_matches_fatal: raise errors.GentoolkitNoMatches
+ @rtype: list
+ @return: Package objects matching query
+ """
+
+ if in_installed:
+ if in_porttree or in_overlay:
+ simple_package_finder = partial(
+ self.find, include_masked=include_masked
+ )
+ complex_package_finder = helpers.get_cpvs
+ else:
+ simple_package_finder = self.find_installed
+ complex_package_finder = helpers.get_installed_cpvs
+ elif in_porttree or in_overlay:
+ simple_package_finder = partial(
+ self.find, include_masked=include_masked, in_installed=False
+ )
+ complex_package_finder = helpers.get_uninstalled_cpvs
+ else:
+ raise errors.GentoolkitFatalError(
+ "Not searching in installed, Portage tree, or overlay. "
+ "Nothing to do."
+ )
+
+ if self.query_type == "set":
+ self.package_finder = simple_package_finder
+ matches = self._do_set_lookup(show_progress=show_progress)
+ elif self.query_type == "simple":
+ self.package_finder = simple_package_finder
+ matches = self._do_simple_lookup(
+ in_installed=in_installed, show_progress=show_progress
+ )
+ else:
+ self.package_finder = complex_package_finder
+ matches = self._do_complex_lookup(show_progress=show_progress)
+
+ if self.repo_filter is not None:
+ matches = self._filter_by_repository(matches)
+
+ if no_matches_fatal and not matches:
+ ii = in_installed and not (in_porttree or in_overlay)
+ raise errors.GentoolkitNoMatches(self.query, in_installed=ii)
+ return matches
+
+ def find(self, in_installed=True, include_masked=True):
+ """Returns a list of Package objects that matched the query.
+
+ @rtype: list
+ @return: matching Package objects
+ """
+
+ if not self.query:
+ return []
+
+ try:
+ if include_masked:
+ matches = portage.db[portage.root]["porttree"].dbapi.xmatch(
+ "match-all", self.query
+ )
+ else:
+ matches = portage.db[portage.root]["porttree"].dbapi.match(self.query)
+ if in_installed:
+ matches.extend(
+ portage.db[portage.root]["vartree"].dbapi.match(self.query)
+ )
+ except portage.exception.InvalidAtom as err:
+ message = "query.py: find(), query=%s, InvalidAtom=%s" % (
+ self.query,
+ str(err),
+ )
+ raise errors.GentoolkitInvalidAtom(message)
+
+ return [Package(x) for x in set(matches)]
+
+ def find_installed(self):
+ """Return a list of Package objects that matched the search key."""
+
+ try:
+ matches = portage.db[portage.root]["vartree"].dbapi.match(self.query)
+ # catch the ambiguous package Exception
+ except portage.exception.AmbiguousPackageName as err:
+ matches = []
+ for pkgkey in err.args[0]:
+ matches.extend(portage.db[portage.root]["vartree"].dbapi.match(pkgkey))
+ except portage.exception.InvalidAtom as err:
+ raise errors.GentoolkitInvalidAtom(err)
+
+ return [Package(x) for x in set(matches)]
+
+ def find_best(self, include_keyworded=True, include_masked=True):
+ """Returns the "best" version available.
+
+ Order of preference:
+ highest available stable =>
+ highest available keyworded =>
+ highest available masked
+
+ @rtype: Package object or None
+ @return: best of up to three options
+ @raise errors.GentoolkitInvalidAtom: if query is not valid input
+ """
+
+ best = keyworded = masked = None
+ try:
+ best = portage.db[portage.root]["porttree"].dbapi.xmatch(
+ "bestmatch-visible", self.query
+ )
+ except portage.exception.InvalidAtom as err:
+ message = (
+ "query.py: find_best(), bestmatch-visible, "
+ + "query=%s, InvalidAtom=%s" % (self.query, str(err))
+ )
+ raise errors.GentoolkitInvalidAtom(message)
+ # xmatch can return an empty string, so checking for None is not enough
+ if not best:
+ if not (include_keyworded or include_masked):
+ return None
+ try:
+ matches = portage.db[portage.root]["porttree"].dbapi.xmatch(
+ "match-all", self.query
+ )
+ except portage.exception.InvalidAtom as err:
+ message = (
+ "query.py: find_best(), match-all, query=%s, InvalidAtom=%s"
+ % (self.query, str(err))
+ )
+ raise errors.GentoolkitInvalidAtom(message)
+ masked = portage.best(matches)
+ keywordable = []
+ for m in matches:
+ status = portage.getmaskingstatus(m)
+ if "package.mask" not in status or "profile" not in status:
+ keywordable.append(m)
+ if matches:
+ keyworded = portage.best(keywordable)
+ else:
+ return Package(best)
+ if include_keyworded and keyworded:
+ return Package(keyworded)
+ if include_masked and masked:
+ return Package(masked)
+ return None
+
+ def uses_globbing(self):
+ """Check the query to see if it is using globbing.
+
+ @rtype: bool
+ @return: True if query uses globbing, else False
+ """
+
+ if set("!*?[]").intersection(self.query):
+ # Is query an atom such as '=sys-apps/portage-2.2*'?
+ if self.query[0] != "=":
+ return True
+
+ return False
+
+ def is_ranged(self):
+ """Return True if the query appears to be ranged, else False."""
+
+ q = self.query
+ return q.startswith(("~", "<", ">")) or q.endswith("*")
+
+ def _do_simple_lookup(self, in_installed=True, show_progress=True):
+ """Find matches for a query which is an atom or cpv."""
+
+ result = []
+
+ if show_progress and CONFIG["verbose"]:
+ self.print_summary()
+
+ result = self.package_finder()
+ if not in_installed:
+ result = [x for x in result if not x.is_installed()]
+
+ return result
+
+ def _do_complex_lookup(self, show_progress=True):
+ """Find matches for a query which is a regex or includes globbing."""
+
+ result = []
+
+ if show_progress and not CONFIG["piping"]:
+ self.print_summary()
+
+ try:
+ cat = CPV(self.query).category
+ except errors.GentoolkitInvalidCPV:
+ cat = ""
+
+ pre_filter = []
+ # The "get_" functions can pre-filter against the whole package key,
+ # but since we allow globbing now, we run into issues like:
+ # >>> portage.dep.dep_getkey("sys-apps/portage-*")
+ # 'sys-apps/portage-'
+ # So the only way to guarantee we don't overrun the key is to
+ # prefilter by cat only.
+ if cat:
+ if self.is_regex:
+ cat_re = cat
+ else:
+ cat_re = fnmatch.translate(cat)
+ predicate = lambda x: re.match(cat_re, x.split("/", 1)[0])
+ pre_filter = self.package_finder(predicate=predicate)
+
+ # Post-filter
+ if self.is_regex:
+ try:
+ re.compile(self.query)
+ except re.error:
+ raise errors.GentoolkitInvalidRegex(self.query)
+ predicate = lambda x: re.search(self.query, x)
+ else:
+ if cat:
+ query_re = fnmatch.translate(self.query)
+ else:
+ query_re = fnmatch.translate("*/%s" % self.query)
+ predicate = lambda x: re.search(query_re, x)
+ if pre_filter:
+ result = [x for x in pre_filter if predicate(x)]
+ else:
+ result = self.package_finder(predicate=predicate)
+
+ return [Package(x) for x in result]
+
+ def _do_set_lookup(self, show_progress=True):
+ """Find matches for a query that is a package set."""
+
+ if show_progress and not CONFIG["piping"]:
+ self.print_summary()
+
+ setname = self.query[len(SETPREFIX) :]
+ result = []
+ try:
+ atoms = get_set_atoms(setname)
+ except errors.GentoolkitSetNotFound:
+ return result
+
+ q = self.query
+ for atom in atoms:
+ self.query = str(atom)
+ result.extend(self._do_simple_lookup(show_progress=False))
+ self.query = q
+
+ return result
+
+ def _filter_by_repository(self, matches):
+ """Filter out packages which do not belong to self.repo_filter."""
+
+ result = []
+ for match in matches:
+ repo_name = match.repo_name()
+ if repo_name == self.repo_filter:
+ result.append(match)
+ elif not repo_name and self.repo_filter in ("unknown", "null"):
+ result.append(match)
+
+ return result
+
+ def _get_query_type(self):
+ """Determine of what type the query is."""
+
+ if self.query.startswith(SETPREFIX):
+ return "set"
+ elif self.is_regex or self.uses_globbing():
+ return "complex"
+ return "simple"