diff options
author | 2020-02-24 00:06:11 -0800 | |
---|---|---|
committer | 2020-02-24 02:26:33 -0800 | |
commit | 27712651aa7014a960b012dc89457df09677edc1 (patch) | |
tree | a2472871c8ebbfc4d538b31ffc3f62c3cf32270f /lib/portage/util/_async | |
parent | SpawnProcess: use async_start method (bug 709746) (diff) | |
download | portage-27712651aa7014a960b012dc89457df09677edc1.tar.gz portage-27712651aa7014a960b012dc89457df09677edc1.tar.bz2 portage-27712651aa7014a960b012dc89457df09677edc1.zip |
PipeLogger: non-blocking write to pipe (bug 709746)
Add support to write to a non-blocking pipe instead of a
log file. This is needed for the purposes of bug 709746,
where PipeLogger will write to a pipe that is drained
by anoher PipeLogger instance which is running in the same
process.
Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico <zmedico@gentoo.org>
Diffstat (limited to 'lib/portage/util/_async')
-rw-r--r-- | lib/portage/util/_async/PipeLogger.py | 67 |
1 files changed, 54 insertions, 13 deletions
diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py index a4258f350..6b03988a1 100644 --- a/lib/portage/util/_async/PipeLogger.py +++ b/lib/portage/util/_async/PipeLogger.py @@ -8,6 +8,9 @@ import sys import portage from portage import os, _encodings, _unicode_encode +from portage.util.futures import asyncio +from portage.util.futures.compat_coroutine import coroutine +from portage.util.futures.unix_events import _set_nonblocking from _emerge.AbstractPollTask import AbstractPollTask class PipeLogger(AbstractPollTask): @@ -21,13 +24,15 @@ class PipeLogger(AbstractPollTask): """ __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ - ("_log_file", "_log_file_real") + ("_io_loop_task", "_log_file", "_log_file_real") def _start(self): log_file_path = self.log_file_path - if log_file_path is not None: - + if hasattr(log_file_path, 'write'): + self._log_file = log_file_path + _set_nonblocking(self._log_file.fileno()) + elif log_file_path is not None: self._log_file = open(_unicode_encode(log_file_path, encoding=_encodings['fs'], errors='strict'), mode='ab') if log_file_path.endswith('.gz'): @@ -57,7 +62,8 @@ class PipeLogger(AbstractPollTask): fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC) - self.scheduler.add_reader(fd, self._output_handler, fd) + self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), loop=self.scheduler) + self._io_loop_task.add_done_callback(self._io_loop_done) self._registered = True def _cancel(self): @@ -65,8 +71,8 @@ class PipeLogger(AbstractPollTask): if self.returncode is None: self.returncode = self._cancelled_returncode - def _output_handler(self, fd): - + @coroutine + def _io_loop(self, fd): background = self.background stdout_fd = self.stdout_fd log_file = self._log_file @@ -76,14 +82,18 @@ class PipeLogger(AbstractPollTask): if buf is None: # not a POLLIN event, EAGAIN, etc... - break + future = self.scheduler.create_future() + self.scheduler.add_reader(fd, future.set_result, None) + try: + yield future + finally: + self.scheduler.remove_reader(fd) + future.done() or future.cancel() + continue if not buf: # EOF - self._unregister() - self.returncode = self.returncode or os.EX_OK - self._async_wait() - break + return else: if not background and stdout_fd is not None: @@ -120,8 +130,34 @@ class PipeLogger(AbstractPollTask): fcntl.F_GETFL) ^ os.O_NONBLOCK) if log_file is not None: - log_file.write(buf) - log_file.flush() + write_buf = buf + while True: + try: + if write_buf is not None: + log_file.write(write_buf) + write_buf = None + log_file.flush() + except EnvironmentError as e: + if e.errno != errno.EAGAIN: + raise + future = self.scheduler.create_future() + self.scheduler.add_writer(self._log_file.fileno(), future.set_result, None) + try: + yield future + finally: + self.scheduler.remove_writer(self._log_file.fileno()) + future.done() or future.cancel() + else: + break + + def _io_loop_done(self, future): + try: + future.result() + except asyncio.CancelledError: + self.cancel() + self._was_cancelled() + self.returncode = self.returncode or os.EX_OK + self._async_wait() def _unregister(self): if self.input_fd is not None: @@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask): self.input_fd.close() self.input_fd = None + if self._io_loop_task is not None: + self._io_loop_task.done() or self._io_loop_task.cancel() + self._io_loop_task = None + if self.stdout_fd is not None: os.close(self.stdout_fd) self.stdout_fd = None if self._log_file is not None: + self.scheduler.remove_writer(self._log_file.fileno()) self._log_file.close() self._log_file = None |