aboutsummaryrefslogtreecommitdiff
blob: c2c3e740ea4fe45174dfa73bb1d3764cce9979c6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# Copyright 1999-2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2

import signal

from portage import os
from portage.util.futures import asyncio
from portage.util.SlotObject import SlotObject


class AsynchronousTask(SlotObject):
    """
    Subclasses override _wait() and _poll() so that calls
    to public methods can be wrapped for implementing
    hooks such as exit listener notification.

    Sublasses should call self._async_wait() to notify exit listeners after
    the task is complete and self.returncode has been set.
    """

    __slots__ = ("background", "cancelled", "returncode", "scheduler") + (
        "_exit_listener_handles",
        "_exit_listeners",
        "_start_listeners",
    )

    _cancelled_returncode = -signal.SIGINT

    def start(self):
        """
        Start an asynchronous task and then return as soon as possible.
        """
        self._start_hook()
        self._start()

    def async_wait(self):
        """
        Wait for returncode asynchronously. Notification is available
        via the add_done_callback method of the returned Future instance.

        @returns: Future, result is self.returncode
        """
        waiter = self.scheduler.create_future()
        exit_listener = lambda self: waiter.cancelled() or waiter.set_result(
            self.returncode
        )
        self.addExitListener(exit_listener)
        waiter.add_done_callback(
            lambda waiter: self.removeExitListener(exit_listener)
            if waiter.cancelled()
            else None
        )
        if self.returncode is not None:
            # If the returncode is not None, it means the exit event has already
            # happened, so use _async_wait() to guarantee that the exit_listener
            # is called. This does not do any harm because a given exit listener
            # is never called more than once.
            self._async_wait()
        return waiter

    def _start(self):
        self.returncode = os.EX_OK
        self._async_wait()

    def isAlive(self):
        return self.returncode is None

    def poll(self):
        if self.returncode is not None:
            return self.returncode
        self._poll()
        self._wait_hook()
        return self.returncode

    def _poll(self):
        return self.returncode

    def wait(self):
        """
        Wait for the returncode attribute to become ready, and return
        it. If the returncode is not ready and the event loop is already
        running, then the async_wait() method should be used instead of
        wait(), because wait() will raise asyncio.InvalidStateError in
        this case.

        @rtype: int
        @returns: the value of self.returncode
        """
        if self.returncode is None:
            if self.scheduler.is_running():
                raise asyncio.InvalidStateError("Result is not ready for %s" % (self,))
            self.scheduler.run_until_complete(self.async_wait())
        self._wait_hook()
        return self.returncode

    def _async_wait(self):
        """
        Subclasses call this method in order to invoke exit listeners when
        self.returncode is set. Subclasses may override this method in order
        to perform cleanup. The default implementation for this method simply
        calls self.wait(), which will immediately raise an InvalidStateError
        if the event loop is running and self.returncode is None.
        """
        self.wait()

    def cancel(self):
        """
        Cancel the task, but do not wait for exit status. If asynchronous exit
        notification is desired, then use addExitListener to add a listener
        before calling this method.
        NOTE: Synchronous waiting for status is not supported, since it would
        be vulnerable to hitting the recursion limit when a large number of
        tasks need to be terminated simultaneously, like in bug #402335.
        """
        if not self.cancelled:
            self.cancelled = True
            self._cancel()

    def _cancel(self):
        """
        Subclasses should implement this, as a template method
        to be called by AsynchronousTask.cancel().
        """
        pass

    def _was_cancelled(self):
        """
        If cancelled, set returncode if necessary and return True.
        Otherwise, return False.
        """
        if self.cancelled:
            if self.returncode is None:
                self.returncode = self._cancelled_returncode
            return True
        return False

    def addStartListener(self, f):
        """
        The function will be called with one argument, a reference to self.
        """
        if self._start_listeners is None:
            self._start_listeners = []
        self._start_listeners.append(f)

        # Ensure that start listeners are always called.
        if self.returncode is not None:
            self._start_hook()

    def removeStartListener(self, f):
        if self._start_listeners is None:
            return
        self._start_listeners.remove(f)

    def _start_hook(self):
        if self._start_listeners is not None:
            start_listeners = self._start_listeners
            self._start_listeners = None

            for f in start_listeners:
                self.scheduler.call_soon(f, self)

    def addExitListener(self, f):
        """
        The function will be called with one argument, a reference to self.
        """
        if self._exit_listeners is None:
            self._exit_listeners = []
        self._exit_listeners.append(f)
        if self.returncode is not None:
            self._wait_hook()

    def removeExitListener(self, f):
        if self._exit_listeners is not None:
            try:
                self._exit_listeners.remove(f)
            except ValueError:
                pass

        if self._exit_listener_handles is not None:
            handle = self._exit_listener_handles.pop(f, None)
            if handle is not None:
                handle.cancel()

    def _wait_hook(self):
        """
        Call this method after the task completes, just before returning
        the returncode from wait() or poll(). This hook is
        used to trigger exit listeners when the returncode first
        becomes available.
        """
        # Ensure that start listeners are always called.
        if self.returncode is not None:
            self._start_hook()

        if self.returncode is not None and self._exit_listeners is not None:

            listeners = self._exit_listeners
            self._exit_listeners = None
            if self._exit_listener_handles is None:
                self._exit_listener_handles = {}

            for listener in listeners:
                if listener not in self._exit_listener_handles:
                    self._exit_listener_handles[listener] = self.scheduler.call_soon(
                        self._exit_listener_cb, listener
                    )

    def _exit_listener_cb(self, listener):
        del self._exit_listener_handles[listener]
        listener(self)