aboutsummaryrefslogtreecommitdiff
blob: 65a9ca1bfd4b3166cf967e71b3eb0952556efd98 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# Copyright 1998-2020, 2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import functools
import pty
import shutil
import socket
import tempfile

import pytest

from portage import os
from portage.tests import TestCase
from portage.util._eventloop.global_event_loop import global_event_loop
from portage.util.futures import asyncio
from _emerge.PipeReader import PipeReader


class PipeReaderTestCase(TestCase):
    _use_array = False
    _echo_cmd = "echo -n '%s'"

    def test_pipe(self):
        def make_pipes():
            return os.pipe(), None

        self._do_test(make_pipes)

    def test_pty_device(self):
        def make_pipes():
            try:
                return pty.openpty(), None
            except OSError:
                self.skipTest("pty not available")

        self._do_test(make_pipes)

    def test_domain_socket(self):
        def make_pipes():
            read_end, write_end = socket.socketpair()
            return (read_end.detach(), write_end.detach()), None

        self._do_test(make_pipes)

    def test_named_pipe(self):
        def make_pipes():
            tempdir = tempfile.mkdtemp()
            fifo_path = os.path.join(tempdir, "fifo")
            os.mkfifo(fifo_path)
            return (
                (
                    os.open(fifo_path, os.O_NONBLOCK | os.O_RDONLY),
                    os.open(fifo_path, os.O_NONBLOCK | os.O_WRONLY),
                ),
                functools.partial(shutil.rmtree, tempdir),
            )

        self._do_test(make_pipes)

    def _testPipeReader(self, master_fd, slave_fd, test_string):
        """
        Use a poll loop to read data from a pipe and assert that
        the data written to the pipe is identical to the data
        read from the pipe.
        """

        # WARNING: It is very important to use unbuffered mode here,
        # in order to avoid issue 5380 with python3.
        master_file = os.fdopen(master_fd, "rb", 0)
        scheduler = global_event_loop()

        consumer = PipeReader(
            input_files={"producer": master_file},
            _use_array=self._use_array,
            scheduler=scheduler,
        )
        consumer.start()

        producer = scheduler.run_until_complete(
            asyncio.create_subprocess_exec(
                "bash",
                "-c",
                self._echo_cmd % test_string,
                stdout=slave_fd,
                loop=scheduler,
            )
        )

        os.close(slave_fd)
        scheduler.run_until_complete(producer.wait())
        scheduler.run_until_complete(consumer.async_wait())

        self.assertEqual(producer.returncode, os.EX_OK)
        self.assertEqual(consumer.returncode, os.EX_OK)

        return consumer.getvalue().decode("ascii", "replace")

    def _do_test(self, make_pipes):
        for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
            test_string = x * "a"
            (read_end, write_end), cleanup = make_pipes()
            try:
                output = self._testPipeReader(read_end, write_end, test_string)
                self.assertEqual(
                    test_string,
                    output,
                    f"x = {x}, len(output) = {len(output)}",
                )
            finally:
                if cleanup is not None:
                    cleanup()


@pytest.mark.xfail()  # This fails sometimes, that's the reason of xfail here
class PipeReaderArrayTestCase(PipeReaderTestCase):
    _use_array = True
    # sleep allows reliable triggering of the failure mode on fast computers
    _echo_cmd = "sleep 0.1 ; echo -n '%s'"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # https://bugs.python.org/issue5380
        # https://bugs.pypy.org/issue956