aboutsummaryrefslogtreecommitdiff
blob: cc746bf52cbd179e13ca5f2b7c2a6625ca422ec6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Copyright 2008-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import fcntl
import errno
import gzip
import sys

import portage
from portage import os, _encodings, _unicode_encode
from portage.util.futures import asyncio
from portage.util.futures._asyncio.streams import _writer
from portage.util.futures.compat_coroutine import coroutine
from portage.util.futures.unix_events import _set_nonblocking
from _emerge.AbstractPollTask import AbstractPollTask

class PipeLogger(AbstractPollTask):

	"""
	This can be used for logging output of a child process,
	optionally outputing to log_file_path and/or stdout_fd.  It can
	also monitor for EOF on input_fd, which may be used to detect
	termination of a child process. If log_file_path ends with
	'.gz' then the log file is written with compression.
	"""

	__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
		("_io_loop_task", "_log_file", "_log_file_nb", "_log_file_real")

	def _start(self):

		log_file_path = self.log_file_path
		if hasattr(log_file_path, 'write'):
			self._log_file_nb = True
			self._log_file = log_file_path
			_set_nonblocking(self._log_file.fileno())
		elif log_file_path is not None:
			self._log_file = open(_unicode_encode(log_file_path,
				encoding=_encodings['fs'], errors='strict'), mode='ab')
			if log_file_path.endswith('.gz'):
				self._log_file_real = self._log_file
				self._log_file = gzip.GzipFile(filename='', mode='ab',
					fileobj=self._log_file)

			portage.util.apply_secpass_permissions(log_file_path,
				uid=portage.portage_uid, gid=portage.portage_gid,
				mode=0o660)

		if isinstance(self.input_fd, int):
			self.input_fd = os.fdopen(self.input_fd, 'rb', 0)

		fd = self.input_fd.fileno()

		fcntl.fcntl(fd, fcntl.F_SETFL,
			fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)

		# FD_CLOEXEC is enabled by default in Python >=3.4.
		if sys.hexversion < 0x3040000:
			try:
				fcntl.FD_CLOEXEC
			except AttributeError:
				pass
			else:
				fcntl.fcntl(fd, fcntl.F_SETFD,
					fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)

		self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd), loop=self.scheduler)
		self._io_loop_task.add_done_callback(self._io_loop_done)
		self._registered = True

	def _cancel(self):
		self._unregister()
		if self.returncode is None:
			self.returncode = self._cancelled_returncode

	@coroutine
	def _io_loop(self, input_file):
		background = self.background
		stdout_fd = self.stdout_fd
		log_file = self._log_file 
		fd = input_file.fileno()

		while True:
			buf = self._read_buf(fd)

			if buf is None:
				# not a POLLIN event, EAGAIN, etc...
				future = self.scheduler.create_future()
				self.scheduler.add_reader(fd, future.set_result, None)
				try:
					yield future
				finally:
					# The loop and input file may have been closed.
					if not self.scheduler.is_closed():
						future.done() or future.cancel()
						# Do not call remove_reader in cases where fd has
						# been closed and then re-allocated to a concurrent
						# coroutine as in bug 716636.
						if not input_file.closed:
							self.scheduler.remove_reader(fd)
				continue

			if not buf:
				# EOF
				return

			else:
				if not background and stdout_fd is not None:
					failures = 0
					stdout_buf = buf
					while stdout_buf:
						try:
							stdout_buf = \
								stdout_buf[os.write(stdout_fd, stdout_buf):]
						except OSError as e:
							if e.errno != errno.EAGAIN:
								raise
							del e
							failures += 1
							if failures > 50:
								# Avoid a potentially infinite loop. In
								# most cases, the failure count is zero
								# and it's unlikely to exceed 1.
								raise

							# This means that a subprocess has put an inherited
							# stdio file descriptor (typically stdin) into
							# O_NONBLOCK mode. This is not acceptable (see bug
							# #264435), so revert it. We need to use a loop
							# here since there's a race condition due to
							# parallel processes being able to change the
							# flags on the inherited file descriptor.
							# TODO: When possible, avoid having child processes
							# inherit stdio file descriptors from portage
							# (maybe it can't be avoided with
							# PROPERTIES=interactive).
							fcntl.fcntl(stdout_fd, fcntl.F_SETFL,
								fcntl.fcntl(stdout_fd,
								fcntl.F_GETFL) ^ os.O_NONBLOCK)

				if log_file is not None:
					if self._log_file_nb:
						# Use the _writer function which uses os.write, since the
						# log_file.write method looses data when an EAGAIN occurs.
						yield _writer(log_file, buf, loop=self.scheduler)
					else:
						# For gzip.GzipFile instances, the above _writer function
						# will not work because data written directly to the file
						# descriptor bypasses compression.
						log_file.write(buf)
						log_file.flush()

	def _io_loop_done(self, future):
		try:
			future.result()
		except asyncio.CancelledError:
			self.cancel()
			self._was_cancelled()
		self.returncode = self.returncode or os.EX_OK
		self._async_wait()

	def _unregister(self):
		if self.input_fd is not None:
			if isinstance(self.input_fd, int):
				os.close(self.input_fd)
			elif not self.input_fd.closed:
				self.scheduler.remove_reader(self.input_fd.fileno())
				self.input_fd.close()
			self.input_fd = None

		if self._io_loop_task is not None:
			if not self.scheduler.is_closed():
				self._io_loop_task.done() or self._io_loop_task.cancel()
			self._io_loop_task = None

		if self.stdout_fd is not None:
			os.close(self.stdout_fd)
			self.stdout_fd = None

		if self._log_file is not None:
			if not self._log_file.closed:
				self.scheduler.remove_writer(self._log_file.fileno())
				self._log_file.close()
			self._log_file = None

		if self._log_file_real is not None:
			# Avoid "ResourceWarning: unclosed file" since python 3.2.
			self._log_file_real.close()
			self._log_file_real = None

		self._registered = False