aboutsummaryrefslogtreecommitdiff
blob: 62db37001f671ee571f32ee4630c8d1c85a9ae37 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# Copyright 2019 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import errno
import os
import shutil
import stat
import tempfile

from portage.tests import TestCase
from portage.util._async.FileCopier import FileCopier
from portage.util._eventloop.global_event_loop import global_event_loop


class FileCopierTestCase(TestCase):
    def testFileCopier(self):
        loop = global_event_loop()
        tempdir = tempfile.mkdtemp()
        try:
            # regular successful copy
            src_path = os.path.join(tempdir, "src")
            dest_path = os.path.join(tempdir, "dest")
            content = b"foo"
            file_mode = 0o600
            with open(src_path, "wb") as f:
                f.write(content)
            os.chmod(src_path, file_mode)
            copier = FileCopier(src_path=src_path, dest_path=dest_path, scheduler=loop)
            copier.start()
            loop.run_until_complete(copier.async_wait())
            self.assertEqual(copier.returncode, 0)
            copier.future.result()
            with open(dest_path, "rb") as f:
                self.assertEqual(f.read(), content)
            self.assertEqual(file_mode, stat.S_IMODE(os.stat(dest_path).st_mode))

            # failure due to nonexistent src_path
            src_path = os.path.join(tempdir, "does-not-exist")
            copier = FileCopier(src_path=src_path, dest_path=dest_path, scheduler=loop)
            copier.start()
            loop.run_until_complete(copier.async_wait())
            self.assertEqual(copier.returncode, 1)
            self.assertEqual(copier.future.exception().errno, errno.ENOENT)
            self.assertEqual(
                copier.future.exception().filename, src_path.encode("utf8")
            )
        finally:
            shutil.rmtree(tempdir)