aboutsummaryrefslogtreecommitdiff
blob: 040590e4948aec5011f2258e3db141e36bc81926 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# Copyright Gentoo Foundation 2006-2020
# Portage Unit Testing Functionality

import tempfile
import io
import tarfile
from os import urandom

import portage.gpkg
from portage import os
from portage import shutil
from portage.tests import TestCase
from portage.exception import CompressorOperationFailed


class test_gpkg_stream_case(TestCase):
    def test_gpkg_stream_reader(self):
        data = urandom(1048576)
        data_io = io.BytesIO(data)
        data_io.seek(0)
        with portage.gpkg.tar_stream_reader(data_io, ["cat"]) as test_reader:
            data2 = test_reader.read()
        data_io.close()
        self.assertEqual(data, data2)

    def test_gpkg_stream_reader_without_cmd(self):
        data = urandom(1048576)
        data_io = io.BytesIO(data)
        data_io.seek(0)
        with portage.gpkg.tar_stream_reader(data_io) as test_reader:
            data2 = test_reader.read()
        data_io.close()
        self.assertEqual(data, data2)

    def test_gpkg_stream_reader_kill(self):
        data = urandom(1048576)
        data_io = io.BytesIO(data)
        data_io.seek(0)
        with portage.gpkg.tar_stream_reader(data_io, ["cat"]) as test_reader:
            try:
                test_reader.kill()
            except CompressorOperationFailed:
                pass
        data_io.close()
        self.assertNotEqual(test_reader.proc.poll(), None)

    def test_gpkg_stream_reader_kill_without_cmd(self):
        data = urandom(1048576)
        data_io = io.BytesIO(data)
        data_io.seek(0)
        with portage.gpkg.tar_stream_reader(data_io) as test_reader:
            test_reader.kill()
        data_io.close()
        self.assertEqual(test_reader.proc, None)

    def test_gpkg_stream_writer(self):
        tmpdir = tempfile.mkdtemp()
        try:
            gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar")
            data = urandom(1048576)
            with tarfile.open(gpkg_file_loc, "w") as test_tar:
                test_tarinfo = tarfile.TarInfo("test")
                with portage.gpkg.tar_stream_writer(
                    test_tarinfo, test_tar, tarfile.USTAR_FORMAT, ["cat"]
                ) as test_writer:
                    test_writer.write(data)

            with tarfile.open(gpkg_file_loc, "r") as test_tar:
                test_tarinfo = test_tar.getmember("test")
                data2 = test_tar.extractfile(test_tarinfo).read()
                self.assertEqual(data, data2)
        finally:
            shutil.rmtree(tmpdir)

    def test_gpkg_stream_writer_without_cmd(self):
        tmpdir = tempfile.mkdtemp()

        try:
            gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar")
            data = urandom(1048576)
            with tarfile.open(gpkg_file_loc, "w") as test_tar:
                test_tarinfo = tarfile.TarInfo("test")
                with portage.gpkg.tar_stream_writer(
                    test_tarinfo, test_tar, tarfile.USTAR_FORMAT
                ) as test_writer:
                    test_writer.write(data)

            with tarfile.open(gpkg_file_loc, "r") as test_tar:
                test_tarinfo = test_tar.getmember("test")
                data2 = test_tar.extractfile(test_tarinfo).read()
                self.assertEqual(data, data2)
        finally:
            shutil.rmtree(tmpdir)