1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
# Copyright 2018-2021 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import errno
import os
import portage
portage.proxy.lazyimport.lazyimport(
globals(),
"_emerge.PipeReader:PipeReader",
"portage.util.futures:asyncio",
)
def _reader(input_file, loop=None):
"""
Asynchronously read a binary input file, and close it when
it reaches EOF.
@param input_file: binary input file descriptor
@type input_file: file or int
@param loop: asyncio.AbstractEventLoop (or compatible)
@type loop: event loop
@return: bytes
@rtype: asyncio.Future (or compatible)
"""
loop = asyncio._wrap_loop(loop)
future = loop.create_future()
_Reader(future, input_file, loop)
return future
class _Reader:
def __init__(self, future, input_file, loop):
self._future = future
self._pipe_reader = PipeReader(
input_files={"input_file": input_file}, scheduler=loop
)
self._future.add_done_callback(self._cancel_callback)
self._pipe_reader.addExitListener(self._eof)
self._pipe_reader.start()
def _cancel_callback(self, future):
if future.cancelled():
self._cancel()
def _eof(self, pipe_reader):
self._pipe_reader = None
self._future.set_result(pipe_reader.getvalue())
def _cancel(self):
if self._pipe_reader is not None and self._pipe_reader.poll() is None:
self._pipe_reader.removeExitListener(self._eof)
self._pipe_reader.cancel()
self._pipe_reader = None
async def _writer(output_file, content, loop=DeprecationWarning):
"""
Asynchronously write bytes to output file. The output file is
assumed to be in non-blocking mode. If an EnvironmentError
other than EAGAIN is encountered, which typically indicates that
the other end of the pipe has closed, the error is raised.
This function is a coroutine.
@param output_file: output file
@type output_file: file object
@param content: content to write
@type content: bytes
@param loop: deprecated
"""
loop = asyncio.get_event_loop()
fd = output_file.fileno()
while content:
try:
content = content[os.write(fd, content) :]
except EnvironmentError as e:
if e.errno != errno.EAGAIN:
raise
waiter = loop.create_future()
loop.add_writer(fd, lambda: waiter.done() or waiter.set_result(None))
try:
await waiter
finally:
# The loop and output file may have been closed.
if not loop.is_closed():
waiter.done() or waiter.cancel()
# Do not call remove_writer in cases where fd has
# been closed and then re-allocated to a concurrent
# coroutine as in bug 716636.
if not output_file.closed:
loop.remove_writer(fd)
|