From e9d1125f6730c85c4b384a580da55da68338acf1 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Tue, 7 Feb 2012 11:11:50 -0800 Subject: PollScheduler: add timeout_add like glib's This will be useful as a substitute for recursion, in order to avoid hitting the recursion limit for bug #402335. --- pym/_emerge/PollScheduler.py | 89 ++++++++++++++++++++++++++++++++++++++++++-- pym/_emerge/Scheduler.py | 8 ++-- 2 files changed, 89 insertions(+), 8 deletions(-) diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py index fd9dfc0af..fd573592a 100644 --- a/pym/_emerge/PollScheduler.py +++ b/pym/_emerge/PollScheduler.py @@ -24,7 +24,12 @@ from _emerge.PollSelectAdapter import PollSelectAdapter class PollScheduler(object): class _sched_iface_class(SlotObject): - __slots__ = ("output", "register", "schedule", "unregister") + __slots__ = ("output", "register", "schedule", + "source_remove", "timeout_add", "unregister") + + class _timeout_handler_class(SlotObject): + __slots__ = ("args", "function", "interval", "source_id", + "timestamp") def __init__(self): self._terminated = threading.Event() @@ -37,13 +42,17 @@ class PollScheduler(object): self._poll_event_handler_ids = {} # Increment id for each new handler. self._event_handler_id = 0 + self._timeout_handlers = {} self._poll_obj = create_poll_instance() + self._polling = False self._scheduling = False self._background = False self.sched_iface = self._sched_iface_class( output=self._task_output, register=self._register, schedule=self._schedule_wait, + source_remove=self._unregister, + timeout_add=self._timeout_add, unregister=self._unregister) def terminate(self): @@ -82,7 +91,7 @@ class PollScheduler(object): should return False immediately (since there's no need to schedule anything after _terminate_tasks() has been called). """ - raise NotImplementedError() + pass def _schedule(self): """ @@ -140,6 +149,23 @@ class PollScheduler(object): StopIteration if timeout is None and there are no file descriptors to poll. """ + if self._polling: + return + self._polling = True + try: + self._do_poll(timeout=timeout) + finally: + self._polling = False + + def _do_poll(self, timeout=None): + """ + All poll() calls pass through here. The poll events + are added directly to self._poll_event_queue. + In order to avoid endless blocking, this raises + StopIteration if timeout is None and there are + no file descriptors to poll. + """ + self._run_timeouts() if not self._poll_event_handlers: self._schedule() if timeout is None and \ @@ -226,6 +252,51 @@ class PollScheduler(object): return bool(events_handled) + def _timeout_add(self, interval, function, *args): + """ + Like glib.timeout_add(), interval argument is the number of + milliseconds between calls to your function, and your function + should return False to stop being called, or True to continue + being called. Any additional positional arguments given here + are passed to your function when it's called. + + NOTE: Timeouts registered by this function currently do not + keep the main loop running when there are no remaining callbacks + registered for IO events. This is not an issue if the purpose of + the timeout is to place an upper limit on the time allowed for + a particular IO event to occur, since the handler associated with + the IO event will serve to keep the main loop running. + """ + self._event_handler_id += 1 + source_id = self._event_handler_id + self._timeout_handlers[source_id] = \ + self._timeout_handler_class( + interval=interval, function=function, args=args, + source_id=source_id, timestamp=time.time()) + return source_id + + def _run_timeouts(self): + ready_timeouts = [] + current_time = time.time() + for x in self._timeout_handlers.values(): + elapsed_seconds = current_time - x.timestamp + # elapsed_seconds < 0 means the system clock has been adjusted + if elapsed_seconds < 0 or \ + (x.interval - 1000 * elapsed_seconds) <= 0: + ready_timeouts.append(x) + + # Iterate of our local list, since self._timeout_handlers can be + # modified during the exection of these callbacks. + for x in ready_timeouts: + if x.source_id not in self._timeout_handlers: + # it got cancelled while executing another timeout + continue + x.timestamp = time.time() + if not x.function(*x.args): + self._unregister(x.source_id) + + return bool(ready_timeouts) + def _register(self, f, eventmask, handler): """ @rtype: Integer @@ -242,7 +313,17 @@ class PollScheduler(object): return reg_id def _unregister(self, reg_id): - f = self._poll_event_handler_ids[reg_id] + """ + Like glib.source_remove(), this returns True if the given reg_id + is found and removed, and False if the reg_id is invalid or has + already been removed. + """ + timeout_handler = self._timeout_handlers.pop(reg_id, None) + if timeout_handler is not None: + return True + f = self._poll_event_handler_ids.pop(reg_id, None) + if f is None: + return False self._poll_obj.unregister(f) if self._poll_event_queue: # Discard any unhandled events that belong to this file, @@ -262,7 +343,7 @@ class PollScheduler(object): self._poll_event_queue[:] = remaining_events del self._poll_event_handlers[f] - del self._poll_event_handler_ids[reg_id] + return True def _schedule_wait(self, wait_ids=None, timeout=None, condition=None): """ diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py index d09b474e1..5b56650f3 100644 --- a/pym/_emerge/Scheduler.py +++ b/pym/_emerge/Scheduler.py @@ -79,11 +79,9 @@ class Scheduler(PollScheduler): _opts_no_self_update = frozenset(["--buildpkgonly", "--fetchonly", "--fetch-all-uri", "--pretend"]) - class _iface_class(SlotObject): + class _iface_class(PollScheduler._sched_iface_class): __slots__ = ("fetch", - "output", "register", "schedule", - "scheduleSetup", "scheduleUnpack", "scheduleYield", - "unregister") + "scheduleSetup", "scheduleUnpack", "scheduleYield") class _fetch_iface_class(SlotObject): __slots__ = ("log_file", "schedule") @@ -223,6 +221,8 @@ class Scheduler(PollScheduler): scheduleSetup=self._schedule_setup, scheduleUnpack=self._schedule_unpack, scheduleYield=self._schedule_yield, + source_remove=self._unregister, + timeout_add=self._timeout_add, unregister=self._unregister) self._prefetchers = weakref.WeakValueDictionary() -- cgit v1.2.3-65-gdbad