aboutsummaryrefslogtreecommitdiff
blob: 72844403c8b27180322e3fe6c995ef0258fd5623 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2

__all__ = (
	'ForkExecutor',
)

import collections
import functools
import multiprocessing
import os
import sys
import traceback

from portage.util._async.AsyncFunction import AsyncFunction
from portage.util.futures import asyncio


class ForkExecutor(object):
	"""
	An implementation of concurrent.futures.Executor that forks a
	new process for each task, with support for cancellation of tasks.

	This is entirely driven by an event loop.
	"""
	def __init__(self, max_workers=None, loop=None):
		self._max_workers = max_workers or multiprocessing.cpu_count()
		self._loop = asyncio._wrap_loop(loop)
		self._submit_queue = collections.deque()
		self._running_tasks = {}
		self._shutdown = False
		self._shutdown_future = self._loop.create_future()

	def submit(self, fn, *args, **kwargs):
		"""Submits a callable to be executed with the given arguments.

		Schedules the callable to be executed as fn(*args, **kwargs) and returns
		a Future instance representing the execution of the callable.

		Returns:
			A Future representing the given call.
		"""
		future = self._loop.create_future()
		proc = AsyncFunction(target=functools.partial(
			self._guarded_fn_call, fn, args, kwargs))
		self._submit_queue.append((future, proc))
		self._schedule()
		return future

	def _schedule(self):
		while (not self._shutdown and self._submit_queue and
			len(self._running_tasks) < self._max_workers):
			future, proc = self._submit_queue.popleft()
			future.add_done_callback(functools.partial(self._cancel_cb, proc))
			proc.addExitListener(functools.partial(self._proc_exit, future))
			proc.scheduler = self._loop
			proc.start()
			self._running_tasks[id(proc)] = proc

	def _cancel_cb(self, proc, future):
		if future.cancelled():
			# async, handle the rest in _proc_exit
			proc.cancel()

	@staticmethod
	def _guarded_fn_call(fn, args, kwargs):
		try:
			result = fn(*args, **kwargs)
			exception = None
		except Exception as e:
			result = None
			exception = _ExceptionWithTraceback(e)

		return result, exception

	def _proc_exit(self, future, proc):
		if not future.cancelled():
			if proc.returncode == os.EX_OK:
				result, exception = proc.result
				if exception is not None:
					future.set_exception(exception)
				else:
					future.set_result(result)
			else:
				# TODO: add special exception class for this, maybe
				# distinguish between kill and crash
				future.set_exception(
					Exception('pid {} crashed or killed, exitcode {}'.\
						format(proc.pid, proc.returncode)))

		del self._running_tasks[id(proc)]
		self._schedule()
		if self._shutdown and not self._running_tasks:
			self._shutdown_future.set_result(None)

	def shutdown(self, wait=True):
		self._shutdown = True
		if not self._running_tasks and not self._shutdown_future.done():
			self._shutdown_future.set_result(None)
		if wait:
			self._loop.run_until_complete(self._shutdown_future)

	def __enter__(self):
		return self

	def __exit__(self, exc_type, exc_val, exc_tb):
		self.shutdown(wait=True)
		return False


class _ExceptionWithTraceback:
	def __init__(self, exc):
		tb = traceback.format_exception(type(exc), exc, exc.__traceback__)
		tb = ''.join(tb)
		self.exc = exc
		self.tb = '\n"""\n%s"""' % tb
	def __reduce__(self):
		return _rebuild_exc, (self.exc, self.tb)


class _RemoteTraceback(Exception):
	def __init__(self, tb):
		self.tb = tb
	def __str__(self):
		return self.tb


def _rebuild_exc(exc, tb):
	exc.__cause__ = _RemoteTraceback(tb)
	return exc


if sys.version_info < (3,):
	# Python 2 does not support exception chaining, so
	# don't bother to preserve the traceback.
	_ExceptionWithTraceback = lambda exc: exc