1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
# Copyright 2001-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import subprocess
import sys
import threading
from portage import os
from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS
from portage.exception import GPGException
from portage.output import colorize
from portage.util import shlex_split, varexpand, writemsg, writemsg_stdout
class GPG:
"""
Unlock GPG, must call dircetly from main program for get correct TTY
"""
def __init__(self, settings):
"""
Portage settings are needed to run GPG unlock command.
"""
self.settings = settings
self.thread = None
self._terminated = None
self.GPG_signing_base_command = self.settings.get(
"BINPKG_GPG_SIGNING_BASE_COMMAND"
)
self.digest_algo = self.settings.get("BINPKG_GPG_SIGNING_DIGEST")
self.signing_gpg_home = self.settings.get("BINPKG_GPG_SIGNING_GPG_HOME")
self.signing_gpg_key = self.settings.get("BINPKG_GPG_SIGNING_KEY")
self.GPG_unlock_command = self.GPG_signing_base_command.replace(
"[PORTAGE_CONFIG]",
f"--homedir {self.signing_gpg_home} "
f"--digest-algo {self.digest_algo} "
f"--local-user {self.signing_gpg_key} "
"--output - /dev/null",
)
if "gpg-keepalive" in self.settings.features:
self.keepalive = True
else:
self.keepalive = False
def unlock(self):
"""
Set GPG_TTY and run GPG unlock command.
If gpg-keepalive is set, start keepalive thread.
"""
if self.GPG_unlock_command and (
self.settings.get("BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0])
== "gpkg"
):
try:
os.environ["GPG_TTY"] = os.ttyname(sys.stdout.fileno())
except OSError as e:
# When run with no input/output tty, this will fail.
# However, if the password is given by command,
# GPG does not need to ask password, so can be ignored.
writemsg(f"{colorize('WARN', str(e))}\n")
cmd = shlex_split(varexpand(self.GPG_unlock_command, mydict=self.settings))
return_code = subprocess.Popen(cmd, stdout=subprocess.DEVNULL).wait()
if return_code == os.EX_OK:
writemsg_stdout(f"{colorize('GOOD', 'unlocked')}\n")
sys.stdout.flush()
else:
raise GPGException("GPG unlock failed")
if self.keepalive:
self.GPG_unlock_command = shlex_split(
varexpand(self.GPG_unlock_command, mydict=self.settings)
)
self._terminated = threading.Event()
self.thread = threading.Thread(target=self.gpg_keepalive, daemon=True)
self.thread.start()
def stop(self):
"""
Stop keepalive thread.
"""
if self.thread is not None:
self._terminated.set()
def gpg_keepalive(self):
"""
Call GPG unlock command every 5 mins to avoid the passphrase expired.
"""
count = 0
while not self._terminated.is_set():
if count < 5:
if self._terminated.wait(60):
break
count += 1
continue
else:
count = 0
proc = subprocess.Popen(
self.GPG_unlock_command,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
)
if proc.wait() != os.EX_OK and not self._terminated.is_set():
raise GPGException("GPG keepalive failed")
|