aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'portage_with_autodep/pym/_emerge/AsynchronousLock.py')
-rw-r--r--portage_with_autodep/pym/_emerge/AsynchronousLock.py288
1 files changed, 288 insertions, 0 deletions
diff --git a/portage_with_autodep/pym/_emerge/AsynchronousLock.py b/portage_with_autodep/pym/_emerge/AsynchronousLock.py
new file mode 100644
index 0000000..637ba73
--- /dev/null
+++ b/portage_with_autodep/pym/_emerge/AsynchronousLock.py
@@ -0,0 +1,288 @@
+# Copyright 2010-2011 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import dummy_threading
+import fcntl
+import logging
+import sys
+
+try:
+ import threading
+except ImportError:
+ import dummy_threading as threading
+
+import portage
+from portage import os
+from portage.exception import TryAgain
+from portage.localization import _
+from portage.locks import lockfile, unlockfile
+from portage.util import writemsg_level
+from _emerge.AbstractPollTask import AbstractPollTask
+from _emerge.AsynchronousTask import AsynchronousTask
+from _emerge.PollConstants import PollConstants
+from _emerge.SpawnProcess import SpawnProcess
+
+class AsynchronousLock(AsynchronousTask):
+ """
+ This uses the portage.locks module to acquire a lock asynchronously,
+ using either a thread (if available) or a subprocess.
+
+ The default behavior is to use a process instead of a thread, since
+ there is currently no way to interrupt a thread that is waiting for
+ a lock (notably, SIGINT doesn't work because python delivers all
+ signals to the main thread).
+ """
+
+ __slots__ = ('path', 'scheduler',) + \
+ ('_imp', '_force_async', '_force_dummy', '_force_process', \
+ '_force_thread', '_waiting')
+
+ _use_process_by_default = True
+
+ def _start(self):
+
+ if not self._force_async:
+ try:
+ self._imp = lockfile(self.path,
+ wantnewlockfile=True, flags=os.O_NONBLOCK)
+ except TryAgain:
+ pass
+ else:
+ self.returncode = os.EX_OK
+ self.wait()
+ return
+
+ if self._force_process or \
+ (not self._force_thread and \
+ (self._use_process_by_default or threading is dummy_threading)):
+ self._imp = _LockProcess(path=self.path, scheduler=self.scheduler)
+ else:
+ self._imp = _LockThread(path=self.path,
+ scheduler=self.scheduler,
+ _force_dummy=self._force_dummy)
+
+ self._imp.addExitListener(self._imp_exit)
+ self._imp.start()
+
+ def _imp_exit(self, imp):
+ # call exit listeners
+ if not self._waiting:
+ self.wait()
+
+ def _cancel(self):
+ if isinstance(self._imp, AsynchronousTask):
+ self._imp.cancel()
+
+ def _poll(self):
+ if isinstance(self._imp, AsynchronousTask):
+ self._imp.poll()
+ return self.returncode
+
+ def _wait(self):
+ if self.returncode is not None:
+ return self.returncode
+ self._waiting = True
+ self.returncode = self._imp.wait()
+ self._waiting = False
+ return self.returncode
+
+ def unlock(self):
+ if self._imp is None:
+ raise AssertionError('not locked')
+ if isinstance(self._imp, (_LockProcess, _LockThread)):
+ self._imp.unlock()
+ else:
+ unlockfile(self._imp)
+ self._imp = None
+
+class _LockThread(AbstractPollTask):
+ """
+ This uses the portage.locks module to acquire a lock asynchronously,
+ using a background thread. After the lock is acquired, the thread
+ writes to a pipe in order to notify a poll loop running in the main
+ thread.
+
+ If the threading module is unavailable then the dummy_threading
+ module will be used, and the lock will be acquired synchronously
+ (before the start() method returns).
+ """
+
+ __slots__ = ('path',) + \
+ ('_files', '_force_dummy', '_lock_obj',
+ '_thread', '_reg_id',)
+
+ def _start(self):
+ pr, pw = os.pipe()
+ self._files = {}
+ self._files['pipe_read'] = os.fdopen(pr, 'rb', 0)
+ self._files['pipe_write'] = os.fdopen(pw, 'wb', 0)
+ for k, f in self._files.items():
+ fcntl.fcntl(f.fileno(), fcntl.F_SETFL,
+ fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
+ self._reg_id = self.scheduler.register(self._files['pipe_read'].fileno(),
+ PollConstants.POLLIN, self._output_handler)
+ self._registered = True
+ threading_mod = threading
+ if self._force_dummy:
+ threading_mod = dummy_threading
+ self._thread = threading_mod.Thread(target=self._run_lock)
+ self._thread.start()
+
+ def _run_lock(self):
+ self._lock_obj = lockfile(self.path, wantnewlockfile=True)
+ self._files['pipe_write'].write(b'\0')
+
+ def _output_handler(self, f, event):
+ buf = self._read_buf(self._files['pipe_read'], event)
+ if buf:
+ self._unregister()
+ self.returncode = os.EX_OK
+ self.wait()
+
+ def _cancel(self):
+ # There's currently no way to force thread termination.
+ pass
+
+ def _wait(self):
+ if self.returncode is not None:
+ return self.returncode
+ if self._registered:
+ self.scheduler.schedule(self._reg_id)
+ return self.returncode
+
+ def unlock(self):
+ if self._lock_obj is None:
+ raise AssertionError('not locked')
+ if self.returncode is None:
+ raise AssertionError('lock not acquired yet')
+ unlockfile(self._lock_obj)
+ self._lock_obj = None
+
+ def _unregister(self):
+ self._registered = False
+
+ if self._thread is not None:
+ self._thread.join()
+ self._thread = None
+
+ if self._reg_id is not None:
+ self.scheduler.unregister(self._reg_id)
+ self._reg_id = None
+
+ if self._files is not None:
+ for f in self._files.values():
+ f.close()
+ self._files = None
+
+class _LockProcess(AbstractPollTask):
+ """
+ This uses the portage.locks module to acquire a lock asynchronously,
+ using a subprocess. After the lock is acquired, the process
+ writes to a pipe in order to notify a poll loop running in the main
+ process. The unlock() method notifies the subprocess to release the
+ lock and exit.
+ """
+
+ __slots__ = ('path',) + \
+ ('_acquired', '_kill_test', '_proc', '_files', '_reg_id', '_unlocked')
+
+ def _start(self):
+ in_pr, in_pw = os.pipe()
+ out_pr, out_pw = os.pipe()
+ self._files = {}
+ self._files['pipe_in'] = os.fdopen(in_pr, 'rb', 0)
+ self._files['pipe_out'] = os.fdopen(out_pw, 'wb', 0)
+ fcntl.fcntl(in_pr, fcntl.F_SETFL,
+ fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK)
+ self._reg_id = self.scheduler.register(in_pr,
+ PollConstants.POLLIN, self._output_handler)
+ self._registered = True
+ self._proc = SpawnProcess(
+ args=[portage._python_interpreter,
+ os.path.join(portage._bin_path, 'lock-helper.py'), self.path],
+ env=dict(os.environ, PORTAGE_PYM_PATH=portage._pym_path),
+ fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()},
+ scheduler=self.scheduler)
+ self._proc.addExitListener(self._proc_exit)
+ self._proc.start()
+ os.close(out_pr)
+ os.close(in_pw)
+
+ def _proc_exit(self, proc):
+ if proc.returncode != os.EX_OK:
+ # Typically, this will happen due to the
+ # process being killed by a signal.
+ if not self._acquired:
+ # If the lock hasn't been aquired yet, the
+ # caller can check the returncode and handle
+ # this failure appropriately.
+ if not (self.cancelled or self._kill_test):
+ writemsg_level("_LockProcess: %s\n" % \
+ _("failed to acquire lock on '%s'") % (self.path,),
+ level=logging.ERROR, noiselevel=-1)
+ self._unregister()
+ self.returncode = proc.returncode
+ self.wait()
+ return
+
+ if not self.cancelled and \
+ not self._unlocked:
+ # We don't want lost locks going unnoticed, so it's
+ # only safe to ignore if either the cancel() or
+ # unlock() methods have been previously called.
+ raise AssertionError("lock process failed with returncode %s" \
+ % (proc.returncode,))
+
+ def _cancel(self):
+ if self._proc is not None:
+ self._proc.cancel()
+
+ def _poll(self):
+ if self._proc is not None:
+ self._proc.poll()
+ return self.returncode
+
+ def _wait(self):
+ if self.returncode is not None:
+ return self.returncode
+ if self._registered:
+ self.scheduler.schedule(self._reg_id)
+ return self.returncode
+
+ def _output_handler(self, f, event):
+ buf = self._read_buf(self._files['pipe_in'], event)
+ if buf:
+ self._acquired = True
+ self._unregister()
+ self.returncode = os.EX_OK
+ self.wait()
+
+ def _unregister(self):
+ self._registered = False
+
+ if self._reg_id is not None:
+ self.scheduler.unregister(self._reg_id)
+ self._reg_id = None
+
+ if self._files is not None:
+ try:
+ pipe_in = self._files.pop('pipe_in')
+ except KeyError:
+ pass
+ else:
+ pipe_in.close()
+
+ def unlock(self):
+ if self._proc is None:
+ raise AssertionError('not locked')
+ if self.returncode is None:
+ raise AssertionError('lock not acquired yet')
+ if self.returncode != os.EX_OK:
+ raise AssertionError("lock process failed with returncode %s" \
+ % (self.returncode,))
+ self._unlocked = True
+ self._files['pipe_out'].write(b'\0')
+ self._files['pipe_out'].close()
+ self._files = None
+ self._proc.wait()
+ self._proc = None