diff options
author | Zac Medico <zmedico@gentoo.org> | 2017-03-20 23:56:55 -0700 |
---|---|---|
committer | Zac Medico <zmedico@gentoo.org> | 2017-03-24 13:32:25 -0700 |
commit | 86400e9f864e86f8f677ccda9ce4103d6d02ef87 (patch) | |
tree | af1c822f829477276cf8356f83a5d66599fef17a | |
parent | EventLoop: implement call_soon for asyncio compat (bug 591760) (diff) | |
download | portage-86400e9f864e86f8f677ccda9ce4103d6d02ef87.tar.gz portage-86400e9f864e86f8f677ccda9ce4103d6d02ef87.tar.bz2 portage-86400e9f864e86f8f677ccda9ce4103d6d02ef87.zip |
PollScheduler: terminate via call_soon for asyncio compat
Use call_soon to schedule the _termination_check callback when needed.
The previous idle_add usage was relatively inefficient, because it
scheduled the _termination_check callback to be called in every
iteration of the event loop.
Add a _cleanup method to handle cleanup of callbacks registered with
the global event loop. Since the terminate method is thread safe and it
interacts with self._term_callback_handle, use this variable only while
holding a lock.
-rw-r--r-- | pym/_emerge/PollScheduler.py | 57 | ||||
-rw-r--r-- | pym/_emerge/Scheduler.py | 7 | ||||
-rw-r--r-- | pym/portage/util/_async/AsyncScheduler.py | 16 |
3 files changed, 54 insertions, 26 deletions
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py index b118ac157..569879b36 100644 --- a/pym/_emerge/PollScheduler.py +++ b/pym/_emerge/PollScheduler.py @@ -25,8 +25,10 @@ class PollScheduler(object): a non-main thread) @type main: bool """ + self._term_rlock = threading.RLock() self._terminated = threading.Event() self._terminated_tasks = False + self._term_check_handle = None self._max_jobs = 1 self._max_load = None self._scheduling = False @@ -44,6 +46,21 @@ class PollScheduler(object): def _is_background(self): return self._background + def _cleanup(self): + """ + Cleanup any callbacks that have been registered with the global + event loop. + """ + # The self._term_check_handle attribute requires locking + # since it's modified by the thread safe terminate method. + with self._term_rlock: + if self._term_check_handle not in (None, False): + self._term_check_handle.cancel() + # This prevents the terminate method from scheduling + # any more callbacks (since _cleanup must eliminate all + # callbacks in order to ensure complete cleanup). + self._term_check_handle = False + def terminate(self): """ Schedules asynchronous, graceful termination of the scheduler @@ -51,26 +68,36 @@ class PollScheduler(object): This method is thread-safe (and safe for signal handlers). """ - self._terminated.set() + with self._term_rlock: + if self._term_check_handle is None: + self._terminated.set() + self._term_check_handle = self._event_loop.call_soon_threadsafe( + self._termination_check, True) - def _termination_check(self): + def _termination_check(self, retry=False): """ Calls _terminate_tasks() if appropriate. It's guaranteed not to - call it while _schedule_tasks() is being called. The check should - be executed for each iteration of the event loop, for response to - termination signals at the earliest opportunity. It always returns - True, for continuous scheduling via idle_add. + call it while _schedule_tasks() is being called. This method must + only be called via the event loop thread. + + @param retry: If True then reschedule if scheduling state prevents + immediate termination. + @type retry: bool """ - if not self._scheduling and \ - self._terminated.is_set() and \ + if self._terminated.is_set() and \ not self._terminated_tasks: - self._scheduling = True - try: - self._terminated_tasks = True - self._terminate_tasks() - finally: - self._scheduling = False - return True + if not self._scheduling: + self._scheduling = True + try: + self._terminated_tasks = True + self._terminate_tasks() + finally: + self._scheduling = False + + elif retry: + with self._term_rlock: + self._term_check_handle = self._event_loop.call_soon( + self._termination_check, True) def _terminate_tasks(self): """ diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py index 71fe75f62..58ff97139 100644 --- a/pym/_emerge/Scheduler.py +++ b/pym/_emerge/Scheduler.py @@ -1055,6 +1055,7 @@ class Scheduler(PollScheduler): else: signal.signal(signal.SIGCONT, signal.SIG_DFL) + self._termination_check() if received_signal: sys.exit(received_signal[0]) @@ -1091,6 +1092,10 @@ class Scheduler(PollScheduler): if isinstance(x, Package) and x.operation == "merge"]) self._status_display.maxval = self._pkg_count.maxval + # Cleanup any callbacks that have been registered with the global + # event loop by calls to the terminate method. + self._cleanup() + self._logger.log(" *** Finished. Cleaning up...") if failed_pkgs: @@ -1393,7 +1398,6 @@ class Scheduler(PollScheduler): blocker_db.discardBlocker(pkg) def _main_loop(self): - term_check_id = self._event_loop.idle_add(self._termination_check) loadavg_check_id = None if self._max_load is not None and \ self._loadavg_latency is not None and \ @@ -1420,7 +1424,6 @@ class Scheduler(PollScheduler): while self._is_work_scheduled(): self._event_loop.iteration() finally: - self._event_loop.source_remove(term_check_id) if loadavg_check_id is not None: self._event_loop.source_remove(loadavg_check_id) diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py index 9b96c6f36..3deb6cb04 100644 --- a/pym/portage/util/_async/AsyncScheduler.py +++ b/pym/portage/util/_async/AsyncScheduler.py @@ -18,7 +18,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler): self._error_count = 0 self._running_tasks = set() self._remaining_tasks = True - self._term_check_id = None self._loadavg_check_id = None def _poll(self): @@ -65,7 +64,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler): self._schedule() def _start(self): - self._term_check_id = self._event_loop.idle_add(self._termination_check) if self._max_load is not None and \ self._loadavg_latency is not None and \ (self._max_jobs is True or self._max_jobs > 1): @@ -75,6 +73,12 @@ class AsyncScheduler(AsynchronousTask, PollScheduler): self._loadavg_latency, self._schedule) self._schedule() + def _cleanup(self): + super(AsyncScheduler, self)._cleanup() + if self._loadavg_check_id is not None: + self._event_loop.source_remove(self._loadavg_check_id) + self._loadavg_check_id = None + def _wait(self): # Loop while there are jobs to be scheduled. while self._keep_scheduling(): @@ -86,13 +90,7 @@ class AsyncScheduler(AsynchronousTask, PollScheduler): while self._is_work_scheduled(): self._event_loop.iteration() - if self._term_check_id is not None: - self._event_loop.source_remove(self._term_check_id) - self._term_check_id = None - - if self._loadavg_check_id is not None: - self._event_loop.source_remove(self._loadavg_check_id) - self._loadavg_check_id = None + self._cleanup() if self._error_count > 0: self.returncode = 1 |