[virt-tools-list] [virt-bootstrap] [PATCH v5 11/11] tests: Add unit tests for the "utils" module

Cedric Bosdonnat cbosdonnat at suse.com
Mon Jul 24 13:41:56 UTC 2017


On Mon, 2017-07-24 at 09:14 +0100, Radostin Stoyanov wrote:
> ---
>  tests/test_utils.py | 702 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 702 insertions(+)
>  create mode 100644 tests/test_utils.py
> 
> diff --git a/tests/test_utils.py b/tests/test_utils.py
> new file mode 100644
> index 0000000..f1b31bd
> --- /dev/null
> +++ b/tests/test_utils.py
> @@ -0,0 +1,702 @@
> +# Authors:
> +#   Cedric Bosdonnat <cbosdonnat at suse.com>
> +#   Radostin Stoyanov <rstoyanov1 at gmail.com>
> +#
> +# Copyright (C) 2017 SUSE, Inc.
> +# Copyright (C) 2017 Radostin Stoyanov
> +#

Again

> +# This program is free software: you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation, either version 3 of the License, or
> +# (at your option) any later version.
> +
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +# GNU General Public License for more details.
> +
> +# You should have received a copy of the GNU General Public License
> +# along with this program.  If not, see <http://www.gnu.org/licenses/>.
> +
> +
> +"""
> +Unit tests for functions defined in virtBootstrap.utils
> +"""
> +
> +from tests import unittest
> +from tests import mock
> +from tests import utils
> +try:
> +    # pylint: disable=redefined-builtin
> +    from importlib import reload
> +except ImportError:
> +    pass
> +
> +
> +# pylint: disable=invalid-name
> +# pylint: disable=too-many-public-methods
> +class TestUtils(unittest.TestCase):
> +    """
> +    Ensures that functions defined in the utils module of virtBootstrap
> +    work as expected.
> +    """
> +
> +    ###################################
> +    # Tests for: checksum()
> +    ###################################
> +    def test_utils_checksum_return_false_on_invalid_hash(self):
> +        """
> +        Ensures that checksum() returns False if the actual and expected
> +        hash sum of file are not equal.
> +        """
> +        with mock.patch.multiple(utils,
> +                                 open=mock.DEFAULT,
> +                                 logger=mock.DEFAULT,
> +                                 hashlib=mock.DEFAULT) as mocked:
> +            path, sum_type, sum_expected = '/foo', 'sha256', 'bar'
> +            mocked['hashlib'].sha256.hexdigest.return_value = False
> +            self.assertFalse(utils.checksum(path, sum_type, sum_expected))
> +
> +    def test_utils_checksum_return_false_if_file_could_not_be_opened(self):
> +        """
> +        Ensures that checksum() returns False if the file to be checked
> +        cannot be open for read.
> +        """
> +        with mock.patch.multiple(utils,
> +                                 open=mock.DEFAULT,
> +                                 logger=mock.DEFAULT,
> +                                 hashlib=mock.DEFAULT) as mocked:
> +            mocked['open'].side_effect = IOError()
> +            self.assertFalse(utils.checksum('foo', 'sha256', 'bar'))
> +
> +    def test_utils_checksum_return_true_on_valid_hash(self):
> +        """
> +        Ensures that checksum() returns True when the actual and expected
> +        hash sum of file are equal.
> +        """
> +        with mock.patch.multiple(utils,
> +                                 open=mock.DEFAULT,
> +                                 logger=mock.DEFAULT,
> +                                 hashlib=mock.DEFAULT) as mocked:
> +            path, sum_type, sum_expected = '/foo', 'sha256', 'bar'
> +            mocked['hashlib'].sha256.return_value.hexdigest.return_value \
> +                = sum_expected
> +            self.assertTrue(utils.checksum(path, sum_type, sum_expected))
> +
> +    ###################################
> +    # Tests for: execute()
> +    ###################################
> +    def test_utils_execute_logging_on_successful_proc_call(self):
> +        """
> +        Ensures that execute() creates log record of cmd, stdout and stderr
> +        when the exit code of process is 0.
> +        """
> +        with mock.patch.multiple(utils,
> +                                 logger=mock.DEFAULT,
> +                                 Popen=mock.DEFAULT) as mocked:
> +            cmd = ['foo']
> +            output, err = 'test_out', 'test_err'
> +
> +            mocked['Popen'].return_value.returncode = 0
> +            (mocked['Popen'].return_value
> +             .communicate.return_value) = (output.encode(), err.encode())
> +
> +            utils.execute(cmd)
> +            mocked['logger'].debug.assert_any_call("Call command:\n%s", cmd[0])
> +            mocked['logger'].debug.assert_any_call("Stdout:\n%s", output)
> +            mocked['logger'].debug.assert_any_call("Stderr:\n%s", err)
> +
> +    def test_utils_execute_raise_error_on_unsuccessful_proc_call(self):
> +        """
> +        Ensures that execute() raise CalledProcessError exception when the
> +        exit code of process is not 0.
> +        """
> +        with mock.patch('virtBootstrap.utils.Popen') as m_popen:
> +            m_popen.return_value.returncode = 1
> +            m_popen.return_value.communicate.return_value = (b'output', b'err')
> +            with self.assertRaises(utils.CalledProcessError):
> +                utils.execute(['foo'])
> +
> +    ###################################
> +    # Tests for: safe_untar()
> +    ###################################
> +    def test_utils_safe_untar_calls_execute(self):
> +        """
> +        Ensures that safe_untar() calls execute with virt-sandbox
> +        command to extract source files to destination folder.
> +        Test for users with EUID 0 and 1000.
> +        """
> +        with mock.patch('virtBootstrap.utils.os.geteuid') as m_geteuid:
> +            for uid in [0, 1000]:
> +                m_geteuid.return_value = uid
> +                reload(utils)
> +                with mock.patch('virtBootstrap.utils.execute') as m_execute:
> +                    src, dest = 'foo', 'bar'
> +                    utils.safe_untar('foo', 'bar')
> +                    cmd = ['virt-sandbox',
> +                           '-c', utils.LIBVIRT_CONN,
> +                           '-m', 'host-bind:/mnt=' + dest,
> +                           '--',
> +                           '/bin/tar', 'xf', src,
> +                           '-C', '/mnt',
> +                           '--exclude', 'dev/*']
> +                    m_execute.assert_called_once_with(cmd)
> +
> +    ###################################
> +    # Tests for: bytes_to_size()
> +    ###################################
> +    def test_utils_bytes_to_size(self):
> +        """
> +        Validates the output of bytes_to_size() for some test cases.
> +        """
> +        test_values = {
> +            0: '0', 1: '1', 512: '512', 1000: '0.98 KiB', 1024: '1 KiB',
> +            4096: '4 KiB', 5120: '5 KiB', 10 ** 10: '9.31 GiB'
> +        }
> +        for value in test_values:
> +            self.assertEqual(utils.bytes_to_size(value), test_values[value])
> +
> +    ###################################
> +    # Tests for: size_to_bytes()
> +    ###################################
> +    def test_utils_size_to_bytes(self):
> +        """
> +        Validates the output of size_to_bytes() for some test cases.
> +        """
> +        test_values = [1, '0']
> +        test_formats = ['TB', 'GB', 'MB', 'KB', 'B']
> +        expected_output = [1099511627776, 1073741824, 1048576, 1024, 1,
> +                           0, 0, 0, 0, 0]
> +        i = 0
> +        for value in test_values:
> +            for fmt in test_formats:
> +                self.assertEqual(utils.size_to_bytes(value, fmt),
> +                                 expected_output[i])
> +                i += 1
> +
> +    ###################################
> +    # Tests for: log_layer_extract()
> +    ###################################
> +    def test_utils_log_layer_extract(self):
> +        """
> +        Ensures that log_layer_extract() updates the progress and creates
> +        log record with debug level.
> +        """
> +        m_progress = mock.Mock()
> +        layer = ['sum_type', 'sum_value', 'layer_file', 'layer_size']
> +        with mock.patch.multiple(utils, logger=mock.DEFAULT,
> +                                 bytes_to_size=mock.DEFAULT) as mocked:
> +            utils.log_layer_extract(layer, 'foo', 'bar', m_progress)
> +        mocked['bytes_to_size'].assert_called_once_with('layer_size')
> +        mocked['logger'].debug.assert_called_once()
> +        m_progress.assert_called_once()
> +
> +    ###################################
> +    # Tests for: get_mime_type()
> +    ###################################
> +    @mock.patch('virtBootstrap.utils.Popen')
> +    def test_utils_get_mime_type(self, m_popen):
> +        """
> +        Ensures that get_mime_type() returns the detected MIME type
> +        of /usr/bin/file.
> +        """
> +        path = "foo"
> +        mime = "application/x-gzip"
> +        stdout = ('%s: %s' % (path, mime)).encode()
> +        m_popen.return_value.stdout.read.return_value = stdout
> +        self.assertEqual(utils.get_mime_type(path), mime)
> +        m_popen.assert_called_once_with(["/usr/bin/file", "--mime-type", path],
> +                                        stdout=utils.PIPE)
> +
> +    ###################################
> +    # Tests for: untar_layers()
> +    ###################################
> +    def test_utils_untar_all_layers_in_order(self):
> +        """
> +        Ensures that untar_layers() iterates through all passed layers
> +        in order.
> +        """
> +        layers = ['l1', 'l2', 'l3']
> +        layers_list = [['', '', layer] for layer in layers]
> +        dest_dir = '/foo'
> +        expected_calls = [mock.call(layer, dest_dir) for layer in layers]
> +        with mock.patch.multiple(utils,
> +                                 safe_untar=mock.DEFAULT,
> +                                 log_layer_extract=mock.DEFAULT) as mocked:
> +            utils.untar_layers(layers_list, dest_dir, mock.Mock())
> +        mocked['safe_untar'].assert_has_calls(expected_calls)
> +
> +    ###################################
> +    # Tests for: create_qcow2()
> +    ###################################
> +    def _apply_test_to_create_qcow2(self, expected_calls, *args):
> +        """
> +        This method contains common test pattern used in the next two
> +        test cases.
> +        """
> +        with mock.patch.multiple(utils,
> +                                 execute=mock.DEFAULT,
> +                                 logger=mock.DEFAULT,
> +                                 get_mime_type=mock.DEFAULT) as mocked:
> +            mocked['get_mime_type'].return_value = 'application/x-gzip'
> +            utils.create_qcow2(*args)
> +        mocked['execute'].assert_has_calls(expected_calls)
> +
> +    def test_utils_create_qcow2_base_layer(self):
> +        """
> +        Ensures that create_qcow2() creates base layer when
> +        backing_file = None.
> +        """
> +        tar_file = 'foo'
> +        layer_file = 'bar'
> +        size = '5G'
> +        backing_file = None
> +
> +        expected_calls = [
> +            mock.call(["qemu-img", "create", "-f", "qcow2", layer_file, size]),
> +
> +            mock.call(['virt-format',
> +                       '--format=qcow2',
> +                       '--partition=none',
> +                       '--filesystem=ext3',
> +                       '-a', layer_file]),
> +
> +            mock.call(['guestfish',
> +                       '-a', layer_file,
> +                       '-m', '/dev/sda',
> +                       'tar-in', tar_file, '/', 'compress:gzip'])
> +        ]
> +
> +        self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file,
> +                                         backing_file, size)
> +
> +    def test_utils_create_qcow2_layer_with_backing_chain(self):
> +        """
> +        Ensures that create_qcow2() creates new layer with backing chains
> +        when backing_file is specified.
> +        """
> +        tar_file = 'foo'
> +        layer_file = 'bar'
> +        backing_file = 'base'
> +        size = '5G'
> +
> +        expected_calls = [
> +            mock.call(['qemu-img', 'create',
> +                       '-b', backing_file,
> +                       '-f', 'qcow2',
> +                       layer_file, size]),
> +
> +            mock.call(['guestfish',
> +                       '-a', layer_file,
> +                       '-m', '/dev/sda',
> +                       'tar-in', tar_file, '/', 'compress:gzip'])
> +        ]
> +
> +        self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file,
> +                                         backing_file, size)
> +
> +    ###################################
> +    # Tests for: extract_layers_in_qcow2()
> +    ###################################
> +    def test_utils_if_all_layers_extracted_in_order_in_qcow2(self):
> +        """
> +        Ensures that extract_layers_in_qcow2() iterates through all
> +        layers in order.
> +        """
> +        layers = ['l1', 'l2', 'l3']
> +        layers_list = [['', '', layer] for layer in layers]
> +        dest_dir = '/foo'
> +
> +        # Generate expected calls
> +        expected_calls = []
> +        qcow2_backing_file = None
> +        for index, layer in enumerate(layers):
> +            qcow2_layer_file = dest_dir + "/layer-%s.qcow2" % index
> +            expected_calls.append(
> +                mock.call(layer, qcow2_layer_file, qcow2_backing_file))
> +            qcow2_backing_file = qcow2_layer_file
> +
> +        # Mocking out and execute
> +        with mock.patch.multiple(utils,
> +                                 create_qcow2=mock.DEFAULT,
> +                                 log_layer_extract=mock.DEFAULT) as mocked:
> +            utils.extract_layers_in_qcow2(layers_list, dest_dir, mock.Mock())
> +
> +        # Check actual calls
> +        mocked['create_qcow2'].assert_has_calls(expected_calls)
> +
> +    ###################################
> +    # Tests for: get_image_dir()
> +    ###################################
> +    def test_utils_getimage_dir(self):
> +        """
> +        Ensures that get_image_dir() returns path to DEFAULT_IMG_DIR
> +        if the no_cache argument is set to False and create it if
> +        does not exist.
> +        """
> +        # Perform this test for UID 0 and 1000
> +        for uid in [0, 1000]:
> +            with mock.patch('os.geteuid') as m_geteuid:
> +                m_geteuid.return_value = uid
> +                reload(utils)
> +                with mock.patch('os.makedirs') as m_makedirs:
> +                    with mock.patch('os.path.exists') as m_path_exists:
> +                        m_path_exists.return_value = False
> +                        self.assertEqual(utils.get_image_dir(False),
> +                                         utils.DEFAULT_IMG_DIR)
> +            m_makedirs.assert_called_once_with(utils.DEFAULT_IMG_DIR)
> +
> +    @mock.patch('tempfile.mkdtemp')
> +    def test_utils_getimage_dir_no_cache(self, m_mkdtemp):
> +        """
> +        Ensures that get_image_dir() returns temporary file path created
> +        by tempfile.mkdtemp.
> +        """
> +        m_mkdtemp.return_value = 'foo'
> +        self.assertEqual(utils.get_image_dir(True), 'foo')
> +        m_mkdtemp.assert_called_once()
> +
> +    ###################################
> +    # Tests for: get_image_details()
> +    ###################################
> +    @mock.patch('virtBootstrap.utils.Popen')
> +    def test_utils_get_image_details_raise_error_on_fail(self, m_popen):
> +        """
> +        Ensures that get_image_details() throws ValueError exception
> +        when stderr from skopeo is provided.
> +        """
> +        src = 'docker://foo'
> +        m_popen.return_value.communicate.return_value = [b'', b'Error']
> +        with self.assertRaises(ValueError):
> +            utils.get_image_details(src)
> +
> +    @mock.patch('virtBootstrap.utils.Popen')
> +    def test_utils_get_image_details_return_json_obj_on_success(self, m_popen):
> +        """
> +        Ensures that get_image_details() returns python dictionary which
> +        represents the data provided from stdout of skopeo when stderr
> +        is not present.
> +        """
> +        src = 'docker://foo'
> +        json_dict = {'foo': 'bar'}
> +        stdout = utils.json.dumps(json_dict).encode()
> +        m_popen.return_value.communicate.return_value = [stdout, '']
> +        self.assertDictEqual(utils.get_image_details(src), json_dict)
> +
> +    def test_utils_get_image_details_all_argument_passed(self):
> +        """
> +        Ensures that get_image_details() pass all argument values to
> +        skopeo inspect.
> +        """
> +        src = 'docker://foo'
> +        raw, insecure = True, True
> +        username, password = 'user', 'password'
> +        cmd = ['skopeo', 'inspect', src,
> +               '--raw',
> +               '--tls-verify=false',
> +               "--creds=%s:%s" % (username, password)]
> +
> +        with mock.patch.multiple(utils,
> +                                 Popen=mock.DEFAULT,
> +                                 PIPE=mock.DEFAULT) as mocked:
> +            mocked['Popen'].return_value.communicate.return_value = [b'{}',
> +                                                                     b'']
> +            utils.get_image_details(src, raw, insecure, username, password)
> +
> +        mocked['Popen'].assert_called_once_with(cmd,
> +                                                stdout=mocked['PIPE'],
> +                                                stderr=mocked['PIPE'])
> +
> +    ###################################
> +    # Tests for: is_new_layer_message()
> +    ###################################
> +    def test_utils_is_new_layer_message(self):
> +        """
> +        Ensures that is_new_layer_message() returns True when message
> +        from the skopeo's stdout indicates processing of new layer
> +        and False otherwise.
> +        """
> +
> +        valid_msgs = [
> +            "Copying blob sha256:be232718519c940b04bc57",
> +            "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2"
> +        ]
> +
> +        invalid_msgs = [
> +            'Copying config sha256', 'test', ''
> +        ]
> +
> +        for msg in valid_msgs:
> +            self.assertTrue(utils.is_new_layer_message(msg))
> +        for msg in invalid_msgs:
> +            self.assertFalse(utils.is_new_layer_message(msg))
> +
> +    ###################################
> +    # Tests for: is_layer_config_message()
> +    ###################################
> +    def test_utils_is_layer_config_message(self):
> +        """
> +        Ensures that is_layer_config_message() returns True when message
> +        from the skopeo's stdout indicates processing of manifest file
> +        of container image and False otherwise.
> +        """
> +        invalid_msgs = [
> +            "Copying blob sha256:be232718519c940b04bc57",
> +            "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2",
> +            ''
> +        ]
> +
> +        valid_msg = 'Copying config sha256:d355ed3537e94e76389fd78b7724'
> +
> +        self.assertTrue(utils.is_layer_config_message(valid_msg))
> +        for msg in invalid_msgs:
> +            self.assertFalse(utils.is_layer_config_message(msg))
> +
> +    ###################################
> +    # Tests for: make_async()
> +    ###################################
> +    def test_utils_make_async(self):
> +        """
> +        Ensures that make_async() sets O_NONBLOCK flag on PIPE.
> +        """
> +
> +        pipe = utils.Popen(["echo"], stdout=utils.PIPE).stdout
> +        fd = pipe.fileno()
> +        F_GETFL = utils.fcntl.F_GETFL
> +        O_NONBLOCK = utils.os.O_NONBLOCK
> +
> +        self.assertFalse(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
> +        utils.make_async(fd)
> +        self.assertTrue(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
> +
> +    ###################################
> +    # Tests for: read_async()
> +    ###################################
> +    def test_utils_read_async_successful_read(self):
> +        """
> +        Ensures that read_async() calls read() of passed file descriptor.
> +        """
> +        m_fd = mock.MagicMock()
> +        utils.read_async(m_fd)
> +        m_fd.read.assert_called_once()
> +
> +    def test_utils_read_async_return_empty_str_on_EAGAIN_error(self):
> +        """
> +        Ensures that read_async() ignores EAGAIN errors and returns
> +        empty string.
> +        """
> +        m_fd = mock.MagicMock()
> +        m_fd.read.side_effect = IOError(utils.errno.EAGAIN, '')
> +        self.assertEqual(utils.read_async(m_fd), '')
> +
> +    def test_utils_read_async_raise_errors(self):
> +        """
> +        Ensures that read_async() does not ignore IOError which is different
> +        than EAGAIN and throws an exception.
> +        """
> +        m_fd = mock.MagicMock()
> +        m_fd.read.side_effect = IOError()
> +        with self.assertRaises(IOError):
> +            utils.read_async(m_fd)
> +
> +    ###################################
> +    # Tests for: str2float()
> +    ###################################
> +    def test_utils_str2float(self):
> +        """
> +        Validates the output of str2float() for some test cases.
> +        """
> +        test_values = {'1': 1.0, 'test': None, '0': 0.0, '1.25': 1.25}
> +        for test in test_values:
> +            self.assertEqual(utils.str2float(test), test_values[test])
> +
> +    ###################################
> +    # Tests for: set_root_password_in_rootfs()
> +    ###################################
> +    def test_utils_set_root_password_in_rootfs_restore_permissions(self):
> +        """
> +        Ensures that set_root_password_in_rootfs() restore shadow
> +        file permissions after edit.
> +        """
> +        permissions = 700
> +        rootfs_path = '/foo'
> +        shadow_file = '%s/etc/shadow' % rootfs_path
> +
> +        m_open = mock.mock_open(read_data='')
> +        with mock.patch('virtBootstrap.utils.open', m_open, create=True):
> +            with mock.patch('virtBootstrap.utils.os') as m_os:
> +                m_os.stat.return_value = [permissions]
> +                m_os.path.join.return_value = shadow_file
> +                utils.set_root_password_in_rootfs(rootfs_path, 'password')
> +
> +        expected_calls = [
> +            mock.call.path.join(rootfs_path, 'etc/shadow'),
> +            mock.call.stat(shadow_file),
> +            mock.call.chmod(shadow_file, 438),
> +            mock.call.chmod(shadow_file, permissions)
> +        ]
> +        m_os.assert_has_calls(expected_calls)
> +
> +    def test_utils_set_root_password_in_rootfs_restore_permissions_fail(self):
> +        """
> +        Ensures that set_root_password_in_rootfs() restore shadow file
> +        permissions in case of failure.
> +        """
> +        permissions = 700
> +        rootfs_path = '/foo'
> +        shadow_file = '%s/etc/shadow' % rootfs_path
> +
> +        m_open = mock.mock_open(read_data='')
> +        with mock.patch('virtBootstrap.utils.open', m_open, create=True):
> +            with mock.patch('virtBootstrap.utils.os') as m_os:
> +                m_os.stat.return_value = [permissions]
> +                m_os.path.join.return_value = shadow_file
> +
> +                with self.assertRaises(Exception):
> +                    m_open.side_effect = Exception
> +                    utils.set_root_password_in_rootfs(rootfs_path, 'password')
> +
> +        expected_calls = [
> +            mock.call.path.join(rootfs_path, 'etc/shadow'),
> +            mock.call.stat(shadow_file),
> +            mock.call.chmod(shadow_file, 438),
> +            mock.call.chmod(shadow_file, permissions)
> +        ]
> +        m_os.assert_has_calls(expected_calls)
> +
> +    def test_utils_set_root_password_in_rootfs_store_hash(self):
> +        """
> +        Ensures that set_root_password_in_rootfs() stores the hashed
> +        root password in shadow file.
> +        """
> +        rootfs_path = '/foo'
> +        password = 'secret'
> +        initial_value = '!locked'
> +        hashed_password = 'hashed_password'
> +        shadow_content = '\n'.join([
> +            "root:%s::0:99999:7:::",
> +            "bin:*:17004:0:99999:7:::"
> +            "daemon:*:17004:0:99999:7:::",
> +            "adm:*:17004:0:99999:7:::"
> +        ])
> +
> +        m_open = mock.mock_open(read_data=shadow_content % initial_value)
> +        with mock.patch('virtBootstrap.utils.open', m_open, create=True):
> +            with mock.patch('virtBootstrap.utils.os'):
> +                with mock.patch('passlib.hosts.linux_context.hash') as m_hash:
> +                    m_hash.return_value = hashed_password
> +                    utils.set_root_password_in_rootfs(rootfs_path, password)
> +
> +        m_hash.assert_called_once_with(password)
> +        m_open().write.assert_called_once_with(shadow_content
> +                                               % hashed_password)
> +
> +    ###################################
> +    # Tests for: set_root_password_in_image()
> +    ###################################
> +    @mock.patch('virtBootstrap.utils.execute')
> +    def test_utils_set_root_password_in_image(self, m_execute):
> +        """
> +        Ensures that set_root_password_in_image() calls virt-edit
> +        with correct arguments.
> +        """
> +        image, password = 'foo', 'password'
> +        password_hash = ('$6$rounds=656000$PaQ/H4c/k8Ix9YOM$'
> +                         'cyD47r9PtAE2LhnkpdbVzsiQbM0/h2S/1Bv'
> +                         'u/sXqUtCg.3Ijp7TQy/8tEVstxMy5k5v4mh'
> +                         'CGFqnVv7S6wd.Ah/')
> +
> +        expected_call = [
> +            'virt-edit',
> +            '-a', image, '/etc/shadow',
> +            '-e', 's,^root:.*?:,root:%s:,' % utils.re.escape(password_hash)]
> +
> +        hash_function = 'virtBootstrap.utils.passlib.hosts.linux_context.hash'
> +        with mock.patch(hash_function) as m_hash:
> +            m_hash.return_value = password_hash
> +            utils.set_root_password_in_image(image, password)
> +
> +        m_execute.assert_called_once_with(expected_call)
> +
> +    ###################################
> +    # Tests for: set_root_password()
> +    ###################################
> +    @mock.patch('virtBootstrap.utils.set_root_password_in_rootfs')
> +    def test_utils_set_root_password_dir(self, m_set_root_password_in_rootfs):
> +        """
> +        Ensures that set_root_password() calls set_root_password_in_rootfs()
> +        when the format is set to "dir".
> +        """
> +        fmt, dest, root_password = 'dir', 'dest', 'root_password'
> +        utils.set_root_password(fmt, dest, root_password)
> +
> +        m_set_root_password_in_rootfs.assert_called_once_with(
> +            dest, root_password
> +        )
> +
> +    @mock.patch('virtBootstrap.utils.set_root_password_in_image')
> +    def test_utils_set_root_password_dir(self, m_set_root_password_in_image):
> +        """
> +        Ensures that set_root_password() calls set_root_password_in_image()
> +        when the format is set to "qcow2" with the path to the last
> +        extracted layer.
> +        """
> +        fmt, dest, root_password = 'qcow2', 'dest', 'root_password'
> +        layers = ['layer-0.qcow2', 'layer-1.qcow2']
> +
> +        with mock.patch('os.listdir') as m_listdir:
> +            m_listdir.return_value = layers
> +            utils.set_root_password(fmt, dest, root_password)
> +
> +        m_set_root_password_in_image.assert_called_once_with(
> +            utils.os.path.join(dest, max(layers)),
> +            root_password
> +        )
> +
> +    ###################################
> +    # Tests for: write_progress()
> +    ###################################
> +    def test_utils_write_progress_fill_terminal_width(self):
> +        """
> +        Ensures that write_progress() outputs a message with length
> +        equal to terminal width and last symbol '\r'.
> +        """
> +        terminal_width = 120
> +        prog = {'status': 'status', 'value': 0}
> +        with mock.patch.multiple(utils,
> +                                 Popen=mock.DEFAULT,
> +                                 PIPE=mock.DEFAULT,
> +                                 sys=mock.DEFAULT) as mocked:
> +
> +            (mocked['Popen'].return_value.stdout
> +             .read.return_value) = ("20 %s" % terminal_width).encode()
> +
> +            utils.write_progress(prog)
> +
> +        mocked['Popen'].assert_called_once_with(["stty", "size"],
> +                                                stdout=mocked['PIPE'])
> +        output_message = mocked['sys'].stdout.write.call_args[0][0]
> +        mocked['sys'].stdout.write.assert_called_once()
> +        self.assertEqual(len(output_message), terminal_width + 1)
> +        self.assertEqual(output_message[-1], '\r')
> +
> +    def test_utils_write_progress_use_default_term_width_on_failure(self):
> +        """
> +        Ensures that write_progress() outputs a message with length equal
> +        to default terminal width (80) when the detecting terminal width
> +        has failed.
> +        """
> +        default_terminal_width = 80
> +        prog = {'status': 'status', 'value': 0}
> +        with mock.patch.multiple(utils, Popen=mock.DEFAULT,
> +                                 sys=mock.DEFAULT) as mocked:
> +            mocked['Popen'].side_effect = Exception()
> +            utils.write_progress(prog)
> +
> +        self.assertEqual(len(mocked['sys'].stdout.write.call_args[0][0]),
> +                         default_terminal_width + 1)
> +        mocked['sys'].stdout.write.assert_called_once()
> +
> +
> +if __name__ == '__main__':
> +    unittest.main(exit=False)

ACK

--
Cedric




More information about the virt-tools-list mailing list