1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
# -*- coding:utf-8 -*-
import codecs
import subprocess
import sys
# import our initialized portage instance
from repoman._portage import portage
from portage import os
from portage.process import find_binary
from portage import _encodings, _unicode_encode
def repoman_getstatusoutput(cmd):
"""
Implements an interface similar to getstatusoutput(), but with
customized unicode handling (see bug #310789) and without the shell.
"""
args = portage.util.shlex_split(cmd)
if sys.hexversion < 0x3020000 and sys.hexversion >= 0x3000000 and \
not os.path.isabs(args[0]):
# Python 3.1 _execvp throws TypeError for non-absolute executable
# path passed as bytes (see https://bugs.python.org/issue8513).
fullname = find_binary(args[0])
if fullname is None:
raise portage.exception.CommandNotFound(args[0])
args[0] = fullname
encoding = _encodings['fs']
args = [
_unicode_encode(x, encoding=encoding, errors='strict') for x in args]
proc = subprocess.Popen(
args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = portage._unicode_decode(
proc.communicate()[0], encoding=encoding, errors='strict')
if output and output[-1] == "\n":
# getstatusoutput strips one newline
output = output[:-1]
return (proc.wait(), output)
class repoman_popen(portage.proxy.objectproxy.ObjectProxy):
"""
Implements an interface similar to os.popen(), but with customized
unicode handling (see bug #310789) and without the shell.
"""
__slots__ = ('_proc', '_stdout')
def __init__(self, cmd):
args = portage.util.shlex_split(cmd)
if sys.hexversion < 0x3020000 and sys.hexversion >= 0x3000000 and \
not os.path.isabs(args[0]):
# Python 3.1 _execvp throws TypeError for non-absolute executable
# path passed as bytes (see https://bugs.python.org/issue8513).
fullname = find_binary(args[0])
if fullname is None:
raise portage.exception.CommandNotFound(args[0])
args[0] = fullname
encoding = _encodings['fs']
args = [
_unicode_encode(x, encoding=encoding, errors='strict')
for x in args]
proc = subprocess.Popen(args, stdout=subprocess.PIPE)
object.__setattr__(
self, '_proc', proc)
object.__setattr__(
self, '_stdout', codecs.getreader(encoding)(proc.stdout, 'strict'))
def _get_target(self):
return object.__getattribute__(self, '_stdout')
__enter__ = _get_target
def __exit__(self, exc_type, exc_value, traceback):
proc = object.__getattribute__(self, '_proc')
proc.wait()
proc.stdout.close()
|