aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/portage/tests/process/test_PopenProcess.py41
-rw-r--r--lib/portage/util/_async/PipeLogger.py67
2 files changed, 94 insertions, 14 deletions
diff --git a/lib/portage/tests/process/test_PopenProcess.py b/lib/portage/tests/process/test_PopenProcess.py
index ed506b814..d4e97f210 100644
--- a/lib/portage/tests/process/test_PopenProcess.py
+++ b/lib/portage/tests/process/test_PopenProcess.py
@@ -9,6 +9,8 @@ from portage.tests import TestCase
from portage.util._async.PipeLogger import PipeLogger
from portage.util._async.PopenProcess import PopenProcess
from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures._asyncio.streams import _reader
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
from _emerge.PipeReader import PipeReader
class PopenPipeTestCase(TestCase):
@@ -73,8 +75,41 @@ class PopenPipeTestCase(TestCase):
return content.decode('ascii', 'replace')
+ @coroutine
+ def _testPipeLoggerToPipe(self, test_string, loop=None):
+ """
+ Test PipeLogger writing to a pipe connected to a PipeReader.
+ This verifies that PipeLogger does not deadlock when writing
+ to a pipe that's drained by a PipeReader running in the same
+ process (requires non-blocking write).
+ """
+
+ producer = PopenProcess(proc=subprocess.Popen(
+ ["bash", "-c", self._echo_cmd % test_string],
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
+ scheduler=loop)
+
+ pr, pw = os.pipe()
+
+ consumer = producer.pipe_reader = PipeLogger(background=True,
+ input_fd=producer.proc.stdout,
+ log_file_path=os.fdopen(pw, 'wb', 0))
+
+ reader = _reader(pr, loop=loop)
+ yield producer.async_start()
+ content = yield reader
+ yield producer.async_wait()
+ yield consumer.async_wait()
+
+ self.assertEqual(producer.returncode, os.EX_OK)
+ self.assertEqual(consumer.returncode, os.EX_OK)
+
+ coroutine_return(content.decode('ascii', 'replace'))
+
def testPopenPipe(self):
- for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
+ loop = global_event_loop()
+
+ for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**15, 2**16):
test_string = x * "a"
output = self._testPipeReader(test_string)
self.assertEqual(test_string, output,
@@ -83,3 +118,7 @@ class PopenPipeTestCase(TestCase):
output = self._testPipeLogger(test_string)
self.assertEqual(test_string, output,
"x = %s, len(output) = %s" % (x, len(output)))
+
+ output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
+ self.assertEqual(test_string, output,
+ "x = %s, len(output) = %s" % (x, len(output)))
diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index a4258f350..6b03988a1 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,6 +8,9 @@ import sys
import portage
from portage import os, _encodings, _unicode_encode
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
+from portage.util.futures.unix_events import _set_nonblocking
from _emerge.AbstractPollTask import AbstractPollTask
class PipeLogger(AbstractPollTask):
@@ -21,13 +24,15 @@ class PipeLogger(AbstractPollTask):
"""
__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
- ("_log_file", "_log_file_real")
+ ("_io_loop_task", "_log_file", "_log_file_real")
def _start(self):
log_file_path = self.log_file_path
- if log_file_path is not None:
-
+ if hasattr(log_file_path, 'write'):
+ self._log_file = log_file_path
+ _set_nonblocking(self._log_file.fileno())
+ elif log_file_path is not None:
self._log_file = open(_unicode_encode(log_file_path,
encoding=_encodings['fs'], errors='strict'), mode='ab')
if log_file_path.endswith('.gz'):
@@ -57,7 +62,8 @@ class PipeLogger(AbstractPollTask):
fcntl.fcntl(fd, fcntl.F_SETFD,
fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
- self.scheduler.add_reader(fd, self._output_handler, fd)
+ self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), loop=self.scheduler)
+ self._io_loop_task.add_done_callback(self._io_loop_done)
self._registered = True
def _cancel(self):
@@ -65,8 +71,8 @@ class PipeLogger(AbstractPollTask):
if self.returncode is None:
self.returncode = self._cancelled_returncode
- def _output_handler(self, fd):
-
+ @coroutine
+ def _io_loop(self, fd):
background = self.background
stdout_fd = self.stdout_fd
log_file = self._log_file
@@ -76,14 +82,18 @@ class PipeLogger(AbstractPollTask):
if buf is None:
# not a POLLIN event, EAGAIN, etc...
- break
+ future = self.scheduler.create_future()
+ self.scheduler.add_reader(fd, future.set_result, None)
+ try:
+ yield future
+ finally:
+ self.scheduler.remove_reader(fd)
+ future.done() or future.cancel()
+ continue
if not buf:
# EOF
- self._unregister()
- self.returncode = self.returncode or os.EX_OK
- self._async_wait()
- break
+ return
else:
if not background and stdout_fd is not None:
@@ -120,8 +130,34 @@ class PipeLogger(AbstractPollTask):
fcntl.F_GETFL) ^ os.O_NONBLOCK)
if log_file is not None:
- log_file.write(buf)
- log_file.flush()
+ write_buf = buf
+ while True:
+ try:
+ if write_buf is not None:
+ log_file.write(write_buf)
+ write_buf = None
+ log_file.flush()
+ except EnvironmentError as e:
+ if e.errno != errno.EAGAIN:
+ raise
+ future = self.scheduler.create_future()
+ self.scheduler.add_writer(self._log_file.fileno(), future.set_result, None)
+ try:
+ yield future
+ finally:
+ self.scheduler.remove_writer(self._log_file.fileno())
+ future.done() or future.cancel()
+ else:
+ break
+
+ def _io_loop_done(self, future):
+ try:
+ future.result()
+ except asyncio.CancelledError:
+ self.cancel()
+ self._was_cancelled()
+ self.returncode = self.returncode or os.EX_OK
+ self._async_wait()
def _unregister(self):
if self.input_fd is not None:
@@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask):
self.input_fd.close()
self.input_fd = None
+ if self._io_loop_task is not None:
+ self._io_loop_task.done() or self._io_loop_task.cancel()
+ self._io_loop_task = None
+
if self.stdout_fd is not None:
os.close(self.stdout_fd)
self.stdout_fd = None
if self._log_file is not None:
+ self.scheduler.remove_writer(self._log_file.fileno())
self._log_file.close()
self._log_file = None