[virt-tools-list] [virt-bootstrap] [PATCH v6 03/26] Add regression tests

Radostin Stoyanov rstoyanov1 at gmail.com
Thu Aug 17 09:39:41 UTC 2017


These tests aim to verify the output of virt-bootstrap in more
abstract manner by creating tar files calling bootstrap() with
them to check the result when output format is set to "dir" or
"qcow2".
---
 tests/__init__.py      | 436 ++++++++++++++++++++++++++++++++++++++++
 tests/docker_source.py | 535 +++++++++++++++++++++++++++++++++++++++++++++++++
 tests/file_source.py   | 184 +++++++++++++++++
 tests/test_utils.py    | 143 +++++++++++++
 4 files changed, 1298 insertions(+)
 create mode 100644 tests/__init__.py
 create mode 100644 tests/docker_source.py
 create mode 100644 tests/file_source.py
 create mode 100644 tests/test_utils.py

diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..7a53c38
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,436 @@
+# -*- coding: utf-8 -*-
+# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
+#
+# Copyright (C) 2017 Radostin Stoyanov
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Regression tests for virt-bootstrap
+"""
+
+import hashlib
+import io
+import os
+import shutil
+import sys
+import tarfile
+import unittest
+import passlib.hosts
+
+try:
+    import mock
+except ImportError:
+    import unittest.mock as mock
+
+sys.path.insert(0, '../src')  # noqa: E402
+
+# pylint: disable=import-error, wrong-import-position
+from virtBootstrap import virt_bootstrap
+from virtBootstrap import sources
+from virtBootstrap import progress
+from virtBootstrap import utils
+
+__all__ = ['virt_bootstrap', 'sources', 'progress', 'utils']
+
+
+# pylint: disable=invalid-name,too-many-instance-attributes,too-many-arguments
+class BuildTarFiles(unittest.TestCase):
+    """
+    Create dummy tar files used for testing.
+    """
+
+    def setUp(self):
+        """
+        Create dummy rootfs tar files
+        """
+        self.tar_dir = os.path.realpath('tests/tarfiles')
+        self.dest_dir = os.path.realpath('tests/filesystems')
+        self.default_permissions = 0o755
+        self.shadow_file_permissions = 0o600
+        self.shadow_file_content = "root:*:"
+        self.tmp_folders = [self.tar_dir, self.dest_dir]
+        self.tar_file = None
+        self.uid_map = []
+        self.gid_map = []
+        self.root_password = ''
+        self.checked_members = set()
+        self.rootfs_tree = {
+            'root': {
+                'uid': 0,
+                'gid': 0,
+                'dirs': [
+                    'bin',
+                    'etc',
+                    'home',
+                    'lib',
+                    'opt',
+                    'root',
+                    'run',
+                    'sbin',
+                    'srv',
+                    'tmp',
+                    'usr',
+                    'var',
+                ],
+                'files': [
+                    'etc/hosts',
+                    (
+                        'etc/shadow',
+                        self.shadow_file_permissions,
+                        self.shadow_file_content
+                    )
+                ]
+            },
+            'user1': {
+                'uid': 500,
+                'gid': 500,
+                'dirs': ['home/user1'],
+                'files': [
+                    ('home/user1/test_file', 0o644, 'test data')
+                ]
+            },
+
+            'user2': {
+                'uid': 1000,
+                'gid': 1000,
+                'dirs': [
+                    'home/user2',
+                    'home/user2/test_dir'
+                ],
+                'files': [
+                    'home/user2/test_dir/test_file'
+                ]
+            }
+        }
+
+        # Create test folders
+        for test_dir in self.tmp_folders:
+            if not os.path.exists(test_dir):
+                os.makedirs(test_dir)
+
+        self.create_tar_file()
+
+    def create_tar_file(self):
+        """
+        Use temporary name to create uncomressed tarball with dummy file
+        system. Then get checksum of the content and rename the tarfile to
+        <checksum>.tar In this way we can easily generate Manifest of Docker
+        registry and pass it to virt-bootstrap.
+        """
+        filepath = os.path.join(self.tar_dir, 'tmp_file.tar')
+        with tarfile.open(filepath, 'w') as tar:
+            self.create_user_dirs(tar)
+        # Get sha256 checksum of the archive
+        with open(filepath, 'rb') as file_handle:
+            file_hash = hashlib.sha256(file_handle.read()).hexdigest()
+        # Rename the archive to <checksum>.tar
+        new_filepath = os.path.join(self.tar_dir, "%s.tar" % file_hash)
+        os.rename(filepath, new_filepath)
+        self.tar_file = new_filepath
+
+    def tearDown(self):
+        """
+        Clean up.
+        """
+        for test_dir in self.tmp_folders:
+            shutil.rmtree(test_dir)
+
+    def create_tar_members(self, tar_handle, members, m_type, uid=0, gid=0,
+                           permissions=None):
+        """
+        Add members to tar file.
+        """
+        if permissions is None:
+            permissions = self.default_permissions
+
+        for name in members:
+            data = ''
+            if isinstance(name, tuple):
+                name, permissions, data = name
+            data_encoded = data.encode('utf-8')
+
+            t_info = tarfile.TarInfo(name)
+            t_info.type = m_type
+            t_info.mode = permissions
+            t_info.uid = uid
+            t_info.gid = gid
+            t_info.size = len(data_encoded)
+
+            tar_handle.addfile(t_info, io.BytesIO(data_encoded))
+
+    def create_user_dirs(self, tar_handle):
+        """
+        Create root file system tree in tar archive.
+        """
+        tar_members = [
+            ['dirs', tarfile.DIRTYPE],
+            ['files', tarfile.REGTYPE],
+        ]
+
+        for user in self.rootfs_tree:
+            for members, tar_type in tar_members:
+                self.create_tar_members(
+                    tar_handle,
+                    self.rootfs_tree[user][members],
+                    tar_type,
+                    uid=self.rootfs_tree[user]['uid'],
+                    gid=self.rootfs_tree[user]['gid']
+                )
+
+    def apply_mapping(self):
+        """
+        This method applies UID/GID mapping to all users defined in
+        self.rootfs_tree.
+        """
+
+        for user in self.rootfs_tree:
+            user_uid = self.rootfs_tree[user]['uid']
+            user_gid = self.rootfs_tree[user]['gid']
+
+            if self.uid_map:
+                for start, tartget, rng in self.uid_map:
+                    if user_uid >= start and user_uid <= start + rng:
+                        diff = user_uid - start
+                        self.rootfs_tree[user]['uid'] = tartget + diff
+
+            if self.gid_map:
+                for start, tartget, rng in self.gid_map:
+                    if user_gid >= start and user_gid <= start + rng:
+                        diff = user_gid - start
+                        self.rootfs_tree[user]['gid'] = tartget + diff
+
+    def check_rootfs(self, skip_ownership=False):
+        """
+        Check if the root file system was extracted correctly.
+        """
+        for user in self.rootfs_tree:
+            self.check_extracted_members(
+                user, 'files', os.path.isfile, skip_ownership
+            )
+            self.check_extracted_members(
+                user, 'dirs', os.path.isdir, skip_ownership
+            )
+
+    def check_extracted_members(self, user, members, check_existance,
+                                skip_ownership=False):
+        """
+        Check permissions, ownership and content of
+        extracted files or directories.
+
+        @param user: user name defined in self.rootfs_tree
+        @param members: The string 'dirs' or 'files'
+        @param check_existance: Function used to confirm the existnace of
+            member. (E.g. os.path.isdir or os.path.isfile)
+        @param skip_ownership: Whther to skip verification of ownership. Useful
+            when members are extracted with unprivileged user.
+        """
+        permissions = self.default_permissions
+        user_uid = self.rootfs_tree[user]['uid']
+        user_gid = self.rootfs_tree[user]['gid']
+
+        for member_name in self.rootfs_tree[user][members]:
+            # If unpack member if it is tuple. Allow us to specify permissions
+            # and data per file.
+            member_data = ''
+            if isinstance(member_name, tuple):
+                if len(member_name) == 3:
+                    member_data = member_name[2]
+                member_name, permissions = member_name[:2]
+
+            # Skip already checked members. E.g. when multiple layers were
+            # extracted we want to check only the latest version of file.
+            if member_name in self.checked_members:
+                continue
+            else:
+                self.checked_members.add(member_name)
+
+            #########################
+            # Assertion functions
+            #########################
+            member_path = os.path.join(self.dest_dir, member_name)
+            self.assertTrue(
+                check_existance(member_path),
+                'Member was not extracted: %s' % member_path
+            )
+            stat = os.stat(member_path)
+            self.assertEqual(
+                stat.st_mode & 0o777, permissions,
+                'Incorrect permissions: %s' % member_path
+            )
+            if not skip_ownership:
+                self.assertEqual(
+                    stat.st_uid, user_uid,
+                    'Incorrect UID: %s' % member_path
+                )
+                self.assertEqual(
+                    stat.st_gid, user_gid,
+                    'Incorrect GID: %s' % member_path
+                )
+
+            if member_data:
+                with open(member_path, 'r') as content:
+                    file_content = content.read()
+                self.assertEqual(
+                    member_data, file_content,
+                    'Incorrect file content: %s\n'
+                    'Found: %s\n'
+                    'Expected: %s' % (member_path, file_content, member_data)
+                )
+
+    def validate_shadow_file(self, shadow_path,
+                             skip_hash=False, skip_ownership=False):
+        """
+        Ensure that extracted /etc/shadow file has correct ownership,
+        permissions and hashed root password.
+        """
+        self.assertTrue(
+            os.path.isfile(shadow_path),
+            'Does not exist: %s' % shadow_path
+        )
+        stat = os.stat(shadow_path)
+        self.assertEqual(
+            stat.st_mode & 0o777,
+            self.shadow_file_permissions,
+            'Shadow file has incorrect permissions: %s' % shadow_path
+        )
+        if not skip_ownership:
+            self.assertEqual(
+                stat.st_uid,
+                self.rootfs_tree['root']['uid'],
+                'Shadow file has incorrect UID: %s' % shadow_path
+            )
+            self.assertEqual(
+                stat.st_gid,
+                self.rootfs_tree['root']['gid'],
+                'Shadow file has incorrect GID: %s' % shadow_path
+            )
+
+        if not skip_hash:
+            # Note: For simplicity we assume that the first line of the file
+            # contains the root entry.
+            with open(shadow_path, 'r') as content:
+                shadow_content = content.readlines()
+
+            if not shadow_content:
+                raise Exception("File is empty: %s" % shadow_path)
+
+            self.assertTrue(
+                passlib.hosts.linux_context.verify(
+                    self.root_password,
+                    shadow_content[0].split(':')[1]
+                ),
+                "Root password hash is invalid."
+            )
+
+    def validate_shadow_file_in_image(self, g):
+        """
+        Validate permission, ownership and root password hash for /etc/shadow
+        file stored in qcow2 image.
+        """
+        self.assertTrue(
+            g.is_file('/etc/shadow'),
+            "Shadow file does not exist"
+        )
+
+        stat = g.stat('/etc/shadow')
+        self.assertEqual(
+            stat['mode'] & 0o777,
+            self.shadow_file_permissions,
+            'Shadow file has incorrect permissions'
+        )
+        self.assertEqual(
+            stat['uid'],
+            self.rootfs_tree['root']['uid'],
+            'Shadow file has incorrect UID'
+        )
+        self.assertEqual(
+            stat['gid'],
+            self.rootfs_tree['root']['gid'],
+            'Shadow file has incorrect GID'
+        )
+
+        # Note: For simplicity we assume that the first line of the file
+        # contains the root entry.
+        shadow_content = g.cat('/etc/shadow').split('\n')
+        self.assertTrue(
+            passlib.hosts.linux_context.verify(
+                self.root_password,
+                shadow_content[0].split(':')[1]
+            ),
+            "Root password hash is invalid."
+        )
+
+    def check_image_content(self, g, user, members, check_existance):
+        """
+        Verify the existance, permissions, ownership of members in qcow2 image.
+
+        @param g: guestfs handle
+        @param user: User defined in self.rootfs_tree
+        @param members: The string 'dirs' or 'files'
+        @param check_existance: Function used to confirm the existnace of
+            member. (E.g. os.path.isdir or os.path.isfile)
+        """
+        permissions = self.default_permissions
+        user_uid = self.rootfs_tree[user]['uid']
+        user_gid = self.rootfs_tree[user]['gid']
+
+        for member_name in self.rootfs_tree[user][members]:
+            # Get specified permissions of file.
+            if isinstance(member_name, tuple):
+                member_name, permissions = member_name[:2]
+
+            # Skip already checked files.
+            if member_name in self.checked_members:
+                continue
+            else:
+                self.checked_members.add(member_name)
+
+            # When using guestfs all names should start with '/'
+            if not member_name.startswith('/'):
+                member_name = '/' + member_name
+
+            self.assertTrue(
+                check_existance(member_name),
+                "Member was not found: %s" % member_name
+            )
+            stat = g.stat(member_name)
+            self.assertEqual(
+                stat['mode'] & 0o777, permissions,
+                'Incorrect permissions: %s' % member_name
+            )
+            self.assertEqual(
+                stat['uid'],
+                user_uid,
+                "Incorrect UID: %s\n"
+                "Found: %s\n"
+                "Expected: %s" % (member_name, stat['uid'], user_uid)
+            )
+            self.assertEqual(
+                stat['gid'],
+                user_gid,
+                "Incorrect GID: %s\n"
+                "Found: %s\n"
+                "Expected: %s" % (member_name, stat['gid'], user_gid)
+            )
+
+    def check_image(self, g):
+        """
+        Check the presence of files and folders in qcow2 image.
+        """
+        for user in self.rootfs_tree:
+            # Check folders
+            self.check_image_content(g, user, 'dirs', g.is_dir)
+            # Check files
+            self.check_image_content(g, user, 'files', g.is_file)
diff --git a/tests/docker_source.py b/tests/docker_source.py
new file mode 100644
index 0000000..cf2b2d2
--- /dev/null
+++ b/tests/docker_source.py
@@ -0,0 +1,535 @@
+# -*- coding: utf-8 -*-
+# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
+#
+# Copyright (C) 2017 Radostin Stoyanov
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Regression tests which aim to excercise creation of root file system
+with DockerSource.
+
+To avoid fetching network resources we mock out the functions:
+- utils.get_image_details(): Returns manifest content
+- utils.get_image_dir(): Returns the directory which contains the tar files
+
+Brief description of this tests:
+1. Create 3 dummy tar files named <checksum>.tar used as image layers.
+2. Generate manifest content.
+3. Mock out get_image_details() and get_image_dir().
+4. Call bootstrap().
+5. Check the result.
+"""
+
+import copy
+import os
+import subprocess
+import unittest
+import guestfs
+
+from . import mock
+from . import sources
+from . import virt_bootstrap
+from . import BuildTarFiles
+
+
+# pylint: disable=invalid-name
+class CreateLayers(BuildTarFiles):
+    """
+    Create tarfiles to used as image layers and generate manifest for them.
+    """
+    def check_result(self, layers_rootfs, dest):
+        """
+        This method is used to check extracted root file system.
+        """
+        pass
+
+    def call_bootstrap(self, manifest, dest):
+        """
+        This method is used to call vurtBootstrap.bootstrap()
+        """
+        pass
+
+    def generate_manifest(self, layers, layers_rootfs):
+        """
+        Generate Manifest content and then call self.call_bootstrap() and
+        self.check_result()
+        """
+        manifest = {
+            "schemaVersion": 2,
+            "layers": [
+                {
+                    "digest":
+                    "sha256:" + os.path.basename(layer).split('.')[0]
+                }
+                for layer in layers
+            ]
+        }
+        self.call_bootstrap(manifest, self.dest_dir)
+        self.check_result(layers_rootfs, self.dest_dir)
+
+    def main(self):
+        """
+        Create dummy tar files used to simulate layers of image
+        then call generate_manifest().
+
+        The variable:
+        - "layers" store a lists of paths to created archives.
+
+        - "layers_rootfs" store the value of self.rootfs_tree used to generate
+        tarball.
+        """
+        # Store base layer info
+        layers = [self.tar_file]
+        layers_rootfs = [copy.deepcopy(self.rootfs_tree)]
+
+        # Create Layer 1
+        self.rootfs_tree['root']['files'] = [
+            ('etc/foo/bar', 0o644, "This should be overwritten")
+        ]
+        self.create_tar_file()
+        # Store layer 1 info
+        layers.append(self.tar_file)
+        layers_rootfs.append(copy.deepcopy(self.rootfs_tree))
+
+        # Create Layer 2
+        self.rootfs_tree['root']['files'] = [
+            ('etc/foo/bar', 0o644, "Content of etc/foo/bar"),
+            ('bin/foobar', 0o755, "My executable script")
+        ]
+        self.create_tar_file()
+        # Store layer 2 info
+        layers.append(self.tar_file)
+        layers_rootfs.append(copy.deepcopy(self.rootfs_tree))
+
+        self.generate_manifest(layers, layers_rootfs)
+
+
+class DirExtractRootFS(CreateLayers):
+    """
+    Ensures that all layers extracted correctly in destination folder.
+    """
+    def check_result(self, layers_rootfs, dest):
+        """
+        Iterates trough values of layers_rootfs in reverse order (from the last
+        layer to first) and calls check_extracted_files().
+        """
+        for rootfs_tree in layers_rootfs[::-1]:
+            self.rootfs_tree = rootfs_tree
+            self.check_rootfs(skip_ownership=(os.geteuid != 0))
+
+    def call_bootstrap(self, manifest, dest):
+        """
+        Mock get_image_details() and get_image_dir() then call the function
+        virt-bootstra.bootstrap().
+        """
+        with mock.patch.multiple('virtBootstrap.utils',
+                                 get_image_details=mock.DEFAULT,
+                                 get_image_dir=mock.DEFAULT) as mocked:
+
+            mocked['get_image_details'].return_value = manifest
+            mocked['get_image_dir'].return_value = self.tar_dir
+
+            virt_bootstrap.bootstrap(
+                uri='docker://foobar',
+                dest=dest,
+                fmt='dir',
+                progress_cb=mock.Mock()
+            )
+
+    def runTest(self):
+        """
+        Execute this test.
+        """
+        self.main()
+
+
+ at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
+class DirOwnershipMapping(CreateLayers):
+    """
+    Ensures that the mapping of ownership works as expected.
+    """
+
+    def check_result(self, layers_rootfs, dest):
+        """
+        Iterate trough all values of layers_rootfs in reverse order apply the
+        mapping self.rootfs_tree used to create the test archive and verify
+        extracted files.
+        """
+        for rootfs_tree in layers_rootfs[::-1]:
+            self.rootfs_tree = rootfs_tree
+            self.apply_mapping()
+            self.check_rootfs()
+
+    def call_bootstrap(self, manifest, dest):
+        """
+        Mock get_image_details() and get_image_dir() and call the function
+        virt_bootstrap.bootstrap() with UID/GID mapping values.
+        """
+        self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+        self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+        with mock.patch.multiple('virtBootstrap.utils',
+                                 get_image_details=mock.DEFAULT,
+                                 get_image_dir=mock.DEFAULT) as mocked:
+            mocked['get_image_details'].return_value = manifest
+            mocked['get_image_dir'].return_value = self.tar_dir
+            virt_bootstrap.bootstrap(
+                progress_cb=mock.Mock(),
+                uri='docker://foo',
+                dest=dest,
+                fmt='dir',
+                uid_map=self.uid_map,
+                gid_map=self.gid_map
+            )
+
+    def runTest(self):
+        """
+        Execute this test.
+        """
+        self.main()
+
+
+ at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
+class DirSettingRootPassword(CreateLayers):
+    """
+    Ensures that the root password is set correctly.
+    """
+    def check_result(self, layers_rootfs, dest):
+        """
+        Validate shadow file
+        """
+        self.validate_shadow_file(os.path.join(dest, 'etc/shadow'))
+
+    def call_bootstrap(self, manifest, dest):
+        """
+        Mock get_image_details() and get_image_dir() and call the function
+        virt_bootstrap.bootstrap() with root_password value.
+        """
+        self.root_password = "My secret root password"
+        with mock.patch.multiple('virtBootstrap.utils',
+                                 get_image_details=mock.DEFAULT,
+                                 get_image_dir=mock.DEFAULT) as mocked:
+
+            mocked['get_image_details'].return_value = manifest
+            mocked['get_image_dir'].return_value = self.tar_dir
+
+            virt_bootstrap.bootstrap(
+                progress_cb=mock.Mock(),
+                uri='docker://foo',
+                dest=dest,
+                fmt='dir',
+                root_password=self.root_password
+            )
+
+    def runTest(self):
+        """
+        Execute this test.
+        """
+        self.main()
+
+
+class Qcow2BuildImage(CreateLayers):
+    """
+    Ensures that the convertion of tar files to qcow2 image with backing chains
+    works as expected.
+    """
+    def get_image_info(self, image_path):
+        """
+        Returns information about the disk image using "qemu-img".
+        """
+        cmd = ['qemu-img', 'info', image_path]
+        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+        output, _ignore = proc.communicate()
+        return output.decode('utf-8').split('\n')
+
+    def check_result(self, layers_rootfs, dest):
+        """
+        Verify that layers were converted correctly to qcow2 images.
+        """
+        ###################
+        # Check base layer
+        ###################
+        base_layer_path = os.path.join(dest, "layer-0.qcow2")
+        base_info = self.get_image_info(base_layer_path)
+        self.assertEqual(base_info[1], 'file format: qcow2')
+        images = [base_layer_path]
+        ###########################
+        # Check backing chains
+        ###########################
+        for i in range(1, len(layers_rootfs)):
+            img_path = os.path.join(dest, "layer-%d.qcow2" % i)
+            img_info = self.get_image_info(img_path)
+            self.assertEqual(
+                img_info[1],
+                'file format: qcow2',
+                'Invalid qcow2 disk image: %s' % img_path
+            )
+            backing_file = os.path.join(dest, "layer-%d.qcow2" % (i - 1))
+            self.assertEqual(
+                img_info[5],
+                'backing file: %s' % backing_file,
+                "Incorrect backing file for: %s\n"
+                "Expected: %s\n"
+                "Found: %s" % (img_info, backing_file, img_info[5])
+            )
+            images.append(img_path)
+        ###############################
+        # Check extracted files/folders
+        ###############################
+        g = guestfs.GuestFS(python_return_dict=True)
+        for path in images:
+            g.add_drive_opts(path, readonly=True)
+        g.launch()
+        devices = g.list_filesystems()
+        for dev, rootfs in zip(sorted(devices), layers_rootfs):
+            self.rootfs_tree = rootfs
+            g.mount(dev, '/')
+            self.check_image(g)
+            g.umount('/')
+        g.shutdown()
+
+    def call_bootstrap(self, manifest, dest):
+        """
+        Mock get_image_details() and get_image_dir() and call the function
+        virt_bootstrap.bootstrap() for qcow2 format.
+        """
+        with mock.patch.multiple('virtBootstrap.utils',
+                                 get_image_details=mock.DEFAULT,
+                                 get_image_dir=mock.DEFAULT) as mocked:
+
+            mocked['get_image_details'].return_value = manifest
+            mocked['get_image_dir'].return_value = self.tar_dir
+
+            virt_bootstrap.bootstrap(
+                uri='docker://foobar',
+                dest=dest,
+                fmt='qcow2',
+                progress_cb=mock.Mock()
+            )
+
+    def runTest(self):
+        """
+        Execute this test.
+        """
+        self.main()
+
+
+ at unittest.skip("Need fix for this test to pass")
+class Qcow2OwnershipMapping(Qcow2BuildImage):
+    """
+    Ensures that UID/GID mapping works correctly for qcow2 conversion.
+    """
+    def check_result(self, layers_rootfs, dest):
+        """
+        Iterate through values of layers_rootfs in reverse order and apply the
+        mapping values to self.rootfs_tree. Then verify the ownership of
+        files/folders created in the last backing chain of qcow2 image.
+        """
+        g = guestfs.GuestFS(python_return_dict=True)
+        image_path = os.path.join(dest, "layer-%d.qcow2" % len(layers_rootfs))
+        g.add_drive_opts(image_path, readonly=True)
+
+        g.launch()
+        for rootfs in layers_rootfs[::-1]:
+            self.rootfs_tree = rootfs
+            self.apply_mapping()
+            g.mount('/dev/sda', '/')
+            self.check_image(g)
+            g.umount('/')
+        g.shutdown()
+
+    def call_bootstrap(self, manifest, dest):
+        """
+        Mock the functions get_image_details() and get_image_dir() then call
+        virtbootstrap.bootstrap() with UID/GID mapping values and fmt="qcow2".
+        """
+        self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+        self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+        with mock.patch.multiple('virtBootstrap.utils',
+                                 get_image_details=mock.DEFAULT,
+                                 get_image_dir=mock.DEFAULT) as mocked:
+            mocked['get_image_details'].return_value = manifest
+            mocked['get_image_dir'].return_value = self.tar_dir
+            virt_bootstrap.bootstrap(
+                uri='docker://foobar',
+                progress_cb=mock.Mock(),
+                fmt='qcow2',
+                dest=dest,
+                uid_map=self.uid_map,
+                gid_map=self.gid_map
+            )
+
+
+ at unittest.skip("Need fix for this test to pass")
+class Qcow2SettingRootPassword(Qcow2BuildImage):
+    """
+    Ensures that the root password is set correctly in the last backing chain
+    of image qcow2 image.
+    """
+    def check_result(self, layers_rootfs, dest):
+        """
+        Load last backing chain and validate shadow file.
+        """
+        g = guestfs.GuestFS(python_return_dict=True)
+        g.add_drive_opts(
+            os.path.join(dest, "layer-%d.qcow2" % len(layers_rootfs)),
+            readonly=True
+        )
+        g.launch()
+        g.mount('/dev/sda', '/')
+        self.validate_shadow_file_in_image(g)
+        g.umount('/')
+        g.shutdown()
+
+    def call_bootstrap(self, manifest, dest):
+        """
+        Mock the functions get_image_details() and get_image_dir() then
+        call virt_bootstrap.bootstrap() with root password value and qcow2
+        output format.
+        """
+        self.root_password = "My secret password"
+        with mock.patch.multiple('virtBootstrap.utils',
+                                 get_image_details=mock.DEFAULT,
+                                 get_image_dir=mock.DEFAULT) as mocked:
+            mocked['get_image_details'].return_value = manifest
+            mocked['get_image_dir'].return_value = self.tar_dir
+            virt_bootstrap.bootstrap(
+                uri='docker://foobar',
+                dest=dest,
+                fmt='qcow2',
+                progress_cb=mock.Mock(),
+                root_password=self.root_password
+            )
+
+
+class TestDockerSource(unittest.TestCase):
+    """
+    Unit tests for DockerSource
+    """
+    ###################################
+    # Tests for: retrieve_layers_info()
+    ###################################
+    def _mock_retrieve_layers_info(self, manifest, kwargs):
+        """
+        This method is gather common test pattern used in the following
+        two test cases which aim to return an instance of the class
+        DockerSource with some util functions being mocked.
+        """
+        with mock.patch.multiple('virtBootstrap.utils',
+                                 get_image_details=mock.DEFAULT,
+                                 get_image_dir=mock.DEFAULT) as m_utils:
+
+            m_utils['get_image_details'].return_value = manifest
+            m_utils['get_image_dir'].return_value = '/images_path'
+
+            patch_method = 'virtBootstrap.sources.DockerSource.gen_valid_uri'
+            with mock.patch(patch_method) as m_uri:
+                src_instance = sources.DockerSource(**kwargs)
+        return (src_instance, m_uri, m_utils)
+
+    def test_retrieve_layers_info_pass_arguments_to_get_image_details(self):
+        """
+        Ensures that retrieve_layers_info() calls get_image_details()
+        with all passed arguments.
+        """
+        src_kwargs = {
+            'uri': '',
+            'progress': mock.Mock()
+        }
+
+        manifest = {'schemaVersion': 2, 'layers': []}
+        (src_instance,
+         m_uri, m_utils) = self._mock_retrieve_layers_info(manifest,
+                                                           src_kwargs)
+
+        kwargs = {
+            'insecure': src_instance.insecure,
+            'username': src_instance.username,
+            'password': src_instance.password,
+            'raw': True
+        }
+        m_utils['get_image_details'].assert_called_once_with(m_uri(), **kwargs)
+
+    def test_retrieve_layers_info_schema_version_1(self):
+        """
+        Ensures that retrieve_layers_info() extracts the layers' information
+        from manifest with schema version 1 a list with format:
+            ["digest", "sum_type", "file_path", "size"].
+        """
+        kwargs = {
+            'uri': '',
+            'progress': mock.Mock()
+        }
+
+        manifest = {
+            'schemaVersion': 1,
+            'fsLayers': [
+                {'blobSum': 'sha256:75c416ea'},
+                {'blobSum': 'sha256:c6ff40b6'},
+                {'blobSum': 'sha256:a7050fc1'}
+            ]
+        }
+
+        expected_result = [
+            ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', None],
+            ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', None],
+            ['sha256', '75c416ea', '/images_path/75c416ea.tar', None]
+        ]
+
+        src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0]
+        self.assertEqual(src_instance.layers, expected_result)
+
+    def test_retrieve_layers_info_schema_version_2(self):
+        """
+        Ensures that retrieve_layers_info() extracts the layers' information
+        from manifest with schema version 2 a list with format:
+            ["digest", "sum_type", "file_path", "size"].
+        """
+        kwargs = {
+            'uri': '',
+            'progress': mock.Mock()
+        }
+
+        manifest = {
+            'schemaVersion': 2,
+            "layers": [
+                {"size": 47103294, "digest": "sha256:75c416ea"},
+                {"size": 814, "digest": "sha256:c6ff40b6"},
+                {"size": 513, "digest": "sha256:a7050fc1"}
+            ]
+        }
+
+        expected_result = [
+            ['sha256', '75c416ea', '/images_path/75c416ea.tar', 47103294],
+            ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', 814],
+            ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', 513]
+        ]
+
+        src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0]
+        self.assertEqual(src_instance.layers, expected_result)
+
+    def test_retrieve_layers_info_raise_error_on_invalid_schema_version(self):
+        """
+        Ensures that retrieve_layers_info() calls get_image_details()
+        with all passed arguments.
+        """
+        kwargs = {
+            'uri': '',
+            'progress': mock.Mock()
+        }
+
+        manifest = {'schemaVersion': 3}
+        with self.assertRaises(ValueError):
+            self._mock_retrieve_layers_info(manifest, kwargs)
diff --git a/tests/file_source.py b/tests/file_source.py
new file mode 100644
index 0000000..dd7fd00
--- /dev/null
+++ b/tests/file_source.py
@@ -0,0 +1,184 @@
+# -*- coding: utf-8 -*-
+# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
+#
+# Copyright (C) 2017 Radostin Stoyanov
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Regression tests which aim to excercise the creation of root file system
+with FileSource.
+"""
+
+import os
+import unittest
+import guestfs
+
+from . import virt_bootstrap
+from . import BuildTarFiles
+from . import mock
+
+
+# pylint: disable=invalid-name
+class Qcow2BuildImage(BuildTarFiles):
+    """
+    Ensures that building qcow2 image from tarball with root file system
+    works as expected.
+    """
+
+    def check_qcow2_images(self, image_path):
+        """
+        Ensures that qcow2 images contain all files.
+        """
+        g = guestfs.GuestFS(python_return_dict=True)
+        g.add_drive_opts(image_path, readonly=True)
+        g.launch()
+        g.mount('/dev/sda', '/')
+        self.check_image(g)
+        g.umount('/')
+        g.shutdown()
+
+    def get_image_path(self):
+        """
+        Returns the path where the qcow2 image will be stored.
+        """
+        return os.path.join(
+            self.dest_dir,
+            "%s.qcow2" % os.path.basename(self.tar_file)
+        )
+
+    def runTest(self):
+        """
+        Create qcow2 image from each dummy tarfile.
+        """
+        virt_bootstrap.bootstrap(
+            uri=self.tar_file,
+            dest=self.dest_dir,
+            fmt='qcow2',
+            progress_cb=mock.Mock()
+        )
+        self.check_qcow2_images(self.get_image_path())
+
+
+ at unittest.skip("Not implemented")
+class Qcow2OwnershipMapping(Qcow2BuildImage):
+    """
+    Ensures that UID/GID mapping works correctly for qcow2 conversion.
+    """
+    def runTest(self):
+        """
+        Create qcow2 image from each dummy tarfile.
+        """
+        self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+        self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+        virt_bootstrap.bootstrap(
+            progress_cb=mock.Mock(),
+            uri=self.tar_file,
+            dest=self.dest_dir,
+            fmt='qcow2',
+            uid_map=self.uid_map,
+            gid_map=self.gid_map
+        )
+        self.apply_mapping()
+        self.check_qcow2_images(self.get_image_path())
+
+
+ at unittest.skip("Not implemented")
+class Qcow2SettingRootPassword(Qcow2BuildImage):
+    """
+    Ensures that the root password is set correctly in the backing file
+    of image qcow2 image.
+    """
+    def runTest(self):
+        """
+        Create qcow2 image from each dummy tarfile.
+        """
+        self.root_password = "My secret password"
+        virt_bootstrap.bootstrap(
+            progress_cb=mock.Mock(),
+            uri=self.tar_file,
+            dest=self.dest_dir,
+            fmt='qcow2',
+            root_password=self.root_password
+        )
+        self.check_image = self.validate_shadow_file_in_image
+        self.check_qcow2_images(self.get_image_path())
+
+
+ at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
+class DirExtractRootFS(BuildTarFiles):
+    """
+    Ensures that files from rootfs tarball are extracted correctly.
+    """
+    def runTest(self):
+        """
+        Extract rootfs from each dummy tarfile.
+        """
+        dest = self.dest_dir
+        virt_bootstrap.bootstrap(
+            uri=self.tar_file,
+            dest=dest,
+            fmt='dir',
+            progress_cb=mock.Mock()
+        )
+        self.check_rootfs()
+
+
+ at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
+class DirOwnershipMapping(DirExtractRootFS):
+    """
+    Ensures that UID/GID mapping for extracted root file system are applied
+    correctly.
+    """
+    def runTest(self):
+        """
+        Extract the dummy tarfiles using FileSource and apply UID/GID mappings.
+        """
+        self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+        self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+        # Create qcow2 images
+        dest = self.dest_dir
+        virt_bootstrap.bootstrap(
+            uri=self.tar_file,
+            dest=dest,
+            fmt='dir',
+            progress_cb=mock.Mock(),
+            uid_map=self.uid_map,
+            gid_map=self.gid_map
+        )
+        self.apply_mapping()
+        self.check_rootfs()
+
+
+ at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
+class DirSettingRootPassword(DirExtractRootFS):
+    """
+    Ensures that the root password is set correctly when FileSource is used
+    with fmt='dir'.
+    """
+
+    def runTest(self):
+        """
+        Extract rootfs from each dummy tarfile and set root password.
+        """
+        self.root_password = 'my secret root password'
+        virt_bootstrap.bootstrap(
+            uri=self.tar_file,
+            dest=self.dest_dir,
+            fmt='dir',
+            progress_cb=mock.Mock(),
+            root_password=self.root_password
+        )
+        self.validate_shadow_file(os.path.join(self.dest_dir, 'etc/shadow'))
diff --git a/tests/test_utils.py b/tests/test_utils.py
new file mode 100644
index 0000000..c2f55b5
--- /dev/null
+++ b/tests/test_utils.py
@@ -0,0 +1,143 @@
+# -*- coding: utf-8 -*-
+# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
+#
+# Copyright (C) 2017 Radostin Stoyanov
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Unit tests for functions defined in virtBootstrap.utils
+"""
+import unittest
+from . import utils
+
+
+# pylint: disable=invalid-name
+class TestUtils(unittest.TestCase):
+    """
+    Ensures that functions defined in the utils module of virtBootstrap
+    work as expected.
+    """
+    ###################################
+    # Tests for: bytes_to_size()
+    ###################################
+    def test_utils_bytes_to_size(self):
+        """
+        Validates the output of bytes_to_size() for some test cases.
+        """
+        test_values = {
+            0: '0', 1: '1', 512: '512', 1000: '0.98 KiB', 1024: '1 KiB',
+            4096: '4 KiB', 5120: '5 KiB', 10 ** 10: '9.31 GiB'
+        }
+        for value in test_values:
+            self.assertEqual(utils.bytes_to_size(value), test_values[value])
+
+    ###################################
+    # Tests for: size_to_bytes()
+    ###################################
+    def test_utils_size_to_bytes(self):
+        """
+        Validates the output of size_to_bytes() for some test cases.
+        """
+        test_values = [1, '0']
+        test_formats = ['TB', 'GB', 'MB', 'KB', 'B']
+        expected_output = [1099511627776, 1073741824, 1048576, 1024, 1,
+                           0, 0, 0, 0, 0]
+        i = 0
+        for value in test_values:
+            for fmt in test_formats:
+                self.assertEqual(utils.size_to_bytes(value, fmt),
+                                 expected_output[i])
+                i += 1
+
+    ###################################
+    # Tests for: is_new_layer_message()
+    ###################################
+    def test_utils_is_new_layer_message(self):
+        """
+        Ensures that is_new_layer_message() returns True when message
+        from the skopeo's stdout indicates processing of new layer
+        and False otherwise.
+        """
+
+        valid_msgs = [
+            "Copying blob sha256:be232718519c940b04bc57",
+            "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2"
+        ]
+
+        invalid_msgs = [
+            'Copying config sha256', 'test', ''
+        ]
+
+        for msg in valid_msgs:
+            self.assertTrue(utils.is_new_layer_message(msg))
+        for msg in invalid_msgs:
+            self.assertFalse(utils.is_new_layer_message(msg))
+
+    ###################################
+    # Tests for: is_layer_config_message()
+    ###################################
+    def test_utils_is_layer_config_message(self):
+        """
+        Ensures that is_layer_config_message() returns True when message
+        from the skopeo's stdout indicates processing of manifest file
+        of container image and False otherwise.
+        """
+        invalid_msgs = [
+            "Copying blob sha256:be232718519c940b04bc57",
+            "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2",
+            ''
+        ]
+
+        valid_msg = 'Copying config sha256:d355ed3537e94e76389fd78b7724'
+
+        self.assertTrue(utils.is_layer_config_message(valid_msg))
+        for msg in invalid_msgs:
+            self.assertFalse(utils.is_layer_config_message(msg))
+
+    ###################################
+    # Tests for: make_async()
+    ###################################
+    def test_utils_make_async(self):
+        """
+        Ensures that make_async() sets O_NONBLOCK flag on PIPE.
+        """
+
+        proc = utils.subprocess.Popen(
+            ["echo"],
+            stdout=utils.subprocess.PIPE
+        )
+        pipe = proc.stdout
+
+        fd = pipe.fileno()
+        F_GETFL = utils.fcntl.F_GETFL
+        O_NONBLOCK = utils.os.O_NONBLOCK
+
+        self.assertFalse(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
+        utils.make_async(fd)
+        self.assertTrue(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
+        proc.wait()
+        pipe.close()
+
+    ###################################
+    # Tests for: str2float()
+    ###################################
+    def test_utils_str2float(self):
+        """
+        Validates the output of str2float() for some test cases.
+        """
+        test_values = {'1': 1.0, 'test': None, '0': 0.0, '1.25': 1.25}
+        for test in test_values:
+            self.assertEqual(utils.str2float(test), test_values[test])
-- 
2.13.5




More information about the virt-tools-list mailing list