aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pym/portage/tests/process/test_poll.py74
1 files changed, 49 insertions, 25 deletions
diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py
index 596ea3088..d71c9b59c 100644
--- a/pym/portage/tests/process/test_poll.py
+++ b/pym/portage/tests/process/test_poll.py
@@ -1,11 +1,16 @@
-# Copyright 1998-2013 Gentoo Foundation
+# Copyright 1998-2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
+import functools
+import pty
+import shutil
+import socket
+import sys
import subprocess
+import tempfile
from portage import os
from portage.tests import TestCase
-from portage.util._pty import _create_pty_or_pipe
from portage.util._async.PopenProcess import PopenProcess
from portage.util._eventloop.global_event_loop import global_event_loop
from _emerge.PipeReader import PipeReader
@@ -13,28 +18,47 @@ from _emerge.PipeReader import PipeReader
class PipeReaderTestCase(TestCase):
_use_array = False
- _use_pty = False
_echo_cmd = "echo -n '%s'"
- def _testPipeReader(self, test_string):
+ def test_pipe(self):
+ def make_pipes():
+ return os.pipe(), None
+ self._do_test(make_pipes)
+
+ def test_pty_device(self):
+ def make_pipes():
+ try:
+ return pty.openpty(), None
+ except EnvironmentError:
+ self.skipTest('pty not available')
+ self._do_test(make_pipes)
+
+ def test_domain_socket(self):
+ def make_pipes():
+ if sys.version_info >= (3, 2):
+ read_end, write_end = socket.socketpair()
+ return (read_end.detach(), write_end.detach()), None
+ else:
+ self.skipTest('socket detach not supported')
+ self._do_test(make_pipes)
+
+ def test_named_pipe(self):
+ def make_pipes():
+ tempdir = tempfile.mkdtemp()
+ fifo_path = os.path.join(tempdir, 'fifo')
+ os.mkfifo(fifo_path)
+ return ((os.open(fifo_path, os.O_NONBLOCK|os.O_RDONLY),
+ os.open(fifo_path, os.O_NONBLOCK|os.O_WRONLY)),
+ functools.partial(shutil.rmtree, tempdir))
+ self._do_test(make_pipes)
+
+ def _testPipeReader(self, master_fd, slave_fd, test_string):
"""
Use a poll loop to read data from a pipe and assert that
the data written to the pipe is identical to the data
read from the pipe.
"""
- if self._use_pty:
- got_pty, master_fd, slave_fd = _create_pty_or_pipe()
- if not got_pty:
- os.close(slave_fd)
- os.close(master_fd)
- skip_reason = "pty not acquired"
- self.portage_skip = skip_reason
- self.fail(skip_reason)
- return
- else:
- master_fd, slave_fd = os.pipe()
-
# WARNING: It is very important to use unbuffered mode here,
# in order to avoid issue 5380 with python3.
master_file = os.fdopen(master_fd, 'rb', 0)
@@ -60,15 +84,18 @@ class PipeReaderTestCase(TestCase):
return consumer.getvalue().decode('ascii', 'replace')
- def testPipeReader(self):
+ def _do_test(self, make_pipes):
for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
test_string = x * "a"
- output = self._testPipeReader(test_string)
- self.assertEqual(test_string, output,
- "x = %s, len(output) = %s" % (x, len(output)))
+ (read_end, write_end), cleanup = make_pipes()
+ try:
+ output = self._testPipeReader(read_end, write_end, test_string)
+ self.assertEqual(test_string, output,
+ "x = %s, len(output) = %s" % (x, len(output)))
+ finally:
+ if cleanup is not None:
+ cleanup()
-class PipeReaderPtyTestCase(PipeReaderTestCase):
- _use_pty = True
class PipeReaderArrayTestCase(PipeReaderTestCase):
@@ -81,6 +108,3 @@ class PipeReaderArrayTestCase(PipeReaderTestCase):
# https://bugs.python.org/issue5380
# https://bugs.pypy.org/issue956
self.todo = True
-
-class PipeReaderPtyArrayTestCase(PipeReaderArrayTestCase):
- _use_pty = True