# Copyright 2010-2011 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import dummy_threading import fcntl import sys try: import threading except ImportError: import dummy_threading as threading import portage from portage import os from portage.exception import TryAgain from portage.locks import lockfile, unlockfile from _emerge.AbstractPollTask import AbstractPollTask from _emerge.AsynchronousTask import AsynchronousTask from _emerge.PollConstants import PollConstants from _emerge.SpawnProcess import SpawnProcess class AsynchronousLock(AsynchronousTask): """ This uses the portage.locks module to acquire a lock asynchronously, using either a thread (if available) or a subprocess. The default behavior is to use a process instead of a thread, since there is currently no way to interrupt a thread that is waiting for a lock (notably, SIGINT doesn't work because python delivers all signals to the main thread). """ __slots__ = ('path', 'scheduler',) + \ ('_imp', '_force_async', '_force_dummy', '_force_process', \ '_force_thread', '_waiting') _use_process_by_default = True def _start(self): if not self._force_async: try: self._imp = lockfile(self.path, wantnewlockfile=True, flags=os.O_NONBLOCK) except TryAgain: pass else: self.returncode = os.EX_OK self.wait() return if self._force_process or \ (not self._force_thread and \ (self._use_process_by_default or threading is dummy_threading)): self._imp = _LockProcess(path=self.path, scheduler=self.scheduler) else: self._imp = _LockThread(path=self.path, scheduler=self.scheduler, _force_dummy=self._force_dummy) self._imp.addExitListener(self._imp_exit) self._imp.start() def _imp_exit(self, imp): # call exit listeners if not self._waiting: self.wait() def _cancel(self): if self._imp is not None: self._imp.cancel() def _wait(self): if self.returncode is not None: return self.returncode self._waiting = True self.returncode = self._imp.wait() self._waiting = False return self.returncode def unlock(self): if self._imp is None: raise AssertionError('not locked') if isinstance(self._imp, (_LockProcess, _LockThread)): self._imp.unlock() else: unlockfile(self._imp) self._imp = None class _LockThread(AbstractPollTask): """ This uses the portage.locks module to acquire a lock asynchronously, using a background thread. After the lock is acquired, the thread writes to a pipe in order to notify a poll loop running in the main thread. If the threading module is unavailable then the dummy_threading module will be used, and the lock will be acquired synchronously (before the start() method returns). """ __slots__ = ('path',) + \ ('_files', '_force_dummy', '_lock_obj', '_thread', '_reg_id',) def _start(self): pr, pw = os.pipe() self._files = {} self._files['pipe_read'] = os.fdopen(pr, 'rb', 0) self._files['pipe_write'] = os.fdopen(pw, 'wb', 0) for k, f in self._files.items(): fcntl.fcntl(f.fileno(), fcntl.F_SETFL, fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) self._reg_id = self.scheduler.register(self._files['pipe_read'].fileno(), PollConstants.POLLIN, self._output_handler) self._registered = True threading_mod = threading if self._force_dummy: threading_mod = dummy_threading self._thread = threading_mod.Thread(target=self._run_lock) self._thread.start() def _run_lock(self): self._lock_obj = lockfile(self.path, wantnewlockfile=True) self._files['pipe_write'].write(b'\0') def _output_handler(self, f, event): buf = self._read_buf(self._files['pipe_read'], event) if buf: self._unregister() self.returncode = os.EX_OK self.wait() def _cancel(self): # There's currently no way to force thread termination. pass def _wait(self): if self.returncode is not None: return self.returncode if self._registered: self.scheduler.schedule(self._reg_id) return self.returncode def unlock(self): if self._lock_obj is None: raise AssertionError('not locked') if self.returncode is None: raise AssertionError('lock not acquired yet') unlockfile(self._lock_obj) self._lock_obj = None def _unregister(self): self._registered = False if self._thread is not None: self._thread.join() self._thread = None if self._reg_id is not None: self.scheduler.unregister(self._reg_id) self._reg_id = None if self._files is not None: for f in self._files.values(): f.close() self._files = None class _LockProcess(AbstractPollTask): """ This uses the portage.locks module to acquire a lock asynchronously, using a subprocess. After the lock is acquired, the process writes to a pipe in order to notify a poll loop running in the main process. The unlock() method notifies the subprocess to release the lock and exit. """ __slots__ = ('path', 'scheduler',) + \ ('_proc', '_files', '_reg_id') def _start(self): in_pr, in_pw = os.pipe() out_pr, out_pw = os.pipe() self._files = {} self._files['pipe_in'] = os.fdopen(in_pr, 'rb', 0) self._files['pipe_out'] = os.fdopen(out_pw, 'wb', 0) fcntl.fcntl(in_pr, fcntl.F_SETFL, fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK) self._reg_id = self.scheduler.register(in_pr, PollConstants.POLLIN, self._output_handler) self._registered = True self._proc = SpawnProcess( args=[portage._python_interpreter, os.path.join(portage._bin_path, 'lock-helper.py'), self.path], env=dict(os.environ, PORTAGE_PYM_PATH=portage._pym_path), fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()}, scheduler=self.scheduler) self._proc.addExitListener(self._proc_exit) self._proc.start() os.close(out_pr) os.close(in_pw) def _proc_exit(self, proc): if proc.returncode != os.EX_OK: # There's no good reason for locks to fail. raise AssertionError('lock process failed with returncode %s' \ % (proc.returncode,)) def _cancel(self): if self._proc is not None: self._proc.cancel() def _wait(self): if self.returncode is not None: return self.returncode if self._registered: self.scheduler.schedule(self._reg_id) return self.returncode def _output_handler(self, f, event): buf = self._read_buf(self._files['pipe_in'], event) if buf: self._unregister() self.returncode = os.EX_OK self.wait() def _unregister(self): self._registered = False if self._reg_id is not None: self.scheduler.unregister(self._reg_id) self._reg_id = None if self._files is not None: try: pipe_in = self._files.pop('pipe_in') except KeyError: pass else: pipe_in.close() def unlock(self): if self._proc is None: raise AssertionError('not locked') if self.returncode is None: raise AssertionError('lock not acquired yet') self._files['pipe_out'].write(b'\0') self._files['pipe_out'].close() self._files = None self._proc.wait() self._proc = None