aboutsummaryrefslogtreecommitdiff
blob: 9cfeeeadaf854c4cea06edb7e5cbeee92e71bd22 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# Copyright 2015-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import pickle
import traceback

from portage import os
from portage.util._async.ForkProcess import ForkProcess
from portage.util.futures.compat_coroutine import coroutine
from _emerge.PipeReader import PipeReader

class AsyncFunction(ForkProcess):
	"""
	Execute a function call in a fork, and retrieve the function
	return value via pickling/unpickling, accessible as the
	"result" attribute after the forked process has exited.
	"""

	# NOTE: This class overrides the meaning of the SpawnProcess 'args'
	# attribute, and uses it to hold the positional arguments for the
	# 'target' function.
	__slots__ = ('kwargs', 'result', 'target',
		'_async_func_reader', '_async_func_reader_pw')

	def _start(self):
		self.scheduler.run_until_complete(self._async_start())

	@coroutine
	def _async_start(self):
		pr, pw = os.pipe()
		self.fd_pipes = {}
		self.fd_pipes[pw] = pw
		self._async_func_reader_pw = pw
		self._async_func_reader = PipeReader(
			input_files={"input":pr},
			scheduler=self.scheduler)
		self._async_func_reader.addExitListener(self._async_func_reader_exit)
		self._async_func_reader.start()
		yield ForkProcess._async_start(self)
		os.close(pw)

	def _run(self):
		try:
			result = self.target(*(self.args or []), **(self.kwargs or {}))
			os.write(self._async_func_reader_pw, pickle.dumps(result))
		except Exception:
			traceback.print_exc()
			return 1

		return os.EX_OK

	def _pipe_logger_exit(self, pipe_logger):
		# Ignore this event, since we want to ensure that we exit
		# only after _async_func_reader_exit has reached EOF.
		self._pipe_logger = None

	def _async_func_reader_exit(self, pipe_reader):
		try:
			self.result = pickle.loads(pipe_reader.getvalue())
		except Exception:
			# The child process will have printed a traceback in this case,
			# and returned an unsuccessful returncode.
			pass
		self._async_func_reader = None
		if self.returncode is None:
			self._async_waitpid()
		else:
			self._unregister()
			self._async_wait()

	def _unregister(self):
		ForkProcess._unregister(self)

		pipe_reader = self._async_func_reader
		if pipe_reader is not None:
			self._async_func_reader = None
			pipe_reader.removeExitListener(self._async_func_reader_exit)
			pipe_reader.cancel()