aboutsummaryrefslogtreecommitdiff
blob: 0c33429443e1e14fc8d396b484aa61a00d984741 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2

__all__ = ("ForkExecutor",)

import collections
import functools
import os
import sys
import traceback

from portage.util._async.AsyncFunction import AsyncFunction
from portage.util.futures import asyncio
from portage.util.cpuinfo import get_cpu_count


class ForkExecutor:
    """
    An implementation of concurrent.futures.Executor that forks a
    new process for each task, with support for cancellation of tasks.

    This is entirely driven by an event loop.
    """

    def __init__(self, max_workers=None, loop=None):
        self._max_workers = max_workers or get_cpu_count()
        self._loop = asyncio._wrap_loop(loop)
        self._submit_queue = collections.deque()
        self._running_tasks = {}
        self._shutdown = False
        self._shutdown_future = self._loop.create_future()

    def submit(self, fn, *args, **kwargs):
        """Submits a callable to be executed with the given arguments.

        Schedules the callable to be executed as fn(*args, **kwargs) and returns
        a Future instance representing the execution of the callable.

        Returns:
                A Future representing the given call.
        """
        future = self._loop.create_future()
        proc = AsyncFunction(
            target=functools.partial(self._guarded_fn_call, fn, args, kwargs)
        )
        self._submit_queue.append((future, proc))
        self._schedule()
        return future

    def _schedule(self):
        while (
            not self._shutdown
            and self._submit_queue
            and (
                self._max_workers is True
                or len(self._running_tasks) < self._max_workers
            )
        ):
            future, proc = self._submit_queue.popleft()
            future.add_done_callback(functools.partial(self._cancel_cb, proc))
            proc.addExitListener(functools.partial(self._proc_exit, future))
            proc.scheduler = self._loop
            proc.start()
            self._running_tasks[id(proc)] = proc

    def _cancel_cb(self, proc, future):
        if future.cancelled():
            # async, handle the rest in _proc_exit
            proc.cancel()

    @staticmethod
    def _guarded_fn_call(fn, args, kwargs):
        try:
            result = fn(*args, **kwargs)
            exception = None
        except Exception as e:
            result = None
            exception = _ExceptionWithTraceback(e)

        return result, exception

    def _proc_exit(self, future, proc):
        if not future.cancelled():
            if proc.returncode == os.EX_OK:
                result, exception = proc.result
                if exception is not None:
                    future.set_exception(exception)
                else:
                    future.set_result(result)
            else:
                # TODO: add special exception class for this, maybe
                # distinguish between kill and crash
                future.set_exception(
                    Exception(
                        "pid {} crashed or killed, exitcode {}".format(
                            proc.pid, proc.returncode
                        )
                    )
                )

        del self._running_tasks[id(proc)]
        self._schedule()
        if self._shutdown and not self._running_tasks:
            self._shutdown_future.set_result(None)

    def shutdown(self, wait=True):
        self._shutdown = True
        if not self._running_tasks and not self._shutdown_future.done():
            self._shutdown_future.set_result(None)
        if wait:
            self._loop.run_until_complete(self._shutdown_future)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False


class _ExceptionWithTraceback:
    def __init__(self, exc):
        tb = traceback.format_exception(type(exc), exc, exc.__traceback__)
        tb = "".join(tb)
        self.exc = exc
        self.tb = '\n"""\n%s"""' % tb

    def __reduce__(self):
        return _rebuild_exc, (self.exc, self.tb)


class _RemoteTraceback(Exception):
    def __init__(self, tb):
        self.tb = tb

    def __str__(self):
        return self.tb


def _rebuild_exc(exc, tb):
    exc.__cause__ = _RemoteTraceback(tb)
    return exc


if sys.version_info < (3,):
    # Python 2 does not support exception chaining, so
    # don't bother to preserve the traceback.
    _ExceptionWithTraceback = lambda exc: exc