aboutsummaryrefslogtreecommitdiff
blob: bf5c45189ecc3b0cff18b6bee216ff4fe814e138 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# Copyright 2008-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

try:
	import fcntl
except ImportError:
	# http://bugs.jython.org/issue1074
	fcntl = None

import errno
import logging
import signal
import sys

from _emerge.SubProcess import SubProcess
import portage
from portage import os
from portage.const import BASH_BINARY
from portage.localization import _
from portage.output import EOutput
from portage.util import writemsg_level
from portage.util._async.BuildLogger import BuildLogger
from portage.util._async.PipeLogger import PipeLogger
from portage.util.futures import asyncio
from portage.util.futures.compat_coroutine import coroutine

class SpawnProcess(SubProcess):

	"""
	Constructor keyword args are passed into portage.process.spawn().
	The required "args" keyword argument will be passed as the first
	spawn() argument.
	"""

	_spawn_kwarg_names = ("env", "opt_name", "fd_pipes",
		"uid", "gid", "groups", "umask", "logfile",
		"path_lookup", "pre_exec", "close_fds", "cgroup",
		"unshare_ipc", "unshare_mount", "unshare_pid", "unshare_net")

	__slots__ = ("args", "log_filter_file") + \
		_spawn_kwarg_names + ("_main_task", "_selinux_type",)

	# Max number of attempts to kill the processes listed in cgroup.procs,
	# given that processes may fork before they can be killed.
	_CGROUP_CLEANUP_RETRY_MAX = 8

	def _start(self):
		self.scheduler.run_until_complete(self._async_start())

	@coroutine
	def _async_start(self):
		if self.fd_pipes is None:
			self.fd_pipes = {}
		else:
			self.fd_pipes = self.fd_pipes.copy()
		fd_pipes = self.fd_pipes

		master_fd, slave_fd = self._pipe(fd_pipes)

		can_log = self._can_log(slave_fd)
		if can_log:
			log_file_path = self.logfile
		else:
			log_file_path = None

		null_input = None
		if not self.background or 0 in fd_pipes:
			# Subclasses such as AbstractEbuildProcess may have already passed
			# in a null file descriptor in fd_pipes, so use that when given.
			pass
		else:
			# TODO: Use job control functions like tcsetpgrp() to control
			# access to stdin. Until then, use /dev/null so that any
			# attempts to read from stdin will immediately return EOF
			# instead of blocking indefinitely.
			null_input = os.open('/dev/null', os.O_RDWR)
			fd_pipes[0] = null_input

		fd_pipes.setdefault(0, portage._get_stdin().fileno())
		fd_pipes.setdefault(1, sys.__stdout__.fileno())
		fd_pipes.setdefault(2, sys.__stderr__.fileno())

		# flush any pending output
		stdout_filenos = (sys.__stdout__.fileno(), sys.__stderr__.fileno())
		for fd in fd_pipes.values():
			if fd in stdout_filenos:
				sys.__stdout__.flush()
				sys.__stderr__.flush()
				break

		fd_pipes_orig = fd_pipes.copy()

		if log_file_path is not None or self.background:
			fd_pipes[1] = slave_fd
			fd_pipes[2] = slave_fd

		else:
			# Create a dummy pipe that PipeLogger uses to efficiently
			# monitor for process exit by listening for the EOF event.
			# Re-use of the allocated fd number for the key in fd_pipes
			# guarantees that the keys will not collide for similarly
			# allocated pipes which are used by callers such as
			# FileDigester and MergeProcess. See the _setup_pipes
			# docstring for more benefits of this allocation approach.
			self._dummy_pipe_fd = slave_fd
			fd_pipes[slave_fd] = slave_fd

		kwargs = {}
		for k in self._spawn_kwarg_names:
			v = getattr(self, k)
			if v is not None:
				kwargs[k] = v

		kwargs["fd_pipes"] = fd_pipes
		kwargs["returnpid"] = True
		kwargs.pop("logfile", None)

		retval = self._spawn(self.args, **kwargs)

		os.close(slave_fd)
		if null_input is not None:
			os.close(null_input)

		if isinstance(retval, int):
			# spawn failed
			self.returncode = retval
			self._async_wait()
			return

		self.pid = retval[0]

		stdout_fd = None
		if can_log and not self.background:
			stdout_fd = os.dup(fd_pipes_orig[1])
			# FD_CLOEXEC is enabled by default in Python >=3.4.
			if sys.hexversion < 0x3040000 and fcntl is not None:
				try:
					fcntl.FD_CLOEXEC
				except AttributeError:
					pass
				else:
					fcntl.fcntl(stdout_fd, fcntl.F_SETFD,
						fcntl.fcntl(stdout_fd,
						fcntl.F_GETFD) | fcntl.FD_CLOEXEC)

		build_logger = BuildLogger(env=self.env,
			log_path=log_file_path,
			log_filter_file=self.log_filter_file,
			scheduler=self.scheduler)

		self._registered = True
		pipe_logger = None
		try:
			yield build_logger.async_start()

			pipe_logger = PipeLogger(background=self.background,
				scheduler=self.scheduler, input_fd=master_fd,
				log_file_path=build_logger.stdin,
				stdout_fd=stdout_fd)

			yield pipe_logger.async_start()
		except asyncio.CancelledError:
			if pipe_logger is not None and pipe_logger.poll() is None:
				pipe_logger.cancel()
			if build_logger.poll() is None:
				build_logger.cancel()
			raise

		self._main_task = asyncio.ensure_future(
			self._main(pipe_logger, build_logger), loop=self.scheduler)
		self._main_task.add_done_callback(self._main_exit)

	@coroutine
	def _main(self, pipe_logger, build_logger):
		try:
			if pipe_logger.poll() is None:
				yield pipe_logger.async_wait()
			if build_logger.poll() is None:
				yield build_logger.async_wait()
		except asyncio.CancelledError:
			if pipe_logger.poll() is None:
				pipe_logger.cancel()
			if build_logger.poll() is None:
				build_logger.cancel()
			raise

	def _main_exit(self, main_task):
		try:
			main_task.result()
		except asyncio.CancelledError:
			self.cancel()
		self._async_waitpid()

	def _can_log(self, slave_fd):
		return True

	def _pipe(self, fd_pipes):
		"""
		@type fd_pipes: dict
		@param fd_pipes: pipes from which to copy terminal size if desired.
		"""
		return os.pipe()

	def _spawn(self, args, **kwargs):
		spawn_func = portage.process.spawn

		if self._selinux_type is not None:
			spawn_func = portage.selinux.spawn_wrapper(spawn_func,
				self._selinux_type)
			# bash is an allowed entrypoint, while most binaries are not
			if args[0] != BASH_BINARY:
				args = [BASH_BINARY, "-c", "exec \"$@\"", args[0]] + args

		return spawn_func(args, **kwargs)

	def _unregister(self):
		SubProcess._unregister(self)
		if self.cgroup is not None:
			self._cgroup_cleanup()
			self.cgroup = None
		if self._main_task is not None:
			self._main_task.done() or self._main_task.cancel()

	def _cancel(self):
		SubProcess._cancel(self)
		if self._main_task is not None:
			self._main_task.done() or self._main_task.cancel()
		self._cgroup_cleanup()

	def _cgroup_cleanup(self):
		if self.cgroup:
			def get_pids(cgroup):
				try:
					with open(os.path.join(cgroup, 'cgroup.procs'), 'r') as f:
						return [int(p) for p in f.read().split()]
				except EnvironmentError:
					# removed by cgroup-release-agent
					return []

			def kill_all(pids, sig):
				for p in pids:
					try:
						os.kill(p, sig)
					except OSError as e:
						if e.errno == errno.EPERM:
							# Reported with hardened kernel (bug #358211).
							writemsg_level(
								"!!! kill: (%i) - Operation not permitted\n" %
								(p,), level=logging.ERROR,
								noiselevel=-1)
						elif e.errno != errno.ESRCH:
							raise

			# step 1: kill all orphans (loop in case of new forks)
			remaining = self._CGROUP_CLEANUP_RETRY_MAX
			while remaining:
				remaining -= 1
				pids = get_pids(self.cgroup)
				if pids:
					kill_all(pids, signal.SIGKILL)
				else:
					break

			if pids:
				msg = []
				msg.append(
					_("Failed to kill pid(s) in '%(cgroup)s': %(pids)s") % dict(
					cgroup=os.path.join(self.cgroup, 'cgroup.procs'),
					pids=' '.join(str(pid) for pid in pids)))

				self._elog('eerror', msg)

			# step 2: remove the cgroup
			try:
				os.rmdir(self.cgroup)
			except OSError:
				# it may be removed already, or busy
				# we can't do anything good about it
				pass

	def _elog(self, elog_funcname, lines):
		elog_func = getattr(EOutput(), elog_funcname)
		for line in lines:
			elog_func(line)