aboutsummaryrefslogtreecommitdiff
blob: 50d561df61140db61de1ec4f8329a41433cf2f22 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# Copyright 2018-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import errno
import os
import pty
import shutil
import socket
import tempfile

from portage.tests import TestCase
from portage.util._eventloop.global_event_loop import global_event_loop
from portage.util.futures import asyncio
from portage.util.futures.unix_events import (
    DefaultEventLoopPolicy,
    _set_nonblocking,
)


class _PipeClosedTestCase:
    def test_pipe(self):
        read_end, write_end = os.pipe()
        self._do_test(read_end, write_end)

    def test_pty_device(self):
        try:
            read_end, write_end = pty.openpty()
        except OSError:
            self.skipTest("pty not available")
        self._do_test(read_end, write_end)

    def test_domain_socket(self):
        read_end, write_end = socket.socketpair()
        self._do_test(read_end.detach(), write_end.detach())

    def test_named_pipe(self):
        tempdir = tempfile.mkdtemp()
        try:
            fifo_path = os.path.join(tempdir, "fifo")
            os.mkfifo(fifo_path)
            self._do_test(
                os.open(fifo_path, os.O_NONBLOCK | os.O_RDONLY),
                os.open(fifo_path, os.O_NONBLOCK | os.O_WRONLY),
            )
        finally:
            shutil.rmtree(tempdir)


class ReaderPipeClosedTestCase(_PipeClosedTestCase, TestCase):
    """
    Test that a reader callback is called after the other end of
    the pipe has been closed.
    """

    def _do_test(self, read_end, write_end):
        initial_policy = asyncio.get_event_loop_policy()
        if not isinstance(initial_policy, DefaultEventLoopPolicy):
            asyncio.set_event_loop_policy(DefaultEventLoopPolicy())

        loop = asyncio._wrap_loop()
        read_end = os.fdopen(read_end, "rb", 0)
        write_end = os.fdopen(write_end, "wb", 0)
        try:

            def reader_callback():
                if not reader_callback.called.done():
                    reader_callback.called.set_result(None)

            reader_callback.called = loop.create_future()
            loop.add_reader(read_end.fileno(), reader_callback)

            # Allow the loop to check for IO events, and assert
            # that our future is still not done.
            loop.run_until_complete(asyncio.sleep(0, loop=loop))
            self.assertFalse(reader_callback.called.done())

            # Demonstrate that the callback is called after the
            # other end of the pipe has been closed.
            write_end.close()
            loop.run_until_complete(reader_callback.called)
        finally:
            loop.remove_reader(read_end.fileno())
            write_end.close()
            read_end.close()
            asyncio.set_event_loop_policy(initial_policy)
            if loop not in (None, global_event_loop()):
                loop.close()
                self.assertFalse(global_event_loop().is_closed())


class WriterPipeClosedTestCase(_PipeClosedTestCase, TestCase):
    """
    Test that a writer callback is called after the other end of
    the pipe has been closed.
    """

    def _do_test(self, read_end, write_end):
        initial_policy = asyncio.get_event_loop_policy()
        if not isinstance(initial_policy, DefaultEventLoopPolicy):
            asyncio.set_event_loop_policy(DefaultEventLoopPolicy())

        loop = asyncio._wrap_loop()
        read_end = os.fdopen(read_end, "rb", 0)
        write_end = os.fdopen(write_end, "wb", 0)
        try:

            def writer_callback():
                if not writer_callback.called.done():
                    writer_callback.called.set_result(None)

            writer_callback.called = loop.create_future()
            _set_nonblocking(write_end.fileno())
            loop.add_writer(write_end.fileno(), writer_callback)

            # With pypy we've seen intermittent spurious writer callbacks
            # here, so retry until the correct state is achieved.
            tries = 10
            while tries:
                tries -= 1

                # Fill up the pipe, so that no writer callbacks should be
                # received until the state has changed.
                while True:
                    try:
                        os.write(write_end.fileno(), 512 * b"0")
                    except OSError as e:
                        if e.errno != errno.EAGAIN:
                            raise
                        break

                # Allow the loop to check for IO events, and assert
                # that our future is still not done.
                loop.run_until_complete(asyncio.sleep(0, loop=loop))
                if writer_callback.called.done():
                    writer_callback.called = loop.create_future()
                else:
                    break

            self.assertFalse(writer_callback.called.done())

            # Demonstrate that the callback is called after the
            # other end of the pipe has been closed.
            read_end.close()
            loop.run_until_complete(writer_callback.called)
        finally:
            loop.remove_writer(write_end.fileno())
            write_end.close()
            read_end.close()
            asyncio.set_event_loop_policy(initial_policy)
            if loop not in (None, global_event_loop()):
                loop.close()
                self.assertFalse(global_event_loop().is_closed())