aboutsummaryrefslogtreecommitdiff
blob: 1aa5ee3bfeff5cf9b7eefa5cae0d4581709ec1f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# Copyright 1999-2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2

import fcntl
import sys

from portage import os
from _emerge.AbstractPollTask import AbstractPollTask

class PipeReader(AbstractPollTask):

	"""
	Reads output from one or more files and saves it in memory,
	for retrieval via the getvalue() method. This is driven by
	the scheduler's poll() loop, so it runs entirely within the
	current process.
	"""

	__slots__ = ("input_files",) + \
		("_read_data", "_use_array")

	def _start(self):
		self._read_data = []

		for f in self.input_files.values():
			fd = f if isinstance(f, int) else f.fileno()
			fcntl.fcntl(fd, fcntl.F_SETFL,
				fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)

			# FD_CLOEXEC is enabled by default in Python >=3.4.
			if sys.hexversion < 0x3040000:
				try:
					fcntl.FD_CLOEXEC
				except AttributeError:
					pass
				else:
					fcntl.fcntl(fd, fcntl.F_SETFD,
						fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)

			if self._use_array:
				self.scheduler.add_reader(fd, self._array_output_handler, f)
			else:
				self.scheduler.add_reader(fd, self._output_handler, fd)

		self._registered = True

	def _cancel(self):
		self._unregister()
		if self.returncode is None:
			self.returncode = self._cancelled_returncode

	def getvalue(self):
		"""Retrieve the entire contents"""
		return b''.join(self._read_data)

	def close(self):
		"""Free the memory buffer."""
		self._read_data = None

	def _output_handler(self, fd):

		while True:
			data = self._read_buf(fd)
			if data is None:
				break
			if data:
				self._read_data.append(data)
			else:
				self._unregister()
				self.returncode = self.returncode or os.EX_OK
				self._async_wait()
				break

	def _array_output_handler(self, f):

		while True:
			data = self._read_array(f)
			if data is None:
				break
			if data:
				self._read_data.append(data)
			else:
				self._unregister()
				self.returncode = self.returncode or os.EX_OK
				self._async_wait()
				break

		return True

	def _unregister(self):
		"""
		Unregister from the scheduler and close open files.
		"""

		self._registered = False

		if self.input_files is not None:
			for f in self.input_files.values():
				if isinstance(f, int):
					self.scheduler.remove_reader(f)
					os.close(f)
				else:
					self.scheduler.remove_reader(f.fileno())
					f.close()
			self.input_files = None