aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2018-04-15 03:11:21 -0700
committerZac Medico <zmedico@gentoo.org>2018-04-16 19:19:57 -0700
commit72914c0dbae1dfab7565a627451b616e330b8889 (patch)
treee1d9a379a9e87c0db033fb952522553e8a704429
parentestrip, install-qa-check.d/10ignored-flags: fix bug 653352 (diff)
downloadportage-72914c0d.tar.gz
portage-72914c0d.tar.bz2
portage-72914c0d.zip
Implement AbstractEventLoop.connect_write_pipe (bug 649588)
In python versions that support asyncio, this allows API consumers to use subprocess.PIPE for asyncio.create_subprocess_exec() stdin parameters. Bug: https://bugs.gentoo.org/649588
-rw-r--r--pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py34
-rw-r--r--pym/portage/util/futures/transports.py90
-rw-r--r--pym/portage/util/futures/unix_events.py259
3 files changed, 373 insertions, 10 deletions
diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index 94984fc93..8c8c395ca 100644
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -191,3 +191,37 @@ class SubprocessExecTestCase(TestCase):
self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
self._run_test(test)
+
+ def testWriteTransport(self):
+ """
+ Test asyncio.create_subprocess_exec(stdin=subprocess.PIPE) which
+ requires an AbstractEventLoop.connect_write_pipe implementation
+ (and a WriteTransport implementation for it to return).
+ """
+ if not hasattr(asyncio, 'create_subprocess_exec'):
+ self.skipTest('create_subprocess_exec not implemented for python2')
+
+ stdin_data = b'hello world'
+ cat_binary = find_binary("cat")
+ self.assertNotEqual(cat_binary, None)
+ cat_binary = cat_binary.encode()
+
+ def test(loop):
+ proc = loop.run_until_complete(
+ asyncio.create_subprocess_exec(
+ cat_binary,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
+
+ # This buffers data when necessary to avoid blocking.
+ proc.stdin.write(stdin_data)
+ # Any buffered data is written asynchronously after the
+ # close method is called.
+ proc.stdin.close()
+
+ self.assertEqual(
+ loop.run_until_complete(proc.stdout.read()),
+ stdin_data)
+ self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
+
+ self._run_test(test)
diff --git a/pym/portage/util/futures/transports.py b/pym/portage/util/futures/transports.py
new file mode 100644
index 000000000..60ea93073
--- /dev/null
+++ b/pym/portage/util/futures/transports.py
@@ -0,0 +1,90 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+try:
+ from asyncio.transports import Transport as _Transport
+except ImportError:
+ _Transport = object
+
+
+class _FlowControlMixin(_Transport):
+ """
+ This is identical to the standard library's private
+ asyncio.transports._FlowControlMixin class.
+
+ All the logic for (write) flow control in a mix-in base class.
+
+ The subclass must implement get_write_buffer_size(). It must call
+ _maybe_pause_protocol() whenever the write buffer size increases,
+ and _maybe_resume_protocol() whenever it decreases. It may also
+ override set_write_buffer_limits() (e.g. to specify different
+ defaults).
+
+ The subclass constructor must call super().__init__(extra). This
+ will call set_write_buffer_limits().
+
+ The user may call set_write_buffer_limits() and
+ get_write_buffer_size(), and their protocol's pause_writing() and
+ resume_writing() may be called.
+ """
+
+ def __init__(self, extra=None, loop=None):
+ super().__init__(extra)
+ assert loop is not None
+ self._loop = loop
+ self._protocol_paused = False
+ self._set_write_buffer_limits()
+
+ def _maybe_pause_protocol(self):
+ size = self.get_write_buffer_size()
+ if size <= self._high_water:
+ return
+ if not self._protocol_paused:
+ self._protocol_paused = True
+ try:
+ self._protocol.pause_writing()
+ except Exception as exc:
+ self._loop.call_exception_handler({
+ 'message': 'protocol.pause_writing() failed',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
+
+ def _maybe_resume_protocol(self):
+ if (self._protocol_paused and
+ self.get_write_buffer_size() <= self._low_water):
+ self._protocol_paused = False
+ try:
+ self._protocol.resume_writing()
+ except Exception as exc:
+ self._loop.call_exception_handler({
+ 'message': 'protocol.resume_writing() failed',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
+
+ def get_write_buffer_limits(self):
+ return (self._low_water, self._high_water)
+
+ def _set_write_buffer_limits(self, high=None, low=None):
+ if high is None:
+ if low is None:
+ high = 64*1024
+ else:
+ high = 4*low
+ if low is None:
+ low = high // 4
+ if not high >= low >= 0:
+ raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
+ (high, low))
+ self._high_water = high
+ self._low_water = low
+
+ def set_write_buffer_limits(self, high=None, low=None):
+ self._set_write_buffer_limits(high=high, low=low)
+ self._maybe_pause_protocol()
+
+ def get_write_buffer_size(self):
+ raise NotImplementedError
diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
index d69b13718..1a86ed439 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -9,19 +9,25 @@ __all__ = (
try:
from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport
from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
- from asyncio.transports import ReadTransport as _ReadTransport
+ from asyncio.transports import (
+ ReadTransport as _ReadTransport,
+ WriteTransport as _WriteTransport,
+ )
except ImportError:
_AbstractChildWatcher = object
_BaseSubprocessTransport = object
_ReadTransport = object
+ _WriteTransport = object
import errno
import fcntl
import functools
import logging
import os
+import socket
import stat
import subprocess
+import sys
from portage.util._eventloop.global_event_loop import (
global_event_loop as _global_event_loop,
@@ -31,6 +37,8 @@ from portage.util.futures import (
events,
)
+from portage.util.futures.transports import _FlowControlMixin
+
class _PortageEventLoop(events.AbstractEventLoop):
"""
@@ -104,6 +112,35 @@ class _PortageEventLoop(events.AbstractEventLoop):
waiter.add_done_callback(waiter_callback)
return result
+ def connect_write_pipe(self, protocol_factory, pipe):
+ """
+ Register write pipe in event loop. Set the pipe to non-blocking mode.
+
+ @type protocol_factory: callable
+ @param protocol_factory: must instantiate object with Protocol interface
+ @type pipe: file
+ @param pipe: a pipe to write to
+ @rtype: asyncio.Future
+ @return: Return pair (transport, protocol), where transport supports the
+ WriteTransport interface.
+ """
+ protocol = protocol_factory()
+ result = self.create_future()
+ waiter = self.create_future()
+ transport = self._make_write_pipe_transport(pipe, protocol, waiter)
+
+ def waiter_callback(waiter):
+ try:
+ waiter.result()
+ except Exception as e:
+ transport.close()
+ result.set_exception(e)
+ else:
+ result.set_result((transport, protocol))
+
+ waiter.add_done_callback(waiter_callback)
+ return result
+
def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
"""
Run subprocesses asynchronously using the subprocess module.
@@ -127,11 +164,6 @@ class _PortageEventLoop(events.AbstractEventLoop):
stdout = kwargs.pop('stdout', subprocess.PIPE)
stderr = kwargs.pop('stderr', subprocess.PIPE)
- if stdin == subprocess.PIPE:
- # Requires connect_write_pipe implementation, for example
- # see asyncio.unix_events._UnixWritePipeTransport.
- raise NotImplementedError()
-
universal_newlines = kwargs.pop('universal_newlines', False)
shell = kwargs.pop('shell', False)
bufsize = kwargs.pop('bufsize', 0)
@@ -158,6 +190,10 @@ class _PortageEventLoop(events.AbstractEventLoop):
extra=None):
return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
+ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
+ extra=None):
+ return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
+
def _make_subprocess_transport(self, result, protocol, args, shell,
stdin, stdout, stderr, bufsize, extra=None, **kwargs):
waiter = self.create_future()
@@ -301,18 +337,221 @@ class _UnixReadPipeTransport(_ReadTransport):
self._loop = None
+class _UnixWritePipeTransport(_FlowControlMixin, _WriteTransport):
+ """
+ This is identical to the standard library's private
+ asyncio.unix_events._UnixWritePipeTransport class, except that it
+ only calls public AbstractEventLoop methods.
+ """
+
+ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
+ super().__init__(extra, loop)
+ self._extra['pipe'] = pipe
+ self._pipe = pipe
+ self._fileno = pipe.fileno()
+ self._protocol = protocol
+ self._buffer = bytearray()
+ self._conn_lost = 0
+ self._closing = False # Set when close() or write_eof() called.
+
+ mode = os.fstat(self._fileno).st_mode
+ is_char = stat.S_ISCHR(mode)
+ is_fifo = stat.S_ISFIFO(mode)
+ is_socket = stat.S_ISSOCK(mode)
+ if not (is_char or is_fifo or is_socket):
+ self._pipe = None
+ self._fileno = None
+ self._protocol = None
+ raise ValueError("Pipe transport is only for "
+ "pipes, sockets and character devices")
+
+ _set_nonblocking(self._fileno)
+ self._loop.call_soon(self._protocol.connection_made, self)
+
+ # On AIX, the reader trick (to be notified when the read end of the
+ # socket is closed) only works for sockets. On other platforms it
+ # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
+ if is_socket or (is_fifo and not sys.platform.startswith("aix")):
+ # only start reading when connection_made() has been called
+ self._loop.call_soon(self._loop.add_reader,
+ self._fileno, self._read_ready)
+
+ if waiter is not None:
+ # only wake up the waiter when connection_made() has been called
+ self._loop.call_soon(
+ lambda: None if waiter.cancelled() else waiter.set_result(None))
+
+ def get_write_buffer_size(self):
+ return len(self._buffer)
+
+ def _read_ready(self):
+ # Pipe was closed by peer.
+ if self._loop.get_debug():
+ logging.info("%r was closed by peer", self)
+ if self._buffer:
+ self._close(BrokenPipeError())
+ else:
+ self._close()
+
+ def write(self, data):
+ assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
+ if isinstance(data, bytearray):
+ data = memoryview(data)
+ if not data:
+ return
+
+ if self._conn_lost or self._closing:
+ self._conn_lost += 1
+ return
+
+ if not self._buffer:
+ # Attempt to send it right away first.
+ try:
+ n = os.write(self._fileno, data)
+ except (BlockingIOError, InterruptedError):
+ n = 0
+ except Exception as exc:
+ self._conn_lost += 1
+ self._fatal_error(exc, 'Fatal write error on pipe transport')
+ return
+ if n == len(data):
+ return
+ elif n > 0:
+ data = memoryview(data)[n:]
+ self._loop.add_writer(self._fileno, self._write_ready)
+
+ self._buffer += data
+ self._maybe_pause_protocol()
+
+ def _write_ready(self):
+ assert self._buffer, 'Data should not be empty'
+
+ try:
+ n = os.write(self._fileno, self._buffer)
+ except (BlockingIOError, InterruptedError):
+ pass
+ except Exception as exc:
+ self._buffer.clear()
+ self._conn_lost += 1
+ # Remove writer here, _fatal_error() doesn't it
+ # because _buffer is empty.
+ self._loop.remove_writer(self._fileno)
+ self._fatal_error(exc, 'Fatal write error on pipe transport')
+ else:
+ if n == len(self._buffer):
+ self._buffer.clear()
+ self._loop.remove_writer(self._fileno)
+ self._maybe_resume_protocol() # May append to buffer.
+ if self._closing:
+ self._loop.remove_reader(self._fileno)
+ self._call_connection_lost(None)
+ return
+ elif n > 0:
+ del self._buffer[:n]
+
+ def can_write_eof(self):
+ return True
+
+ def write_eof(self):
+ if self._closing:
+ return
+ assert self._pipe
+ self._closing = True
+ if not self._buffer:
+ self._loop.remove_reader(self._fileno)
+ self._loop.call_soon(self._call_connection_lost, None)
+
+ def set_protocol(self, protocol):
+ self._protocol = protocol
+
+ def get_protocol(self):
+ return self._protocol
+
+ def is_closing(self):
+ return self._closing
+
+ def close(self):
+ if self._pipe is not None and not self._closing:
+ # write_eof is all what we needed to close the write pipe
+ self.write_eof()
+
+ def abort(self):
+ self._close(None)
+
+ def _fatal_error(self, exc, message='Fatal error on pipe transport'):
+ # should be called by exception handler only
+ if isinstance(exc,
+ (BrokenPipeError, ConnectionResetError, ConnectionAbortedError)):
+ if self._loop.get_debug():
+ logging.debug("%r: %s", self, message, exc_info=True)
+ else:
+ self._loop.call_exception_handler({
+ 'message': message,
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
+ self._close(exc)
+
+ def _close(self, exc=None):
+ self._closing = True
+ if self._buffer:
+ self._loop.remove_writer(self._fileno)
+ self._buffer.clear()
+ self._loop.remove_reader(self._fileno)
+ self._loop.call_soon(self._call_connection_lost, exc)
+
+ def _call_connection_lost(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ self._pipe.close()
+ self._pipe = None
+ self._protocol = None
+ self._loop = None
+
+
+if hasattr(os, 'set_inheritable'):
+ # Python 3.4 and newer
+ _set_inheritable = os.set_inheritable
+else:
+ def _set_inheritable(fd, inheritable):
+ cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
+
+ old = fcntl.fcntl(fd, fcntl.F_GETFD)
+ if not inheritable:
+ fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
+ else:
+ fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
+
+
class _UnixSubprocessTransport(_BaseSubprocessTransport):
"""
This is identical to the standard library's private
- asyncio.unix_events._UnixSubprocessTransport class, except that
- subprocess.PIPE is not implemented for stdin, since that would
- require connect_write_pipe support in the event loop. For example,
- see the asyncio.unix_events._UnixWritePipeTransport class.
+ asyncio.unix_events._UnixSubprocessTransport class, except that it
+ only calls public AbstractEventLoop methods.
"""
def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+ stdin_w = None
+ if stdin == subprocess.PIPE:
+ # Use a socket pair for stdin, since not all platforms
+ # support selecting read events on the write end of a
+ # socket (which we use in order to detect closing of the
+ # other end). Notably this is needed on AIX, and works
+ # just fine on other platforms.
+ stdin, stdin_w = socket.socketpair()
+
+ # Mark the write end of the stdin pipe as non-inheritable,
+ # needed by close_fds=False on Python 3.3 and older
+ # (Python 3.4 implements the PEP 446, socketpair returns
+ # non-inheritable sockets)
+ _set_inheritable(stdin_w.fileno(), False)
self._proc = subprocess.Popen(
args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
universal_newlines=False, bufsize=bufsize, **kwargs)
+ if stdin_w is not None:
+ stdin.close()
+ self._proc.stdin = os.fdopen(stdin_w.detach(), 'wb', bufsize)
class AbstractChildWatcher(_AbstractChildWatcher):