aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/portage/util/__init__.py')
-rw-r--r--lib/portage/util/__init__.py159
1 files changed, 81 insertions, 78 deletions
diff --git a/lib/portage/util/__init__.py b/lib/portage/util/__init__.py
index 5ade7f660..1f8c9e94f 100644
--- a/lib/portage/util/__init__.py
+++ b/lib/portage/util/__init__.py
@@ -1,6 +1,26 @@
-# Copyright 2004-2020 Gentoo Authors
+# Copyright 2004-2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
+from portage.cache.mappings import UserDict
+from portage.proxy.objectproxy import ObjectProxy
+from portage.localization import _
+from portage.exception import (
+ InvalidAtom,
+ PortageException,
+ FileNotFound,
+ IsADirectory,
+ OperationNotPermitted,
+ ParseError,
+ PermissionDenied,
+ ReadOnlyFileSystem,
+)
+from portage.const import VCS_DIRS
+from portage import _unicode_decode
+from portage import _unicode_encode
+from portage import _os_merge
+from portage import _encodings
+from portage import os
+
__all__ = [
"apply_permissions",
"apply_recursive_permissions",
@@ -36,6 +56,7 @@ __all__ = [
"writemsg",
"writemsg_level",
"writemsg_stdout",
+ "no_color",
]
from contextlib import AbstractContextManager
@@ -51,6 +72,7 @@ import string
import sys
import traceback
import glob
+from typing import Optional, TextIO
import portage
@@ -61,31 +83,11 @@ portage.proxy.lazyimport.lazyimport(
"subprocess",
)
-from portage import os
-from portage import _encodings
-from portage import _os_merge
-from portage import _unicode_encode
-from portage import _unicode_decode
-from portage.const import VCS_DIRS
-from portage.exception import (
- InvalidAtom,
- PortageException,
- FileNotFound,
- IsADirectory,
- OperationNotPermitted,
- ParseError,
- PermissionDenied,
- ReadOnlyFileSystem,
-)
-from portage.localization import _
-from portage.proxy.objectproxy import ObjectProxy
-from portage.cache.mappings import UserDict
-
noiselimit = 0
-def initialize_logger(level=logging.WARNING):
+def initialize_logger(level=logging.WARNING) -> None:
"""Sets up basic logging of portage activities
Args:
level: the level to emit messages at ('info', 'debug', 'warning' ...)
@@ -95,8 +97,15 @@ def initialize_logger(level=logging.WARNING):
logging.basicConfig(level=level, format="[%(levelname)-4s] %(message)s")
-def writemsg(mystr, noiselevel=0, fd=None):
- """Prints out warning and debug messages based on the noiselimit setting"""
+def writemsg(mystr: str, noiselevel: int = 0, fd: Optional[TextIO] = None) -> None:
+ """
+ Prints out warning and debug messages based on the noiselimit setting
+
+ Takes three arguments
+ 1. mystr: The message to write
+ 2. noiselevel: The noiselevel of the message
+ 3. fd: file descriptor - where to write the message to
+ """
global noiselimit
if fd is None:
fd = sys.stderr
@@ -116,24 +125,23 @@ def writemsg(mystr, noiselevel=0, fd=None):
fd.flush()
-def writemsg_stdout(mystr, noiselevel=0):
+def writemsg_stdout(mystr: str, noiselevel: int = 0) -> None:
"""Prints messages stdout based on the noiselimit setting"""
writemsg(mystr, noiselevel=noiselevel, fd=sys.stdout)
-def writemsg_level(msg, level=0, noiselevel=0):
+def writemsg_level(msg: str, level: int = 0, noiselevel: int = 0) -> None:
"""
Show a message for the given level as defined by the logging module
- (default is 0). When level >= logging.WARNING then the message is
+ (default is 0).
+
+ When level >= logging.WARNING then the message is
sent to stderr, otherwise it is sent to stdout. The noiselevel is
passed directly to writemsg().
-
- @type msg: str
- @param msg: a message string, including newline if appropriate
- @type level: int
- @param level: a numeric logging level (see the logging module)
- @type noiselevel: int
- @param noiselevel: passed directly to writemsg
+ Takes three parameters
+ 1. msg - the message to output
+ 2. level - the numeric logging level (see python's logging module)
+ 3. noiselevel - portage's logging level, passed directly to writemsg
"""
if level >= logging.WARNING:
fd = sys.stderr
@@ -142,7 +150,7 @@ def writemsg_level(msg, level=0, noiselevel=0):
writemsg(msg, noiselevel=noiselevel, fd=fd)
-def normalize_path(mypath):
+def normalize_path(mypath) -> str:
"""
os.path.normpath("//foo") returns "//foo" instead of "/foo"
We dislike this behavior so we create our own normpath func
@@ -472,9 +480,8 @@ def read_corresponding_eapi_file(filename, default="0"):
eapi = None
try:
- with io.open(
+ with open(
_unicode_encode(eapi_file, encoding=_encodings["fs"], errors="strict"),
- mode="r",
encoding=_encodings["repo.content"],
errors="replace",
) as f:
@@ -487,7 +494,7 @@ def read_corresponding_eapi_file(filename, default="0"):
% (eapi_file),
noiselevel=-1,
)
- except IOError:
+ except OSError:
pass
_eapi_cache[eapi_file] = eapi
@@ -574,7 +581,6 @@ def grabfile_package(
eapi=None,
eapi_default="0",
):
-
pkgs = grabfile(
myfilename, compatlevel, recursive=recursive, remember_source_file=True
)
@@ -679,9 +685,8 @@ def grablines(myfilename, recursive=0, remember_source_file=False):
else:
try:
- with io.open(
+ with open(
_unicode_encode(myfilename, encoding=_encodings["fs"], errors="strict"),
- mode="r",
encoding=_encodings["content"],
errors="replace",
) as myfile:
@@ -689,7 +694,7 @@ def grablines(myfilename, recursive=0, remember_source_file=False):
mylines = [(line, myfilename) for line in myfile.readlines()]
else:
mylines = myfile.readlines()
- except IOError as e:
+ except OSError as e:
if e.errno == PermissionDenied.errno:
raise PermissionDenied(myfilename)
elif e.errno in (errno.ENOENT, errno.ESTALE):
@@ -708,7 +713,7 @@ def writedict(mydict, myfilename, writekey=True):
lines.append(v + "\n")
else:
for k, v in mydict.items():
- lines.append("%s %s\n" % (k, " ".join(v)))
+ lines.append(f"{k} {' '.join(v)}\n")
write_atomic(myfilename, "".join(lines))
@@ -734,11 +739,11 @@ class _getconfig_shlex(shlex.shlex):
try:
newfile = varexpand(newfile, self.var_expand_map)
return shlex.shlex.sourcehook(self, newfile)
- except EnvironmentError as e:
+ except OSError as e:
if e.errno == PermissionDenied.errno:
raise PermissionDenied(newfile)
if e.errno not in (errno.ENOENT, errno.ENOTDIR):
- writemsg("open('%s', 'r'): %s\n" % (newfile, e), noiselevel=-1)
+ writemsg(f"open('{newfile}', 'r'): {e}\n", noiselevel=-1)
raise
msg = self.error_leader()
@@ -748,7 +753,7 @@ class _getconfig_shlex(shlex.shlex):
msg += _("%s: No such file or directory") % newfile
if self.__portage_tolerant:
- writemsg("%s\n" % msg, noiselevel=-1)
+ writemsg(f"{msg}\n", noiselevel=-1)
else:
raise ParseError(msg)
return (newfile, io.StringIO())
@@ -760,7 +765,6 @@ _invalid_var_name_re = re.compile(r"^\d|\W")
def getconfig(
mycfg, tolerant=False, allow_sourcing=False, expand=True, recursive=False
):
-
if isinstance(expand, dict):
# Some existing variable definitions have been
# passed in, for use in substitutions.
@@ -795,16 +799,15 @@ def getconfig(
try:
f = open(
_unicode_encode(mycfg, encoding=_encodings["fs"], errors="strict"),
- mode="r",
encoding=_encodings["content"],
errors="replace",
)
content = f.read()
- except IOError as e:
+ except OSError as e:
if e.errno == PermissionDenied.errno:
raise PermissionDenied(mycfg)
if e.errno != errno.ENOENT:
- writemsg("open('%s', 'r'): %s\n" % (mycfg, e), noiselevel=-1)
+ writemsg(f"open('{mycfg}', 'r'): {e}\n", noiselevel=-1)
if e.errno not in (errno.EISDIR,):
raise
return None
@@ -867,7 +870,7 @@ def getconfig(
if not tolerant:
raise ParseError(msg)
else:
- writemsg("%s\n" % msg, noiselevel=-1)
+ writemsg(f"{msg}\n", noiselevel=-1)
return mykeys
elif equ != "=":
@@ -875,7 +878,7 @@ def getconfig(
if not tolerant:
raise ParseError(msg)
else:
- writemsg("%s\n" % msg, noiselevel=-1)
+ writemsg(f"{msg}\n", noiselevel=-1)
return mykeys
val = _unicode_decode(lex.get_token())
@@ -886,14 +889,14 @@ def getconfig(
if not tolerant:
raise ParseError(msg)
else:
- writemsg("%s\n" % msg, noiselevel=-1)
+ writemsg(f"{msg}\n", noiselevel=-1)
return mykeys
if _invalid_var_name_re.search(key) is not None:
msg = lex.error_leader() + _("Invalid variable name '%s'") % (key,)
if not tolerant:
raise ParseError(msg)
- writemsg("%s\n" % msg, noiselevel=-1)
+ writemsg(f"{msg}\n", noiselevel=-1)
continue
if expand:
@@ -908,8 +911,8 @@ def getconfig(
except Exception as e:
if isinstance(e, ParseError) or lex is None:
raise
- msg = "%s%s" % (lex.error_leader(), e)
- writemsg("%s\n" % msg, noiselevel=-1)
+ msg = f"{lex.error_leader()}{e}"
+ writemsg(f"{msg}\n", noiselevel=-1)
raise
return mykeys
@@ -1085,7 +1088,7 @@ def dump_traceback(msg, noiselevel=1):
stack = traceback.extract_tb(info[2])
error = str(info[1])
writemsg("\n====================================\n", noiselevel=noiselevel)
- writemsg("%s\n\n" % msg, noiselevel=noiselevel)
+ writemsg(f"{msg}\n\n", noiselevel=noiselevel)
for line in traceback.format_list(stack):
writemsg(line, noiselevel=noiselevel)
if error:
@@ -1128,7 +1131,7 @@ class cmp_sort_key:
def __lt__(self, other):
if other.__class__ is not self.__class__:
raise TypeError(
- "Expected type %s, got %s" % (self.__class__, other.__class__)
+ f"Expected type {self.__class__}, got {other.__class__}"
)
return self._cmp_func(self._obj, other._obj) < 0
@@ -1195,7 +1198,7 @@ def _do_stat(filename, follow_links=True):
return os.stat(filename)
return os.lstat(filename)
except OSError as oe:
- func_call = "stat('%s')" % filename
+ func_call = f"stat('{filename}')"
if oe.errno == errno.EPERM:
raise OperationNotPermitted(func_call)
if oe.errno == errno.EACCES:
@@ -1290,7 +1293,7 @@ def apply_permissions(
os.chmod(filename, new_mode)
modified = True
except OSError as oe:
- func_call = "chmod('%s', %s)" % (filename, oct(new_mode))
+ func_call = f"chmod('{filename}', {oct(new_mode)})"
if oe.errno == errno.EPERM:
raise OperationNotPermitted(func_call)
elif oe.errno == errno.EACCES:
@@ -1412,7 +1415,6 @@ def apply_secpass_permissions(
# Avoid accessing portage.data.secpass when possible, since
# it triggers config loading (undesirable for chmod-lite).
if (uid != -1 or gid != -1) and portage.data.secpass < 2:
-
if uid != -1 and uid != stat_cached.st_uid:
all_applied = False
uid = -1
@@ -1463,11 +1465,11 @@ class atomic_ofstream(AbstractContextManager, ObjectProxy):
tmp_name, encoding=_encodings["fs"], errors="strict"
),
mode=mode,
- **kargs
+ **kargs,
),
)
return
- except IOError as e:
+ except OSError as e:
if canonical_path == filename:
raise
# Ignore this error, since it's irrelevant
@@ -1482,7 +1484,7 @@ class atomic_ofstream(AbstractContextManager, ObjectProxy):
open_func(
_unicode_encode(tmp_name, encoding=_encodings["fs"], errors="strict"),
mode=mode,
- **kargs
+ **kargs,
),
)
@@ -1560,10 +1562,10 @@ def write_atomic(file_path, content, **kwargs):
f = atomic_ofstream(file_path, **kwargs)
f.write(content)
f.close()
- except (IOError, OSError) as e:
+ except OSError as e:
if f:
f.abort()
- func_call = "write_atomic('%s')" % file_path
+ func_call = f"write_atomic('{file_path}')"
if e.errno == errno.EPERM:
raise OperationNotPermitted(func_call)
elif e.errno == errno.EACCES:
@@ -1592,7 +1594,7 @@ def ensure_dirs(dir_path, **kwargs):
os.makedirs(dir_path)
created_dir = True
except OSError as oe:
- func_call = "makedirs('%s')" % dir_path
+ func_call = f"makedirs('{dir_path}')"
if oe.errno in (errno.EEXIST,):
pass
else:
@@ -1625,7 +1627,6 @@ class LazyItemsDict(UserDict):
__slots__ = ("lazy_items",)
def __init__(self, *args, **kwargs):
-
self.lazy_items = {}
UserDict.__init__(self, *args, **kwargs)
@@ -1721,21 +1722,15 @@ class LazyItemsDict(UserDict):
raise TypeError(
"LazyItemsDict "
+ "deepcopy is unsafe with lazy items that are "
- + "not singletons: key=%s value=%s"
- % (
- k,
- lazy_item,
- )
+ + f"not singletons: key={k} value={lazy_item}"
)
UserDict.__setitem__(result, k_copy, deepcopy(self[k], memo))
return result
class _LazyItem:
-
__slots__ = ("func", "pargs", "kwargs", "singleton")
def __init__(self, func, pargs, kwargs, singleton):
-
if not pargs:
pargs = None
if not kwargs:
@@ -1965,7 +1960,7 @@ def find_updated_config_files(target_root, config_protect):
if stat.S_ISDIR(mymode):
mycommand = (
- "find '%s' -name '.*' -type d -prune -o -name '._cfg????_*'" % x
+ f"find '{x}' -name '.*' -type d -prune -o -name '._cfg????_*'"
)
else:
mycommand = (
@@ -2005,8 +2000,7 @@ def getlibpaths(root, env=None):
if include_match is not None:
subpath = os.path.join(os.path.dirname(path), include_match.group(1))
for p in glob.glob(subpath):
- for r in read_ld_so_conf(p):
- yield r
+ yield from read_ld_so_conf(p)
else:
yield l
@@ -2020,3 +2014,12 @@ def getlibpaths(root, env=None):
rval.append("/lib")
return [normalize_path(x) for x in rval if x]
+
+
+def no_color(settings: Optional[dict]) -> bool:
+ # In several years (2026+), we can cleanup NOCOLOR support, and just support NO_COLOR.
+ has_color: str = settings.get("NO_COLOR")
+ nocolor: str = settings.get("NOCOLOR", "false").lower()
+ if has_color is None:
+ return nocolor in ("yes", "true")
+ return bool(has_color)