diff options
Diffstat (limited to 'lib/portage/util/__init__.py')
-rw-r--r-- | lib/portage/util/__init__.py | 159 |
1 files changed, 81 insertions, 78 deletions
diff --git a/lib/portage/util/__init__.py b/lib/portage/util/__init__.py index 5ade7f660..1f8c9e94f 100644 --- a/lib/portage/util/__init__.py +++ b/lib/portage/util/__init__.py @@ -1,6 +1,26 @@ -# Copyright 2004-2020 Gentoo Authors +# Copyright 2004-2023 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +from portage.cache.mappings import UserDict +from portage.proxy.objectproxy import ObjectProxy +from portage.localization import _ +from portage.exception import ( + InvalidAtom, + PortageException, + FileNotFound, + IsADirectory, + OperationNotPermitted, + ParseError, + PermissionDenied, + ReadOnlyFileSystem, +) +from portage.const import VCS_DIRS +from portage import _unicode_decode +from portage import _unicode_encode +from portage import _os_merge +from portage import _encodings +from portage import os + __all__ = [ "apply_permissions", "apply_recursive_permissions", @@ -36,6 +56,7 @@ __all__ = [ "writemsg", "writemsg_level", "writemsg_stdout", + "no_color", ] from contextlib import AbstractContextManager @@ -51,6 +72,7 @@ import string import sys import traceback import glob +from typing import Optional, TextIO import portage @@ -61,31 +83,11 @@ portage.proxy.lazyimport.lazyimport( "subprocess", ) -from portage import os -from portage import _encodings -from portage import _os_merge -from portage import _unicode_encode -from portage import _unicode_decode -from portage.const import VCS_DIRS -from portage.exception import ( - InvalidAtom, - PortageException, - FileNotFound, - IsADirectory, - OperationNotPermitted, - ParseError, - PermissionDenied, - ReadOnlyFileSystem, -) -from portage.localization import _ -from portage.proxy.objectproxy import ObjectProxy -from portage.cache.mappings import UserDict - noiselimit = 0 -def initialize_logger(level=logging.WARNING): +def initialize_logger(level=logging.WARNING) -> None: """Sets up basic logging of portage activities Args: level: the level to emit messages at ('info', 'debug', 'warning' ...) @@ -95,8 +97,15 @@ def initialize_logger(level=logging.WARNING): logging.basicConfig(level=level, format="[%(levelname)-4s] %(message)s") -def writemsg(mystr, noiselevel=0, fd=None): - """Prints out warning and debug messages based on the noiselimit setting""" +def writemsg(mystr: str, noiselevel: int = 0, fd: Optional[TextIO] = None) -> None: + """ + Prints out warning and debug messages based on the noiselimit setting + + Takes three arguments + 1. mystr: The message to write + 2. noiselevel: The noiselevel of the message + 3. fd: file descriptor - where to write the message to + """ global noiselimit if fd is None: fd = sys.stderr @@ -116,24 +125,23 @@ def writemsg(mystr, noiselevel=0, fd=None): fd.flush() -def writemsg_stdout(mystr, noiselevel=0): +def writemsg_stdout(mystr: str, noiselevel: int = 0) -> None: """Prints messages stdout based on the noiselimit setting""" writemsg(mystr, noiselevel=noiselevel, fd=sys.stdout) -def writemsg_level(msg, level=0, noiselevel=0): +def writemsg_level(msg: str, level: int = 0, noiselevel: int = 0) -> None: """ Show a message for the given level as defined by the logging module - (default is 0). When level >= logging.WARNING then the message is + (default is 0). + + When level >= logging.WARNING then the message is sent to stderr, otherwise it is sent to stdout. The noiselevel is passed directly to writemsg(). - - @type msg: str - @param msg: a message string, including newline if appropriate - @type level: int - @param level: a numeric logging level (see the logging module) - @type noiselevel: int - @param noiselevel: passed directly to writemsg + Takes three parameters + 1. msg - the message to output + 2. level - the numeric logging level (see python's logging module) + 3. noiselevel - portage's logging level, passed directly to writemsg """ if level >= logging.WARNING: fd = sys.stderr @@ -142,7 +150,7 @@ def writemsg_level(msg, level=0, noiselevel=0): writemsg(msg, noiselevel=noiselevel, fd=fd) -def normalize_path(mypath): +def normalize_path(mypath) -> str: """ os.path.normpath("//foo") returns "//foo" instead of "/foo" We dislike this behavior so we create our own normpath func @@ -472,9 +480,8 @@ def read_corresponding_eapi_file(filename, default="0"): eapi = None try: - with io.open( + with open( _unicode_encode(eapi_file, encoding=_encodings["fs"], errors="strict"), - mode="r", encoding=_encodings["repo.content"], errors="replace", ) as f: @@ -487,7 +494,7 @@ def read_corresponding_eapi_file(filename, default="0"): % (eapi_file), noiselevel=-1, ) - except IOError: + except OSError: pass _eapi_cache[eapi_file] = eapi @@ -574,7 +581,6 @@ def grabfile_package( eapi=None, eapi_default="0", ): - pkgs = grabfile( myfilename, compatlevel, recursive=recursive, remember_source_file=True ) @@ -679,9 +685,8 @@ def grablines(myfilename, recursive=0, remember_source_file=False): else: try: - with io.open( + with open( _unicode_encode(myfilename, encoding=_encodings["fs"], errors="strict"), - mode="r", encoding=_encodings["content"], errors="replace", ) as myfile: @@ -689,7 +694,7 @@ def grablines(myfilename, recursive=0, remember_source_file=False): mylines = [(line, myfilename) for line in myfile.readlines()] else: mylines = myfile.readlines() - except IOError as e: + except OSError as e: if e.errno == PermissionDenied.errno: raise PermissionDenied(myfilename) elif e.errno in (errno.ENOENT, errno.ESTALE): @@ -708,7 +713,7 @@ def writedict(mydict, myfilename, writekey=True): lines.append(v + "\n") else: for k, v in mydict.items(): - lines.append("%s %s\n" % (k, " ".join(v))) + lines.append(f"{k} {' '.join(v)}\n") write_atomic(myfilename, "".join(lines)) @@ -734,11 +739,11 @@ class _getconfig_shlex(shlex.shlex): try: newfile = varexpand(newfile, self.var_expand_map) return shlex.shlex.sourcehook(self, newfile) - except EnvironmentError as e: + except OSError as e: if e.errno == PermissionDenied.errno: raise PermissionDenied(newfile) if e.errno not in (errno.ENOENT, errno.ENOTDIR): - writemsg("open('%s', 'r'): %s\n" % (newfile, e), noiselevel=-1) + writemsg(f"open('{newfile}', 'r'): {e}\n", noiselevel=-1) raise msg = self.error_leader() @@ -748,7 +753,7 @@ class _getconfig_shlex(shlex.shlex): msg += _("%s: No such file or directory") % newfile if self.__portage_tolerant: - writemsg("%s\n" % msg, noiselevel=-1) + writemsg(f"{msg}\n", noiselevel=-1) else: raise ParseError(msg) return (newfile, io.StringIO()) @@ -760,7 +765,6 @@ _invalid_var_name_re = re.compile(r"^\d|\W") def getconfig( mycfg, tolerant=False, allow_sourcing=False, expand=True, recursive=False ): - if isinstance(expand, dict): # Some existing variable definitions have been # passed in, for use in substitutions. @@ -795,16 +799,15 @@ def getconfig( try: f = open( _unicode_encode(mycfg, encoding=_encodings["fs"], errors="strict"), - mode="r", encoding=_encodings["content"], errors="replace", ) content = f.read() - except IOError as e: + except OSError as e: if e.errno == PermissionDenied.errno: raise PermissionDenied(mycfg) if e.errno != errno.ENOENT: - writemsg("open('%s', 'r'): %s\n" % (mycfg, e), noiselevel=-1) + writemsg(f"open('{mycfg}', 'r'): {e}\n", noiselevel=-1) if e.errno not in (errno.EISDIR,): raise return None @@ -867,7 +870,7 @@ def getconfig( if not tolerant: raise ParseError(msg) else: - writemsg("%s\n" % msg, noiselevel=-1) + writemsg(f"{msg}\n", noiselevel=-1) return mykeys elif equ != "=": @@ -875,7 +878,7 @@ def getconfig( if not tolerant: raise ParseError(msg) else: - writemsg("%s\n" % msg, noiselevel=-1) + writemsg(f"{msg}\n", noiselevel=-1) return mykeys val = _unicode_decode(lex.get_token()) @@ -886,14 +889,14 @@ def getconfig( if not tolerant: raise ParseError(msg) else: - writemsg("%s\n" % msg, noiselevel=-1) + writemsg(f"{msg}\n", noiselevel=-1) return mykeys if _invalid_var_name_re.search(key) is not None: msg = lex.error_leader() + _("Invalid variable name '%s'") % (key,) if not tolerant: raise ParseError(msg) - writemsg("%s\n" % msg, noiselevel=-1) + writemsg(f"{msg}\n", noiselevel=-1) continue if expand: @@ -908,8 +911,8 @@ def getconfig( except Exception as e: if isinstance(e, ParseError) or lex is None: raise - msg = "%s%s" % (lex.error_leader(), e) - writemsg("%s\n" % msg, noiselevel=-1) + msg = f"{lex.error_leader()}{e}" + writemsg(f"{msg}\n", noiselevel=-1) raise return mykeys @@ -1085,7 +1088,7 @@ def dump_traceback(msg, noiselevel=1): stack = traceback.extract_tb(info[2]) error = str(info[1]) writemsg("\n====================================\n", noiselevel=noiselevel) - writemsg("%s\n\n" % msg, noiselevel=noiselevel) + writemsg(f"{msg}\n\n", noiselevel=noiselevel) for line in traceback.format_list(stack): writemsg(line, noiselevel=noiselevel) if error: @@ -1128,7 +1131,7 @@ class cmp_sort_key: def __lt__(self, other): if other.__class__ is not self.__class__: raise TypeError( - "Expected type %s, got %s" % (self.__class__, other.__class__) + f"Expected type {self.__class__}, got {other.__class__}" ) return self._cmp_func(self._obj, other._obj) < 0 @@ -1195,7 +1198,7 @@ def _do_stat(filename, follow_links=True): return os.stat(filename) return os.lstat(filename) except OSError as oe: - func_call = "stat('%s')" % filename + func_call = f"stat('{filename}')" if oe.errno == errno.EPERM: raise OperationNotPermitted(func_call) if oe.errno == errno.EACCES: @@ -1290,7 +1293,7 @@ def apply_permissions( os.chmod(filename, new_mode) modified = True except OSError as oe: - func_call = "chmod('%s', %s)" % (filename, oct(new_mode)) + func_call = f"chmod('{filename}', {oct(new_mode)})" if oe.errno == errno.EPERM: raise OperationNotPermitted(func_call) elif oe.errno == errno.EACCES: @@ -1412,7 +1415,6 @@ def apply_secpass_permissions( # Avoid accessing portage.data.secpass when possible, since # it triggers config loading (undesirable for chmod-lite). if (uid != -1 or gid != -1) and portage.data.secpass < 2: - if uid != -1 and uid != stat_cached.st_uid: all_applied = False uid = -1 @@ -1463,11 +1465,11 @@ class atomic_ofstream(AbstractContextManager, ObjectProxy): tmp_name, encoding=_encodings["fs"], errors="strict" ), mode=mode, - **kargs + **kargs, ), ) return - except IOError as e: + except OSError as e: if canonical_path == filename: raise # Ignore this error, since it's irrelevant @@ -1482,7 +1484,7 @@ class atomic_ofstream(AbstractContextManager, ObjectProxy): open_func( _unicode_encode(tmp_name, encoding=_encodings["fs"], errors="strict"), mode=mode, - **kargs + **kargs, ), ) @@ -1560,10 +1562,10 @@ def write_atomic(file_path, content, **kwargs): f = atomic_ofstream(file_path, **kwargs) f.write(content) f.close() - except (IOError, OSError) as e: + except OSError as e: if f: f.abort() - func_call = "write_atomic('%s')" % file_path + func_call = f"write_atomic('{file_path}')" if e.errno == errno.EPERM: raise OperationNotPermitted(func_call) elif e.errno == errno.EACCES: @@ -1592,7 +1594,7 @@ def ensure_dirs(dir_path, **kwargs): os.makedirs(dir_path) created_dir = True except OSError as oe: - func_call = "makedirs('%s')" % dir_path + func_call = f"makedirs('{dir_path}')" if oe.errno in (errno.EEXIST,): pass else: @@ -1625,7 +1627,6 @@ class LazyItemsDict(UserDict): __slots__ = ("lazy_items",) def __init__(self, *args, **kwargs): - self.lazy_items = {} UserDict.__init__(self, *args, **kwargs) @@ -1721,21 +1722,15 @@ class LazyItemsDict(UserDict): raise TypeError( "LazyItemsDict " + "deepcopy is unsafe with lazy items that are " - + "not singletons: key=%s value=%s" - % ( - k, - lazy_item, - ) + + f"not singletons: key={k} value={lazy_item}" ) UserDict.__setitem__(result, k_copy, deepcopy(self[k], memo)) return result class _LazyItem: - __slots__ = ("func", "pargs", "kwargs", "singleton") def __init__(self, func, pargs, kwargs, singleton): - if not pargs: pargs = None if not kwargs: @@ -1965,7 +1960,7 @@ def find_updated_config_files(target_root, config_protect): if stat.S_ISDIR(mymode): mycommand = ( - "find '%s' -name '.*' -type d -prune -o -name '._cfg????_*'" % x + f"find '{x}' -name '.*' -type d -prune -o -name '._cfg????_*'" ) else: mycommand = ( @@ -2005,8 +2000,7 @@ def getlibpaths(root, env=None): if include_match is not None: subpath = os.path.join(os.path.dirname(path), include_match.group(1)) for p in glob.glob(subpath): - for r in read_ld_so_conf(p): - yield r + yield from read_ld_so_conf(p) else: yield l @@ -2020,3 +2014,12 @@ def getlibpaths(root, env=None): rval.append("/lib") return [normalize_path(x) for x in rval if x] + + +def no_color(settings: Optional[dict]) -> bool: + # In several years (2026+), we can cleanup NOCOLOR support, and just support NO_COLOR. + has_color: str = settings.get("NO_COLOR") + nocolor: str = settings.get("NOCOLOR", "false").lower() + if has_color is None: + return nocolor in ("yes", "true") + return bool(has_color) |