aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2017-03-20 23:56:55 -0700
committerZac Medico <zmedico@gentoo.org>2017-03-24 13:32:25 -0700
commit86400e9f864e86f8f677ccda9ce4103d6d02ef87 (patch)
treeaf1c822f829477276cf8356f83a5d66599fef17a
parentEventLoop: implement call_soon for asyncio compat (bug 591760) (diff)
downloadportage-86400e9f864e86f8f677ccda9ce4103d6d02ef87.tar.gz
portage-86400e9f864e86f8f677ccda9ce4103d6d02ef87.tar.bz2
portage-86400e9f864e86f8f677ccda9ce4103d6d02ef87.zip
PollScheduler: terminate via call_soon for asyncio compat
Use call_soon to schedule the _termination_check callback when needed. The previous idle_add usage was relatively inefficient, because it scheduled the _termination_check callback to be called in every iteration of the event loop. Add a _cleanup method to handle cleanup of callbacks registered with the global event loop. Since the terminate method is thread safe and it interacts with self._term_callback_handle, use this variable only while holding a lock.
-rw-r--r--pym/_emerge/PollScheduler.py57
-rw-r--r--pym/_emerge/Scheduler.py7
-rw-r--r--pym/portage/util/_async/AsyncScheduler.py16
3 files changed, 54 insertions, 26 deletions
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index b118ac157..569879b36 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -25,8 +25,10 @@ class PollScheduler(object):
a non-main thread)
@type main: bool
"""
+ self._term_rlock = threading.RLock()
self._terminated = threading.Event()
self._terminated_tasks = False
+ self._term_check_handle = None
self._max_jobs = 1
self._max_load = None
self._scheduling = False
@@ -44,6 +46,21 @@ class PollScheduler(object):
def _is_background(self):
return self._background
+ def _cleanup(self):
+ """
+ Cleanup any callbacks that have been registered with the global
+ event loop.
+ """
+ # The self._term_check_handle attribute requires locking
+ # since it's modified by the thread safe terminate method.
+ with self._term_rlock:
+ if self._term_check_handle not in (None, False):
+ self._term_check_handle.cancel()
+ # This prevents the terminate method from scheduling
+ # any more callbacks (since _cleanup must eliminate all
+ # callbacks in order to ensure complete cleanup).
+ self._term_check_handle = False
+
def terminate(self):
"""
Schedules asynchronous, graceful termination of the scheduler
@@ -51,26 +68,36 @@ class PollScheduler(object):
This method is thread-safe (and safe for signal handlers).
"""
- self._terminated.set()
+ with self._term_rlock:
+ if self._term_check_handle is None:
+ self._terminated.set()
+ self._term_check_handle = self._event_loop.call_soon_threadsafe(
+ self._termination_check, True)
- def _termination_check(self):
+ def _termination_check(self, retry=False):
"""
Calls _terminate_tasks() if appropriate. It's guaranteed not to
- call it while _schedule_tasks() is being called. The check should
- be executed for each iteration of the event loop, for response to
- termination signals at the earliest opportunity. It always returns
- True, for continuous scheduling via idle_add.
+ call it while _schedule_tasks() is being called. This method must
+ only be called via the event loop thread.
+
+ @param retry: If True then reschedule if scheduling state prevents
+ immediate termination.
+ @type retry: bool
"""
- if not self._scheduling and \
- self._terminated.is_set() and \
+ if self._terminated.is_set() and \
not self._terminated_tasks:
- self._scheduling = True
- try:
- self._terminated_tasks = True
- self._terminate_tasks()
- finally:
- self._scheduling = False
- return True
+ if not self._scheduling:
+ self._scheduling = True
+ try:
+ self._terminated_tasks = True
+ self._terminate_tasks()
+ finally:
+ self._scheduling = False
+
+ elif retry:
+ with self._term_rlock:
+ self._term_check_handle = self._event_loop.call_soon(
+ self._termination_check, True)
def _terminate_tasks(self):
"""
diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index 71fe75f62..58ff97139 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -1055,6 +1055,7 @@ class Scheduler(PollScheduler):
else:
signal.signal(signal.SIGCONT, signal.SIG_DFL)
+ self._termination_check()
if received_signal:
sys.exit(received_signal[0])
@@ -1091,6 +1092,10 @@ class Scheduler(PollScheduler):
if isinstance(x, Package) and x.operation == "merge"])
self._status_display.maxval = self._pkg_count.maxval
+ # Cleanup any callbacks that have been registered with the global
+ # event loop by calls to the terminate method.
+ self._cleanup()
+
self._logger.log(" *** Finished. Cleaning up...")
if failed_pkgs:
@@ -1393,7 +1398,6 @@ class Scheduler(PollScheduler):
blocker_db.discardBlocker(pkg)
def _main_loop(self):
- term_check_id = self._event_loop.idle_add(self._termination_check)
loadavg_check_id = None
if self._max_load is not None and \
self._loadavg_latency is not None and \
@@ -1420,7 +1424,6 @@ class Scheduler(PollScheduler):
while self._is_work_scheduled():
self._event_loop.iteration()
finally:
- self._event_loop.source_remove(term_check_id)
if loadavg_check_id is not None:
self._event_loop.source_remove(loadavg_check_id)
diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py
index 9b96c6f36..3deb6cb04 100644
--- a/pym/portage/util/_async/AsyncScheduler.py
+++ b/pym/portage/util/_async/AsyncScheduler.py
@@ -18,7 +18,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
self._error_count = 0
self._running_tasks = set()
self._remaining_tasks = True
- self._term_check_id = None
self._loadavg_check_id = None
def _poll(self):
@@ -65,7 +64,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
self._schedule()
def _start(self):
- self._term_check_id = self._event_loop.idle_add(self._termination_check)
if self._max_load is not None and \
self._loadavg_latency is not None and \
(self._max_jobs is True or self._max_jobs > 1):
@@ -75,6 +73,12 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
self._loadavg_latency, self._schedule)
self._schedule()
+ def _cleanup(self):
+ super(AsyncScheduler, self)._cleanup()
+ if self._loadavg_check_id is not None:
+ self._event_loop.source_remove(self._loadavg_check_id)
+ self._loadavg_check_id = None
+
def _wait(self):
# Loop while there are jobs to be scheduled.
while self._keep_scheduling():
@@ -86,13 +90,7 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
while self._is_work_scheduled():
self._event_loop.iteration()
- if self._term_check_id is not None:
- self._event_loop.source_remove(self._term_check_id)
- self._term_check_id = None
-
- if self._loadavg_check_id is not None:
- self._event_loop.source_remove(self._loadavg_check_id)
- self._loadavg_check_id = None
+ self._cleanup()
if self._error_count > 0:
self.returncode = 1