aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2012-10-05 13:48:53 -0700
committerZac Medico <zmedico@gentoo.org>2012-10-05 13:48:53 -0700
commite4b64dd7dc7c2217055f110990b2496b71976681 (patch)
tree9cfccd57286d0013556dc470cad9e3a54bde38ad
parenttest_ipc_daemon: implement internal SleepProcess (diff)
downloadportage-e4b64dd7dc7c2217055f110990b2496b71976681.tar.gz
portage-e4b64dd7dc7c2217055f110990b2496b71976681.tar.bz2
portage-e4b64dd7dc7c2217055f110990b2496b71976681.zip
TaskScheduler: inherit AsyncScheduler
This allows the QueueScheduler class to be eliminated.
-rw-r--r--pym/_emerge/FifoIpcDaemon.py2
-rw-r--r--pym/_emerge/QueueScheduler.py105
-rw-r--r--pym/_emerge/TaskScheduler.py26
-rw-r--r--pym/portage/tests/ebuild/test_ipc_daemon.py58
-rw-r--r--pym/portage/tests/process/test_poll.py17
-rw-r--r--pym/portage/util/_async/TaskScheduler.py22
6 files changed, 68 insertions, 162 deletions
diff --git a/pym/_emerge/FifoIpcDaemon.py b/pym/_emerge/FifoIpcDaemon.py
index fcc4ab4b9..de9dc67b1 100644
--- a/pym/_emerge/FifoIpcDaemon.py
+++ b/pym/_emerge/FifoIpcDaemon.py
@@ -47,6 +47,8 @@ class FifoIpcDaemon(AbstractPollTask):
if self.returncode is None:
self.returncode = 1
self._unregister()
+ # notify exit listeners
+ self.wait()
def _wait(self):
if self.returncode is not None:
diff --git a/pym/_emerge/QueueScheduler.py b/pym/_emerge/QueueScheduler.py
deleted file mode 100644
index 206087c7a..000000000
--- a/pym/_emerge/QueueScheduler.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright 1999-2012 Gentoo Foundation
-# Distributed under the terms of the GNU General Public License v2
-
-from _emerge.PollScheduler import PollScheduler
-
-class QueueScheduler(PollScheduler):
-
- """
- Add instances of SequentialTaskQueue and then call run(). The
- run() method returns when no tasks remain.
- """
-
- def __init__(self, main=True, max_jobs=None, max_load=None):
- PollScheduler.__init__(self, main=main)
-
- if max_jobs is None:
- max_jobs = 1
-
- self._max_jobs = max_jobs
- self._max_load = max_load
-
- self._queues = []
- self._schedule_listeners = []
-
- def add(self, q):
- self._queues.append(q)
-
- def remove(self, q):
- self._queues.remove(q)
-
- def clear(self):
- for q in self._queues:
- q.clear()
-
- def run(self, timeout=None):
-
- timeout_callback = None
- if timeout is not None:
- def timeout_callback():
- timeout_callback.timed_out = True
- return False
- timeout_callback.timed_out = False
- timeout_callback.timeout_id = self.sched_iface.timeout_add(
- timeout, timeout_callback)
-
- term_check_id = self.sched_iface.idle_add(self._termination_check)
- try:
- while not (timeout_callback is not None and
- timeout_callback.timed_out):
- # We don't have any callbacks to trigger _schedule(),
- # so we have to call it explicitly here.
- self._schedule()
- if self._keep_scheduling():
- self.sched_iface.iteration()
- else:
- break
-
- while self._is_work_scheduled() and \
- not (timeout_callback is not None and
- timeout_callback.timed_out):
- self.sched_iface.iteration()
- finally:
- self.sched_iface.source_remove(term_check_id)
- if timeout_callback is not None:
- self.sched_iface.unregister(timeout_callback.timeout_id)
-
- def _schedule_tasks(self):
- """
- @rtype: bool
- @return: True if there may be remaining tasks to schedule,
- False otherwise.
- """
- if self._terminated_tasks:
- return
-
- while self._can_add_job():
- n = self._max_jobs - self._running_job_count()
- if n < 1:
- break
-
- if not self._start_next_job(n):
- return
-
- def _keep_scheduling(self):
- return not self._terminated_tasks and any(self._queues)
-
- def _running_job_count(self):
- job_count = 0
- for q in self._queues:
- job_count += len(q.running_tasks)
- self._jobs = job_count
- return job_count
-
- def _start_next_job(self, n=1):
- started_count = 0
- for q in self._queues:
- initial_job_count = len(q.running_tasks)
- q.schedule()
- final_job_count = len(q.running_tasks)
- if final_job_count > initial_job_count:
- started_count += (final_job_count - initial_job_count)
- if started_count >= n:
- break
- return started_count
-
diff --git a/pym/_emerge/TaskScheduler.py b/pym/_emerge/TaskScheduler.py
deleted file mode 100644
index 583bfe323..000000000
--- a/pym/_emerge/TaskScheduler.py
+++ /dev/null
@@ -1,26 +0,0 @@
-# Copyright 1999-2012 Gentoo Foundation
-# Distributed under the terms of the GNU General Public License v2
-
-from _emerge.QueueScheduler import QueueScheduler
-from _emerge.SequentialTaskQueue import SequentialTaskQueue
-
-class TaskScheduler(object):
-
- """
- A simple way to handle scheduling of AsynchrousTask instances. Simply
- add tasks and call run(). The run() method returns when no tasks remain.
- """
-
- def __init__(self, main=True, max_jobs=None, max_load=None):
- self._queue = SequentialTaskQueue(max_jobs=max_jobs)
- self._scheduler = QueueScheduler(main=main,
- max_jobs=max_jobs, max_load=max_load)
- self.sched_iface = self._scheduler.sched_iface
- self.run = self._scheduler.run
- self.clear = self._scheduler.clear
- self.wait = self._queue.wait
- self._scheduler.add(self._queue)
-
- def add(self, task):
- self._queue.add(task)
-
diff --git a/pym/portage/tests/ebuild/test_ipc_daemon.py b/pym/portage/tests/ebuild/test_ipc_daemon.py
index 77277fe8e..d4328a1d6 100644
--- a/pym/portage/tests/ebuild/test_ipc_daemon.py
+++ b/pym/portage/tests/ebuild/test_ipc_daemon.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2011 Gentoo Foundation
+# Copyright 2010-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import tempfile
@@ -14,10 +14,12 @@ from portage.locks import hardlock_cleanup
from portage.package.ebuild._ipc.ExitCommand import ExitCommand
from portage.util import ensure_dirs
from portage.util._async.ForkProcess import ForkProcess
+from portage.util._async.TaskScheduler import TaskScheduler
+from portage.util._eventloop.global_event_loop import global_event_loop
+from _emerge.PollScheduler import PollScheduler
from _emerge.SpawnProcess import SpawnProcess
from _emerge.EbuildBuildDir import EbuildBuildDir
from _emerge.EbuildIpcDaemon import EbuildIpcDaemon
-from _emerge.TaskScheduler import TaskScheduler
class SleepProcess(ForkProcess):
"""
@@ -33,6 +35,7 @@ class IpcDaemonTestCase(TestCase):
_SCHEDULE_TIMEOUT = 40000 # 40 seconds
def testIpcDaemon(self):
+ event_loop = global_event_loop()
tmpdir = tempfile.mkdtemp()
build_dir = None
try:
@@ -54,9 +57,8 @@ class IpcDaemonTestCase(TestCase):
env["__PORTAGE_TEST_HARDLINK_LOCKS"] = \
os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"]
- task_scheduler = TaskScheduler(max_jobs=2)
build_dir = EbuildBuildDir(
- scheduler=task_scheduler.sched_iface,
+ scheduler=PollScheduler(event_loop=event_loop).sched_iface,
settings=env)
build_dir.lock()
ensure_dirs(env['PORTAGE_BUILDDIR'])
@@ -71,26 +73,23 @@ class IpcDaemonTestCase(TestCase):
commands = {'exit' : exit_command}
daemon = EbuildIpcDaemon(commands=commands,
input_fifo=input_fifo,
- output_fifo=output_fifo,
- scheduler=task_scheduler.sched_iface)
+ output_fifo=output_fifo)
proc = SpawnProcess(
args=[BASH_BINARY, "-c",
'"$PORTAGE_BIN_PATH"/ebuild-ipc exit %d' % exitcode],
- env=env, scheduler=task_scheduler.sched_iface)
+ env=env)
+ task_scheduler = TaskScheduler(iter([daemon, proc]),
+ max_jobs=2, event_loop=event_loop)
self.received_command = False
def exit_command_callback():
self.received_command = True
- task_scheduler.clear()
- task_scheduler.wait()
+ task_scheduler.cancel()
exit_command.reply_hook = exit_command_callback
start_time = time.time()
- task_scheduler.add(daemon)
- task_scheduler.add(proc)
- task_scheduler.run(timeout=self._SCHEDULE_TIMEOUT)
- task_scheduler.clear()
- task_scheduler.wait()
+ self._run(event_loop, task_scheduler, self._SCHEDULE_TIMEOUT)
+
hardlock_cleanup(env['PORTAGE_BUILDDIR'],
remove_all_locks=True)
@@ -101,7 +100,7 @@ class IpcDaemonTestCase(TestCase):
self.assertEqual(daemon.isAlive(), False)
self.assertEqual(exit_command.exitcode, exitcode)
- # Intentionally short timeout test for QueueScheduler.run()
+ # Intentionally short timeout test for EventLoop/AsyncScheduler.
# Use a ridiculously long sleep_time_s in case the user's
# system is heavily loaded (see bug #436334).
sleep_time_s = 600 #600.000 seconds
@@ -116,20 +115,18 @@ class IpcDaemonTestCase(TestCase):
scheduler=task_scheduler.sched_iface)
proc = SleepProcess(seconds=sleep_time_s,
scheduler=task_scheduler.sched_iface)
+ task_scheduler = TaskScheduler(iter([daemon, proc]),
+ max_jobs=2, event_loop=event_loop)
self.received_command = False
def exit_command_callback():
self.received_command = True
- task_scheduler.clear()
- task_scheduler.wait()
+ task_scheduler.cancel()
exit_command.reply_hook = exit_command_callback
start_time = time.time()
- task_scheduler.add(daemon)
- task_scheduler.add(proc)
- task_scheduler.run(timeout=short_timeout_ms)
- task_scheduler.clear()
- task_scheduler.wait()
+ self._run(event_loop, task_scheduler, short_timeout_ms)
+
hardlock_cleanup(env['PORTAGE_BUILDDIR'],
remove_all_locks=True)
@@ -144,3 +141,20 @@ class IpcDaemonTestCase(TestCase):
if build_dir is not None:
build_dir.unlock()
shutil.rmtree(tmpdir)
+
+ def _timeout_callback(self):
+ self._timed_out = True
+
+ def _run(self, event_loop, task_scheduler, timeout):
+ self._timed_out = False
+ timeout_id = event_loop.timeout_add(timeout, self._timeout_callback)
+
+ try:
+ task_scheduler.start()
+ while not self._timed_out and task_scheduler.poll() is None:
+ event_loop.iteration()
+ if self._timed_out:
+ task_scheduler.cancel()
+ task_scheduler.wait()
+ finally:
+ event_loop.source_remove(timeout_id)
diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py
index d6667b4e0..3772d797f 100644
--- a/pym/portage/tests/process/test_poll.py
+++ b/pym/portage/tests/process/test_poll.py
@@ -4,7 +4,7 @@
from portage import os
from portage.tests import TestCase
from portage.util._pty import _create_pty_or_pipe
-from _emerge.TaskScheduler import TaskScheduler
+from portage.util._async.TaskScheduler import TaskScheduler
from _emerge.PipeReader import PipeReader
from _emerge.SpawnProcess import SpawnProcess
@@ -37,25 +37,24 @@ class PipeReaderTestCase(TestCase):
# in order to avoid issue 5380 with python3.
master_file = os.fdopen(master_fd, 'rb', 0)
slave_file = os.fdopen(slave_fd, 'wb', 0)
- task_scheduler = TaskScheduler(max_jobs=2)
producer = SpawnProcess(
args=["bash", "-c", self._echo_cmd % test_string],
- env=os.environ, fd_pipes={1:slave_fd},
- scheduler=task_scheduler.sched_iface)
- task_scheduler.add(producer)
- slave_file.close()
+ env=os.environ, fd_pipes={1:slave_fd})
consumer = PipeReader(
input_files={"producer" : master_file},
- scheduler=task_scheduler.sched_iface, _use_array=self._use_array)
+ _use_array=self._use_array)
- task_scheduler.add(consumer)
+ task_scheduler = TaskScheduler(iter([producer, consumer]), max_jobs=2)
# This will ensure that both tasks have exited, which
# is necessary to avoid "ResourceWarning: unclosed file"
# warnings since Python 3.2 (and also ensures that we
# don't leave any zombie child processes).
- task_scheduler.run()
+ task_scheduler.start()
+ slave_file.close()
+ task_scheduler.wait()
+
self.assertEqual(producer.returncode, os.EX_OK)
self.assertEqual(consumer.returncode, os.EX_OK)
diff --git a/pym/portage/util/_async/TaskScheduler.py b/pym/portage/util/_async/TaskScheduler.py
new file mode 100644
index 000000000..b0ec7af09
--- /dev/null
+++ b/pym/portage/util/_async/TaskScheduler.py
@@ -0,0 +1,22 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from .AsyncScheduler import AsyncScheduler
+
+class TaskScheduler(AsyncScheduler):
+
+ __slots__ = ('_task_iter',)
+
+ """
+ A simple way to handle scheduling of AbstractPollTask instances. Simply
+ pass a task iterator into the constructor and call start(). Use the
+ poll, wait, or addExitListener methods to be notified when all of the
+ tasks have completed.
+ """
+
+ def __init__(self, task_iter, **kwargs):
+ AsyncScheduler.__init__(self, **kwargs)
+ self._task_iter = task_iter
+
+ def _next_task(self):
+ return next(self._task_iter)