aboutsummaryrefslogtreecommitdiff
blob: d8a4cfcfc4b2f682e216ee64a34066fff6993f57 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# Copyright 2001-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import subprocess
import sys
import threading

from portage import os
from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS
from portage.exception import GPGException
from portage.output import colorize
from portage.util import shlex_split, varexpand, writemsg, writemsg_stdout


class GPG:
    """
    Unlock GPG, must call dircetly from main program for get correct TTY
    """

    def __init__(self, settings):
        """
        Portage settings are needed to run GPG unlock command.
        """
        self.settings = settings
        self.thread = None
        self._terminated = None
        self.GPG_signing_base_command = self.settings.get(
            "BINPKG_GPG_SIGNING_BASE_COMMAND"
        )
        self.digest_algo = self.settings.get("BINPKG_GPG_SIGNING_DIGEST")
        self.signing_gpg_home = self.settings.get("BINPKG_GPG_SIGNING_GPG_HOME")
        self.signing_gpg_key = self.settings.get("BINPKG_GPG_SIGNING_KEY")
        self.GPG_unlock_command = self.GPG_signing_base_command.replace(
            "[PORTAGE_CONFIG]",
            f"--homedir {self.signing_gpg_home} "
            f"--digest-algo {self.digest_algo} "
            f"--local-user {self.signing_gpg_key} "
            "--output - /dev/null",
        )

        if "gpg-keepalive" in self.settings.features:
            self.keepalive = True
        else:
            self.keepalive = False

    def unlock(self):
        """
        Set GPG_TTY and run GPG unlock command.
        If gpg-keepalive is set, start keepalive thread.
        """
        if self.GPG_unlock_command and (
            self.settings.get("BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0])
            == "gpkg"
        ):
            try:
                os.environ["GPG_TTY"] = os.ttyname(sys.stdout.fileno())
            except OSError as e:
                # When run with no input/output tty, this will fail.
                # However, if the password is given by command,
                # GPG does not need to ask password, so can be ignored.
                writemsg(f"{colorize('WARN', str(e))}\n")

            cmd = shlex_split(varexpand(self.GPG_unlock_command, mydict=self.settings))
            return_code = subprocess.Popen(cmd, stdout=subprocess.DEVNULL).wait()

            if return_code == os.EX_OK:
                writemsg_stdout(f"{colorize('GOOD', 'unlocked')}\n")
                sys.stdout.flush()
            else:
                raise GPGException("GPG unlock failed")

            if self.keepalive:
                self.GPG_unlock_command = shlex_split(
                    varexpand(self.GPG_unlock_command, mydict=self.settings)
                )
                self._terminated = threading.Event()
                self.thread = threading.Thread(target=self.gpg_keepalive, daemon=True)
                self.thread.start()

    def stop(self):
        """
        Stop keepalive thread.
        """
        if self.thread is not None:
            self._terminated.set()

    def gpg_keepalive(self):
        """
        Call GPG unlock command every 5 mins to avoid the passphrase expired.
        """
        count = 0
        while not self._terminated.is_set():
            if count < 5:
                if self._terminated.wait(60):
                    break
                count += 1
                continue
            else:
                count = 0

            proc = subprocess.Popen(
                self.GPG_unlock_command,
                stdin=subprocess.DEVNULL,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.STDOUT,
            )
            if proc.wait() != os.EX_OK and not self._terminated.is_set():
                raise GPGException("GPG keepalive failed")