diff options
author | Wolfgang E. Sanyer <WolfgangESanyer@gmail.com> | 2021-09-20 08:49:15 -0400 |
---|---|---|
committer | Matt Turner <mattst88@gentoo.org> | 2021-09-20 15:51:52 -0700 |
commit | bbcd72b5fe85fe9bbca1913f8aa22077d94e75d0 (patch) | |
tree | 64a38c9e94cad7c800fcd1ca7307632f603e1315 /pym/gentoolkit/helpers.py | |
parent | equery: Remove leftovers from 'changes' removal (diff) | |
download | gentoolkit-bbcd72b5fe85fe9bbca1913f8aa22077d94e75d0.tar.gz gentoolkit-bbcd72b5fe85fe9bbca1913f8aa22077d94e75d0.tar.bz2 gentoolkit-bbcd72b5fe85fe9bbca1913f8aa22077d94e75d0.zip |
Change tabs to spaces (using autopep8). Also, format repo using black.
The following command was used to change the tabs to spaces:
autopep8 --in-place --select=E101,E11,E121,E122,E123,E124,E125,E126,E127,E128,E129,E131,E133,E20,E211,E22,E224,E224,E226,E227,E228,E231,E241,E242,E251,E252,E26,E265,E266,E27,E301,E302,E303,E304,E305,E306,W291,W293,W391 -r .
And then black was run as `black .` on the entire tree
Signed-off-by: Wolfgang E. Sanyer <WolfgangESanyer@gmail.com>
Signed-off-by: Matt Turner <mattst88@gentoo.org>
Diffstat (limited to 'pym/gentoolkit/helpers.py')
-rw-r--r-- | pym/gentoolkit/helpers.py | 492 |
1 files changed, 255 insertions, 237 deletions
diff --git a/pym/gentoolkit/helpers.py b/pym/gentoolkit/helpers.py index 236a379..15d959d 100644 --- a/pym/gentoolkit/helpers.py +++ b/pym/gentoolkit/helpers.py @@ -9,14 +9,14 @@ """ __all__ = ( - 'FileOwner', - 'get_cpvs', - 'get_installed_cpvs', - 'get_uninstalled_cpvs', - 'get_bintree_cpvs', - 'uniqify', + "FileOwner", + "get_cpvs", + "get_installed_cpvs", + "get_uninstalled_cpvs", + "get_bintree_cpvs", + "uniqify", ) -__docformat__ = 'epytext' +__docformat__ = "epytext" # ======= # Imports @@ -35,280 +35,298 @@ from gentoolkit import errors from gentoolkit.atom import Atom from gentoolkit.cpv import CPV from gentoolkit.versionmatch import VersionMatch + # This has to be imported below to stop circular import. -#from gentoolkit.package import Package +# from gentoolkit.package import Package # ======= # Classes # ======= + class FileOwner: - """Creates a function for locating the owner of filename queries. - - Example usage: - >>> from gentoolkit.helpers import FileOwner - >>> findowner = FileOwner() - >>> findowner(('/bin/grep',)) - [(<Package 'sys-apps/grep-2.12'>, '/bin/grep')] - """ - def __init__(self, is_regex=False, early_out=False, printer_fn=None): - """Instantiate function. - - @type is_regex: bool - @param is_regex: funtion args are regular expressions - @type early_out: bool - @param early_out: return when first result is found (safe) - @type printer_fn: callable - @param printer_fn: If defined, will be passed useful information for - printing each result as it is found. - """ - self.is_regex = is_regex - self.early_out = early_out - self.printer_fn = printer_fn - - def __call__(self, queries): - """Run the function. - - @type queries: iterable - @param queries: filepaths or filepath regexes - """ - query_re_string = self._prepare_search_regex(queries) - try: - query_re = re.compile(query_re_string) - except (TypeError, re.error) as err: - raise errors.GentoolkitInvalidRegex(err) - - use_match = False - if ((self.is_regex or query_re_string.startswith(r'^\/')) - and '|' not in query_re_string ): - # If we were passed a regex or a single path starting with root, - # we can use re.match, else use re.search. - use_match = True - - pkgset = get_installed_cpvs() - - return self.find_owners(query_re, use_match=use_match, pkgset=pkgset) - - def find_owners(self, query_re, use_match=False, pkgset=None): - """Find owners and feed data to supplied output function. - - @type query_re: _sre.SRE_Pattern - @param query_re: file regex - @type use_match: bool - @param use_match: use re.match or re.search - @type pkgset: iterable or None - @param pkgset: list of packages to look through - """ - # FIXME: Remove when lazyimport supports objects: - from gentoolkit.package import Package - - if use_match: - query_fn = query_re.match - else: - query_fn = query_re.search - - results = [] - found_match = False - for pkg in sorted([Package(x) for x in pkgset]): - files = pkg.parsed_contents() - for cfile in files: - match = query_fn(cfile) - if match: - results.append((pkg, cfile)) - if self.printer_fn is not None: - self.printer_fn(pkg, cfile) - if self.early_out: - found_match = True - break - if found_match: - break - return results - - @staticmethod - def expand_abspaths(paths): - """Expand any relative paths (./file) to their absolute paths. - - @type paths: list - @param paths: file path strs - @rtype: list - @return: the original list with any relative paths expanded - @raise AttributeError: if paths does not have attribute 'extend' - """ - - osp = os.path - expanded_paths = [] - for path in paths: - if path.startswith('./'): - expanded_paths.append(osp.abspath(path)) - else: - expanded_paths.append(path) - - return expanded_paths - - @staticmethod - def extend_realpaths(paths): - """Extend a list of paths with the realpaths for any symlinks. - - @type paths: list - @param paths: file path strs - @rtype: list - @return: the original list plus the realpaths for any symlinks - so long as the realpath doesn't already exist in the list - @raise AttributeError: if paths does not have attribute 'extend' - """ - - osp = os.path - paths.extend([osp.realpath(x) for x in paths - if osp.realpath(x) not in paths]) - - return paths - - def _prepare_search_regex(self, queries): - """Create a regex out of the queries""" - - queries = list(queries) - if self.is_regex: - return '|'.join(queries) - else: - result = [] - # Trim trailing and multiple slashes from queries - slashes = re.compile(r'/+') - queries = self.expand_abspaths(queries) - queries = self.extend_realpaths(queries) - for query in queries: - query = slashes.sub('/', query).rstrip('/') - if query.startswith('/'): - query = "^%s$" % re.escape(query) - else: - query = "/%s$" % re.escape(query) - result.append(query) - result = "|".join(result) - return result + """Creates a function for locating the owner of filename queries. + + Example usage: + >>> from gentoolkit.helpers import FileOwner + >>> findowner = FileOwner() + >>> findowner(('/bin/grep',)) + [(<Package 'sys-apps/grep-2.12'>, '/bin/grep')] + """ + + def __init__(self, is_regex=False, early_out=False, printer_fn=None): + """Instantiate function. + + @type is_regex: bool + @param is_regex: funtion args are regular expressions + @type early_out: bool + @param early_out: return when first result is found (safe) + @type printer_fn: callable + @param printer_fn: If defined, will be passed useful information for + printing each result as it is found. + """ + self.is_regex = is_regex + self.early_out = early_out + self.printer_fn = printer_fn + + def __call__(self, queries): + """Run the function. + + @type queries: iterable + @param queries: filepaths or filepath regexes + """ + query_re_string = self._prepare_search_regex(queries) + try: + query_re = re.compile(query_re_string) + except (TypeError, re.error) as err: + raise errors.GentoolkitInvalidRegex(err) + + use_match = False + if ( + self.is_regex or query_re_string.startswith(r"^\/") + ) and "|" not in query_re_string: + # If we were passed a regex or a single path starting with root, + # we can use re.match, else use re.search. + use_match = True + + pkgset = get_installed_cpvs() + + return self.find_owners(query_re, use_match=use_match, pkgset=pkgset) + + def find_owners(self, query_re, use_match=False, pkgset=None): + """Find owners and feed data to supplied output function. + + @type query_re: _sre.SRE_Pattern + @param query_re: file regex + @type use_match: bool + @param use_match: use re.match or re.search + @type pkgset: iterable or None + @param pkgset: list of packages to look through + """ + # FIXME: Remove when lazyimport supports objects: + from gentoolkit.package import Package + + if use_match: + query_fn = query_re.match + else: + query_fn = query_re.search + + results = [] + found_match = False + for pkg in sorted([Package(x) for x in pkgset]): + files = pkg.parsed_contents() + for cfile in files: + match = query_fn(cfile) + if match: + results.append((pkg, cfile)) + if self.printer_fn is not None: + self.printer_fn(pkg, cfile) + if self.early_out: + found_match = True + break + if found_match: + break + return results + + @staticmethod + def expand_abspaths(paths): + """Expand any relative paths (./file) to their absolute paths. + + @type paths: list + @param paths: file path strs + @rtype: list + @return: the original list with any relative paths expanded + @raise AttributeError: if paths does not have attribute 'extend' + """ + + osp = os.path + expanded_paths = [] + for path in paths: + if path.startswith("./"): + expanded_paths.append(osp.abspath(path)) + else: + expanded_paths.append(path) + + return expanded_paths + + @staticmethod + def extend_realpaths(paths): + """Extend a list of paths with the realpaths for any symlinks. + + @type paths: list + @param paths: file path strs + @rtype: list + @return: the original list plus the realpaths for any symlinks + so long as the realpath doesn't already exist in the list + @raise AttributeError: if paths does not have attribute 'extend' + """ + + osp = os.path + paths.extend([osp.realpath(x) for x in paths if osp.realpath(x) not in paths]) + + return paths + + def _prepare_search_regex(self, queries): + """Create a regex out of the queries""" + + queries = list(queries) + if self.is_regex: + return "|".join(queries) + else: + result = [] + # Trim trailing and multiple slashes from queries + slashes = re.compile(r"/+") + queries = self.expand_abspaths(queries) + queries = self.extend_realpaths(queries) + for query in queries: + query = slashes.sub("/", query).rstrip("/") + if query.startswith("/"): + query = "^%s$" % re.escape(query) + else: + query = "/%s$" % re.escape(query) + result.append(query) + result = "|".join(result) + return result + # ========= # Functions # ========= + def get_cpvs(predicate=None, include_installed=True): - """Get all packages in the Portage tree and overlays. Optionally apply a - predicate. - - Example usage: - >>> from gentoolkit.helpers import get_cpvs - >>> len(set(get_cpvs())) - 33518 - >>> fn = lambda x: x.startswith('app-portage') - >>> len(set(get_cpvs(fn, include_installed=False))) - 137 - - @type predicate: function - @param predicate: a function to filter the package list with - @type include_installed: bool - @param include_installed: - If True: Return the union of all_cpvs and all_installed_cpvs - If False: Return the difference of all_cpvs and all_installed_cpvs - @rtype: generator - @return: a generator that yields unsorted cat/pkg-ver strings from the - Portage tree - """ - - if not predicate: - predicate = lambda x: x - - all_cps = portage.db[portage.root]["porttree"].dbapi.cp_all() - - all_cpvs = iter(x for x in chain.from_iterable( - portage.db[portage.root]["porttree"].dbapi.cp_list(x) - for x in all_cps) if predicate(x)) - - all_installed_cpvs = set(get_installed_cpvs(predicate)) - - if include_installed: - for cpv in all_cpvs: - if cpv in all_installed_cpvs: - all_installed_cpvs.remove(cpv) - yield cpv - for cpv in all_installed_cpvs: - yield cpv - else: - for cpv in all_cpvs: - if cpv not in all_installed_cpvs: - yield cpv + """Get all packages in the Portage tree and overlays. Optionally apply a + predicate. + + Example usage: + >>> from gentoolkit.helpers import get_cpvs + >>> len(set(get_cpvs())) + 33518 + >>> fn = lambda x: x.startswith('app-portage') + >>> len(set(get_cpvs(fn, include_installed=False))) + 137 + + @type predicate: function + @param predicate: a function to filter the package list with + @type include_installed: bool + @param include_installed: + If True: Return the union of all_cpvs and all_installed_cpvs + If False: Return the difference of all_cpvs and all_installed_cpvs + @rtype: generator + @return: a generator that yields unsorted cat/pkg-ver strings from the + Portage tree + """ + + if not predicate: + predicate = lambda x: x + + all_cps = portage.db[portage.root]["porttree"].dbapi.cp_all() + + all_cpvs = iter( + x + for x in chain.from_iterable( + portage.db[portage.root]["porttree"].dbapi.cp_list(x) for x in all_cps + ) + if predicate(x) + ) + + all_installed_cpvs = set(get_installed_cpvs(predicate)) + + if include_installed: + for cpv in all_cpvs: + if cpv in all_installed_cpvs: + all_installed_cpvs.remove(cpv) + yield cpv + for cpv in all_installed_cpvs: + yield cpv + else: + for cpv in all_cpvs: + if cpv not in all_installed_cpvs: + yield cpv get_uninstalled_cpvs = partial(get_cpvs, include_installed=False) def get_installed_cpvs(predicate=None): - """Get all installed packages. Optionally apply a predicate. + """Get all installed packages. Optionally apply a predicate. - @type predicate: function - @param predicate: a function to filter the package list with - @rtype: generator - @return: a generator that yields unsorted installed cat/pkg-ver strings - from VARDB - """ + @type predicate: function + @param predicate: a function to filter the package list with + @rtype: generator + @return: a generator that yields unsorted installed cat/pkg-ver strings + from VARDB + """ - if not predicate: - predicate = lambda x: x + if not predicate: + predicate = lambda x: x - installed_cps = portage.db[portage.root]["vartree"].dbapi.cp_all() + installed_cps = portage.db[portage.root]["vartree"].dbapi.cp_all() - installed_cpvs = iter(x for x in chain.from_iterable( - portage.db[portage.root]["vartree"].dbapi.cp_list(x) - for x in installed_cps) if predicate(x)) + installed_cpvs = iter( + x + for x in chain.from_iterable( + portage.db[portage.root]["vartree"].dbapi.cp_list(x) for x in installed_cps + ) + if predicate(x) + ) - for cpv in installed_cpvs: - yield cpv + for cpv in installed_cpvs: + yield cpv def get_bintree_cpvs(predicate=None): - """Get all binary packages available. Optionally apply a predicate. + """Get all binary packages available. Optionally apply a predicate. - @type predicate: function - @param predicate: a function to filter the package list with - @rtype: generator - @return: a generator that yields unsorted binary package cat/pkg-ver strings - from BINDB - """ + @type predicate: function + @param predicate: a function to filter the package list with + @rtype: generator + @return: a generator that yields unsorted binary package cat/pkg-ver strings + from BINDB + """ - if not predicate: - predicate = lambda x: x + if not predicate: + predicate = lambda x: x - installed_cps = portage.db[portage.root]["bintree"].dbapi.cp_all() + installed_cps = portage.db[portage.root]["bintree"].dbapi.cp_all() - installed_cpvs = iter(x for x in chain.from_iterable( - portage.db[portage.root]["bintree"].dbapi.cp_list(x) - for x in installed_cps) if predicate(x)) + installed_cpvs = iter( + x + for x in chain.from_iterable( + portage.db[portage.root]["bintree"].dbapi.cp_list(x) for x in installed_cps + ) + if predicate(x) + ) - for cpv in installed_cpvs: - yield cpv + for cpv in installed_cpvs: + yield cpv def print_file(path): - """Display the contents of a file.""" + """Display the contents of a file.""" - with open(_unicode_encode(path, encoding=_encodings['fs']), mode="rb") as open_file: - lines = open_file.read() - pp.uprint(lines.strip()) + with open(_unicode_encode(path, encoding=_encodings["fs"]), mode="rb") as open_file: + lines = open_file.read() + pp.uprint(lines.strip()) def print_sequence(seq): - """Print every item of a sequence.""" + """Print every item of a sequence.""" - for item in seq: - pp.uprint(item) + for item in seq: + pp.uprint(item) def uniqify(seq, preserve_order=True): - """Return a uniqified list. Optionally preserve order.""" + """Return a uniqified list. Optionally preserve order.""" + + if preserve_order: + seen = set() + result = [x for x in seq if x not in seen and not seen.add(x)] + else: + result = list(set(seq)) - if preserve_order: - seen = set() - result = [x for x in seq if x not in seen and not seen.add(x)] - else: - result = list(set(seq)) + return result - return result # vim: set ts=4 sw=4 tw=79: |