[virt-tools-list] [virt-bootstrap] [PATCH v6 03/26] Add regression tests

Cedric Bosdonnat cbosdonnat at suse.com
Sat Aug 19 06:37:02 UTC 2017


On Thu, 2017-08-17 at 10:39 +0100, Radostin Stoyanov wrote:
> These tests aim to verify the output of virt-bootstrap in more
> abstract manner by creating tar files calling bootstrap() with
> them to check the result when output format is set to "dir" or
> "qcow2".
> ---
>  tests/__init__.py      | 436 ++++++++++++++++++++++++++++++++++++++++
>  tests/docker_source.py | 535 +++++++++++++++++++++++++++++++++++++++++++++++++
>  tests/file_source.py   | 184 +++++++++++++++++
>  tests/test_utils.py    | 143 +++++++++++++
>  4 files changed, 1298 insertions(+)
>  create mode 100644 tests/__init__.py
>  create mode 100644 tests/docker_source.py
>  create mode 100644 tests/file_source.py
>  create mode 100644 tests/test_utils.py
> 
> diff --git a/tests/__init__.py b/tests/__init__.py
> new file mode 100644
> index 0000000..7a53c38
> --- /dev/null
> +++ b/tests/__init__.py
> @@ -0,0 +1,436 @@
> +# -*- coding: utf-8 -*-
> +# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> +#
> +# Copyright (C) 2017 Radostin Stoyanov
> +#
> +# This program is free software: you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation, either version 3 of the License, or
> +# (at your option) any later version.
> +
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +# GNU General Public License for more details.
> +
> +# You should have received a copy of the GNU General Public License
> +# along with this program.  If not, see <http://www.gnu.org/licenses/>.
> +
> +"""
> +Regression tests for virt-bootstrap
> +"""
> +
> +import hashlib
> +import io
> +import os
> +import shutil
> +import sys
> +import tarfile
> +import unittest
> +import passlib.hosts
> +
> +try:
> +    import mock
> +except ImportError:
> +    import unittest.mock as mock
> 
> +sys.path.insert(0, '../src')  # noqa: E402
> +sys.path.insert(0, '../src')  # noqa: E402
> +
> +# pylint: disable=import-error, wrong-import-position
> +from virtBootstrap import virt_bootstrap
> +from virtBootstrap import sources
> +from virtBootstrap import progress
> +from virtBootstrap import utils
> +
> +__all__ = ['virt_bootstrap', 'sources', 'progress', 'utils']
> +
> +
> +# pylint: disable=invalid-name,too-many-instance-attributes,too-many-arguments
> +class BuildTarFiles(unittest.TestCase):
> +    """
> +    Create dummy tar files used for testing.
> +    """
> +
> +    def setUp(self):
> +        """
> +        Create dummy rootfs tar files
> +        """
> +        self.tar_dir = os.path.realpath('tests/tarfiles')
> +        self.dest_dir = os.path.realpath('tests/filesystems')

You should add those folders to gitignore.

> +        self.default_permissions = 0o755
> +        self.shadow_file_permissions = 0o600
> +        self.shadow_file_content = "root:*:"
> +        self.tmp_folders = [self.tar_dir, self.dest_dir]
> +        self.tar_file = None
> +        self.uid_map = []
> +        self.gid_map = []
> +        self.root_password = ''
> +        self.checked_members = set()
> +        self.rootfs_tree = {
> +            'root': {
> +                'uid': 0,
> +                'gid': 0,
> +                'dirs': [
> +                    'bin',
> +                    'etc',
> +                    'home',
> +                    'lib',
> +                    'opt',
> +                    'root',
> +                    'run',
> +                    'sbin',
> +                    'srv',
> +                    'tmp',
> +                    'usr',
> +                    'var',
> +                ],
> +                'files': [
> +                    'etc/hosts',
> +                    (
> +                        'etc/shadow',
> +                        self.shadow_file_permissions,
> +                        self.shadow_file_content
> +                    )
> +                ]
> +            },
> +            'user1': {
> +                'uid': 500,
> +                'gid': 500,
> +                'dirs': ['home/user1'],
> +                'files': [
> +                    ('home/user1/test_file', 0o644, 'test data')
> +                ]
> +            },
> +
> +            'user2': {
> +                'uid': 1000,
> +                'gid': 1000,
> +                'dirs': [
> +                    'home/user2',
> +                    'home/user2/test_dir'
> +                ],
> +                'files': [
> +                    'home/user2/test_dir/test_file'
> +                ]
> +            }
> +        }
> +
> +        # Create test folders
> +        for test_dir in self.tmp_folders:
> +            if not os.path.exists(test_dir):
> +                os.makedirs(test_dir)
> +
> +        self.create_tar_file()
> +
> +    def create_tar_file(self):
> +        """
> +        Use temporary name to create uncomressed tarball with dummy file

typo: 'uncompressed'

> +        system. Then get checksum of the content and rename the tarfile to
> +        <checksum>.tar In this way we can easily generate Manifest of Docker
> +        registry and pass it to virt-bootstrap.
> +        """
> +        filepath = os.path.join(self.tar_dir, 'tmp_file.tar')

I guess this name should get some randomness: imagine the tests are later run
parallelized. Using tempfile.mkstemp() to create this file is surely safer.

> +        with tarfile.open(filepath, 'w') as tar:
> +            self.create_user_dirs(tar)
> +        # Get sha256 checksum of the archive
> +        with open(filepath, 'rb') as file_handle:
> +            file_hash = hashlib.sha256(file_handle.read()).hexdigest()
> +        # Rename the archive to <checksum>.tar
> +        new_filepath = os.path.join(self.tar_dir, "%s.tar" % file_hash)
> +        os.rename(filepath, new_filepath)
> +        self.tar_file = new_filepath
> +
> +    def tearDown(self):
> +        """
> +        Clean up.
> +        """
> +        for test_dir in self.tmp_folders:
> +            shutil.rmtree(test_dir)

Hum, if you're removing those folders anyway, then you surely want to create them
with mkdtemp().

> +
> +    def create_tar_members(self, tar_handle, members, m_type, uid=0, gid=0,
> +                           permissions=None):
> +        """
> +        Add members to tar file.
> +        """
> +        if permissions is None:
> +            permissions = self.default_permissions
> +
> +        for name in members:
> +            data = ''
> +            if isinstance(name, tuple):
> +                name, permissions, data = name
> +            data_encoded = data.encode('utf-8')
> +
> +            t_info = tarfile.TarInfo(name)
> +            t_info.type = m_type
> +            t_info.mode = permissions
> +            t_info.uid = uid
> +            t_info.gid = gid
> +            t_info.size = len(data_encoded)
> +
> +            tar_handle.addfile(t_info, io.BytesIO(data_encoded))
> +
> +    def create_user_dirs(self, tar_handle):
> +        """
> +        Create root file system tree in tar archive.
> +        """
> +        tar_members = [
> +            ['dirs', tarfile.DIRTYPE],
> +            ['files', tarfile.REGTYPE],
> +        ]
> +
> +        for user in self.rootfs_tree:
> +            for members, tar_type in tar_members:
> +                self.create_tar_members(
> +                    tar_handle,
> +                    self.rootfs_tree[user][members],
> +                    tar_type,
> +                    uid=self.rootfs_tree[user]['uid'],
> +                    gid=self.rootfs_tree[user]['gid']
> +                )
> +
> +    def apply_mapping(self):
> +        """
> +        This method applies UID/GID mapping to all users defined in
> +        self.rootfs_tree.
> +        """
> +
> +        for user in self.rootfs_tree:
> +            user_uid = self.rootfs_tree[user]['uid']
> +            user_gid = self.rootfs_tree[user]['gid']
> +
> +            if self.uid_map:
> +                for start, tartget, rng in self.uid_map:
> +                    if user_uid >= start and user_uid <= start + rng:
> +                        diff = user_uid - start
> +                        self.rootfs_tree[user]['uid'] = tartget + diff
> +
> +            if self.gid_map:
> +                for start, tartget, rng in self.gid_map:
> +                    if user_gid >= start and user_gid <= start + rng:
> +                        diff = user_gid - start
> +                        self.rootfs_tree[user]['gid'] = tartget + diff
> +
> +    def check_rootfs(self, skip_ownership=False):
> +        """
> +        Check if the root file system was extracted correctly.
> +        """
> +        for user in self.rootfs_tree:
> +            self.check_extracted_members(
> +                user, 'files', os.path.isfile, skip_ownership
> +            )
> +            self.check_extracted_members(
> +                user, 'dirs', os.path.isdir, skip_ownership
> +            )
> +
> +    def check_extracted_members(self, user, members, check_existance,
> +                                skip_ownership=False):
> +        """
> +        Check permissions, ownership and content of
> +        extracted files or directories.
> +
> +        @param user: user name defined in self.rootfs_tree
> +        @param members: The string 'dirs' or 'files'
> +        @param check_existance: Function used to confirm the existnace of
> +            member. (E.g. os.path.isdir or os.path.isfile)
> +        @param skip_ownership: Whther to skip verification of ownership. Useful
> +            when members are extracted with unprivileged user.
> +        """
> +        permissions = self.default_permissions
> +        user_uid = self.rootfs_tree[user]['uid']
> +        user_gid = self.rootfs_tree[user]['gid']
> +
> +        for member_name in self.rootfs_tree[user][members]:
> +            # If unpack member if it is tuple. Allow us to specify permissions
> +            # and data per file.
> +            member_data = ''
> +            if isinstance(member_name, tuple):
> +                if len(member_name) == 3:
> +                    member_data = member_name[2]
> +                member_name, permissions = member_name[:2]
> +
> +            # Skip already checked members. E.g. when multiple layers were
> +            # extracted we want to check only the latest version of file.
> +            if member_name in self.checked_members:
> +                continue
> +            else:
> +                self.checked_members.add(member_name)
> +
> +            #########################
> +            # Assertion functions
> +            #########################
> +            member_path = os.path.join(self.dest_dir, member_name)
> +            self.assertTrue(
> +                check_existance(member_path),
> +                'Member was not extracted: %s' % member_path
> +            )
> +            stat = os.stat(member_path)
> +            self.assertEqual(
> +                stat.st_mode & 0o777, permissions,
> +                'Incorrect permissions: %s' % member_path
> +            )
> +            if not skip_ownership:
> +                self.assertEqual(
> +                    stat.st_uid, user_uid,
> +                    'Incorrect UID: %s' % member_path
> +                )
> +                self.assertEqual(
> +                    stat.st_gid, user_gid,
> +                    'Incorrect GID: %s' % member_path
> +                )
> +
> +            if member_data:
> +                with open(member_path, 'r') as content:
> +                    file_content = content.read()
> +                self.assertEqual(
> +                    member_data, file_content,
> +                    'Incorrect file content: %s\n'
> +                    'Found: %s\n'
> +                    'Expected: %s' % (member_path, file_content, member_data)
> +                )
> +
> +    def validate_shadow_file(self, shadow_path,
> +                             skip_hash=False, skip_ownership=False):
> +        """
> +        Ensure that extracted /etc/shadow file has correct ownership,
> +        permissions and hashed root password.
> +        """
> +        self.assertTrue(
> +            os.path.isfile(shadow_path),
> +            'Does not exist: %s' % shadow_path
> +        )
> +        stat = os.stat(shadow_path)
> +        self.assertEqual(
> +            stat.st_mode & 0o777,
> +            self.shadow_file_permissions,
> +            'Shadow file has incorrect permissions: %s' % shadow_path
> +        )
> +        if not skip_ownership:
> +            self.assertEqual(
> +                stat.st_uid,
> +                self.rootfs_tree['root']['uid'],
> +                'Shadow file has incorrect UID: %s' % shadow_path
> +            )
> +            self.assertEqual(
> +                stat.st_gid,
> +                self.rootfs_tree['root']['gid'],
> +                'Shadow file has incorrect GID: %s' % shadow_path
> +            )
> +
> +        if not skip_hash:
> +            # Note: For simplicity we assume that the first line of the file
> +            # contains the root entry.
> +            with open(shadow_path, 'r') as content:
> +                shadow_content = content.readlines()
> +
> +            if not shadow_content:
> +                raise Exception("File is empty: %s" % shadow_path)
> +
> +            self.assertTrue(
> +                passlib.hosts.linux_context.verify(
> +                    self.root_password,
> +                    shadow_content[0].split(':')[1]
> +                ),
> +                "Root password hash is invalid."
> +            )
> +
> +    def validate_shadow_file_in_image(self, g):
> +        """
> +        Validate permission, ownership and root password hash for /etc/shadow
> +        file stored in qcow2 image.
> +        """
> +        self.assertTrue(
> +            g.is_file('/etc/shadow'),
> +            "Shadow file does not exist"
> +        )
> +
> +        stat = g.stat('/etc/shadow')
> +        self.assertEqual(
> +            stat['mode'] & 0o777,
> +            self.shadow_file_permissions,
> +            'Shadow file has incorrect permissions'
> +        )
> +        self.assertEqual(
> +            stat['uid'],
> +            self.rootfs_tree['root']['uid'],
> +            'Shadow file has incorrect UID'
> +        )
> +        self.assertEqual(
> +            stat['gid'],
> +            self.rootfs_tree['root']['gid'],
> +            'Shadow file has incorrect GID'
> +        )
> +
> +        # Note: For simplicity we assume that the first line of the file
> +        # contains the root entry.
> +        shadow_content = g.cat('/etc/shadow').split('\n')

To factorize code I think we could use class inheritance to handle the
qcow2 vs dir case. Like having an ImageAccesser class implementing the
folder case and the Qcow2ImageAccesser one than specializes in getting
the files from the qcow2 image. Then the accessor object could be a member
of self.

This could avoid you to duplicate checks like this one for both cases.

> +        self.assertTrue(
> +            passlib.hosts.linux_context.verify(
> +                self.root_password,
> +                shadow_content[0].split(':')[1]
> +            ),
> +            "Root password hash is invalid."
> +        )
> +
> +    def check_image_content(self, g, user, members, check_existance):

Same for this function.

> +        """
> +        Verify the existance, permissions, ownership of members in qcow2 image.
> +
> +        @param g: guestfs handle
> +        @param user: User defined in self.rootfs_tree
> +        @param members: The string 'dirs' or 'files'
> +        @param check_existance: Function used to confirm the existnace of
> +            member. (E.g. os.path.isdir or os.path.isfile)
> +        """
> +        permissions = self.default_permissions
> +        user_uid = self.rootfs_tree[user]['uid']
> +        user_gid = self.rootfs_tree[user]['gid']
> +
> +        for member_name in self.rootfs_tree[user][members]:
> +            # Get specified permissions of file.
> +            if isinstance(member_name, tuple):
> +                member_name, permissions = member_name[:2]
> +
> +            # Skip already checked files.
> +            if member_name in self.checked_members:
> +                continue
> +            else:
> +                self.checked_members.add(member_name)
> +
> +            # When using guestfs all names should start with '/'
> +            if not member_name.startswith('/'):
> +                member_name = '/' + member_name
> +
> +            self.assertTrue(
> +                check_existance(member_name),
> +                "Member was not found: %s" % member_name
> +            )
> +            stat = g.stat(member_name)
> +            self.assertEqual(
> +                stat['mode'] & 0o777, permissions,
> +                'Incorrect permissions: %s' % member_name
> +            )
> +            self.assertEqual(
> +                stat['uid'],
> +                user_uid,
> +                "Incorrect UID: %s\n"
> +                "Found: %s\n"
> +                "Expected: %s" % (member_name, stat['uid'], user_uid)
> +            )
> +            self.assertEqual(
> +                stat['gid'],
> +                user_gid,
> +                "Incorrect GID: %s\n"
> +                "Found: %s\n"
> +                "Expected: %s" % (member_name, stat['gid'], user_gid)
> +            )
> +
> +    def check_image(self, g):

Same for this one too.

> +        """
> +        Check the presence of files and folders in qcow2 image.
> +        """
> +        for user in self.rootfs_tree:
> +            # Check folders
> +            self.check_image_content(g, user, 'dirs', g.is_dir)
> +            # Check files
> +            self.check_image_content(g, user, 'files', g.is_file)
> diff --git a/tests/docker_source.py b/tests/docker_source.py
> new file mode 100644
> index 0000000..cf2b2d2
> --- /dev/null
> +++ b/tests/docker_source.py
> @@ -0,0 +1,535 @@
> +# -*- coding: utf-8 -*-
> +# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> +#
> +# Copyright (C) 2017 Radostin Stoyanov
> +#
> +# This program is free software: you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation, either version 3 of the License, or
> +# (at your option) any later version.
> +
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +# GNU General Public License for more details.
> +
> +# You should have received a copy of the GNU General Public License
> +# along with this program.  If not, see <http://www.gnu.org/licenses/>.
> +
> +
> +"""
> +Regression tests which aim to excercise creation of root file system

typo: 'exercise'

> +with DockerSource.
> +
> +To avoid fetching network resources we mock out the functions:
> +- utils.get_image_details(): Returns manifest content
> +- utils.get_image_dir(): Returns the directory which contains the tar files
> +

Then no need to remove it from patch #1 indeed ;)

> +Brief description of this tests:
> +1. Create 3 dummy tar files named <checksum>.tar used as image layers.
> +2. Generate manifest content.
> +3. Mock out get_image_details() and get_image_dir().
> +4. Call bootstrap().
> +5. Check the result.
> +"""
> +

Nice to have added the test overview here!

> +import copy
> +import os
> +import subprocess
> +import unittest
> +import guestfs
> +
> +from . import mock
> +from . import sources
> +from . import virt_bootstrap
> +from . import BuildTarFiles
> +
> +
> +# pylint: disable=invalid-name
> +class CreateLayers(BuildTarFiles):
> +    """
> +    Create tarfiles to used as image layers and generate manifest for them.

typo: 'to used' -> 'to be used'

> +    """
> +    def check_result(self, layers_rootfs, dest):
> +        """
> +        This method is used to check extracted root file system.
> +        """
> +        pass
> +
> +    def call_bootstrap(self, manifest, dest):
> +        """
> +        This method is used to call vurtBootstrap.bootstrap()

typo: 'virtBootstrap'

> +        """
> +        pass
> +
> +    def generate_manifest(self, layers, layers_rootfs):
> +        """
> +        Generate Manifest content and then call self.call_bootstrap() and
> +        self.check_result()
> +        """
> +        manifest = {
> +            "schemaVersion": 2,
> +            "layers": [
> +                {
> +                    "digest":
> +                    "sha256:" + os.path.basename(layer).split('.')[0]
> +                }
> +                for layer in layers
> +            ]
> +        }
> +        self.call_bootstrap(manifest, self.dest_dir)
> +        self.check_result(layers_rootfs, self.dest_dir)
> +
> +    def main(self):
> +        """
> +        Create dummy tar files used to simulate layers of image
> +        then call generate_manifest().
> +
> +        The variable:

typo: 'variables'

> +        - "layers" store a lists of paths to created archives.
> +
> +        - "layers_rootfs" store the value of self.rootfs_tree used to generate
> +        tarball.
> +        """
> +        # Store base layer info
> +        layers = [self.tar_file]
> +        layers_rootfs = [copy.deepcopy(self.rootfs_tree)]
> +
> +        # Create Layer 1
> +        self.rootfs_tree['root']['files'] = [
> +            ('etc/foo/bar', 0o644, "This should be overwritten")
> +        ]

I guess this is the base layer... looks like there are only 2 layers, while
the comment mentions 3. Could you fix the comment, then?

> +        self.create_tar_file()
> +        # Store layer 1 info
> +        layers.append(self.tar_file)
> +        layers_rootfs.append(copy.deepcopy(self.rootfs_tree))
> +
> +        # Create Layer 2
> +        self.rootfs_tree['root']['files'] = [
> +            ('etc/foo/bar', 0o644, "Content of etc/foo/bar"),
> +            ('bin/foobar', 0o755, "My executable script")
> +        ]
> +        self.create_tar_file()
> +        # Store layer 2 info
> +        layers.append(self.tar_file)
> +        layers_rootfs.append(copy.deepcopy(self.rootfs_tree))
> +
> +        self.generate_manifest(layers, layers_rootfs)
> +
> +
> +class DirExtractRootFS(CreateLayers):
> +    """
> +    Ensures that all layers extracted correctly in destination folder.
> +    """
> +    def check_result(self, layers_rootfs, dest):
> +        """
> +        Iterates trough values of layers_rootfs in reverse order (from the last
> +        layer to first) and calls check_extracted_files().
> +        """
> +        for rootfs_tree in layers_rootfs[::-1]:
> +            self.rootfs_tree = rootfs_tree
> +            self.check_rootfs(skip_ownership=(os.geteuid != 0))
> +
> +    def call_bootstrap(self, manifest, dest):
> +        """
> +        Mock get_image_details() and get_image_dir() then call the function
> +        virt-bootstra.bootstrap().

typo: 'virt_bootstrap.bootstrap()'

> +        """
> +        with mock.patch.multiple('virtBootstrap.utils',
> +                                 get_image_details=mock.DEFAULT,
> +                                 get_image_dir=mock.DEFAULT) as mocked:
> +
> +            mocked['get_image_details'].return_value = manifest
> +            mocked['get_image_dir'].return_value = self.tar_dir
> +
> +            virt_bootstrap.bootstrap(
> +                uri='docker://foobar',
> +                dest=dest,
> +                fmt='dir',
> +                progress_cb=mock.Mock()

This function implementation could be move to the parent class... just add
the format as a member of it and use fmt=self.fmt to avoid duplicating that code.

> +            )
> +
> +    def runTest(self):
> +        """
> +        Execute this test.
> +        """
> +        self.main()

Isn't there a way to avoid repeating that function in all subclasses?

> +
> +
> + at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
> +class DirOwnershipMapping(CreateLayers):
> +    """
> +    Ensures that the mapping of ownership works as expected.
> +    """
> +
> +    def check_result(self, layers_rootfs, dest):
> +        """
> +        Iterate trough all values of layers_rootfs in reverse order apply the
> +        mapping self.rootfs_tree used to create the test archive and verify
> +        extracted files.
> +        """
> +        for rootfs_tree in layers_rootfs[::-1]:
> +            self.rootfs_tree = rootfs_tree
> +            self.apply_mapping()
> +            self.check_rootfs()
> +
> +    def call_bootstrap(self, manifest, dest):
> +        """
> +        Mock get_image_details() and get_image_dir() and call the function
> +        virt_bootstrap.bootstrap() with UID/GID mapping values.
> +        """
> +        self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
> +        self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
> +        with mock.patch.multiple('virtBootstrap.utils',
> +                                 get_image_details=mock.DEFAULT,
> +                                 get_image_dir=mock.DEFAULT) as mocked:
> +            mocked['get_image_details'].return_value = manifest
> +            mocked['get_image_dir'].return_value = self.tar_dir
> +            virt_bootstrap.bootstrap(
> +                progress_cb=mock.Mock(),
> +                uri='docker://foo',
> +                dest=dest,
> +                fmt='dir',
> +                uid_map=self.uid_map,
> +                gid_map=self.gid_map

Some more parameters to have in the parent class for a generic call_bootstrap.

> +            )
> +
> +    def runTest(self):
> +        """
> +        Execute this test.
> +        """
> +        self.main()
> +
> +
> + at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
> +class DirSettingRootPassword(CreateLayers):
> +    """
> +    Ensures that the root password is set correctly.
> +    """
> +    def check_result(self, layers_rootfs, dest):
> +        """
> +        Validate shadow file
> +        """
> +        self.validate_shadow_file(os.path.join(dest, 'etc/shadow'))
> +
> +    def call_bootstrap(self, manifest, dest):
> +        """
> +        Mock get_image_details() and get_image_dir() and call the function
> +        virt_bootstrap.bootstrap() with root_password value.
> +        """
> +        self.root_password = "My secret root password"
> +        with mock.patch.multiple('virtBootstrap.utils',
> +                                 get_image_details=mock.DEFAULT,
> +                                 get_image_dir=mock.DEFAULT) as mocked:
> +
> +            mocked['get_image_details'].return_value = manifest
> +            mocked['get_image_dir'].return_value = self.tar_dir
> +
> +            virt_bootstrap.bootstrap(
> +                progress_cb=mock.Mock(),
> +                uri='docker://foo',
> +                dest=dest,
> +                fmt='dir',
> +                root_password=self.root_password

Another one here! Thus you would just have to set the root password in the __init__ of the class

> +            )
> +
> +    def runTest(self):
> +        """
> +        Execute this test.
> +        """
> +        self.main()
> +
> +
> +class Qcow2BuildImage(CreateLayers):
> +    """
> +    Ensures that the convertion of tar files to qcow2 image with backing chains

typo: 'conversion'

> +    works as expected.
> +    """
> +    def get_image_info(self, image_path):
> +        """
> +        Returns information about the disk image using "qemu-img".
> +        """
> +        cmd = ['qemu-img', 'info', image_path]
> +        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
> +        output, _ignore = proc.communicate()
> +        return output.decode('utf-8').split('\n')
> +
> +    def check_result(self, layers_rootfs, dest):
> +        """
> +        Verify that layers were converted correctly to qcow2 images.
> +        """
> +        ###################
> +        # Check base layer
> +        ###################
> +        base_layer_path = os.path.join(dest, "layer-0.qcow2")
> +        base_info = self.get_image_info(base_layer_path)
> +        self.assertEqual(base_info[1], 'file format: qcow2')
> +        images = [base_layer_path]
> +        ###########################
> +        # Check backing chains
> +        ###########################
> +        for i in range(1, len(layers_rootfs)):
> +            img_path = os.path.join(dest, "layer-%d.qcow2" % i)
> +            img_info = self.get_image_info(img_path)
> +            self.assertEqual(
> +                img_info[1],
> +                'file format: qcow2',
> +                'Invalid qcow2 disk image: %s' % img_path
> +            )
> +            backing_file = os.path.join(dest, "layer-%d.qcow2" % (i - 1))
> +            self.assertEqual(
> +                img_info[5],
> +                'backing file: %s' % backing_file,
> +                "Incorrect backing file for: %s\n"
> +                "Expected: %s\n"
> +                "Found: %s" % (img_info, backing_file, img_info[5])
> +            )
> +            images.append(img_path)
> +        ###############################
> +        # Check extracted files/folders
> +        ###############################
> +        g = guestfs.GuestFS(python_return_dict=True)
> +        for path in images:
> +            g.add_drive_opts(path, readonly=True)
> +        g.launch()
> +        devices = g.list_filesystems()
> +        for dev, rootfs in zip(sorted(devices), layers_rootfs):
> +            self.rootfs_tree = rootfs
> +            g.mount(dev, '/')
> +            self.check_image(g)
> +            g.umount('/')
> +        g.shutdown()
> +

Adding some blank lines in that function would improve it's readability.

> +    def call_bootstrap(self, manifest, dest):
> +        """
> +        Mock get_image_details() and get_image_dir() and call the function
> +        virt_bootstrap.bootstrap() for qcow2 format.
> +        """
> +        with mock.patch.multiple('virtBootstrap.utils',
> +                                 get_image_details=mock.DEFAULT,
> +                                 get_image_dir=mock.DEFAULT) as mocked:
> +
> +            mocked['get_image_details'].return_value = manifest
> +            mocked['get_image_dir'].return_value = self.tar_dir
> +
> +            virt_bootstrap.bootstrap(
> +                uri='docker://foobar',
> +                dest=dest,
> +                fmt='qcow2',
> +                progress_cb=mock.Mock()
> +            )

Again, could be factorized.
> +
> +    def runTest(self):
> +        """
> +        Execute this test.
> +        """
> +        self.main()
> +
> +
> + at unittest.skip("Need fix for this test to pass")
> +class Qcow2OwnershipMapping(Qcow2BuildImage):
> +    """
> +    Ensures that UID/GID mapping works correctly for qcow2 conversion.
> +    """
> +    def check_result(self, layers_rootfs, dest):
> +        """
> +        Iterate through values of layers_rootfs in reverse order and apply the
> +        mapping values to self.rootfs_tree. Then verify the ownership of
> +        files/folders created in the last backing chain of qcow2 image.
> +        """
> +        g = guestfs.GuestFS(python_return_dict=True)
> +        image_path = os.path.join(dest, "layer-%d.qcow2" % len(layers_rootfs))
> +        g.add_drive_opts(image_path, readonly=True)
> +
> +        g.launch()
> +        for rootfs in layers_rootfs[::-1]:
> +            self.rootfs_tree = rootfs
> +            self.apply_mapping()
> +            g.mount('/dev/sda', '/')
> +            self.check_image(g)
> +            g.umount('/')
> +        g.shutdown()
> +
> +    def call_bootstrap(self, manifest, dest):
> +        """
> +        Mock the functions get_image_details() and get_image_dir() then call
> +        virtbootstrap.bootstrap() with UID/GID mapping values and fmt="qcow2".
> +        """
> +        self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
> +        self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
> +        with mock.patch.multiple('virtBootstrap.utils',
> +                                 get_image_details=mock.DEFAULT,
> +                                 get_image_dir=mock.DEFAULT) as mocked:
> +            mocked['get_image_details'].return_value = manifest
> +            mocked['get_image_dir'].return_value = self.tar_dir
> +            virt_bootstrap.bootstrap(
> +                uri='docker://foobar',
> +                progress_cb=mock.Mock(),
> +                fmt='qcow2',
> +                dest=dest,
> +                uid_map=self.uid_map,
> +                gid_map=self.gid_map
> +            )

To be factorized too.
> +
> +
> + at unittest.skip("Need fix for this test to pass")
> +class Qcow2SettingRootPassword(Qcow2BuildImage):
> +    """
> +    Ensures that the root password is set correctly in the last backing chain
> +    of image qcow2 image.
> +    """
> +    def check_result(self, layers_rootfs, dest):
> +        """
> +        Load last backing chain and validate shadow file.
> +        """
> +        g = guestfs.GuestFS(python_return_dict=True)
> +        g.add_drive_opts(
> +            os.path.join(dest, "layer-%d.qcow2" % len(layers_rootfs)),
> +            readonly=True
> +        )
> +        g.launch()
> +        g.mount('/dev/sda', '/')
> +        self.validate_shadow_file_in_image(g)
> +        g.umount('/')
> +        g.shutdown()
> +
> +    def call_bootstrap(self, manifest, dest):
> +        """
> +        Mock the functions get_image_details() and get_image_dir() then
> +        call virt_bootstrap.bootstrap() with root password value and qcow2
> +        output format.
> +        """
> +        self.root_password = "My secret password"
> +        with mock.patch.multiple('virtBootstrap.utils',
> +                                 get_image_details=mock.DEFAULT,
> +                                 get_image_dir=mock.DEFAULT) as mocked:
> +            mocked['get_image_details'].return_value = manifest
> +            mocked['get_image_dir'].return_value = self.tar_dir
> +            virt_bootstrap.bootstrap(
> +                uri='docker://foobar',
> +                dest=dest,
> +                fmt='qcow2',
> +                progress_cb=mock.Mock(),
> +                root_password=self.root_password
> +            )
> +

again, to be factorized.
> +
> +class TestDockerSource(unittest.TestCase):
> +    """
> +    Unit tests for DockerSource
> +    """
> +    ###################################
> +    # Tests for: retrieve_layers_info()
> +    ###################################
> +    def _mock_retrieve_layers_info(self, manifest, kwargs):
> +        """
> +        This method is gather common test pattern used in the following
> +        two test cases which aim to return an instance of the class
> +        DockerSource with some util functions being mocked.
> +        """
> +        with mock.patch.multiple('virtBootstrap.utils',
> +                                 get_image_details=mock.DEFAULT,
> +                                 get_image_dir=mock.DEFAULT) as m_utils:
> +
> +            m_utils['get_image_details'].return_value = manifest
> +            m_utils['get_image_dir'].return_value = '/images_path'
> +
> +            patch_method = 'virtBootstrap.sources.DockerSource.gen_valid_uri'
> +            with mock.patch(patch_method) as m_uri:
> +                src_instance = sources.DockerSource(**kwargs)
> +        return (src_instance, m_uri, m_utils)
> +
> +    def test_retrieve_layers_info_pass_arguments_to_get_image_details(self):
> +        """
> +        Ensures that retrieve_layers_info() calls get_image_details()
> +        with all passed arguments.
> +        """
> +        src_kwargs = {
> +            'uri': '',
> +            'progress': mock.Mock()
> +        }
> +
> +        manifest = {'schemaVersion': 2, 'layers': []}
> +        (src_instance,
> +         m_uri, m_utils) = self._mock_retrieve_layers_info(manifest,
> +                                                           src_kwargs)
> +
> +        kwargs = {
> +            'insecure': src_instance.insecure,
> +            'username': src_instance.username,
> +            'password': src_instance.password,
> +            'raw': True
> +        }
> +        m_utils['get_image_details'].assert_called_once_with(m_uri(), **kwargs)
> +
> +    def test_retrieve_layers_info_schema_version_1(self):
> +        """
> +        Ensures that retrieve_layers_info() extracts the layers' information
> +        from manifest with schema version 1 a list with format:
> +            ["digest", "sum_type", "file_path", "size"].
> +        """
> +        kwargs = {
> +            'uri': '',
> +            'progress': mock.Mock()
> +        }
> +
> +        manifest = {
> +            'schemaVersion': 1,
> +            'fsLayers': [
> +                {'blobSum': 'sha256:75c416ea'},
> +                {'blobSum': 'sha256:c6ff40b6'},
> +                {'blobSum': 'sha256:a7050fc1'}
> +            ]
> +        }
> +
> +        expected_result = [
> +            ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', None],
> +            ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', None],
> +            ['sha256', '75c416ea', '/images_path/75c416ea.tar', None]
> +        ]
> +
> +        src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0]
> +        self.assertEqual(src_instance.layers, expected_result)
> +
> +    def test_retrieve_layers_info_schema_version_2(self):
> +        """
> +        Ensures that retrieve_layers_info() extracts the layers' information
> +        from manifest with schema version 2 a list with format:
> +            ["digest", "sum_type", "file_path", "size"].
> +        """
> +        kwargs = {
> +            'uri': '',
> +            'progress': mock.Mock()
> +        }
> +
> +        manifest = {
> +            'schemaVersion': 2,
> +            "layers": [
> +                {"size": 47103294, "digest": "sha256:75c416ea"},
> +                {"size": 814, "digest": "sha256:c6ff40b6"},
> +                {"size": 513, "digest": "sha256:a7050fc1"}
> +            ]
> +        }
> +
> +        expected_result = [
> +            ['sha256', '75c416ea', '/images_path/75c416ea.tar', 47103294],
> +            ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', 814],
> +            ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', 513]
> +        ]
> +
> +        src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0]
> +        self.assertEqual(src_instance.layers, expected_result)
> +
> +    def test_retrieve_layers_info_raise_error_on_invalid_schema_version(self):
> +        """
> +        Ensures that retrieve_layers_info() calls get_image_details()
> +        with all passed arguments.
> +        """
> +        kwargs = {
> +            'uri': '',
> +            'progress': mock.Mock()
> +        }
> +
> +        manifest = {'schemaVersion': 3}
> +        with self.assertRaises(ValueError):
> +            self._mock_retrieve_layers_info(manifest, kwargs)
> diff --git a/tests/file_source.py b/tests/file_source.py
> new file mode 100644
> index 0000000..dd7fd00
> --- /dev/null
> +++ b/tests/file_source.py
> @@ -0,0 +1,184 @@
> +# -*- coding: utf-8 -*-
> +# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> +#
> +# Copyright (C) 2017 Radostin Stoyanov
> +#
> +# This program is free software: you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation, either version 3 of the License, or
> +# (at your option) any later version.
> +
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +# GNU General Public License for more details.
> +
> +# You should have received a copy of the GNU General Public License
> +# along with this program.  If not, see <http://www.gnu.org/licenses/>.
> +
> +
> +"""
> +Regression tests which aim to excercise the creation of root file system
> +with FileSource.
> +"""
> +
> +import os
> +import unittest
> +import guestfs
> +
> +from . import virt_bootstrap
> +from . import BuildTarFiles
> +from . import mock
> +
> +
> +# pylint: disable=invalid-name
> +class Qcow2BuildImage(BuildTarFiles):
> +    """
> +    Ensures that building qcow2 image from tarball with root file system
> +    works as expected.
> +    """
> +
> +    def check_qcow2_images(self, image_path):
> +        """
> +        Ensures that qcow2 images contain all files.
> +        """
> +        g = guestfs.GuestFS(python_return_dict=True)
> +        g.add_drive_opts(image_path, readonly=True)
> +        g.launch()
> +        g.mount('/dev/sda', '/')
> +        self.check_image(g)
> +        g.umount('/')
> +        g.shutdown()
> +
> +    def get_image_path(self):
> +        """
> +        Returns the path where the qcow2 image will be stored.
> +        """
> +        return os.path.join(
> +            self.dest_dir,
> +            "%s.qcow2" % os.path.basename(self.tar_file)
> +        )
> +
> +    def runTest(self):
> +        """
> +        Create qcow2 image from each dummy tarfile.

'each' is confusing here, since there will only be one.

> +        """
> +        virt_bootstrap.bootstrap(
> +            uri=self.tar_file,
> +            dest=self.dest_dir,
> +            fmt='qcow2',
> +            progress_cb=mock.Mock()
> +        )
> +        self.check_qcow2_images(self.get_image_path())
> +
> +
> + at unittest.skip("Not implemented")

Weird... either this is implemented and we don't skip it or we don't add the test in
this commit at all.

> +class Qcow2OwnershipMapping(Qcow2BuildImage):
> +    """
> +    Ensures that UID/GID mapping works correctly for qcow2 conversion.
> +    """
> +    def runTest(self):
> +        """
> +        Create qcow2 image from each dummy tarfile.
> +        """
> +        self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
> +        self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
> +        virt_bootstrap.bootstrap(
> +            progress_cb=mock.Mock(),
> +            uri=self.tar_file,
> +            dest=self.dest_dir,
> +            fmt='qcow2',
> +            uid_map=self.uid_map,
> +            gid_map=self.gid_map
> +        )
> +        self.apply_mapping()
> +        self.check_qcow2_images(self.get_image_path())
> +
> +
> + at unittest.skip("Not implemented")

Same here

> +class Qcow2SettingRootPassword(Qcow2BuildImage):
> +    """
> +    Ensures that the root password is set correctly in the backing file
> +    of image qcow2 image.
> +    """
> +    def runTest(self):
> +        """
> +        Create qcow2 image from each dummy tarfile.
> +        """
> +        self.root_password = "My secret password"
> +        virt_bootstrap.bootstrap(
> +            progress_cb=mock.Mock(),
> +            uri=self.tar_file,
> +            dest=self.dest_dir,
> +            fmt='qcow2',
> +            root_password=self.root_password
> +        )
> +        self.check_image = self.validate_shadow_file_in_image
> +        self.check_qcow2_images(self.get_image_path())
> +
> +
> + at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
> +class DirExtractRootFS(BuildTarFiles):
> +    """
> +    Ensures that files from rootfs tarball are extracted correctly.
> +    """
> +    def runTest(self):
> +        """
> +        Extract rootfs from each dummy tarfile.

That comment smells copy/paste... each is confusing here too.

> +        """
> +        dest = self.dest_dir
> +        virt_bootstrap.bootstrap(
> +            uri=self.tar_file,
> +            dest=dest,
> +            fmt='dir',
> +            progress_cb=mock.Mock()
> +        )
> +        self.check_rootfs()
> +
> +
> + at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
> +class DirOwnershipMapping(DirExtractRootFS):
> +    """
> +    Ensures that UID/GID mapping for extracted root file system are applied
> +    correctly.
> +    """
> +    def runTest(self):
> +        """
> +        Extract the dummy tarfiles using FileSource and apply UID/GID mappings.
> +        """
> +        self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
> +        self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
> +        # Create qcow2 images
> +        dest = self.dest_dir
> +        virt_bootstrap.bootstrap(
> +            uri=self.tar_file,
> +            dest=dest,
> +            fmt='dir',
> +            progress_cb=mock.Mock(),
> +            uid_map=self.uid_map,
> +            gid_map=self.gid_map
> +        )
> +        self.apply_mapping()
> +        self.check_rootfs()
> +
> +
> + at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
> +class DirSettingRootPassword(DirExtractRootFS):
> +    """
> +    Ensures that the root password is set correctly when FileSource is used
> +    with fmt='dir'.
> +    """
> +
> +    def runTest(self):
> +        """
> +        Extract rootfs from each dummy tarfile and set root password.
> +        """
> +        self.root_password = 'my secret root password'
> +        virt_bootstrap.bootstrap(
> +            uri=self.tar_file,
> +            dest=self.dest_dir,
> +            fmt='dir',
> +            progress_cb=mock.Mock(),
> +            root_password=self.root_password
> +        )
> +        self.validate_shadow_file(os.path.join(self.dest_dir, 'etc/shadow'))
> diff --git a/tests/test_utils.py b/tests/test_utils.py
> new file mode 100644
> index 0000000..c2f55b5
> --- /dev/null
> +++ b/tests/test_utils.py
> @@ -0,0 +1,143 @@
> +# -*- coding: utf-8 -*-
> +# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> +#
> +# Copyright (C) 2017 Radostin Stoyanov
> +#
> +# This program is free software: you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation, either version 3 of the License, or
> +# (at your option) any later version.
> +
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +# GNU General Public License for more details.
> +
> +# You should have received a copy of the GNU General Public License
> +# along with this program.  If not, see <http://www.gnu.org/licenses/>.
> +
> +
> +"""
> +Unit tests for functions defined in virtBootstrap.utils
> +"""
> +import unittest
> +from . import utils
> +
> +
> +# pylint: disable=invalid-name
> +class TestUtils(unittest.TestCase):
> +    """
> +    Ensures that functions defined in the utils module of virtBootstrap
> +    work as expected.
> +    """
> +    ###################################
> +    # Tests for: bytes_to_size()
> +    ###################################
> +    def test_utils_bytes_to_size(self):
> +        """
> +        Validates the output of bytes_to_size() for some test cases.
> +        """
> +        test_values = {
> +            0: '0', 1: '1', 512: '512', 1000: '0.98 KiB', 1024: '1 KiB',
> +            4096: '4 KiB', 5120: '5 KiB', 10 ** 10: '9.31 GiB'
> +        }
> +        for value in test_values:
> +            self.assertEqual(utils.bytes_to_size(value), test_values[value])
> +
> +    ###################################
> +    # Tests for: size_to_bytes()
> +    ###################################
> +    def test_utils_size_to_bytes(self):
> +        """
> +        Validates the output of size_to_bytes() for some test cases.
> +        """
> +        test_values = [1, '0']
> +        test_formats = ['TB', 'GB', 'MB', 'KB', 'B']
> +        expected_output = [1099511627776, 1073741824, 1048576, 1024, 1,
> +                           0, 0, 0, 0, 0]
> +        i = 0
> +        for value in test_values:
> +            for fmt in test_formats:
> +                self.assertEqual(utils.size_to_bytes(value, fmt),
> +                                 expected_output[i])
> +                i += 1
> +
> +    ###################################
> +    # Tests for: is_new_layer_message()
> +    ###################################
> +    def test_utils_is_new_layer_message(self):
> +        """
> +        Ensures that is_new_layer_message() returns True when message
> +        from the skopeo's stdout indicates processing of new layer
> +        and False otherwise.
> +        """
> +
> +        valid_msgs = [
> +            "Copying blob sha256:be232718519c940b04bc57",
> +            "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2"
> +        ]
> +
> +        invalid_msgs = [
> +            'Copying config sha256', 'test', ''
> +        ]
> +
> +        for msg in valid_msgs:
> +            self.assertTrue(utils.is_new_layer_message(msg))
> +        for msg in invalid_msgs:
> +            self.assertFalse(utils.is_new_layer_message(msg))
> +
> +    ###################################
> +    # Tests for: is_layer_config_message()
> +    ###################################
> +    def test_utils_is_layer_config_message(self):
> +        """
> +        Ensures that is_layer_config_message() returns True when message
> +        from the skopeo's stdout indicates processing of manifest file
> +        of container image and False otherwise.
> +        """
> +        invalid_msgs = [
> +            "Copying blob sha256:be232718519c940b04bc57",
> +            "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2",
> +            ''
> +        ]
> +
> +        valid_msg = 'Copying config sha256:d355ed3537e94e76389fd78b7724'
> +
> +        self.assertTrue(utils.is_layer_config_message(valid_msg))
> +        for msg in invalid_msgs:
> +            self.assertFalse(utils.is_layer_config_message(msg))
> +
> +    ###################################
> +    # Tests for: make_async()
> +    ###################################
> +    def test_utils_make_async(self):
> +        """
> +        Ensures that make_async() sets O_NONBLOCK flag on PIPE.
> +        """
> +
> +        proc = utils.subprocess.Popen(
> +            ["echo"],
> +            stdout=utils.subprocess.PIPE
> +        )
> +        pipe = proc.stdout
> +
> +        fd = pipe.fileno()
> +        F_GETFL = utils.fcntl.F_GETFL
> +        O_NONBLOCK = utils.os.O_NONBLOCK
> +
> +        self.assertFalse(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
> +        utils.make_async(fd)
> +        self.assertTrue(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
> +        proc.wait()
> +        pipe.close()
> +
> +    ###################################
> +    # Tests for: str2float()
> +    ###################################
> +    def test_utils_str2float(self):
> +        """
> +        Validates the output of str2float() for some test cases.
> +        """
> +        test_values = {'1': 1.0, 'test': None, '0': 0.0, '1.25': 1.25}
> +        for test in test_values:
> +            self.assertEqual(utils.str2float(test), test_values[test])

This new testing way look  way better to me than the old one. Just wondering: aren't the tests
for functions in utils part of the ones your dropped in commit #1? If so, it would be better to
change commit #1 to keep those that are in this commit and strip them from this rather lengthy one.

--
Cedric




More information about the virt-tools-list mailing list