1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# Copyright 2020-2021 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import sys
import portage
from portage import os
from portage.tests import TestCase
from portage.util._async.AsyncFunction import AsyncFunction
from portage.util.futures import asyncio
from portage.util.futures._asyncio.streams import _writer
from portage.util.futures.unix_events import _set_nonblocking
class AsyncFunctionTestCase(TestCase):
@staticmethod
def _read_from_stdin(pw):
os.close(pw)
return "".join(sys.stdin)
async def _testAsyncFunctionStdin(self, loop):
test_string = "1\n2\n3\n"
pr, pw = os.pipe()
fd_pipes = {0: pr}
reader = AsyncFunction(
scheduler=loop, fd_pipes=fd_pipes, target=self._read_from_stdin, args=(pw,)
)
reader.start()
os.close(pr)
_set_nonblocking(pw)
with open(pw, mode="wb", buffering=0) as pipe_write:
await _writer(pipe_write, test_string.encode("utf_8"))
self.assertEqual((await reader.async_wait()), os.EX_OK)
self.assertEqual(reader.result, test_string)
def testAsyncFunctionStdin(self):
loop = asyncio._wrap_loop()
loop.run_until_complete(self._testAsyncFunctionStdin(loop=loop))
def _test_getpid_fork(self):
"""
Verify that portage.getpid() cache is updated in a forked child process.
"""
loop = asyncio._wrap_loop()
proc = AsyncFunction(scheduler=loop, target=portage.getpid)
proc.start()
proc.wait()
self.assertEqual(proc.pid, proc.result)
def test_getpid_fork(self):
self._test_getpid_fork()
def test_getpid_double_fork(self):
"""
Verify that portage.getpid() cache is updated correctly after
two forks.
"""
loop = asyncio._wrap_loop()
proc = AsyncFunction(scheduler=loop, target=self._test_getpid_fork)
proc.start()
self.assertEqual(proc.wait(), 0)
|