From 3b0c52b9ef364dc8e69208ec5341255ac94d41d8 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Sat, 26 May 2018 03:44:38 -0700 Subject: PipeReaderTestCase: cover sockets and named pipes --- pym/portage/tests/process/test_poll.py | 74 ++++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 25 deletions(-) diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py index 596ea3088..d71c9b59c 100644 --- a/pym/portage/tests/process/test_poll.py +++ b/pym/portage/tests/process/test_poll.py @@ -1,11 +1,16 @@ -# Copyright 1998-2013 Gentoo Foundation +# Copyright 1998-2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +import functools +import pty +import shutil +import socket +import sys import subprocess +import tempfile from portage import os from portage.tests import TestCase -from portage.util._pty import _create_pty_or_pipe from portage.util._async.PopenProcess import PopenProcess from portage.util._eventloop.global_event_loop import global_event_loop from _emerge.PipeReader import PipeReader @@ -13,28 +18,47 @@ from _emerge.PipeReader import PipeReader class PipeReaderTestCase(TestCase): _use_array = False - _use_pty = False _echo_cmd = "echo -n '%s'" - def _testPipeReader(self, test_string): + def test_pipe(self): + def make_pipes(): + return os.pipe(), None + self._do_test(make_pipes) + + def test_pty_device(self): + def make_pipes(): + try: + return pty.openpty(), None + except EnvironmentError: + self.skipTest('pty not available') + self._do_test(make_pipes) + + def test_domain_socket(self): + def make_pipes(): + if sys.version_info >= (3, 2): + read_end, write_end = socket.socketpair() + return (read_end.detach(), write_end.detach()), None + else: + self.skipTest('socket detach not supported') + self._do_test(make_pipes) + + def test_named_pipe(self): + def make_pipes(): + tempdir = tempfile.mkdtemp() + fifo_path = os.path.join(tempdir, 'fifo') + os.mkfifo(fifo_path) + return ((os.open(fifo_path, os.O_NONBLOCK|os.O_RDONLY), + os.open(fifo_path, os.O_NONBLOCK|os.O_WRONLY)), + functools.partial(shutil.rmtree, tempdir)) + self._do_test(make_pipes) + + def _testPipeReader(self, master_fd, slave_fd, test_string): """ Use a poll loop to read data from a pipe and assert that the data written to the pipe is identical to the data read from the pipe. """ - if self._use_pty: - got_pty, master_fd, slave_fd = _create_pty_or_pipe() - if not got_pty: - os.close(slave_fd) - os.close(master_fd) - skip_reason = "pty not acquired" - self.portage_skip = skip_reason - self.fail(skip_reason) - return - else: - master_fd, slave_fd = os.pipe() - # WARNING: It is very important to use unbuffered mode here, # in order to avoid issue 5380 with python3. master_file = os.fdopen(master_fd, 'rb', 0) @@ -60,15 +84,18 @@ class PipeReaderTestCase(TestCase): return consumer.getvalue().decode('ascii', 'replace') - def testPipeReader(self): + def _do_test(self, make_pipes): for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14): test_string = x * "a" - output = self._testPipeReader(test_string) - self.assertEqual(test_string, output, - "x = %s, len(output) = %s" % (x, len(output))) + (read_end, write_end), cleanup = make_pipes() + try: + output = self._testPipeReader(read_end, write_end, test_string) + self.assertEqual(test_string, output, + "x = %s, len(output) = %s" % (x, len(output))) + finally: + if cleanup is not None: + cleanup() -class PipeReaderPtyTestCase(PipeReaderTestCase): - _use_pty = True class PipeReaderArrayTestCase(PipeReaderTestCase): @@ -81,6 +108,3 @@ class PipeReaderArrayTestCase(PipeReaderTestCase): # https://bugs.python.org/issue5380 # https://bugs.pypy.org/issue956 self.todo = True - -class PipeReaderPtyArrayTestCase(PipeReaderArrayTestCase): - _use_pty = True -- cgit v1.2.3-65-gdbad