aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2018-04-28 20:39:46 -0700
committerZac Medico <zmedico@gentoo.org>2018-04-28 20:39:46 -0700
commit88ce275deabd4b260be916572639de800591a288 (patch)
tree8c5213ad0568935d7413cba6a71f64362fcba491
parentAbstractPollTask._wait_loop: asyncio compat (bug 653856) (diff)
downloadportage-88ce275d.tar.gz
portage-88ce275d.tar.bz2
portage-88ce275d.zip
Scheduler._main_loop: asyncio compat (bug 653856)
Use create_future, call_later, and run_until_complete for asyncio compatibility. Bug: https://bugs.gentoo.org/653856
-rw-r--r--pym/_emerge/Scheduler.py44
1 files changed, 21 insertions, 23 deletions
diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index 6778708bb..4c1ea5078 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -71,8 +71,8 @@ FAILURE = 1
class Scheduler(PollScheduler):
- # max time between loadavg checks (milliseconds)
- _loadavg_latency = 30000
+ # max time between loadavg checks (seconds)
+ _loadavg_latency = 30
# max time between display status updates (milliseconds)
_max_display_latency = 3000
@@ -239,6 +239,8 @@ class Scheduler(PollScheduler):
self._jobs = 0
self._running_tasks = {}
self._completed_tasks = set()
+ self._main_exit = None
+ self._main_loadavg_handle = None
self._failed_pkgs = []
self._failed_pkgs_all = []
@@ -1373,34 +1375,18 @@ class Scheduler(PollScheduler):
blocker_db.discardBlocker(pkg)
def _main_loop(self):
- loadavg_check_id = None
+ self._main_exit = self._event_loop.create_future()
+
if self._max_load is not None and \
self._loadavg_latency is not None and \
(self._max_jobs is True or self._max_jobs > 1):
# We have to schedule periodically, in case the load
# average has changed since the last call.
- loadavg_check_id = self._event_loop.timeout_add(
+ self._main_loadavg_handle = self._event_loop.call_later(
self._loadavg_latency, self._schedule)
- try:
- # Populate initial event sources. Unless we're scheduling
- # based on load average, we only need to do this once
- # here, since it can be called during the loop from within
- # event handlers.
- self._schedule()
-
- # Loop while there are jobs to be scheduled.
- while self._keep_scheduling():
- self._event_loop.iteration()
-
- # Clean shutdown of previously scheduled jobs. In the
- # case of termination, this allows for basic cleanup
- # such as flushing of buffered output to logs.
- while self._is_work_scheduled():
- self._event_loop.iteration()
- finally:
- if loadavg_check_id is not None:
- self._event_loop.source_remove(loadavg_check_id)
+ self._schedule()
+ self._event_loop.run_until_complete(self._main_exit)
def _merge(self):
@@ -1441,6 +1427,10 @@ class Scheduler(PollScheduler):
self._digraph = None
self._task_queues.fetch.clear()
self._prefetchers.clear()
+ self._main_exit = None
+ if self._main_loadavg_handle is not None:
+ self._main_loadavg_handle.cancel()
+ self._main_loadavg_handle = None
def _choose_pkg(self):
"""
@@ -1606,6 +1596,14 @@ class Scheduler(PollScheduler):
not self._task_queues.merge)):
break
+ if not (self._is_work_scheduled() or
+ self._keep_scheduling() or self._main_exit.done()):
+ self._main_exit.set_result(None)
+ elif self._main_loadavg_handle is not None:
+ self._main_loadavg_handle.cancel()
+ self._main_loadavg_handle = self._event_loop.call_later(
+ self._loadavg_latency, self._schedule)
+
def _sigcont_handler(self, signum, frame):
self._sigcont_time = time.time()