aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/portage/util/__init__.py')
-rw-r--r--lib/portage/util/__init__.py3620
1 files changed, 1922 insertions, 1698 deletions
diff --git a/lib/portage/util/__init__.py b/lib/portage/util/__init__.py
index 0412b2b59..1f8c9e94f 100644
--- a/lib/portage/util/__init__.py
+++ b/lib/portage/util/__init__.py
@@ -1,16 +1,65 @@
-# Copyright 2004-2020 Gentoo Authors
+# Copyright 2004-2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
-__all__ = ['apply_permissions', 'apply_recursive_permissions',
- 'apply_secpass_permissions', 'apply_stat_permissions', 'atomic_ofstream',
- 'cmp_sort_key', 'ConfigProtect', 'dump_traceback', 'ensure_dirs',
- 'find_updated_config_files', 'getconfig', 'getlibpaths', 'grabdict',
- 'grabdict_package', 'grabfile', 'grabfile_package', 'grablines',
- 'initialize_logger', 'LazyItemsDict', 'map_dictlist_vals',
- 'new_protect_filename', 'normalize_path', 'pickle_read', 'stack_dictlist',
- 'stack_dicts', 'stack_lists', 'unique_array', 'unique_everseen', 'varexpand',
- 'write_atomic', 'writedict', 'writemsg', 'writemsg_level', 'writemsg_stdout']
+from portage.cache.mappings import UserDict
+from portage.proxy.objectproxy import ObjectProxy
+from portage.localization import _
+from portage.exception import (
+ InvalidAtom,
+ PortageException,
+ FileNotFound,
+ IsADirectory,
+ OperationNotPermitted,
+ ParseError,
+ PermissionDenied,
+ ReadOnlyFileSystem,
+)
+from portage.const import VCS_DIRS
+from portage import _unicode_decode
+from portage import _unicode_encode
+from portage import _os_merge
+from portage import _encodings
+from portage import os
+__all__ = [
+ "apply_permissions",
+ "apply_recursive_permissions",
+ "apply_secpass_permissions",
+ "apply_stat_permissions",
+ "atomic_ofstream",
+ "cmp_sort_key",
+ "ConfigProtect",
+ "dump_traceback",
+ "ensure_dirs",
+ "find_updated_config_files",
+ "getconfig",
+ "getlibpaths",
+ "grabdict",
+ "grabdict_package",
+ "grabfile",
+ "grabfile_package",
+ "grablines",
+ "initialize_logger",
+ "LazyItemsDict",
+ "map_dictlist_vals",
+ "new_protect_filename",
+ "normalize_path",
+ "pickle_read",
+ "stack_dictlist",
+ "stack_dicts",
+ "stack_lists",
+ "unique_array",
+ "unique_everseen",
+ "varexpand",
+ "write_atomic",
+ "writedict",
+ "writemsg",
+ "writemsg_level",
+ "writemsg_stdout",
+ "no_color",
+]
+
+from contextlib import AbstractContextManager
from copy import deepcopy
import errno
import io
@@ -23,1779 +72,1954 @@ import string
import sys
import traceback
import glob
+from typing import Optional, TextIO
import portage
-portage.proxy.lazyimport.lazyimport(globals(),
- 'pickle',
- 'portage.dep:Atom',
- 'subprocess',
-)
-from portage import os
-from portage import _encodings
-from portage import _os_merge
-from portage import _unicode_encode
-from portage import _unicode_decode
-from portage.const import VCS_DIRS
-from portage.exception import InvalidAtom, PortageException, FileNotFound, \
- IsADirectory, OperationNotPermitted, ParseError, PermissionDenied, \
- ReadOnlyFileSystem
-from portage.localization import _
-from portage.proxy.objectproxy import ObjectProxy
-from portage.cache.mappings import UserDict
+portage.proxy.lazyimport.lazyimport(
+ globals(),
+ "pickle",
+ "portage.dep:Atom",
+ "subprocess",
+)
noiselimit = 0
-def initialize_logger(level=logging.WARNING):
- """Sets up basic logging of portage activities
- Args:
- level: the level to emit messages at ('info', 'debug', 'warning' ...)
- Returns:
- None
- """
- logging.basicConfig(level=level, format='[%(levelname)-4s] %(message)s')
-
-def writemsg(mystr, noiselevel=0, fd=None):
- """Prints out warning and debug messages based on the noiselimit setting"""
- global noiselimit
- if fd is None:
- fd = sys.stderr
- if noiselevel <= noiselimit:
- # avoid potential UnicodeEncodeError
- if isinstance(fd, io.StringIO):
- mystr = _unicode_decode(mystr,
- encoding=_encodings['content'], errors='replace')
- else:
- mystr = _unicode_encode(mystr,
- encoding=_encodings['stdio'], errors='backslashreplace')
- if fd in (sys.stdout, sys.stderr):
- fd = fd.buffer
- fd.write(mystr)
- fd.flush()
-
-def writemsg_stdout(mystr, noiselevel=0):
- """Prints messages stdout based on the noiselimit setting"""
- writemsg(mystr, noiselevel=noiselevel, fd=sys.stdout)
-
-def writemsg_level(msg, level=0, noiselevel=0):
- """
- Show a message for the given level as defined by the logging module
- (default is 0). When level >= logging.WARNING then the message is
- sent to stderr, otherwise it is sent to stdout. The noiselevel is
- passed directly to writemsg().
-
- @type msg: str
- @param msg: a message string, including newline if appropriate
- @type level: int
- @param level: a numeric logging level (see the logging module)
- @type noiselevel: int
- @param noiselevel: passed directly to writemsg
- """
- if level >= logging.WARNING:
- fd = sys.stderr
- else:
- fd = sys.stdout
- writemsg(msg, noiselevel=noiselevel, fd=fd)
-def normalize_path(mypath):
- """
- os.path.normpath("//foo") returns "//foo" instead of "/foo"
- We dislike this behavior so we create our own normpath func
- to fix it.
- """
- if isinstance(mypath, bytes):
- path_sep = os.path.sep.encode()
- else:
- path_sep = os.path.sep
+def initialize_logger(level=logging.WARNING) -> None:
+ """Sets up basic logging of portage activities
+ Args:
+ level: the level to emit messages at ('info', 'debug', 'warning' ...)
+ Returns:
+ None
+ """
+ logging.basicConfig(level=level, format="[%(levelname)-4s] %(message)s")
+
+
+def writemsg(mystr: str, noiselevel: int = 0, fd: Optional[TextIO] = None) -> None:
+ """
+ Prints out warning and debug messages based on the noiselimit setting
+
+ Takes three arguments
+ 1. mystr: The message to write
+ 2. noiselevel: The noiselevel of the message
+ 3. fd: file descriptor - where to write the message to
+ """
+ global noiselimit
+ if fd is None:
+ fd = sys.stderr
+ if noiselevel <= noiselimit:
+ # avoid potential UnicodeEncodeError
+ if isinstance(fd, io.StringIO):
+ mystr = _unicode_decode(
+ mystr, encoding=_encodings["content"], errors="replace"
+ )
+ else:
+ mystr = _unicode_encode(
+ mystr, encoding=_encodings["stdio"], errors="backslashreplace"
+ )
+ if fd in (sys.stdout, sys.stderr):
+ fd = fd.buffer
+ fd.write(mystr)
+ fd.flush()
+
+
+def writemsg_stdout(mystr: str, noiselevel: int = 0) -> None:
+ """Prints messages stdout based on the noiselimit setting"""
+ writemsg(mystr, noiselevel=noiselevel, fd=sys.stdout)
+
+
+def writemsg_level(msg: str, level: int = 0, noiselevel: int = 0) -> None:
+ """
+ Show a message for the given level as defined by the logging module
+ (default is 0).
+
+ When level >= logging.WARNING then the message is
+ sent to stderr, otherwise it is sent to stdout. The noiselevel is
+ passed directly to writemsg().
+ Takes three parameters
+ 1. msg - the message to output
+ 2. level - the numeric logging level (see python's logging module)
+ 3. noiselevel - portage's logging level, passed directly to writemsg
+ """
+ if level >= logging.WARNING:
+ fd = sys.stderr
+ else:
+ fd = sys.stdout
+ writemsg(msg, noiselevel=noiselevel, fd=fd)
+
+
+def normalize_path(mypath) -> str:
+ """
+ os.path.normpath("//foo") returns "//foo" instead of "/foo"
+ We dislike this behavior so we create our own normpath func
+ to fix it.
+ """
+ if isinstance(mypath, bytes):
+ path_sep = os.path.sep.encode()
+ else:
+ path_sep = os.path.sep
+
+ if mypath.startswith(path_sep):
+ # posixpath.normpath collapses 3 or more leading slashes to just 1.
+ return os.path.normpath(2 * path_sep + mypath)
+ return os.path.normpath(mypath)
- if mypath.startswith(path_sep):
- # posixpath.normpath collapses 3 or more leading slashes to just 1.
- return os.path.normpath(2*path_sep + mypath)
- return os.path.normpath(mypath)
def grabfile(myfilename, compat_level=0, recursive=0, remember_source_file=False):
- """This function grabs the lines in a file, normalizes whitespace and returns lines in a list; if a line
- begins with a #, it is ignored, as are empty lines"""
-
- mylines = grablines(myfilename, recursive, remember_source_file=True)
- newlines = []
-
- for x, source_file in mylines:
- #the split/join thing removes leading and trailing whitespace, and converts any whitespace in the line
- #into single spaces.
- myline = x.split()
- if x and x[0] != "#":
- mylinetemp = []
- for item in myline:
- if item[:1] != "#":
- mylinetemp.append(item)
- else:
- break
- myline = mylinetemp
-
- myline = " ".join(myline)
- if not myline:
- continue
- if myline[0] == "#":
- # Check if we have a compat-level string. BC-integration data.
- # '##COMPAT==>N<==' 'some string attached to it'
- mylinetest = myline.split("<==", 1)
- if len(mylinetest) == 2:
- myline_potential = mylinetest[1]
- mylinetest = mylinetest[0].split("##COMPAT==>")
- if len(mylinetest) == 2:
- if compat_level >= int(mylinetest[1]):
- # It's a compat line, and the key matches.
- newlines.append(myline_potential)
- continue
- else:
- continue
- if remember_source_file:
- newlines.append((myline, source_file))
- else:
- newlines.append(myline)
- return newlines
+ """This function grabs the lines in a file, normalizes whitespace and returns lines in a list; if a line
+ begins with a #, it is ignored, as are empty lines"""
+
+ mylines = grablines(myfilename, recursive, remember_source_file=True)
+ newlines = []
+
+ for x, source_file in mylines:
+ # the split/join thing removes leading and trailing whitespace, and converts any whitespace in the line
+ # into single spaces.
+ myline = x.split()
+ if x and x[0] != "#":
+ mylinetemp = []
+ for item in myline:
+ if item[:1] != "#":
+ mylinetemp.append(item)
+ else:
+ break
+ myline = mylinetemp
+
+ myline = " ".join(myline)
+ if not myline:
+ continue
+ if myline[0] == "#":
+ # Check if we have a compat-level string. BC-integration data.
+ # '##COMPAT==>N<==' 'some string attached to it'
+ mylinetest = myline.split("<==", 1)
+ if len(mylinetest) == 2:
+ myline_potential = mylinetest[1]
+ mylinetest = mylinetest[0].split("##COMPAT==>")
+ if len(mylinetest) == 2:
+ if compat_level >= int(mylinetest[1]):
+ # It's a compat line, and the key matches.
+ newlines.append(myline_potential)
+ continue
+ else:
+ continue
+ if remember_source_file:
+ newlines.append((myline, source_file))
+ else:
+ newlines.append(myline)
+ return newlines
+
def map_dictlist_vals(func, myDict):
- """Performs a function on each value of each key in a dictlist.
- Returns a new dictlist."""
- new_dl = {}
- for key in myDict:
- new_dl[key] = []
- new_dl[key] = [func(x) for x in myDict[key]]
- return new_dl
+ """Performs a function on each value of each key in a dictlist.
+ Returns a new dictlist."""
+ new_dl = {}
+ for key in myDict:
+ new_dl[key] = []
+ new_dl[key] = [func(x) for x in myDict[key]]
+ return new_dl
+
def stack_dictlist(original_dicts, incremental=0, incrementals=[], ignore_none=0):
- """
- Stacks an array of dict-types into one array. Optionally merging or
- overwriting matching key/value pairs for the dict[key]->list.
- Returns a single dict. Higher index in lists is preferenced.
-
- Example usage:
- >>> from portage.util import stack_dictlist
- >>> print stack_dictlist( [{'a':'b'},{'x':'y'}])
- >>> {'a':'b','x':'y'}
- >>> print stack_dictlist( [{'a':'b'},{'a':'c'}], incremental = True )
- >>> {'a':['b','c'] }
- >>> a = {'KEYWORDS':['x86','alpha']}
- >>> b = {'KEYWORDS':['-x86']}
- >>> print stack_dictlist( [a,b] )
- >>> { 'KEYWORDS':['x86','alpha','-x86']}
- >>> print stack_dictlist( [a,b], incremental=True)
- >>> { 'KEYWORDS':['alpha'] }
- >>> print stack_dictlist( [a,b], incrementals=['KEYWORDS'])
- >>> { 'KEYWORDS':['alpha'] }
-
- @param original_dicts a list of (dictionary objects or None)
- @type list
- @param incremental True or false depending on whether new keys should overwrite
- keys which already exist.
- @type boolean
- @param incrementals A list of items that should be incremental (-foo removes foo from
- the returned dict).
- @type list
- @param ignore_none Appears to be ignored, but probably was used long long ago.
- @type boolean
+ """
+ Stacks an array of dict-types into one array. Optionally merging or
+ overwriting matching key/value pairs for the dict[key]->list.
+ Returns a single dict. Higher index in lists is preferenced.
+
+ Example usage:
+ >>> from portage.util import stack_dictlist
+ >>> print stack_dictlist( [{'a':'b'},{'x':'y'}])
+ >>> {'a':'b','x':'y'}
+ >>> print stack_dictlist( [{'a':'b'},{'a':'c'}], incremental = True )
+ >>> {'a':['b','c'] }
+ >>> a = {'KEYWORDS':['x86','alpha']}
+ >>> b = {'KEYWORDS':['-x86']}
+ >>> print stack_dictlist( [a,b] )
+ >>> { 'KEYWORDS':['x86','alpha','-x86']}
+ >>> print stack_dictlist( [a,b], incremental=True)
+ >>> { 'KEYWORDS':['alpha'] }
+ >>> print stack_dictlist( [a,b], incrementals=['KEYWORDS'])
+ >>> { 'KEYWORDS':['alpha'] }
+
+ @param original_dicts a list of (dictionary objects or None)
+ @type list
+ @param incremental True or false depending on whether new keys should overwrite
+ keys which already exist.
+ @type boolean
+ @param incrementals A list of items that should be incremental (-foo removes foo from
+ the returned dict).
+ @type list
+ @param ignore_none Appears to be ignored, but probably was used long long ago.
+ @type boolean
+
+ """
+ final_dict = {}
+ for mydict in original_dicts:
+ if mydict is None:
+ continue
+ for y in mydict:
+ if not y in final_dict:
+ final_dict[y] = []
+
+ for thing in mydict[y]:
+ if thing:
+ if incremental or y in incrementals:
+ if thing == "-*":
+ final_dict[y] = []
+ continue
+ elif thing[:1] == "-":
+ try:
+ final_dict[y].remove(thing[1:])
+ except ValueError:
+ pass
+ continue
+ if thing not in final_dict[y]:
+ final_dict[y].append(thing)
+ if y in final_dict and not final_dict[y]:
+ del final_dict[y]
+ return final_dict
- """
- final_dict = {}
- for mydict in original_dicts:
- if mydict is None:
- continue
- for y in mydict:
- if not y in final_dict:
- final_dict[y] = []
-
- for thing in mydict[y]:
- if thing:
- if incremental or y in incrementals:
- if thing == "-*":
- final_dict[y] = []
- continue
- elif thing[:1] == '-':
- try:
- final_dict[y].remove(thing[1:])
- except ValueError:
- pass
- continue
- if thing not in final_dict[y]:
- final_dict[y].append(thing)
- if y in final_dict and not final_dict[y]:
- del final_dict[y]
- return final_dict
def stack_dicts(dicts, incremental=0, incrementals=[], ignore_none=0):
- """Stacks an array of dict-types into one array. Optionally merging or
- overwriting matching key/value pairs for the dict[key]->string.
- Returns a single dict."""
- final_dict = {}
- for mydict in dicts:
- if not mydict:
- continue
- for k, v in mydict.items():
- if k in final_dict and (incremental or (k in incrementals)):
- final_dict[k] += " " + v
- else:
- final_dict[k] = v
- return final_dict
+ """Stacks an array of dict-types into one array. Optionally merging or
+ overwriting matching key/value pairs for the dict[key]->string.
+ Returns a single dict."""
+ final_dict = {}
+ for mydict in dicts:
+ if not mydict:
+ continue
+ for k, v in mydict.items():
+ if k in final_dict and (incremental or (k in incrementals)):
+ final_dict[k] += " " + v
+ else:
+ final_dict[k] = v
+ return final_dict
+
def append_repo(atom_list, repo_name, remember_source_file=False):
- """
- Takes a list of valid atoms without repo spec and appends ::repo_name.
- If an atom already has a repo part, then it is preserved (see bug #461948).
- """
- if remember_source_file:
- return [(atom.repo is not None and atom or atom.with_repo(repo_name), source) \
- for atom, source in atom_list]
-
- return [atom.repo is not None and atom or atom.with_repo(repo_name) \
- for atom in atom_list]
-
-def stack_lists(lists, incremental=1, remember_source_file=False,
- warn_for_unmatched_removal=False, strict_warn_for_unmatched_removal=False, ignore_repo=False):
- """Stacks an array of list-types into one array. Optionally removing
- distinct values using '-value' notation. Higher index is preferenced.
-
- all elements must be hashable."""
- matched_removals = set()
- unmatched_removals = {}
- new_list = {}
- for sub_list in lists:
- for token in sub_list:
- token_key = token
- if remember_source_file:
- token, source_file = token
- else:
- source_file = False
-
- if token is None:
- continue
-
- if incremental:
- if token == "-*":
- new_list.clear()
- elif token[:1] == '-':
- matched = False
- if ignore_repo and not "::" in token:
- #Let -cat/pkg remove cat/pkg::repo.
- to_be_removed = []
- token_slice = token[1:]
- for atom in new_list:
- atom_without_repo = atom
- if atom.repo is not None:
- # Atom.without_repo instantiates a new Atom,
- # which is unnecessary here, so use string
- # replacement instead.
- atom_without_repo = \
- atom.replace("::" + atom.repo, "", 1)
- if atom_without_repo == token_slice:
- to_be_removed.append(atom)
- if to_be_removed:
- matched = True
- for atom in to_be_removed:
- new_list.pop(atom)
- else:
- try:
- new_list.pop(token[1:])
- matched = True
- except KeyError:
- pass
-
- if not matched:
- if source_file and \
- (strict_warn_for_unmatched_removal or \
- token_key not in matched_removals):
- unmatched_removals.setdefault(source_file, set()).add(token)
- else:
- matched_removals.add(token_key)
- else:
- new_list[token] = source_file
- else:
- new_list[token] = source_file
-
- if warn_for_unmatched_removal:
- for source_file, tokens in unmatched_removals.items():
- if len(tokens) > 3:
- selected = [tokens.pop(), tokens.pop(), tokens.pop()]
- writemsg(_("--- Unmatched removal atoms in %s: %s and %s more\n") % \
- (source_file, ", ".join(selected), len(tokens)),
- noiselevel=-1)
- else:
- writemsg(_("--- Unmatched removal atom(s) in %s: %s\n") % (source_file, ", ".join(tokens)),
- noiselevel=-1)
-
- if remember_source_file:
- return list(new_list.items())
- return list(new_list)
-
-def grabdict(myfilename, juststrings=0, empty=0, recursive=0, incremental=1, newlines=0):
- """
- This function grabs the lines in a file, normalizes whitespace and returns lines in a dictionary
-
- @param myfilename: file to process
- @type myfilename: string (path)
- @param juststrings: only return strings
- @type juststrings: Boolean (integer)
- @param empty: Ignore certain lines
- @type empty: Boolean (integer)
- @param recursive: Recursively grab ( support for /etc/portage/package.keywords/* and friends )
- @type recursive: Boolean (integer)
- @param incremental: Append to the return list, don't overwrite
- @type incremental: Boolean (integer)
- @param newlines: Append newlines
- @type newlines: Boolean (integer)
- @rtype: Dictionary
- @return:
- 1. Returns the lines in a file in a dictionary, for example:
- 'sys-apps/portage x86 amd64 ppc'
- would return
- {"sys-apps/portage" : ['x86', 'amd64', 'ppc']}
- """
- newdict = {}
- for x in grablines(myfilename, recursive):
- #the split/join thing removes leading and trailing whitespace, and converts any whitespace in the line
- #into single spaces.
- if x[0] == "#":
- continue
- myline=x.split()
- mylinetemp = []
- for item in myline:
- if item[:1] != "#":
- mylinetemp.append(item)
- else:
- break
- myline = mylinetemp
- if len(myline) < 2 and empty == 0:
- continue
- if len(myline) < 1 and empty == 1:
- continue
- if newlines:
- myline.append("\n")
- if incremental:
- newdict.setdefault(myline[0], []).extend(myline[1:])
- else:
- newdict[myline[0]] = myline[1:]
- if juststrings:
- for k, v in newdict.items():
- newdict[k] = " ".join(v)
- return newdict
+ """
+ Takes a list of valid atoms without repo spec and appends ::repo_name.
+ If an atom already has a repo part, then it is preserved (see bug #461948).
+ """
+ if remember_source_file:
+ return [
+ (atom.repo is not None and atom or atom.with_repo(repo_name), source)
+ for atom, source in atom_list
+ ]
+
+ return [
+ atom.repo is not None and atom or atom.with_repo(repo_name)
+ for atom in atom_list
+ ]
+
+
+def stack_lists(
+ lists,
+ incremental=1,
+ remember_source_file=False,
+ warn_for_unmatched_removal=False,
+ strict_warn_for_unmatched_removal=False,
+ ignore_repo=False,
+):
+ """Stacks an array of list-types into one array. Optionally removing
+ distinct values using '-value' notation. Higher index is preferenced.
+
+ all elements must be hashable."""
+ matched_removals = set()
+ unmatched_removals = {}
+ new_list = {}
+ for sub_list in lists:
+ for token in sub_list:
+ token_key = token
+ if remember_source_file:
+ token, source_file = token
+ else:
+ source_file = False
+
+ if token is None:
+ continue
+
+ if incremental:
+ if token == "-*":
+ new_list.clear()
+ elif token[:1] == "-":
+ matched = False
+ if ignore_repo and not "::" in token:
+ # Let -cat/pkg remove cat/pkg::repo.
+ to_be_removed = []
+ token_slice = token[1:]
+ for atom in new_list:
+ atom_without_repo = atom
+ if atom.repo is not None:
+ # Atom.without_repo instantiates a new Atom,
+ # which is unnecessary here, so use string
+ # replacement instead.
+ atom_without_repo = atom.replace(
+ "::" + atom.repo, "", 1
+ )
+ if atom_without_repo == token_slice:
+ to_be_removed.append(atom)
+ if to_be_removed:
+ matched = True
+ for atom in to_be_removed:
+ new_list.pop(atom)
+ else:
+ try:
+ new_list.pop(token[1:])
+ matched = True
+ except KeyError:
+ pass
+
+ if not matched:
+ if source_file and (
+ strict_warn_for_unmatched_removal
+ or token_key not in matched_removals
+ ):
+ unmatched_removals.setdefault(source_file, set()).add(token)
+ else:
+ matched_removals.add(token_key)
+ else:
+ new_list[token] = source_file
+ else:
+ new_list[token] = source_file
+
+ if warn_for_unmatched_removal:
+ for source_file, tokens in unmatched_removals.items():
+ if len(tokens) > 3:
+ selected = [tokens.pop(), tokens.pop(), tokens.pop()]
+ writemsg(
+ _("--- Unmatched removal atoms in %s: %s and %s more\n")
+ % (source_file, ", ".join(selected), len(tokens)),
+ noiselevel=-1,
+ )
+ else:
+ writemsg(
+ _("--- Unmatched removal atom(s) in %s: %s\n")
+ % (source_file, ", ".join(tokens)),
+ noiselevel=-1,
+ )
+
+ if remember_source_file:
+ return list(new_list.items())
+ return list(new_list)
+
+
+def grabdict(
+ myfilename, juststrings=0, empty=0, recursive=0, incremental=1, newlines=0
+):
+ """
+ This function grabs the lines in a file, normalizes whitespace and returns lines in a dictionary
+
+ @param myfilename: file to process
+ @type myfilename: string (path)
+ @param juststrings: only return strings
+ @type juststrings: Boolean (integer)
+ @param empty: Ignore certain lines
+ @type empty: Boolean (integer)
+ @param recursive: Recursively grab ( support for /etc/portage/package.keywords/* and friends )
+ @type recursive: Boolean (integer)
+ @param incremental: Append to the return list, don't overwrite
+ @type incremental: Boolean (integer)
+ @param newlines: Append newlines
+ @type newlines: Boolean (integer)
+ @rtype: Dictionary
+ @return:
+ 1. Returns the lines in a file in a dictionary, for example:
+ 'sys-apps/portage x86 amd64 ppc'
+ would return
+ {"sys-apps/portage" : ['x86', 'amd64', 'ppc']}
+ """
+ newdict = {}
+ for x in grablines(myfilename, recursive):
+ # the split/join thing removes leading and trailing whitespace, and converts any whitespace in the line
+ # into single spaces.
+ if x[0] == "#":
+ continue
+ myline = x.split()
+ mylinetemp = []
+ for item in myline:
+ if item[:1] != "#":
+ mylinetemp.append(item)
+ else:
+ break
+ myline = mylinetemp
+ if len(myline) < 2 and empty == 0:
+ continue
+ if len(myline) < 1 and empty == 1:
+ continue
+ if newlines:
+ myline.append("\n")
+ if incremental:
+ newdict.setdefault(myline[0], []).extend(myline[1:])
+ else:
+ newdict[myline[0]] = myline[1:]
+ if juststrings:
+ for k, v in newdict.items():
+ newdict[k] = " ".join(v)
+ return newdict
+
_eapi_cache = {}
+
def read_corresponding_eapi_file(filename, default="0"):
- """
- Read the 'eapi' file from the directory 'filename' is in.
- Returns "0" if the file is not present or invalid.
- """
- eapi_file = os.path.join(os.path.dirname(filename), "eapi")
- try:
- eapi = _eapi_cache[eapi_file]
- except KeyError:
- pass
- else:
- if eapi is None:
- return default
- return eapi
-
- eapi = None
- try:
- with io.open(_unicode_encode(eapi_file,
- encoding=_encodings['fs'], errors='strict'),
- mode='r', encoding=_encodings['repo.content'], errors='replace') as f:
- lines = f.readlines()
- if len(lines) == 1:
- eapi = lines[0].rstrip("\n")
- else:
- writemsg(_("--- Invalid 'eapi' file (doesn't contain exactly one line): %s\n") % (eapi_file),
- noiselevel=-1)
- except IOError:
- pass
-
- _eapi_cache[eapi_file] = eapi
- if eapi is None:
- return default
- return eapi
-
-def grabdict_package(myfilename, juststrings=0, recursive=0, newlines=0,
- allow_wildcard=False, allow_repo=False, allow_build_id=False, allow_use=True,
- verify_eapi=False, eapi=None, eapi_default="0"):
- """ Does the same thing as grabdict except it validates keys
- with isvalidatom()"""
-
- if recursive:
- file_list = _recursive_file_list(myfilename)
- else:
- file_list = [myfilename]
-
- atoms = {}
- for filename in file_list:
- d = grabdict(filename, juststrings=False,
- empty=True, recursive=False, incremental=True, newlines=newlines)
- if not d:
- continue
- if verify_eapi and eapi is None:
- eapi = read_corresponding_eapi_file(
- myfilename, default=eapi_default)
-
- for k, v in d.items():
- try:
- k = Atom(k, allow_wildcard=allow_wildcard,
- allow_repo=allow_repo,
- allow_build_id=allow_build_id, eapi=eapi)
- except InvalidAtom as e:
- writemsg(_("--- Invalid atom in %s: %s\n") % (filename, e),
- noiselevel=-1)
- else:
- if not allow_use and k.use:
- writemsg(_("--- Atom is not allowed to have USE flag(s) in %s: %s\n") % (filename, k),
- noiselevel=-1)
- continue
- atoms.setdefault(k, []).extend(v)
-
- if juststrings:
- for k, v in atoms.items():
- atoms[k] = " ".join(v)
-
- return atoms
-
-def grabfile_package(myfilename, compatlevel=0, recursive=0,
- allow_wildcard=False, allow_repo=False, allow_build_id=False,
- remember_source_file=False, verify_eapi=False, eapi=None,
- eapi_default="0"):
-
- pkgs=grabfile(myfilename, compatlevel, recursive=recursive, remember_source_file=True)
- if not pkgs:
- return pkgs
- if verify_eapi and eapi is None:
- eapi = read_corresponding_eapi_file(
- myfilename, default=eapi_default)
- mybasename = os.path.basename(myfilename)
- is_packages_file = mybasename == 'packages'
- atoms = []
- for pkg, source_file in pkgs:
- pkg_orig = pkg
- # for packages and package.mask files
- if pkg[:1] == "-":
- if is_packages_file and pkg == '-*':
- if remember_source_file:
- atoms.append((pkg, source_file))
- else:
- atoms.append(pkg)
- continue
- pkg = pkg[1:]
- if pkg[:1] == '*' and is_packages_file:
- pkg = pkg[1:]
- try:
- pkg = Atom(pkg, allow_wildcard=allow_wildcard,
- allow_repo=allow_repo, allow_build_id=allow_build_id,
- eapi=eapi)
- except InvalidAtom as e:
- writemsg(_("--- Invalid atom in %s: %s\n") % (source_file, e),
- noiselevel=-1)
- else:
- if pkg_orig == str(pkg):
- # normal atom, so return as Atom instance
- if remember_source_file:
- atoms.append((pkg, source_file))
- else:
- atoms.append(pkg)
- else:
- # atom has special prefix, so return as string
- if remember_source_file:
- atoms.append((pkg_orig, source_file))
- else:
- atoms.append(pkg_orig)
- return atoms
+ """
+ Read the 'eapi' file from the directory 'filename' is in.
+ Returns "0" if the file is not present or invalid.
+ """
+ eapi_file = os.path.join(os.path.dirname(filename), "eapi")
+ try:
+ eapi = _eapi_cache[eapi_file]
+ except KeyError:
+ pass
+ else:
+ if eapi is None:
+ return default
+ return eapi
+
+ eapi = None
+ try:
+ with open(
+ _unicode_encode(eapi_file, encoding=_encodings["fs"], errors="strict"),
+ encoding=_encodings["repo.content"],
+ errors="replace",
+ ) as f:
+ lines = f.readlines()
+ if len(lines) == 1:
+ eapi = lines[0].rstrip("\n")
+ else:
+ writemsg(
+ _("--- Invalid 'eapi' file (doesn't contain exactly one line): %s\n")
+ % (eapi_file),
+ noiselevel=-1,
+ )
+ except OSError:
+ pass
+
+ _eapi_cache[eapi_file] = eapi
+ if eapi is None:
+ return default
+ return eapi
+
+
+def grabdict_package(
+ myfilename,
+ juststrings=0,
+ recursive=0,
+ newlines=0,
+ allow_wildcard=False,
+ allow_repo=False,
+ allow_build_id=False,
+ allow_use=True,
+ verify_eapi=False,
+ eapi=None,
+ eapi_default="0",
+):
+ """Does the same thing as grabdict except it validates keys
+ with isvalidatom()"""
+
+ if recursive:
+ file_list = _recursive_file_list(myfilename)
+ else:
+ file_list = [myfilename]
+
+ atoms = {}
+ for filename in file_list:
+ d = grabdict(
+ filename,
+ juststrings=False,
+ empty=True,
+ recursive=False,
+ incremental=True,
+ newlines=newlines,
+ )
+ if not d:
+ continue
+ if verify_eapi and eapi is None:
+ eapi = read_corresponding_eapi_file(myfilename, default=eapi_default)
+
+ for k, v in d.items():
+ try:
+ k = Atom(
+ k,
+ allow_wildcard=allow_wildcard,
+ allow_repo=allow_repo,
+ allow_build_id=allow_build_id,
+ eapi=eapi,
+ )
+ except InvalidAtom as e:
+ writemsg(
+ _("--- Invalid atom in %s: %s\n") % (filename, e), noiselevel=-1
+ )
+ else:
+ if not allow_use and k.use:
+ writemsg(
+ _("--- Atom is not allowed to have USE flag(s) in %s: %s\n")
+ % (filename, k),
+ noiselevel=-1,
+ )
+ continue
+ atoms.setdefault(k, []).extend(v)
+
+ if juststrings:
+ for k, v in atoms.items():
+ atoms[k] = " ".join(v)
+
+ return atoms
+
+
+def grabfile_package(
+ myfilename,
+ compatlevel=0,
+ recursive=0,
+ allow_wildcard=False,
+ allow_repo=False,
+ allow_build_id=False,
+ remember_source_file=False,
+ verify_eapi=False,
+ eapi=None,
+ eapi_default="0",
+):
+ pkgs = grabfile(
+ myfilename, compatlevel, recursive=recursive, remember_source_file=True
+ )
+ if not pkgs:
+ return pkgs
+ if verify_eapi and eapi is None:
+ eapi = read_corresponding_eapi_file(myfilename, default=eapi_default)
+ mybasename = os.path.basename(myfilename)
+ is_packages_file = mybasename == "packages"
+ atoms = []
+ for pkg, source_file in pkgs:
+ pkg_orig = pkg
+ # for packages and package.mask files
+ if pkg[:1] == "-":
+ if is_packages_file and pkg == "-*":
+ if remember_source_file:
+ atoms.append((pkg, source_file))
+ else:
+ atoms.append(pkg)
+ continue
+ pkg = pkg[1:]
+ if pkg[:1] == "*" and is_packages_file:
+ pkg = pkg[1:]
+ try:
+ pkg = Atom(
+ pkg,
+ allow_wildcard=allow_wildcard,
+ allow_repo=allow_repo,
+ allow_build_id=allow_build_id,
+ eapi=eapi,
+ )
+ except InvalidAtom as e:
+ writemsg(
+ _("--- Invalid atom in %s: %s\n") % (source_file, e), noiselevel=-1
+ )
+ else:
+ if pkg_orig == str(pkg):
+ # normal atom, so return as Atom instance
+ if remember_source_file:
+ atoms.append((pkg, source_file))
+ else:
+ atoms.append(pkg)
+ else:
+ # atom has special prefix, so return as string
+ if remember_source_file:
+ atoms.append((pkg_orig, source_file))
+ else:
+ atoms.append(pkg_orig)
+ return atoms
+
def _recursive_basename_filter(f):
- return not f.startswith(".") and not f.endswith("~")
+ return not f.startswith(".") and not f.endswith("~")
+
def _recursive_file_list(path):
- # path may be a regular file or a directory
-
- def onerror(e):
- if e.errno == PermissionDenied.errno:
- raise PermissionDenied(path)
-
- stack = [os.path.split(path)]
-
- while stack:
- parent, fname = stack.pop()
- fullpath = os.path.join(parent, fname)
-
- try:
- st = os.stat(fullpath)
- except OSError as e:
- onerror(e)
- continue
-
- if stat.S_ISDIR(st.st_mode):
- if fname in VCS_DIRS or not _recursive_basename_filter(fname):
- continue
- try:
- children = os.listdir(fullpath)
- except OSError as e:
- onerror(e)
- continue
-
- # Sort in reverse, since we pop from the end of the stack.
- # Include regular files in the stack, so files are sorted
- # together with directories.
- children.sort(reverse=True)
- stack.extend((fullpath, x) for x in children)
-
- elif stat.S_ISREG(st.st_mode):
- if _recursive_basename_filter(fname):
- yield fullpath
+ # path may be a regular file or a directory
+
+ def onerror(e):
+ if e.errno == PermissionDenied.errno:
+ raise PermissionDenied(path)
+
+ stack = [os.path.split(path)]
+
+ while stack:
+ parent, fname = stack.pop()
+ fullpath = os.path.join(parent, fname)
+
+ try:
+ st = os.stat(fullpath)
+ except OSError as e:
+ onerror(e)
+ continue
+
+ if stat.S_ISDIR(st.st_mode):
+ if fname in VCS_DIRS or not _recursive_basename_filter(fname):
+ continue
+ try:
+ children = os.listdir(fullpath)
+ except OSError as e:
+ onerror(e)
+ continue
+
+ # Sort in reverse, since we pop from the end of the stack.
+ # Include regular files in the stack, so files are sorted
+ # together with directories.
+ children.sort(reverse=True)
+ stack.extend((fullpath, x) for x in children)
+
+ elif stat.S_ISREG(st.st_mode):
+ if _recursive_basename_filter(fname):
+ yield fullpath
+
def grablines(myfilename, recursive=0, remember_source_file=False):
- mylines = []
- if recursive:
- for f in _recursive_file_list(myfilename):
- mylines.extend(grablines(f, recursive=False,
- remember_source_file=remember_source_file))
-
- else:
- try:
- with io.open(_unicode_encode(myfilename,
- encoding=_encodings['fs'], errors='strict'),
- mode='r', encoding=_encodings['content'], errors='replace') as myfile:
- if remember_source_file:
- mylines = [(line, myfilename) for line in myfile.readlines()]
- else:
- mylines = myfile.readlines()
- except IOError as e:
- if e.errno == PermissionDenied.errno:
- raise PermissionDenied(myfilename)
- elif e.errno in (errno.ENOENT, errno.ESTALE):
- pass
- else:
- raise
- return mylines
+ mylines = []
+ if recursive:
+ for f in _recursive_file_list(myfilename):
+ mylines.extend(
+ grablines(f, recursive=False, remember_source_file=remember_source_file)
+ )
+
+ else:
+ try:
+ with open(
+ _unicode_encode(myfilename, encoding=_encodings["fs"], errors="strict"),
+ encoding=_encodings["content"],
+ errors="replace",
+ ) as myfile:
+ if remember_source_file:
+ mylines = [(line, myfilename) for line in myfile.readlines()]
+ else:
+ mylines = myfile.readlines()
+ except OSError as e:
+ if e.errno == PermissionDenied.errno:
+ raise PermissionDenied(myfilename)
+ elif e.errno in (errno.ENOENT, errno.ESTALE):
+ pass
+ else:
+ raise
+ return mylines
+
def writedict(mydict, myfilename, writekey=True):
- """Writes out a dict to a file; writekey=0 mode doesn't write out
- the key and assumes all values are strings, not lists."""
- lines = []
- if not writekey:
- for v in mydict.values():
- lines.append(v + "\n")
- else:
- for k, v in mydict.items():
- lines.append("%s %s\n" % (k, " ".join(v)))
- write_atomic(myfilename, "".join(lines))
+ """Writes out a dict to a file; writekey=0 mode doesn't write out
+ the key and assumes all values are strings, not lists."""
+ lines = []
+ if not writekey:
+ for v in mydict.values():
+ lines.append(v + "\n")
+ else:
+ for k, v in mydict.items():
+ lines.append(f"{k} {' '.join(v)}\n")
+ write_atomic(myfilename, "".join(lines))
def shlex_split(s):
- """
- This is equivalent to shlex.split, but if the current interpreter is
- python2, it temporarily encodes unicode strings to bytes since python2's
- shlex.split() doesn't handle unicode strings.
- """
- return shlex.split(s)
+ """
+ This is equivalent to shlex.split, but if the current interpreter is
+ python2, it temporarily encodes unicode strings to bytes since python2's
+ shlex.split() doesn't handle unicode strings.
+ """
+ return shlex.split(s)
class _getconfig_shlex(shlex.shlex):
+ def __init__(self, portage_tolerant=False, **kwargs):
+ shlex.shlex.__init__(self, **kwargs)
+ self.__portage_tolerant = portage_tolerant
+
+ def allow_sourcing(self, var_expand_map):
+ self.source = portage._native_string("source")
+ self.var_expand_map = var_expand_map
+
+ def sourcehook(self, newfile):
+ try:
+ newfile = varexpand(newfile, self.var_expand_map)
+ return shlex.shlex.sourcehook(self, newfile)
+ except OSError as e:
+ if e.errno == PermissionDenied.errno:
+ raise PermissionDenied(newfile)
+ if e.errno not in (errno.ENOENT, errno.ENOTDIR):
+ writemsg(f"open('{newfile}', 'r'): {e}\n", noiselevel=-1)
+ raise
+
+ msg = self.error_leader()
+ if e.errno == errno.ENOTDIR:
+ msg += _("%s: Not a directory") % newfile
+ else:
+ msg += _("%s: No such file or directory") % newfile
+
+ if self.__portage_tolerant:
+ writemsg(f"{msg}\n", noiselevel=-1)
+ else:
+ raise ParseError(msg)
+ return (newfile, io.StringIO())
+
+
+_invalid_var_name_re = re.compile(r"^\d|\W")
+
+
+def getconfig(
+ mycfg, tolerant=False, allow_sourcing=False, expand=True, recursive=False
+):
+ if isinstance(expand, dict):
+ # Some existing variable definitions have been
+ # passed in, for use in substitutions.
+ expand_map = expand
+ expand = True
+ else:
+ expand_map = {}
+ mykeys = {}
+
+ if recursive:
+ # Emulate source commands so that syntax error messages
+ # can display real file names and line numbers.
+ if not expand:
+ expand_map = False
+ fname = None
+ for fname in _recursive_file_list(mycfg):
+ mykeys.update(
+ getconfig(
+ fname,
+ tolerant=tolerant,
+ allow_sourcing=allow_sourcing,
+ expand=expand_map,
+ recursive=False,
+ )
+ or {}
+ )
+ if fname is None:
+ return None
+ return mykeys
+
+ f = None
+ try:
+ f = open(
+ _unicode_encode(mycfg, encoding=_encodings["fs"], errors="strict"),
+ encoding=_encodings["content"],
+ errors="replace",
+ )
+ content = f.read()
+ except OSError as e:
+ if e.errno == PermissionDenied.errno:
+ raise PermissionDenied(mycfg)
+ if e.errno != errno.ENOENT:
+ writemsg(f"open('{mycfg}', 'r'): {e}\n", noiselevel=-1)
+ if e.errno not in (errno.EISDIR,):
+ raise
+ return None
+ finally:
+ if f is not None:
+ f.close()
+
+ # Since this file has unicode_literals enabled, and Python 2's
+ # shlex implementation does not support unicode, the following code
+ # uses _native_string() to encode unicode literals when necessary.
+
+ # Workaround for avoiding a silent error in shlex that is
+ # triggered by a source statement at the end of the file
+ # without a trailing newline after the source statement.
+ if content and content[-1] != portage._native_string("\n"):
+ content += portage._native_string("\n")
+
+ # Warn about dos-style line endings since that prevents
+ # people from being able to source them with bash.
+ if portage._native_string("\r") in content:
+ writemsg(
+ (
+ "!!! "
+ + _(
+ "Please use dos2unix to convert line endings "
+ + "in config file: '%s'"
+ )
+ + "\n"
+ )
+ % mycfg,
+ noiselevel=-1,
+ )
+
+ lex = None
+ try:
+ # The default shlex.sourcehook() implementation
+ # only joins relative paths when the infile
+ # attribute is properly set.
+ lex = _getconfig_shlex(
+ instream=content, infile=mycfg, posix=True, portage_tolerant=tolerant
+ )
+ lex.wordchars = portage._native_string(
+ string.digits + string.ascii_letters + r"~!@#$%*_\:;?,./-+{}"
+ )
+ lex.quotes = portage._native_string("\"'")
+ if allow_sourcing:
+ lex.allow_sourcing(expand_map)
+
+ while True:
+ key = _unicode_decode(lex.get_token())
+ if key == "export":
+ key = _unicode_decode(lex.get_token())
+ if key is None:
+ # normal end of file
+ break
+
+ equ = _unicode_decode(lex.get_token())
+ if not equ:
+ msg = lex.error_leader() + _("Unexpected EOF")
+ if not tolerant:
+ raise ParseError(msg)
+ else:
+ writemsg(f"{msg}\n", noiselevel=-1)
+ return mykeys
+
+ elif equ != "=":
+ msg = lex.error_leader() + _("Invalid token '%s' (not '=')") % (equ,)
+ if not tolerant:
+ raise ParseError(msg)
+ else:
+ writemsg(f"{msg}\n", noiselevel=-1)
+ return mykeys
+
+ val = _unicode_decode(lex.get_token())
+ if val is None:
+ msg = lex.error_leader() + _(
+ "Unexpected end of config file: variable '%s'"
+ ) % (key,)
+ if not tolerant:
+ raise ParseError(msg)
+ else:
+ writemsg(f"{msg}\n", noiselevel=-1)
+ return mykeys
+
+ if _invalid_var_name_re.search(key) is not None:
+ msg = lex.error_leader() + _("Invalid variable name '%s'") % (key,)
+ if not tolerant:
+ raise ParseError(msg)
+ writemsg(f"{msg}\n", noiselevel=-1)
+ continue
+
+ if expand:
+ mykeys[key] = varexpand(
+ val, mydict=expand_map, error_leader=lex.error_leader
+ )
+ expand_map[key] = mykeys[key]
+ else:
+ mykeys[key] = val
+ except SystemExit as e:
+ raise
+ except Exception as e:
+ if isinstance(e, ParseError) or lex is None:
+ raise
+ msg = f"{lex.error_leader()}{e}"
+ writemsg(f"{msg}\n", noiselevel=-1)
+ raise
+
+ return mykeys
- def __init__(self, portage_tolerant=False, **kwargs):
- shlex.shlex.__init__(self, **kwargs)
- self.__portage_tolerant = portage_tolerant
-
- def allow_sourcing(self, var_expand_map):
- self.source = portage._native_string("source")
- self.var_expand_map = var_expand_map
-
- def sourcehook(self, newfile):
- try:
- newfile = varexpand(newfile, self.var_expand_map)
- return shlex.shlex.sourcehook(self, newfile)
- except EnvironmentError as e:
- if e.errno == PermissionDenied.errno:
- raise PermissionDenied(newfile)
- if e.errno not in (errno.ENOENT, errno.ENOTDIR):
- writemsg("open('%s', 'r'): %s\n" % (newfile, e), noiselevel=-1)
- raise
-
- msg = self.error_leader()
- if e.errno == errno.ENOTDIR:
- msg += _("%s: Not a directory") % newfile
- else:
- msg += _("%s: No such file or directory") % newfile
-
- if self.__portage_tolerant:
- writemsg("%s\n" % msg, noiselevel=-1)
- else:
- raise ParseError(msg)
- return (newfile, io.StringIO())
-
-_invalid_var_name_re = re.compile(r'^\d|\W')
-
-def getconfig(mycfg, tolerant=False, allow_sourcing=False, expand=True,
- recursive=False):
-
- if isinstance(expand, dict):
- # Some existing variable definitions have been
- # passed in, for use in substitutions.
- expand_map = expand
- expand = True
- else:
- expand_map = {}
- mykeys = {}
-
- if recursive:
- # Emulate source commands so that syntax error messages
- # can display real file names and line numbers.
- if not expand:
- expand_map = False
- fname = None
- for fname in _recursive_file_list(mycfg):
- mykeys.update(getconfig(fname, tolerant=tolerant,
- allow_sourcing=allow_sourcing, expand=expand_map,
- recursive=False) or {})
- if fname is None:
- return None
- return mykeys
-
- f = None
- try:
- f = open(_unicode_encode(mycfg,
- encoding=_encodings['fs'], errors='strict'), mode='r',
- encoding=_encodings['content'], errors='replace')
- content = f.read()
- except IOError as e:
- if e.errno == PermissionDenied.errno:
- raise PermissionDenied(mycfg)
- if e.errno != errno.ENOENT:
- writemsg("open('%s', 'r'): %s\n" % (mycfg, e), noiselevel=-1)
- if e.errno not in (errno.EISDIR,):
- raise
- return None
- finally:
- if f is not None:
- f.close()
-
- # Since this file has unicode_literals enabled, and Python 2's
- # shlex implementation does not support unicode, the following code
- # uses _native_string() to encode unicode literals when necessary.
-
- # Workaround for avoiding a silent error in shlex that is
- # triggered by a source statement at the end of the file
- # without a trailing newline after the source statement.
- if content and content[-1] != portage._native_string('\n'):
- content += portage._native_string('\n')
-
- # Warn about dos-style line endings since that prevents
- # people from being able to source them with bash.
- if portage._native_string('\r') in content:
- writemsg(("!!! " + _("Please use dos2unix to convert line endings " + \
- "in config file: '%s'") + "\n") % mycfg, noiselevel=-1)
-
- lex = None
- try:
- # The default shlex.sourcehook() implementation
- # only joins relative paths when the infile
- # attribute is properly set.
- lex = _getconfig_shlex(instream=content, infile=mycfg, posix=True,
- portage_tolerant=tolerant)
- lex.wordchars = portage._native_string(string.digits +
- string.ascii_letters + r"~!@#$%*_\:;?,./-+{}")
- lex.quotes = portage._native_string("\"'")
- if allow_sourcing:
- lex.allow_sourcing(expand_map)
-
- while True:
- key = _unicode_decode(lex.get_token())
- if key == "export":
- key = _unicode_decode(lex.get_token())
- if key is None:
- #normal end of file
- break
-
- equ = _unicode_decode(lex.get_token())
- if not equ:
- msg = lex.error_leader() + _("Unexpected EOF")
- if not tolerant:
- raise ParseError(msg)
- else:
- writemsg("%s\n" % msg, noiselevel=-1)
- return mykeys
-
- elif equ != "=":
- msg = lex.error_leader() + \
- _("Invalid token '%s' (not '=')") % (equ,)
- if not tolerant:
- raise ParseError(msg)
- else:
- writemsg("%s\n" % msg, noiselevel=-1)
- return mykeys
-
- val = _unicode_decode(lex.get_token())
- if val is None:
- msg = lex.error_leader() + \
- _("Unexpected end of config file: variable '%s'") % (key,)
- if not tolerant:
- raise ParseError(msg)
- else:
- writemsg("%s\n" % msg, noiselevel=-1)
- return mykeys
-
- if _invalid_var_name_re.search(key) is not None:
- msg = lex.error_leader() + \
- _("Invalid variable name '%s'") % (key,)
- if not tolerant:
- raise ParseError(msg)
- writemsg("%s\n" % msg, noiselevel=-1)
- continue
-
- if expand:
- mykeys[key] = varexpand(val, mydict=expand_map,
- error_leader=lex.error_leader)
- expand_map[key] = mykeys[key]
- else:
- mykeys[key] = val
- except SystemExit as e:
- raise
- except Exception as e:
- if isinstance(e, ParseError) or lex is None:
- raise
- msg = "%s%s" % (lex.error_leader(), e)
- writemsg("%s\n" % msg, noiselevel=-1)
- raise
-
- return mykeys
_varexpand_word_chars = frozenset(string.ascii_letters + string.digits + "_")
_varexpand_unexpected_eof_msg = "unexpected EOF while looking for matching `}'"
+
def varexpand(mystring, mydict=None, error_leader=None):
- if mydict is None:
- mydict = {}
+ if mydict is None:
+ mydict = {}
- """
+ """
new variable expansion code. Preserves quotes, handles \n, etc.
This code is used by the configfile code, as well as others (parser)
This would be a good bunch of code to port to C.
"""
- numvars = 0
- # in single, double quotes
- insing = 0
- indoub = 0
- pos = 0
- length = len(mystring)
- newstring = []
- while pos < length:
- current = mystring[pos]
- if current == "'":
- if indoub:
- newstring.append("'")
- else:
- newstring.append("'") # Quote removal is handled by shlex.
- insing=not insing
- pos += 1
- continue
- elif current == '"':
- if insing:
- newstring.append('"')
- else:
- newstring.append('"') # Quote removal is handled by shlex.
- indoub=not indoub
- pos += 1
- continue
- if not insing:
- #expansion time
- if current == "\n":
- #convert newlines to spaces
- newstring.append(" ")
- pos += 1
- elif current == "\\":
- # For backslash expansion, this function used to behave like
- # echo -e, but that's not needed for our purposes. We want to
- # behave like bash does when expanding a variable assignment
- # in a sourced file, in which case it performs backslash
- # removal for \\ and \$ but nothing more. It also removes
- # escaped newline characters. Note that we don't handle
- # escaped quotes here, since getconfig() uses shlex
- # to handle that earlier.
- if pos + 1 >= len(mystring):
- newstring.append(current)
- break
- else:
- current = mystring[pos + 1]
- pos += 2
- if current == "$":
- newstring.append(current)
- elif current == "\\":
- newstring.append(current)
- # BUG: This spot appears buggy, but it's intended to
- # be bug-for-bug compatible with existing behavior.
- if pos < length and \
- mystring[pos] in ("'", '"', "$"):
- newstring.append(mystring[pos])
- pos += 1
- elif current == "\n":
- pass
- else:
- newstring.append(mystring[pos - 2:pos])
- continue
- elif current == "$":
- pos += 1
- if pos == length:
- # shells handle this like \$
- newstring.append(current)
- continue
-
- if mystring[pos] == "{":
- pos += 1
- if pos == length:
- msg = _varexpand_unexpected_eof_msg
- if error_leader is not None:
- msg = error_leader() + msg
- writemsg(msg + "\n", noiselevel=-1)
- return ""
-
- braced = True
- else:
- braced = False
- myvstart = pos
- while mystring[pos] in _varexpand_word_chars:
- if pos + 1 >= len(mystring):
- if braced:
- msg = _varexpand_unexpected_eof_msg
- if error_leader is not None:
- msg = error_leader() + msg
- writemsg(msg + "\n", noiselevel=-1)
- return ""
- pos += 1
- break
- pos += 1
- myvarname = mystring[myvstart:pos]
- if braced:
- if mystring[pos] != "}":
- msg = _varexpand_unexpected_eof_msg
- if error_leader is not None:
- msg = error_leader() + msg
- writemsg(msg + "\n", noiselevel=-1)
- return ""
- pos += 1
- if len(myvarname) == 0:
- msg = "$"
- if braced:
- msg += "{}"
- msg += ": bad substitution"
- if error_leader is not None:
- msg = error_leader() + msg
- writemsg(msg + "\n", noiselevel=-1)
- return ""
- numvars += 1
- if myvarname in mydict:
- newstring.append(mydict[myvarname])
- else:
- newstring.append(current)
- pos += 1
- else:
- newstring.append(current)
- pos += 1
-
- return "".join(newstring)
+ numvars = 0
+ # in single, double quotes
+ insing = 0
+ indoub = 0
+ pos = 0
+ length = len(mystring)
+ newstring = []
+ while pos < length:
+ current = mystring[pos]
+ if current == "'":
+ if indoub:
+ newstring.append("'")
+ else:
+ newstring.append("'") # Quote removal is handled by shlex.
+ insing = not insing
+ pos += 1
+ continue
+ elif current == '"':
+ if insing:
+ newstring.append('"')
+ else:
+ newstring.append('"') # Quote removal is handled by shlex.
+ indoub = not indoub
+ pos += 1
+ continue
+ if not insing:
+ # expansion time
+ if current == "\n":
+ # convert newlines to spaces
+ newstring.append(" ")
+ pos += 1
+ elif current == "\\":
+ # For backslash expansion, this function used to behave like
+ # echo -e, but that's not needed for our purposes. We want to
+ # behave like bash does when expanding a variable assignment
+ # in a sourced file, in which case it performs backslash
+ # removal for \\ and \$ but nothing more. It also removes
+ # escaped newline characters. Note that we don't handle
+ # escaped quotes here, since getconfig() uses shlex
+ # to handle that earlier.
+ if pos + 1 >= len(mystring):
+ newstring.append(current)
+ break
+ else:
+ current = mystring[pos + 1]
+ pos += 2
+ if current == "$":
+ newstring.append(current)
+ elif current == "\\":
+ newstring.append(current)
+ # BUG: This spot appears buggy, but it's intended to
+ # be bug-for-bug compatible with existing behavior.
+ if pos < length and mystring[pos] in ("'", '"', "$"):
+ newstring.append(mystring[pos])
+ pos += 1
+ elif current == "\n":
+ pass
+ else:
+ newstring.append(mystring[pos - 2 : pos])
+ continue
+ elif current == "$":
+ pos += 1
+ if pos == length:
+ # shells handle this like \$
+ newstring.append(current)
+ continue
+
+ if mystring[pos] == "{":
+ pos += 1
+ if pos == length:
+ msg = _varexpand_unexpected_eof_msg
+ if error_leader is not None:
+ msg = error_leader() + msg
+ writemsg(msg + "\n", noiselevel=-1)
+ return ""
+
+ braced = True
+ else:
+ braced = False
+ myvstart = pos
+ while mystring[pos] in _varexpand_word_chars:
+ if pos + 1 >= len(mystring):
+ if braced:
+ msg = _varexpand_unexpected_eof_msg
+ if error_leader is not None:
+ msg = error_leader() + msg
+ writemsg(msg + "\n", noiselevel=-1)
+ return ""
+ pos += 1
+ break
+ pos += 1
+ myvarname = mystring[myvstart:pos]
+ if braced:
+ if mystring[pos] != "}":
+ msg = _varexpand_unexpected_eof_msg
+ if error_leader is not None:
+ msg = error_leader() + msg
+ writemsg(msg + "\n", noiselevel=-1)
+ return ""
+ pos += 1
+ if len(myvarname) == 0:
+ msg = "$"
+ if braced:
+ msg += "{}"
+ msg += ": bad substitution"
+ if error_leader is not None:
+ msg = error_leader() + msg
+ writemsg(msg + "\n", noiselevel=-1)
+ return ""
+ numvars += 1
+ if myvarname in mydict:
+ newstring.append(mydict[myvarname])
+ else:
+ newstring.append(current)
+ pos += 1
+ else:
+ newstring.append(current)
+ pos += 1
+
+ return "".join(newstring)
+
# broken and removed, but can still be imported
pickle_write = None
+
def pickle_read(filename, default=None, debug=0):
- if not os.access(filename, os.R_OK):
- writemsg(_("pickle_read(): File not readable. '") + filename + "'\n", 1)
- return default
- data = None
- try:
- myf = open(_unicode_encode(filename,
- encoding=_encodings['fs'], errors='strict'), 'rb')
- mypickle = pickle.Unpickler(myf)
- data = mypickle.load()
- myf.close()
- del mypickle, myf
- writemsg(_("pickle_read(): Loaded pickle. '") + filename + "'\n", 1)
- except SystemExit as e:
- raise
- except Exception as e:
- writemsg(_("!!! Failed to load pickle: ") + str(e) + "\n", 1)
- data = default
- return data
+ if not os.access(filename, os.R_OK):
+ writemsg(_("pickle_read(): File not readable. '") + filename + "'\n", 1)
+ return default
+ data = None
+ try:
+ myf = open(
+ _unicode_encode(filename, encoding=_encodings["fs"], errors="strict"), "rb"
+ )
+ mypickle = pickle.Unpickler(myf)
+ data = mypickle.load()
+ myf.close()
+ del mypickle, myf
+ writemsg(_("pickle_read(): Loaded pickle. '") + filename + "'\n", 1)
+ except SystemExit as e:
+ raise
+ except Exception as e:
+ writemsg(_("!!! Failed to load pickle: ") + str(e) + "\n", 1)
+ data = default
+ return data
+
def dump_traceback(msg, noiselevel=1):
- info = sys.exc_info()
- if not info[2]:
- stack = traceback.extract_stack()[:-1]
- error = None
- else:
- stack = traceback.extract_tb(info[2])
- error = str(info[1])
- writemsg("\n====================================\n", noiselevel=noiselevel)
- writemsg("%s\n\n" % msg, noiselevel=noiselevel)
- for line in traceback.format_list(stack):
- writemsg(line, noiselevel=noiselevel)
- if error:
- writemsg(error+"\n", noiselevel=noiselevel)
- writemsg("====================================\n\n", noiselevel=noiselevel)
+ info = sys.exc_info()
+ if not info[2]:
+ stack = traceback.extract_stack()[:-1]
+ error = None
+ else:
+ stack = traceback.extract_tb(info[2])
+ error = str(info[1])
+ writemsg("\n====================================\n", noiselevel=noiselevel)
+ writemsg(f"{msg}\n\n", noiselevel=noiselevel)
+ for line in traceback.format_list(stack):
+ writemsg(line, noiselevel=noiselevel)
+ if error:
+ writemsg(error + "\n", noiselevel=noiselevel)
+ writemsg("====================================\n\n", noiselevel=noiselevel)
+
class cmp_sort_key:
- """
- In python-3.0 the list.sort() method no longer has a "cmp" keyword
- argument. This class acts as an adapter which converts a cmp function
- into one that's suitable for use as the "key" keyword argument to
- list.sort(), making it easier to port code for python-3.0 compatibility.
- It works by generating key objects which use the given cmp function to
- implement their __lt__ method.
-
- Beginning with Python 2.7 and 3.2, equivalent functionality is provided
- by functools.cmp_to_key().
- """
- __slots__ = ("_cmp_func",)
+ """
+ In python-3.0 the list.sort() method no longer has a "cmp" keyword
+ argument. This class acts as an adapter which converts a cmp function
+ into one that's suitable for use as the "key" keyword argument to
+ list.sort(), making it easier to port code for python-3.0 compatibility.
+ It works by generating key objects which use the given cmp function to
+ implement their __lt__ method.
+
+ Beginning with Python 2.7 and 3.2, equivalent functionality is provided
+ by functools.cmp_to_key().
+ """
+
+ __slots__ = ("_cmp_func",)
+
+ def __init__(self, cmp_func):
+ """
+ @type cmp_func: callable which takes 2 positional arguments
+ @param cmp_func: A cmp function.
+ """
+ self._cmp_func = cmp_func
- def __init__(self, cmp_func):
- """
- @type cmp_func: callable which takes 2 positional arguments
- @param cmp_func: A cmp function.
- """
- self._cmp_func = cmp_func
+ def __call__(self, lhs):
+ return self._cmp_key(self._cmp_func, lhs)
- def __call__(self, lhs):
- return self._cmp_key(self._cmp_func, lhs)
+ class _cmp_key:
+ __slots__ = ("_cmp_func", "_obj")
- class _cmp_key:
- __slots__ = ("_cmp_func", "_obj")
+ def __init__(self, cmp_func, obj):
+ self._cmp_func = cmp_func
+ self._obj = obj
- def __init__(self, cmp_func, obj):
- self._cmp_func = cmp_func
- self._obj = obj
+ def __lt__(self, other):
+ if other.__class__ is not self.__class__:
+ raise TypeError(
+ f"Expected type {self.__class__}, got {other.__class__}"
+ )
+ return self._cmp_func(self._obj, other._obj) < 0
- def __lt__(self, other):
- if other.__class__ is not self.__class__:
- raise TypeError("Expected type %s, got %s" % \
- (self.__class__, other.__class__))
- return self._cmp_func(self._obj, other._obj) < 0
def unique_array(s):
- """lifted from python cookbook, credit: Tim Peters
- Return a list of the elements in s in arbitrary order, sans duplicates"""
- n = len(s)
- # assume all elements are hashable, if so, it's linear
- try:
- return list(set(s))
- except TypeError:
- pass
-
- # so much for linear. abuse sort.
- try:
- t = list(s)
- t.sort()
- except TypeError:
- pass
- else:
- assert n > 0
- last = t[0]
- lasti = i = 1
- while i < n:
- if t[i] != last:
- t[lasti] = last = t[i]
- lasti += 1
- i += 1
- return t[:lasti]
-
- # blah. back to original portage.unique_array
- u = []
- for x in s:
- if x not in u:
- u.append(x)
- return u
+ """lifted from python cookbook, credit: Tim Peters
+ Return a list of the elements in s in arbitrary order, sans duplicates"""
+ n = len(s)
+ # assume all elements are hashable, if so, it's linear
+ try:
+ return list(set(s))
+ except TypeError:
+ pass
+
+ # so much for linear. abuse sort.
+ try:
+ t = list(s)
+ t.sort()
+ except TypeError:
+ pass
+ else:
+ assert n > 0
+ last = t[0]
+ lasti = i = 1
+ while i < n:
+ if t[i] != last:
+ t[lasti] = last = t[i]
+ lasti += 1
+ i += 1
+ return t[:lasti]
+
+ # blah. back to original portage.unique_array
+ u = []
+ for x in s:
+ if x not in u:
+ u.append(x)
+ return u
+
def unique_everseen(iterable, key=None):
- """
- List unique elements, preserving order. Remember all elements ever seen.
- Taken from itertools documentation.
- """
- # unique_everseen('AAAABBBCCDAABBB') --> A B C D
- # unique_everseen('ABBCcAD', str.lower) --> A B C D
- seen = set()
- seen_add = seen.add
- if key is None:
- for element in filterfalse(seen.__contains__, iterable):
- seen_add(element)
- yield element
- else:
- for element in iterable:
- k = key(element)
- if k not in seen:
- seen_add(k)
- yield element
+ """
+ List unique elements, preserving order. Remember all elements ever seen.
+ Taken from itertools documentation.
+ """
+ # unique_everseen('AAAABBBCCDAABBB') --> A B C D
+ # unique_everseen('ABBCcAD', str.lower) --> A B C D
+ seen = set()
+ seen_add = seen.add
+ if key is None:
+ for element in filterfalse(seen.__contains__, iterable):
+ seen_add(element)
+ yield element
+ else:
+ for element in iterable:
+ k = key(element)
+ if k not in seen:
+ seen_add(k)
+ yield element
+
def _do_stat(filename, follow_links=True):
- try:
- if follow_links:
- return os.stat(filename)
- return os.lstat(filename)
- except OSError as oe:
- func_call = "stat('%s')" % filename
- if oe.errno == errno.EPERM:
- raise OperationNotPermitted(func_call)
- if oe.errno == errno.EACCES:
- raise PermissionDenied(func_call)
- if oe.errno == errno.ENOENT:
- raise FileNotFound(filename)
- raise
-
-def apply_permissions(filename, uid=-1, gid=-1, mode=-1, mask=-1,
- stat_cached=None, follow_links=True):
- """Apply user, group, and mode bits to a file if the existing bits do not
- already match. The default behavior is to force an exact match of mode
- bits. When mask=0 is specified, mode bits on the target file are allowed
- to be a superset of the mode argument (via logical OR). When mask>0, the
- mode bits that the target file is allowed to have are restricted via
- logical XOR.
- Returns True if the permissions were modified and False otherwise."""
-
- modified = False
-
- # Since Python 3.4, chown requires int type (no proxies).
- uid = int(uid)
- gid = int(gid)
-
- if stat_cached is None:
- stat_cached = _do_stat(filename, follow_links=follow_links)
-
- if (uid != -1 and uid != stat_cached.st_uid) or \
- (gid != -1 and gid != stat_cached.st_gid):
- try:
- if follow_links:
- os.chown(filename, uid, gid)
- else:
- portage.data.lchown(filename, uid, gid)
- modified = True
- except OSError as oe:
- func_call = "chown('%s', %i, %i)" % (filename, uid, gid)
- if oe.errno == errno.EPERM:
- raise OperationNotPermitted(func_call)
- elif oe.errno == errno.EACCES:
- raise PermissionDenied(func_call)
- elif oe.errno == errno.EROFS:
- raise ReadOnlyFileSystem(func_call)
- elif oe.errno == errno.ENOENT:
- raise FileNotFound(filename)
- else:
- raise
-
- new_mode = -1
- st_mode = stat_cached.st_mode & 0o7777 # protect from unwanted bits
- if mask >= 0:
- if mode == -1:
- mode = 0 # Don't add any mode bits when mode is unspecified.
- else:
- mode = mode & 0o7777
- if (mode & st_mode != mode) or \
- ((mask ^ st_mode) & st_mode != st_mode):
- new_mode = mode | st_mode
- new_mode = (mask ^ new_mode) & new_mode
- elif mode != -1:
- mode = mode & 0o7777 # protect from unwanted bits
- if mode != st_mode:
- new_mode = mode
-
- # The chown system call may clear S_ISUID and S_ISGID
- # bits, so those bits are restored if necessary.
- if modified and new_mode == -1 and \
- (st_mode & stat.S_ISUID or st_mode & stat.S_ISGID):
- if mode == -1:
- new_mode = st_mode
- else:
- mode = mode & 0o7777
- if mask >= 0:
- new_mode = mode | st_mode
- new_mode = (mask ^ new_mode) & new_mode
- else:
- new_mode = mode
- if not (new_mode & stat.S_ISUID or new_mode & stat.S_ISGID):
- new_mode = -1
-
- if not follow_links and stat.S_ISLNK(stat_cached.st_mode):
- # Mode doesn't matter for symlinks.
- new_mode = -1
-
- if new_mode != -1:
- try:
- os.chmod(filename, new_mode)
- modified = True
- except OSError as oe:
- func_call = "chmod('%s', %s)" % (filename, oct(new_mode))
- if oe.errno == errno.EPERM:
- raise OperationNotPermitted(func_call)
- elif oe.errno == errno.EACCES:
- raise PermissionDenied(func_call)
- elif oe.errno == errno.EROFS:
- raise ReadOnlyFileSystem(func_call)
- elif oe.errno == errno.ENOENT:
- raise FileNotFound(filename)
- raise
- return modified
+ try:
+ if follow_links:
+ return os.stat(filename)
+ return os.lstat(filename)
+ except OSError as oe:
+ func_call = f"stat('{filename}')"
+ if oe.errno == errno.EPERM:
+ raise OperationNotPermitted(func_call)
+ if oe.errno == errno.EACCES:
+ raise PermissionDenied(func_call)
+ if oe.errno == errno.ENOENT:
+ raise FileNotFound(filename)
+ raise
+
+
+def apply_permissions(
+ filename, uid=-1, gid=-1, mode=-1, mask=-1, stat_cached=None, follow_links=True
+):
+ """Apply user, group, and mode bits to a file if the existing bits do not
+ already match. The default behavior is to force an exact match of mode
+ bits. When mask=0 is specified, mode bits on the target file are allowed
+ to be a superset of the mode argument (via logical OR). When mask>0, the
+ mode bits that the target file is allowed to have are restricted via
+ logical XOR.
+ Returns True if the permissions were modified and False otherwise."""
+
+ modified = False
+
+ # Since Python 3.4, chown requires int type (no proxies).
+ uid = int(uid)
+ gid = int(gid)
+
+ if stat_cached is None:
+ stat_cached = _do_stat(filename, follow_links=follow_links)
+
+ if (uid != -1 and uid != stat_cached.st_uid) or (
+ gid != -1 and gid != stat_cached.st_gid
+ ):
+ try:
+ if follow_links:
+ os.chown(filename, uid, gid)
+ else:
+ portage.data.lchown(filename, uid, gid)
+ modified = True
+ except OSError as oe:
+ func_call = "chown('%s', %i, %i)" % (filename, uid, gid)
+ if oe.errno == errno.EPERM:
+ raise OperationNotPermitted(func_call)
+ elif oe.errno == errno.EACCES:
+ raise PermissionDenied(func_call)
+ elif oe.errno == errno.EROFS:
+ raise ReadOnlyFileSystem(func_call)
+ elif oe.errno == errno.ENOENT:
+ raise FileNotFound(filename)
+ else:
+ raise
+
+ new_mode = -1
+ st_mode = stat_cached.st_mode & 0o7777 # protect from unwanted bits
+ if mask >= 0:
+ if mode == -1:
+ mode = 0 # Don't add any mode bits when mode is unspecified.
+ else:
+ mode = mode & 0o7777
+ if (mode & st_mode != mode) or ((mask ^ st_mode) & st_mode != st_mode):
+ new_mode = mode | st_mode
+ new_mode = (mask ^ new_mode) & new_mode
+ elif mode != -1:
+ mode = mode & 0o7777 # protect from unwanted bits
+ if mode != st_mode:
+ new_mode = mode
+
+ # The chown system call may clear S_ISUID and S_ISGID
+ # bits, so those bits are restored if necessary.
+ if (
+ modified
+ and new_mode == -1
+ and (st_mode & stat.S_ISUID or st_mode & stat.S_ISGID)
+ ):
+ if mode == -1:
+ new_mode = st_mode
+ else:
+ mode = mode & 0o7777
+ if mask >= 0:
+ new_mode = mode | st_mode
+ new_mode = (mask ^ new_mode) & new_mode
+ else:
+ new_mode = mode
+ if not (new_mode & stat.S_ISUID or new_mode & stat.S_ISGID):
+ new_mode = -1
+
+ if not follow_links and stat.S_ISLNK(stat_cached.st_mode):
+ # Mode doesn't matter for symlinks.
+ new_mode = -1
+
+ if new_mode != -1:
+ try:
+ os.chmod(filename, new_mode)
+ modified = True
+ except OSError as oe:
+ func_call = f"chmod('{filename}', {oct(new_mode)})"
+ if oe.errno == errno.EPERM:
+ raise OperationNotPermitted(func_call)
+ elif oe.errno == errno.EACCES:
+ raise PermissionDenied(func_call)
+ elif oe.errno == errno.EROFS:
+ raise ReadOnlyFileSystem(func_call)
+ elif oe.errno == errno.ENOENT:
+ raise FileNotFound(filename)
+ raise
+ return modified
+
def apply_stat_permissions(filename, newstat, **kwargs):
- """A wrapper around apply_secpass_permissions that gets
- uid, gid, and mode from a stat object"""
- return apply_secpass_permissions(filename, uid=newstat.st_uid, gid=newstat.st_gid,
- mode=newstat.st_mode, **kwargs)
-
-def apply_recursive_permissions(top, uid=-1, gid=-1,
- dirmode=-1, dirmask=-1, filemode=-1, filemask=-1, onerror=None):
- """A wrapper around apply_secpass_permissions that applies permissions
- recursively. If optional argument onerror is specified, it should be a
- function; it will be called with one argument, a PortageException instance.
- Returns True if all permissions are applied and False if some are left
- unapplied."""
-
- # Avoid issues with circular symbolic links, as in bug #339670.
- follow_links = False
-
- if onerror is None:
- # Default behavior is to dump errors to stderr so they won't
- # go unnoticed. Callers can pass in a quiet instance.
- def onerror(e):
- if isinstance(e, OperationNotPermitted):
- writemsg(_("Operation Not Permitted: %s\n") % str(e),
- noiselevel=-1)
- elif isinstance(e, FileNotFound):
- writemsg(_("File Not Found: '%s'\n") % str(e), noiselevel=-1)
- else:
- raise
-
- # For bug 554084, always apply permissions to a directory before
- # that directory is traversed.
- all_applied = True
-
- try:
- stat_cached = _do_stat(top, follow_links=follow_links)
- except FileNotFound:
- # backward compatibility
- return True
-
- if stat.S_ISDIR(stat_cached.st_mode):
- mode = dirmode
- mask = dirmask
- else:
- mode = filemode
- mask = filemask
-
- try:
- applied = apply_secpass_permissions(top,
- uid=uid, gid=gid, mode=mode, mask=mask,
- stat_cached=stat_cached, follow_links=follow_links)
- if not applied:
- all_applied = False
- except PortageException as e:
- all_applied = False
- onerror(e)
-
- for dirpath, dirnames, filenames in os.walk(top):
- for name, mode, mask in chain(
- ((x, filemode, filemask) for x in filenames),
- ((x, dirmode, dirmask) for x in dirnames)):
- try:
- applied = apply_secpass_permissions(os.path.join(dirpath, name),
- uid=uid, gid=gid, mode=mode, mask=mask,
- follow_links=follow_links)
- if not applied:
- all_applied = False
- except PortageException as e:
- # Ignore InvalidLocation exceptions such as FileNotFound
- # and DirectoryNotFound since sometimes things disappear,
- # like when adjusting permissions on DISTCC_DIR.
- if not isinstance(e, portage.exception.InvalidLocation):
- all_applied = False
- onerror(e)
- return all_applied
-
-def apply_secpass_permissions(filename, uid=-1, gid=-1, mode=-1, mask=-1,
- stat_cached=None, follow_links=True):
- """A wrapper around apply_permissions that uses secpass and simple
- logic to apply as much of the permissions as possible without
- generating an obviously avoidable permission exception. Despite
- attempts to avoid an exception, it's possible that one will be raised
- anyway, so be prepared.
- Returns True if all permissions are applied and False if some are left
- unapplied."""
-
- if stat_cached is None:
- stat_cached = _do_stat(filename, follow_links=follow_links)
-
- all_applied = True
-
- # Avoid accessing portage.data.secpass when possible, since
- # it triggers config loading (undesirable for chmod-lite).
- if (uid != -1 or gid != -1) and portage.data.secpass < 2:
-
- if uid != -1 and \
- uid != stat_cached.st_uid:
- all_applied = False
- uid = -1
-
- if gid != -1 and \
- gid != stat_cached.st_gid and \
- gid not in os.getgroups():
- all_applied = False
- gid = -1
-
- apply_permissions(filename, uid=uid, gid=gid, mode=mode, mask=mask,
- stat_cached=stat_cached, follow_links=follow_links)
- return all_applied
-
-class atomic_ofstream(ObjectProxy):
- """Write a file atomically via os.rename(). Atomic replacement prevents
- interprocess interference and prevents corruption of the target
- file when the write is interrupted (for example, when an 'out of space'
- error occurs)."""
-
- def __init__(self, filename, mode='w', follow_links=True, **kargs):
- """Opens a temporary filename.pid in the same directory as filename."""
- ObjectProxy.__init__(self)
- object.__setattr__(self, '_aborted', False)
- if 'b' in mode:
- open_func = open
- else:
- open_func = io.open
- kargs.setdefault('encoding', _encodings['content'])
- kargs.setdefault('errors', 'backslashreplace')
-
- if follow_links:
- canonical_path = os.path.realpath(filename)
- object.__setattr__(self, '_real_name', canonical_path)
- tmp_name = "%s.%i" % (canonical_path, portage.getpid())
- try:
- object.__setattr__(self, '_file',
- open_func(_unicode_encode(tmp_name,
- encoding=_encodings['fs'], errors='strict'),
- mode=mode, **kargs))
- return
- except IOError as e:
- if canonical_path == filename:
- raise
- # Ignore this error, since it's irrelevant
- # and the below open call will produce a
- # new error if necessary.
-
- object.__setattr__(self, '_real_name', filename)
- tmp_name = "%s.%i" % (filename, portage.getpid())
- object.__setattr__(self, '_file',
- open_func(_unicode_encode(tmp_name,
- encoding=_encodings['fs'], errors='strict'),
- mode=mode, **kargs))
-
- def _get_target(self):
- return object.__getattribute__(self, '_file')
-
- def __getattribute__(self, attr):
- if attr in ('close', 'abort', '__del__'):
- return object.__getattribute__(self, attr)
- return getattr(object.__getattribute__(self, '_file'), attr)
-
- def close(self):
- """Closes the temporary file, copies permissions (if possible),
- and performs the atomic replacement via os.rename(). If the abort()
- method has been called, then the temp file is closed and removed."""
- f = object.__getattribute__(self, '_file')
- real_name = object.__getattribute__(self, '_real_name')
- if not f.closed:
- try:
- f.close()
- if not object.__getattribute__(self, '_aborted'):
- try:
- apply_stat_permissions(f.name, os.stat(real_name))
- except OperationNotPermitted:
- pass
- except FileNotFound:
- pass
- except OSError as oe: # from the above os.stat call
- if oe.errno in (errno.ENOENT, errno.EPERM):
- pass
- else:
- raise
- os.rename(f.name, real_name)
- finally:
- # Make sure we cleanup the temp file
- # even if an exception is raised.
- try:
- os.unlink(f.name)
- except OSError as oe:
- pass
-
- def abort(self):
- """If an error occurs while writing the file, the user should
- call this method in order to leave the target file unchanged.
- This will call close() automatically."""
- if not object.__getattribute__(self, '_aborted'):
- object.__setattr__(self, '_aborted', True)
- self.close()
-
- def __del__(self):
- """If the user does not explicitly call close(), it is
- assumed that an error has occurred, so we abort()."""
- try:
- f = object.__getattribute__(self, '_file')
- except AttributeError:
- pass
- else:
- if not f.closed:
- self.abort()
- # ensure destructor from the base class is called
- base_destructor = getattr(ObjectProxy, '__del__', None)
- if base_destructor is not None:
- base_destructor(self)
+ """A wrapper around apply_secpass_permissions that gets
+ uid, gid, and mode from a stat object"""
+ return apply_secpass_permissions(
+ filename, uid=newstat.st_uid, gid=newstat.st_gid, mode=newstat.st_mode, **kwargs
+ )
+
+
+def apply_recursive_permissions(
+ top, uid=-1, gid=-1, dirmode=-1, dirmask=-1, filemode=-1, filemask=-1, onerror=None
+):
+ """A wrapper around apply_secpass_permissions that applies permissions
+ recursively. If optional argument onerror is specified, it should be a
+ function; it will be called with one argument, a PortageException instance.
+ Returns True if all permissions are applied and False if some are left
+ unapplied."""
+
+ # Avoid issues with circular symbolic links, as in bug #339670.
+ follow_links = False
+
+ if onerror is None:
+ # Default behavior is to dump errors to stderr so they won't
+ # go unnoticed. Callers can pass in a quiet instance.
+ def onerror(e):
+ if isinstance(e, OperationNotPermitted):
+ writemsg(_("Operation Not Permitted: %s\n") % str(e), noiselevel=-1)
+ elif isinstance(e, FileNotFound):
+ writemsg(_("File Not Found: '%s'\n") % str(e), noiselevel=-1)
+ else:
+ raise
+
+ # For bug 554084, always apply permissions to a directory before
+ # that directory is traversed.
+ all_applied = True
+
+ try:
+ stat_cached = _do_stat(top, follow_links=follow_links)
+ except FileNotFound:
+ # backward compatibility
+ return True
+
+ if stat.S_ISDIR(stat_cached.st_mode):
+ mode = dirmode
+ mask = dirmask
+ else:
+ mode = filemode
+ mask = filemask
+
+ try:
+ applied = apply_secpass_permissions(
+ top,
+ uid=uid,
+ gid=gid,
+ mode=mode,
+ mask=mask,
+ stat_cached=stat_cached,
+ follow_links=follow_links,
+ )
+ if not applied:
+ all_applied = False
+ except PortageException as e:
+ all_applied = False
+ onerror(e)
+
+ for dirpath, dirnames, filenames in os.walk(top):
+ for name, mode, mask in chain(
+ ((x, filemode, filemask) for x in filenames),
+ ((x, dirmode, dirmask) for x in dirnames),
+ ):
+ try:
+ applied = apply_secpass_permissions(
+ os.path.join(dirpath, name),
+ uid=uid,
+ gid=gid,
+ mode=mode,
+ mask=mask,
+ follow_links=follow_links,
+ )
+ if not applied:
+ all_applied = False
+ except PortageException as e:
+ # Ignore InvalidLocation exceptions such as FileNotFound
+ # and DirectoryNotFound since sometimes things disappear,
+ # like when adjusting permissions on DISTCC_DIR.
+ if not isinstance(e, portage.exception.InvalidLocation):
+ all_applied = False
+ onerror(e)
+ return all_applied
+
+
+def apply_secpass_permissions(
+ filename, uid=-1, gid=-1, mode=-1, mask=-1, stat_cached=None, follow_links=True
+):
+ """A wrapper around apply_permissions that uses secpass and simple
+ logic to apply as much of the permissions as possible without
+ generating an obviously avoidable permission exception. Despite
+ attempts to avoid an exception, it's possible that one will be raised
+ anyway, so be prepared.
+ Returns True if all permissions are applied and False if some are left
+ unapplied."""
+
+ if stat_cached is None:
+ stat_cached = _do_stat(filename, follow_links=follow_links)
+
+ all_applied = True
+
+ # Avoid accessing portage.data.secpass when possible, since
+ # it triggers config loading (undesirable for chmod-lite).
+ if (uid != -1 or gid != -1) and portage.data.secpass < 2:
+ if uid != -1 and uid != stat_cached.st_uid:
+ all_applied = False
+ uid = -1
+
+ if gid != -1 and gid != stat_cached.st_gid and gid not in os.getgroups():
+ all_applied = False
+ gid = -1
+
+ apply_permissions(
+ filename,
+ uid=uid,
+ gid=gid,
+ mode=mode,
+ mask=mask,
+ stat_cached=stat_cached,
+ follow_links=follow_links,
+ )
+ return all_applied
+
+
+class atomic_ofstream(AbstractContextManager, ObjectProxy):
+ """Write a file atomically via os.rename(). Atomic replacement prevents
+ interprocess interference and prevents corruption of the target
+ file when the write is interrupted (for example, when an 'out of space'
+ error occurs)."""
+
+ def __init__(self, filename, mode="w", follow_links=True, **kargs):
+ """Opens a temporary filename.pid in the same directory as filename."""
+ ObjectProxy.__init__(self)
+ object.__setattr__(self, "_aborted", False)
+ if "b" in mode:
+ open_func = open
+ else:
+ open_func = io.open
+ kargs.setdefault("encoding", _encodings["content"])
+ kargs.setdefault("errors", "backslashreplace")
+
+ if follow_links:
+ canonical_path = os.path.realpath(filename)
+ object.__setattr__(self, "_real_name", canonical_path)
+ tmp_name = "%s.%i" % (canonical_path, portage.getpid())
+ try:
+ object.__setattr__(
+ self,
+ "_file",
+ open_func(
+ _unicode_encode(
+ tmp_name, encoding=_encodings["fs"], errors="strict"
+ ),
+ mode=mode,
+ **kargs,
+ ),
+ )
+ return
+ except OSError as e:
+ if canonical_path == filename:
+ raise
+ # Ignore this error, since it's irrelevant
+ # and the below open call will produce a
+ # new error if necessary.
+
+ object.__setattr__(self, "_real_name", filename)
+ tmp_name = "%s.%i" % (filename, portage.getpid())
+ object.__setattr__(
+ self,
+ "_file",
+ open_func(
+ _unicode_encode(tmp_name, encoding=_encodings["fs"], errors="strict"),
+ mode=mode,
+ **kargs,
+ ),
+ )
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_type is not None:
+ self.abort()
+ else:
+ self.close()
+
+ def _get_target(self):
+ return object.__getattribute__(self, "_file")
+
+ def __getattribute__(self, attr):
+ if attr in ("close", "abort", "__del__"):
+ return object.__getattribute__(self, attr)
+ return getattr(object.__getattribute__(self, "_file"), attr)
+
+ def close(self):
+ """Closes the temporary file, copies permissions (if possible),
+ and performs the atomic replacement via os.rename(). If the abort()
+ method has been called, then the temp file is closed and removed."""
+ f = object.__getattribute__(self, "_file")
+ real_name = object.__getattribute__(self, "_real_name")
+ if not f.closed:
+ try:
+ f.close()
+ if not object.__getattribute__(self, "_aborted"):
+ try:
+ apply_stat_permissions(f.name, os.stat(real_name))
+ except OperationNotPermitted:
+ pass
+ except FileNotFound:
+ pass
+ except OSError as oe: # from the above os.stat call
+ if oe.errno in (errno.ENOENT, errno.EPERM):
+ pass
+ else:
+ raise
+ os.rename(f.name, real_name)
+ finally:
+ # Make sure we cleanup the temp file
+ # even if an exception is raised.
+ try:
+ os.unlink(f.name)
+ except OSError as oe:
+ pass
+
+ def abort(self):
+ """If an error occurs while writing the file, the user should
+ call this method in order to leave the target file unchanged.
+ This will call close() automatically."""
+ if not object.__getattribute__(self, "_aborted"):
+ object.__setattr__(self, "_aborted", True)
+ self.close()
+
+ def __del__(self):
+ """If the user does not explicitly call close(), it is
+ assumed that an error has occurred, so we abort()."""
+ try:
+ f = object.__getattribute__(self, "_file")
+ except AttributeError:
+ pass
+ else:
+ if not f.closed:
+ self.abort()
+ # ensure destructor from the base class is called
+ base_destructor = getattr(ObjectProxy, "__del__", None)
+ if base_destructor is not None:
+ base_destructor(self)
+
def write_atomic(file_path, content, **kwargs):
- f = None
- try:
- f = atomic_ofstream(file_path, **kwargs)
- f.write(content)
- f.close()
- except (IOError, OSError) as e:
- if f:
- f.abort()
- func_call = "write_atomic('%s')" % file_path
- if e.errno == errno.EPERM:
- raise OperationNotPermitted(func_call)
- elif e.errno == errno.EACCES:
- raise PermissionDenied(func_call)
- elif e.errno == errno.EROFS:
- raise ReadOnlyFileSystem(func_call)
- elif e.errno == errno.ENOENT:
- raise FileNotFound(file_path)
- else:
- raise
+ f = None
+ try:
+ f = atomic_ofstream(file_path, **kwargs)
+ f.write(content)
+ f.close()
+ except OSError as e:
+ if f:
+ f.abort()
+ func_call = f"write_atomic('{file_path}')"
+ if e.errno == errno.EPERM:
+ raise OperationNotPermitted(func_call)
+ elif e.errno == errno.EACCES:
+ raise PermissionDenied(func_call)
+ elif e.errno == errno.EROFS:
+ raise ReadOnlyFileSystem(func_call)
+ elif e.errno == errno.ENOENT:
+ raise FileNotFound(file_path)
+ else:
+ raise
-def ensure_dirs(dir_path, **kwargs):
- """Create a directory and call apply_permissions.
- Returns True if a directory is created or the permissions needed to be
- modified, and False otherwise.
- This function's handling of EEXIST errors makes it useful for atomic
- directory creation, in which multiple processes may be competing to
- create the same directory.
- """
+def ensure_dirs(dir_path, **kwargs):
+ """Create a directory and call apply_permissions.
+ Returns True if a directory is created or the permissions needed to be
+ modified, and False otherwise.
+
+ This function's handling of EEXIST errors makes it useful for atomic
+ directory creation, in which multiple processes may be competing to
+ create the same directory.
+ """
+
+ created_dir = False
+
+ try:
+ os.makedirs(dir_path)
+ created_dir = True
+ except OSError as oe:
+ func_call = f"makedirs('{dir_path}')"
+ if oe.errno in (errno.EEXIST,):
+ pass
+ else:
+ if os.path.isdir(dir_path):
+ # NOTE: DragonFly raises EPERM for makedir('/')
+ # and that is supposed to be ignored here.
+ # Also, sometimes mkdir raises EISDIR on FreeBSD
+ # and we want to ignore that too (bug #187518).
+ pass
+ elif oe.errno == errno.EPERM:
+ raise OperationNotPermitted(func_call)
+ elif oe.errno == errno.EACCES:
+ raise PermissionDenied(func_call)
+ elif oe.errno == errno.EROFS:
+ raise ReadOnlyFileSystem(func_call)
+ else:
+ raise
+ if kwargs:
+ perms_modified = apply_permissions(dir_path, **kwargs)
+ else:
+ perms_modified = False
+ return created_dir or perms_modified
- created_dir = False
-
- try:
- os.makedirs(dir_path)
- created_dir = True
- except OSError as oe:
- func_call = "makedirs('%s')" % dir_path
- if oe.errno in (errno.EEXIST,):
- pass
- else:
- if os.path.isdir(dir_path):
- # NOTE: DragonFly raises EPERM for makedir('/')
- # and that is supposed to be ignored here.
- # Also, sometimes mkdir raises EISDIR on FreeBSD
- # and we want to ignore that too (bug #187518).
- pass
- elif oe.errno == errno.EPERM:
- raise OperationNotPermitted(func_call)
- elif oe.errno == errno.EACCES:
- raise PermissionDenied(func_call)
- elif oe.errno == errno.EROFS:
- raise ReadOnlyFileSystem(func_call)
- else:
- raise
- if kwargs:
- perms_modified = apply_permissions(dir_path, **kwargs)
- else:
- perms_modified = False
- return created_dir or perms_modified
class LazyItemsDict(UserDict):
- """A mapping object that behaves like a standard dict except that it allows
- for lazy initialization of values via callable objects. Lazy items can be
- overwritten and deleted just as normal items."""
-
- __slots__ = ('lazy_items',)
-
- def __init__(self, *args, **kwargs):
-
- self.lazy_items = {}
- UserDict.__init__(self, *args, **kwargs)
-
- def addLazyItem(self, item_key, value_callable, *pargs, **kwargs):
- """Add a lazy item for the given key. When the item is requested,
- value_callable will be called with *pargs and **kwargs arguments."""
- self.lazy_items[item_key] = \
- self._LazyItem(value_callable, pargs, kwargs, False)
- # make it show up in self.keys(), etc...
- UserDict.__setitem__(self, item_key, None)
-
- def addLazySingleton(self, item_key, value_callable, *pargs, **kwargs):
- """This is like addLazyItem except value_callable will only be called
- a maximum of 1 time and the result will be cached for future requests."""
- self.lazy_items[item_key] = \
- self._LazyItem(value_callable, pargs, kwargs, True)
- # make it show up in self.keys(), etc...
- UserDict.__setitem__(self, item_key, None)
-
- def update(self, *args, **kwargs):
- if len(args) > 1:
- raise TypeError(
- "expected at most 1 positional argument, got " + \
- repr(len(args)))
- if args:
- map_obj = args[0]
- else:
- map_obj = None
- if map_obj is None:
- pass
- elif isinstance(map_obj, LazyItemsDict):
- for k in map_obj:
- if k in map_obj.lazy_items:
- UserDict.__setitem__(self, k, None)
- else:
- UserDict.__setitem__(self, k, map_obj[k])
- self.lazy_items.update(map_obj.lazy_items)
- else:
- UserDict.update(self, map_obj)
- if kwargs:
- UserDict.update(self, kwargs)
-
- def __getitem__(self, item_key):
- if item_key in self.lazy_items:
- lazy_item = self.lazy_items[item_key]
- pargs = lazy_item.pargs
- if pargs is None:
- pargs = ()
- kwargs = lazy_item.kwargs
- if kwargs is None:
- kwargs = {}
- result = lazy_item.func(*pargs, **kwargs)
- if lazy_item.singleton:
- self[item_key] = result
- return result
-
- return UserDict.__getitem__(self, item_key)
-
- def __setitem__(self, item_key, value):
- if item_key in self.lazy_items:
- del self.lazy_items[item_key]
- UserDict.__setitem__(self, item_key, value)
-
- def __delitem__(self, item_key):
- if item_key in self.lazy_items:
- del self.lazy_items[item_key]
- UserDict.__delitem__(self, item_key)
-
- def clear(self):
- self.lazy_items.clear()
- UserDict.clear(self)
-
- def copy(self):
- return self.__copy__()
-
- def __copy__(self):
- return self.__class__(self)
-
- def __deepcopy__(self, memo=None):
- """
- This forces evaluation of each contained lazy item, and deepcopy of
- the result. A TypeError is raised if any contained lazy item is not
- a singleton, since it is not necessarily possible for the behavior
- of this type of item to be safely preserved.
- """
- if memo is None:
- memo = {}
- result = self.__class__()
- memo[id(self)] = result
- for k in self:
- k_copy = deepcopy(k, memo)
- lazy_item = self.lazy_items.get(k)
- if lazy_item is not None:
- if not lazy_item.singleton:
- raise TypeError("LazyItemsDict " + \
- "deepcopy is unsafe with lazy items that are " + \
- "not singletons: key=%s value=%s" % (k, lazy_item,))
- UserDict.__setitem__(result, k_copy, deepcopy(self[k], memo))
- return result
-
- class _LazyItem:
-
- __slots__ = ('func', 'pargs', 'kwargs', 'singleton')
-
- def __init__(self, func, pargs, kwargs, singleton):
-
- if not pargs:
- pargs = None
- if not kwargs:
- kwargs = None
-
- self.func = func
- self.pargs = pargs
- self.kwargs = kwargs
- self.singleton = singleton
-
- def __copy__(self):
- return self.__class__(self.func, self.pargs,
- self.kwargs, self.singleton)
-
- def __deepcopy__(self, memo=None):
- """
- Override this since the default implementation can fail silently,
- leaving some attributes unset.
- """
- if memo is None:
- memo = {}
- result = self.__copy__()
- memo[id(self)] = result
- result.func = deepcopy(self.func, memo)
- result.pargs = deepcopy(self.pargs, memo)
- result.kwargs = deepcopy(self.kwargs, memo)
- result.singleton = deepcopy(self.singleton, memo)
- return result
+ """A mapping object that behaves like a standard dict except that it allows
+ for lazy initialization of values via callable objects. Lazy items can be
+ overwritten and deleted just as normal items."""
+
+ __slots__ = ("lazy_items",)
+
+ def __init__(self, *args, **kwargs):
+ self.lazy_items = {}
+ UserDict.__init__(self, *args, **kwargs)
+
+ def addLazyItem(self, item_key, value_callable, *pargs, **kwargs):
+ """Add a lazy item for the given key. When the item is requested,
+ value_callable will be called with *pargs and **kwargs arguments."""
+ self.lazy_items[item_key] = self._LazyItem(value_callable, pargs, kwargs, False)
+ # make it show up in self.keys(), etc...
+ UserDict.__setitem__(self, item_key, None)
+
+ def addLazySingleton(self, item_key, value_callable, *pargs, **kwargs):
+ """This is like addLazyItem except value_callable will only be called
+ a maximum of 1 time and the result will be cached for future requests."""
+ self.lazy_items[item_key] = self._LazyItem(value_callable, pargs, kwargs, True)
+ # make it show up in self.keys(), etc...
+ UserDict.__setitem__(self, item_key, None)
+
+ def update(self, *args, **kwargs):
+ if len(args) > 1:
+ raise TypeError(
+ "expected at most 1 positional argument, got " + repr(len(args))
+ )
+ if args:
+ map_obj = args[0]
+ else:
+ map_obj = None
+ if map_obj is None:
+ pass
+ elif isinstance(map_obj, LazyItemsDict):
+ for k in map_obj:
+ if k in map_obj.lazy_items:
+ UserDict.__setitem__(self, k, None)
+ else:
+ UserDict.__setitem__(self, k, map_obj[k])
+ self.lazy_items.update(map_obj.lazy_items)
+ else:
+ UserDict.update(self, map_obj)
+ if kwargs:
+ UserDict.update(self, kwargs)
+
+ def __getitem__(self, item_key):
+ if item_key in self.lazy_items:
+ lazy_item = self.lazy_items[item_key]
+ pargs = lazy_item.pargs
+ if pargs is None:
+ pargs = ()
+ kwargs = lazy_item.kwargs
+ if kwargs is None:
+ kwargs = {}
+ result = lazy_item.func(*pargs, **kwargs)
+ if lazy_item.singleton:
+ self[item_key] = result
+ return result
+
+ return UserDict.__getitem__(self, item_key)
+
+ def __setitem__(self, item_key, value):
+ if item_key in self.lazy_items:
+ del self.lazy_items[item_key]
+ UserDict.__setitem__(self, item_key, value)
+
+ def __delitem__(self, item_key):
+ if item_key in self.lazy_items:
+ del self.lazy_items[item_key]
+ UserDict.__delitem__(self, item_key)
+
+ def clear(self):
+ self.lazy_items.clear()
+ UserDict.clear(self)
+
+ def copy(self):
+ return self.__copy__()
+
+ def __copy__(self):
+ return self.__class__(self)
+
+ def __deepcopy__(self, memo=None):
+ """
+ This forces evaluation of each contained lazy item, and deepcopy of
+ the result. A TypeError is raised if any contained lazy item is not
+ a singleton, since it is not necessarily possible for the behavior
+ of this type of item to be safely preserved.
+ """
+ if memo is None:
+ memo = {}
+ result = self.__class__()
+ memo[id(self)] = result
+ for k in self:
+ k_copy = deepcopy(k, memo)
+ lazy_item = self.lazy_items.get(k)
+ if lazy_item is not None:
+ if not lazy_item.singleton:
+ raise TypeError(
+ "LazyItemsDict "
+ + "deepcopy is unsafe with lazy items that are "
+ + f"not singletons: key={k} value={lazy_item}"
+ )
+ UserDict.__setitem__(result, k_copy, deepcopy(self[k], memo))
+ return result
+
+ class _LazyItem:
+ __slots__ = ("func", "pargs", "kwargs", "singleton")
+
+ def __init__(self, func, pargs, kwargs, singleton):
+ if not pargs:
+ pargs = None
+ if not kwargs:
+ kwargs = None
+
+ self.func = func
+ self.pargs = pargs
+ self.kwargs = kwargs
+ self.singleton = singleton
+
+ def __copy__(self):
+ return self.__class__(self.func, self.pargs, self.kwargs, self.singleton)
+
+ def __deepcopy__(self, memo=None):
+ """
+ Override this since the default implementation can fail silently,
+ leaving some attributes unset.
+ """
+ if memo is None:
+ memo = {}
+ result = self.__copy__()
+ memo[id(self)] = result
+ result.func = deepcopy(self.func, memo)
+ result.pargs = deepcopy(self.pargs, memo)
+ result.kwargs = deepcopy(self.kwargs, memo)
+ result.singleton = deepcopy(self.singleton, memo)
+ return result
+
class ConfigProtect:
- def __init__(self, myroot, protect_list, mask_list,
- case_insensitive=False):
- self.myroot = myroot
- self.protect_list = protect_list
- self.mask_list = mask_list
- self.case_insensitive = case_insensitive
- self.updateprotect()
-
- def updateprotect(self):
- """Update internal state for isprotected() calls. Nonexistent paths
- are ignored."""
-
- os = _os_merge
-
- self.protect = []
- self._dirs = set()
- for x in self.protect_list:
- ppath = normalize_path(
- os.path.join(self.myroot, x.lstrip(os.path.sep)))
- # Protect files that don't exist (bug #523684). If the
- # parent directory doesn't exist, we can safely skip it.
- if os.path.isdir(os.path.dirname(ppath)):
- self.protect.append(ppath)
- try:
- if stat.S_ISDIR(os.stat(ppath).st_mode):
- self._dirs.add(ppath)
- except OSError:
- pass
-
- self.protectmask = []
- for x in self.mask_list:
- ppath = normalize_path(
- os.path.join(self.myroot, x.lstrip(os.path.sep)))
- if self.case_insensitive:
- ppath = ppath.lower()
- try:
- """Use lstat so that anything, even a broken symlink can be
- protected."""
- if stat.S_ISDIR(os.lstat(ppath).st_mode):
- self._dirs.add(ppath)
- self.protectmask.append(ppath)
- """Now use stat in case this is a symlink to a directory."""
- if stat.S_ISDIR(os.stat(ppath).st_mode):
- self._dirs.add(ppath)
- except OSError:
- # If it doesn't exist, there's no need to mask it.
- pass
-
- def isprotected(self, obj):
- """Returns True if obj is protected, False otherwise. The caller must
- ensure that obj is normalized with a single leading slash. A trailing
- slash is optional for directories."""
- masked = 0
- protected = 0
- sep = os.path.sep
- if self.case_insensitive:
- obj = obj.lower()
- for ppath in self.protect:
- if len(ppath) > masked and obj.startswith(ppath):
- if ppath in self._dirs:
- if obj != ppath and not obj.startswith(ppath + sep):
- # /etc/foo does not match /etc/foobaz
- continue
- elif obj != ppath:
- # force exact match when CONFIG_PROTECT lists a
- # non-directory
- continue
- protected = len(ppath)
- #config file management
- for pmpath in self.protectmask:
- if len(pmpath) >= protected and obj.startswith(pmpath):
- if pmpath in self._dirs:
- if obj != pmpath and \
- not obj.startswith(pmpath + sep):
- # /etc/foo does not match /etc/foobaz
- continue
- elif obj != pmpath:
- # force exact match when CONFIG_PROTECT_MASK lists
- # a non-directory
- continue
- #skip, it's in the mask
- masked = len(pmpath)
- return protected > masked
+ def __init__(self, myroot, protect_list, mask_list, case_insensitive=False):
+ self.myroot = myroot
+ self.protect_list = protect_list
+ self.mask_list = mask_list
+ self.case_insensitive = case_insensitive
+ self.updateprotect()
+
+ def updateprotect(self):
+ """Update internal state for isprotected() calls. Nonexistent paths
+ are ignored."""
+
+ os = _os_merge
+
+ self.protect = []
+ self._dirs = set()
+ for x in self.protect_list:
+ ppath = normalize_path(os.path.join(self.myroot, x.lstrip(os.path.sep)))
+ # Protect files that don't exist (bug #523684). If the
+ # parent directory doesn't exist, we can safely skip it.
+ if os.path.isdir(os.path.dirname(ppath)):
+ self.protect.append(ppath)
+ try:
+ if stat.S_ISDIR(os.stat(ppath).st_mode):
+ self._dirs.add(ppath)
+ except OSError:
+ pass
+
+ self.protectmask = []
+ for x in self.mask_list:
+ ppath = normalize_path(os.path.join(self.myroot, x.lstrip(os.path.sep)))
+ if self.case_insensitive:
+ ppath = ppath.lower()
+ try:
+ """Use lstat so that anything, even a broken symlink can be
+ protected."""
+ if stat.S_ISDIR(os.lstat(ppath).st_mode):
+ self._dirs.add(ppath)
+ self.protectmask.append(ppath)
+ """Now use stat in case this is a symlink to a directory."""
+ if stat.S_ISDIR(os.stat(ppath).st_mode):
+ self._dirs.add(ppath)
+ except OSError:
+ # If it doesn't exist, there's no need to mask it.
+ pass
+
+ def isprotected(self, obj):
+ """Returns True if obj is protected, False otherwise. The caller must
+ ensure that obj is normalized with a single leading slash. A trailing
+ slash is optional for directories."""
+ masked = 0
+ protected = 0
+ sep = os.path.sep
+ if self.case_insensitive:
+ obj = obj.lower()
+ for ppath in self.protect:
+ if len(ppath) > masked and obj.startswith(ppath):
+ if ppath in self._dirs:
+ if obj != ppath and not obj.startswith(ppath + sep):
+ # /etc/foo does not match /etc/foobaz
+ continue
+ elif obj != ppath:
+ # force exact match when CONFIG_PROTECT lists a
+ # non-directory
+ continue
+ protected = len(ppath)
+ # config file management
+ for pmpath in self.protectmask:
+ if len(pmpath) >= protected and obj.startswith(pmpath):
+ if pmpath in self._dirs:
+ if obj != pmpath and not obj.startswith(pmpath + sep):
+ # /etc/foo does not match /etc/foobaz
+ continue
+ elif obj != pmpath:
+ # force exact match when CONFIG_PROTECT_MASK lists
+ # a non-directory
+ continue
+ # skip, it's in the mask
+ masked = len(pmpath)
+ return protected > masked
+
def new_protect_filename(mydest, newmd5=None, force=False):
- """Resolves a config-protect filename for merging, optionally
- using the last filename if the md5 matches. If force is True,
- then a new filename will be generated even if mydest does not
- exist yet.
- (dest,md5) ==> 'string' --- path_to_target_filename
- (dest) ==> ('next', 'highest') --- next_target and most-recent_target
- """
+ """Resolves a config-protect filename for merging, optionally
+ using the last filename if the md5 matches. If force is True,
+ then a new filename will be generated even if mydest does not
+ exist yet.
+ (dest,md5) ==> 'string' --- path_to_target_filename
+ (dest) ==> ('next', 'highest') --- next_target and most-recent_target
+ """
+
+ # config protection filename format:
+ # ._cfg0000_foo
+ # 0123456789012
+
+ os = _os_merge
+
+ prot_num = -1
+ last_pfile = ""
+
+ if not force and not os.path.exists(mydest):
+ return mydest
+
+ real_filename = os.path.basename(mydest)
+ real_dirname = os.path.dirname(mydest)
+ for pfile in os.listdir(real_dirname):
+ if pfile[0:5] != "._cfg":
+ continue
+ if pfile[10:] != real_filename:
+ continue
+ try:
+ new_prot_num = int(pfile[5:9])
+ if new_prot_num > prot_num:
+ prot_num = new_prot_num
+ last_pfile = pfile
+ except ValueError:
+ continue
+ prot_num = prot_num + 1
+
+ new_pfile = normalize_path(
+ os.path.join(
+ real_dirname, "._cfg" + str(prot_num).zfill(4) + "_" + real_filename
+ )
+ )
+ old_pfile = normalize_path(os.path.join(real_dirname, last_pfile))
+ if last_pfile and newmd5:
+ try:
+ old_pfile_st = os.lstat(old_pfile)
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ else:
+ if stat.S_ISLNK(old_pfile_st.st_mode):
+ try:
+ # Read symlink target as bytes, in case the
+ # target path has a bad encoding.
+ pfile_link = os.readlink(
+ _unicode_encode(
+ old_pfile, encoding=_encodings["merge"], errors="strict"
+ )
+ )
+ except OSError:
+ if e.errno != errno.ENOENT:
+ raise
+ else:
+ pfile_link = _unicode_decode(
+ pfile_link, encoding=_encodings["merge"], errors="replace"
+ )
+ if pfile_link == newmd5:
+ return old_pfile
+ else:
+ try:
+ last_pfile_md5 = portage.checksum._perform_md5_merge(old_pfile)
+ except FileNotFound:
+ # The file suddenly disappeared or it's a
+ # broken symlink.
+ pass
+ else:
+ if last_pfile_md5 == newmd5:
+ return old_pfile
+ return new_pfile
- # config protection filename format:
- # ._cfg0000_foo
- # 0123456789012
-
- os = _os_merge
-
- prot_num = -1
- last_pfile = ""
-
- if not force and \
- not os.path.exists(mydest):
- return mydest
-
- real_filename = os.path.basename(mydest)
- real_dirname = os.path.dirname(mydest)
- for pfile in os.listdir(real_dirname):
- if pfile[0:5] != "._cfg":
- continue
- if pfile[10:] != real_filename:
- continue
- try:
- new_prot_num = int(pfile[5:9])
- if new_prot_num > prot_num:
- prot_num = new_prot_num
- last_pfile = pfile
- except ValueError:
- continue
- prot_num = prot_num + 1
-
- new_pfile = normalize_path(os.path.join(real_dirname,
- "._cfg" + str(prot_num).zfill(4) + "_" + real_filename))
- old_pfile = normalize_path(os.path.join(real_dirname, last_pfile))
- if last_pfile and newmd5:
- try:
- old_pfile_st = os.lstat(old_pfile)
- except OSError as e:
- if e.errno != errno.ENOENT:
- raise
- else:
- if stat.S_ISLNK(old_pfile_st.st_mode):
- try:
- # Read symlink target as bytes, in case the
- # target path has a bad encoding.
- pfile_link = os.readlink(_unicode_encode(old_pfile,
- encoding=_encodings['merge'], errors='strict'))
- except OSError:
- if e.errno != errno.ENOENT:
- raise
- else:
- pfile_link = _unicode_decode(pfile_link,
- encoding=_encodings['merge'], errors='replace')
- if pfile_link == newmd5:
- return old_pfile
- else:
- try:
- last_pfile_md5 = \
- portage.checksum._perform_md5_merge(old_pfile)
- except FileNotFound:
- # The file suddenly disappeared or it's a
- # broken symlink.
- pass
- else:
- if last_pfile_md5 == newmd5:
- return old_pfile
- return new_pfile
def find_updated_config_files(target_root, config_protect):
- """
- Return a tuple of configuration files that needs to be updated.
- The tuple contains lists organized like this:
- [protected_dir, file_list]
- If the protected config isn't a protected_dir but a procted_file, list is:
- [protected_file, None]
- If no configuration files needs to be updated, None is returned
- """
+ """
+ Return a tuple of configuration files that needs to be updated.
+ The tuple contains lists organized like this:
+ [protected_dir, file_list]
+ If the protected config isn't a protected_dir but a procted_file, list is:
+ [protected_file, None]
+ If no configuration files needs to be updated, None is returned
+ """
+
+ encoding = _encodings["fs"]
+
+ if config_protect:
+ # directories with some protect files in them
+ for x in config_protect:
+ files = []
+
+ x = os.path.join(target_root, x.lstrip(os.path.sep))
+ if not os.access(x, os.W_OK):
+ continue
+ try:
+ mymode = os.lstat(x).st_mode
+ except OSError:
+ continue
+
+ if stat.S_ISLNK(mymode):
+ # We want to treat it like a directory if it
+ # is a symlink to an existing directory.
+ try:
+ real_mode = os.stat(x).st_mode
+ if stat.S_ISDIR(real_mode):
+ mymode = real_mode
+ except OSError:
+ pass
+
+ if stat.S_ISDIR(mymode):
+ mycommand = (
+ f"find '{x}' -name '.*' -type d -prune -o -name '._cfg????_*'"
+ )
+ else:
+ mycommand = (
+ "find '%s' -maxdepth 1 -name '._cfg????_%s'"
+ % os.path.split(x.rstrip(os.path.sep))
+ )
+ mycommand += " ! -name '.*~' ! -iname '.*.bak' -print0"
+ cmd = shlex_split(mycommand)
+
+ cmd = [
+ _unicode_encode(arg, encoding=encoding, errors="strict") for arg in cmd
+ ]
+ proc = subprocess.Popen(
+ cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
+ )
+ output = _unicode_decode(proc.communicate()[0], encoding=encoding)
+ status = proc.wait()
+ if os.WIFEXITED(status) and os.WEXITSTATUS(status) == os.EX_OK:
+ files = output.split("\0")
+ # split always produces an empty string as the last element
+ if files and not files[-1]:
+ del files[-1]
+ if files:
+ if stat.S_ISDIR(mymode):
+ yield (x, files)
+ else:
+ yield (x, None)
+
+
+_ld_so_include_re = re.compile(r"^include\s+(\S.*)")
- encoding = _encodings['fs']
-
- if config_protect:
- # directories with some protect files in them
- for x in config_protect:
- files = []
-
- x = os.path.join(target_root, x.lstrip(os.path.sep))
- if not os.access(x, os.W_OK):
- continue
- try:
- mymode = os.lstat(x).st_mode
- except OSError:
- continue
-
- if stat.S_ISLNK(mymode):
- # We want to treat it like a directory if it
- # is a symlink to an existing directory.
- try:
- real_mode = os.stat(x).st_mode
- if stat.S_ISDIR(real_mode):
- mymode = real_mode
- except OSError:
- pass
-
- if stat.S_ISDIR(mymode):
- mycommand = \
- "find '%s' -name '.*' -type d -prune -o -name '._cfg????_*'" % x
- else:
- mycommand = "find '%s' -maxdepth 1 -name '._cfg????_%s'" % \
- os.path.split(x.rstrip(os.path.sep))
- mycommand += " ! -name '.*~' ! -iname '.*.bak' -print0"
- cmd = shlex_split(mycommand)
-
- cmd = [_unicode_encode(arg, encoding=encoding, errors='strict')
- for arg in cmd]
- proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- output = _unicode_decode(proc.communicate()[0], encoding=encoding)
- status = proc.wait()
- if os.WIFEXITED(status) and os.WEXITSTATUS(status) == os.EX_OK:
- files = output.split('\0')
- # split always produces an empty string as the last element
- if files and not files[-1]:
- del files[-1]
- if files:
- if stat.S_ISDIR(mymode):
- yield (x, files)
- else:
- yield (x, None)
-
-_ld_so_include_re = re.compile(r'^include\s+(\S.*)')
def getlibpaths(root, env=None):
- def read_ld_so_conf(path):
- for l in grabfile(path):
- include_match = _ld_so_include_re.match(l)
- if include_match is not None:
- subpath = os.path.join(os.path.dirname(path),
- include_match.group(1))
- for p in glob.glob(subpath):
- for r in read_ld_so_conf(p):
- yield r
- else:
- yield l
-
- """ Return a list of paths that are used for library lookups """
- if env is None:
- env = os.environ
- # the following is based on the information from ld.so(8)
- rval = env.get("LD_LIBRARY_PATH", "").split(":")
- rval.extend(read_ld_so_conf(os.path.join(root, "etc", "ld.so.conf")))
- rval.append("/usr/lib")
- rval.append("/lib")
-
- return [normalize_path(x) for x in rval if x]
+ def read_ld_so_conf(path):
+ for l in grabfile(path):
+ include_match = _ld_so_include_re.match(l)
+ if include_match is not None:
+ subpath = os.path.join(os.path.dirname(path), include_match.group(1))
+ for p in glob.glob(subpath):
+ yield from read_ld_so_conf(p)
+ else:
+ yield l
+
+ """ Return a list of paths that are used for library lookups """
+ if env is None:
+ env = os.environ
+ # the following is based on the information from ld.so(8)
+ rval = env.get("LD_LIBRARY_PATH", "").split(":")
+ rval.extend(read_ld_so_conf(os.path.join(root, "etc", "ld.so.conf")))
+ rval.append("/usr/lib")
+ rval.append("/lib")
+
+ return [normalize_path(x) for x in rval if x]
+
+
+def no_color(settings: Optional[dict]) -> bool:
+ # In several years (2026+), we can cleanup NOCOLOR support, and just support NO_COLOR.
+ has_color: str = settings.get("NO_COLOR")
+ nocolor: str = settings.get("NOCOLOR", "false").lower()
+ if has_color is None:
+ return nocolor in ("yes", "true")
+ return bool(has_color)