aboutsummaryrefslogtreecommitdiff
blob: 893692288e363e5fe8806e2548edea00f66f8833 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# Copyright 2012-2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import subprocess

from portage import os
from portage.tests import TestCase
from portage.util._async.PopenProcess import PopenProcess
from portage.util._eventloop.global_event_loop import global_event_loop
from portage.util._async.PipeReaderBlockingIO import PipeReaderBlockingIO


class PopenPipeBlockingIOTestCase(TestCase):
    """
    Test PopenProcess, which is historically useful for Jython support:
            * use subprocess.Popen since Jython does not support os.fork()
            * use blocking IO with threads, since Jython does not support
              fcntl non-blocking IO)

    Portage does not currently support Jython, but re-introducing support
    in The Future (TM) may be possible.
    """

    _echo_cmd = "echo -n '%s'"

    def _testPipeReader(self, test_string):
        """
        Use a poll loop to read data from a pipe and assert that
        the data written to the pipe is identical to the data
        read from the pipe.
        """

        producer = PopenProcess(
            proc=subprocess.Popen(
                ["bash", "-c", self._echo_cmd % test_string],
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
            ),
            pipe_reader=PipeReaderBlockingIO(),
            scheduler=global_event_loop(),
        )

        consumer = producer.pipe_reader
        consumer.input_files = {"producer": producer.proc.stdout}

        producer.start()
        producer.wait()

        self.assertEqual(producer.returncode, os.EX_OK)
        self.assertEqual(consumer.returncode, os.EX_OK)

        return consumer.getvalue().decode("ascii", "replace")

    def testPopenPipeBlockingIO(self):
        for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
            test_string = x * "a"
            output = self._testPipeReader(test_string)
            self.assertEqual(
                test_string, output, f"x = {x}, len(output) = {len(output)}"
            )