From 88ce275deabd4b260be916572639de800591a288 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Sat, 28 Apr 2018 20:39:46 -0700 Subject: Scheduler._main_loop: asyncio compat (bug 653856) Use create_future, call_later, and run_until_complete for asyncio compatibility. Bug: https://bugs.gentoo.org/653856 --- pym/_emerge/Scheduler.py | 44 +++++++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py index 6778708bb..4c1ea5078 100644 --- a/pym/_emerge/Scheduler.py +++ b/pym/_emerge/Scheduler.py @@ -71,8 +71,8 @@ FAILURE = 1 class Scheduler(PollScheduler): - # max time between loadavg checks (milliseconds) - _loadavg_latency = 30000 + # max time between loadavg checks (seconds) + _loadavg_latency = 30 # max time between display status updates (milliseconds) _max_display_latency = 3000 @@ -239,6 +239,8 @@ class Scheduler(PollScheduler): self._jobs = 0 self._running_tasks = {} self._completed_tasks = set() + self._main_exit = None + self._main_loadavg_handle = None self._failed_pkgs = [] self._failed_pkgs_all = [] @@ -1373,34 +1375,18 @@ class Scheduler(PollScheduler): blocker_db.discardBlocker(pkg) def _main_loop(self): - loadavg_check_id = None + self._main_exit = self._event_loop.create_future() + if self._max_load is not None and \ self._loadavg_latency is not None and \ (self._max_jobs is True or self._max_jobs > 1): # We have to schedule periodically, in case the load # average has changed since the last call. - loadavg_check_id = self._event_loop.timeout_add( + self._main_loadavg_handle = self._event_loop.call_later( self._loadavg_latency, self._schedule) - try: - # Populate initial event sources. Unless we're scheduling - # based on load average, we only need to do this once - # here, since it can be called during the loop from within - # event handlers. - self._schedule() - - # Loop while there are jobs to be scheduled. - while self._keep_scheduling(): - self._event_loop.iteration() - - # Clean shutdown of previously scheduled jobs. In the - # case of termination, this allows for basic cleanup - # such as flushing of buffered output to logs. - while self._is_work_scheduled(): - self._event_loop.iteration() - finally: - if loadavg_check_id is not None: - self._event_loop.source_remove(loadavg_check_id) + self._schedule() + self._event_loop.run_until_complete(self._main_exit) def _merge(self): @@ -1441,6 +1427,10 @@ class Scheduler(PollScheduler): self._digraph = None self._task_queues.fetch.clear() self._prefetchers.clear() + self._main_exit = None + if self._main_loadavg_handle is not None: + self._main_loadavg_handle.cancel() + self._main_loadavg_handle = None def _choose_pkg(self): """ @@ -1606,6 +1596,14 @@ class Scheduler(PollScheduler): not self._task_queues.merge)): break + if not (self._is_work_scheduled() or + self._keep_scheduling() or self._main_exit.done()): + self._main_exit.set_result(None) + elif self._main_loadavg_handle is not None: + self._main_loadavg_handle.cancel() + self._main_loadavg_handle = self._event_loop.call_later( + self._loadavg_latency, self._schedule) + def _sigcont_handler(self, signum, frame): self._sigcont_time = time.time() -- cgit v1.2.3-65-gdbad