aboutsummaryrefslogtreecommitdiff
blob: 650a16491ff23f80dc93227e98e7a769fb9504c7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2

import errno
import os

import portage
portage.proxy.lazyimport.lazyimport(globals(),
	'_emerge.PipeReader:PipeReader',
	'portage.util.futures:asyncio',
	'portage.util.futures.unix_events:_set_nonblocking',
)
from portage.util.futures.compat_coroutine import coroutine


def _reader(input_file, loop=None):
	"""
	Asynchronously read a binary input file, and close it when
	it reaches EOF.

	@param input_file: binary input file descriptor
	@type input_file: file or int
	@param loop: asyncio.AbstractEventLoop (or compatible)
	@type loop: event loop
	@return: bytes
	@rtype: asyncio.Future (or compatible)
	"""
	loop = asyncio._wrap_loop(loop)
	future = loop.create_future()
	_Reader(future, input_file, loop)
	return future


class _Reader(object):
	def __init__(self, future, input_file, loop):
		self._future = future
		self._pipe_reader = PipeReader(
			input_files={'input_file':input_file}, scheduler=loop)

		self._future.add_done_callback(self._cancel_callback)
		self._pipe_reader.addExitListener(self._eof)
		self._pipe_reader.start()

	def _cancel_callback(self, future):
		if future.cancelled():
			self._cancel()

	def _eof(self, pipe_reader):
		self._pipe_reader = None
		self._future.set_result(pipe_reader.getvalue())

	def _cancel(self):
		if self._pipe_reader is not None and self._pipe_reader.poll() is None:
			self._pipe_reader.removeExitListener(self._eof)
			self._pipe_reader.cancel()
			self._pipe_reader = None


@coroutine
def _writer(output_file, content, loop=None):
	"""
	Asynchronously write bytes to output file, and close it when
	done. If an EnvironmentError other than EAGAIN is encountered,
	which typically indicates that the other end of the pipe has
	close, the error is raised. This function is a coroutine.

	@param output_file: output file descriptor
	@type output_file: file or int
	@param content: content to write
	@type content: bytes
	@param loop: asyncio.AbstractEventLoop (or compatible)
	@type loop: event loop
	"""
	fd = output_file if isinstance(output_file, int) else output_file.fileno()
	_set_nonblocking(fd)
	loop = asyncio._wrap_loop(loop)
	try:
		while content:
			waiter = loop.create_future()
			loop.add_writer(fd, lambda: waiter.set_result(None))
			try:
				yield waiter
				while content:
					try:
						content = content[os.write(fd, content):]
					except EnvironmentError as e:
						if e.errno == errno.EAGAIN:
							break
						else:
							raise
			finally:
				loop.remove_writer(fd)
	except GeneratorExit:
		raise
	finally:
		os.close(output_file) if isinstance(output_file, int) else output_file.close()