[virt-tools-list] [virt-bootstrap] [PATCH v4 12/13] tests: Add unit tests for the "utils" module

Radostin Stoyanov rstoyanov1 at gmail.com
Fri Jul 21 12:13:28 UTC 2017


---
 tests/test_utils.py | 640 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 640 insertions(+)
 create mode 100644 tests/test_utils.py

diff --git a/tests/test_utils.py b/tests/test_utils.py
new file mode 100644
index 0000000..19f9f70
--- /dev/null
+++ b/tests/test_utils.py
@@ -0,0 +1,640 @@
+# Authors:
+#   Cedric Bosdonnat <cbosdonnat at suse.com>
+#   Radostin Stoyanov <rstoyanov1 at gmail.com>
+#
+# Copyright (C) 2017 SUSE, Inc.
+# Copyright (C) 2017 Radostin Stoyanov
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Unit tests for functions defined in virtBootstrap.utils
+"""
+
+from tests import unittest
+from tests import mock
+from tests import utils
+try:
+    # pylint: disable=redefined-builtin
+    from importlib import reload
+except ImportError:
+    pass
+
+
+# pylint: disable=invalid-name
+# pylint: disable=too-many-public-methods
+class TestUtils(unittest.TestCase):
+    """
+    Ensures that functions defined in the utils module of virtBootstrap
+    work as expected.
+    """
+
+    ###################################
+    # Tests for: checksum()
+    ###################################
+    def test_utils_checksum_return_false_on_invalid_hash(self):
+        """
+        Ensures that checksum() returns False if the actual and expected
+        hash sum of file are not equal.
+        """
+        with mock.patch.multiple(utils,
+                                 open=mock.DEFAULT,
+                                 logger=mock.DEFAULT,
+                                 hashlib=mock.DEFAULT) as mocked:
+            path, sum_type, sum_expected = '/foo', 'sha256', 'bar'
+            mocked['hashlib'].sha256.hexdigest.return_value = False
+            self.assertFalse(utils.checksum(path, sum_type, sum_expected))
+
+    def test_utils_checksum_return_false_if_file_could_not_be_opened(self):
+        """
+        Ensures that checksum() returns False if the file to be checked
+        cannot be open for read.
+        """
+        with mock.patch.multiple(utils,
+                                 open=mock.DEFAULT,
+                                 logger=mock.DEFAULT,
+                                 hashlib=mock.DEFAULT) as mocked:
+            mocked['open'].side_effect = IOError()
+            self.assertFalse(utils.checksum('foo', 'sha256', 'bar'))
+
+    def test_utils_checksum_return_true_on_valid_hash(self):
+        """
+        Ensures that checksum() returns True when the actual and expected
+        hash sum of file are equal.
+        """
+        with mock.patch.multiple(utils,
+                                 open=mock.DEFAULT,
+                                 logger=mock.DEFAULT,
+                                 hashlib=mock.DEFAULT) as mocked:
+            path, sum_type, sum_expected = '/foo', 'sha256', 'bar'
+            mocked['hashlib'].sha256.return_value.hexdigest.return_value \
+                = sum_expected
+            self.assertTrue(utils.checksum(path, sum_type, sum_expected))
+
+    ###################################
+    # Tests for: execute()
+    ###################################
+    def test_utils_execute_logging_on_successful_proc_call(self):
+        """
+        Ensures that execute() creates log record of cmd, stdout and stderr
+        when the exit code of process is 0.
+        """
+        with mock.patch.multiple(utils,
+                                 logger=mock.DEFAULT,
+                                 Popen=mock.DEFAULT) as mocked:
+            cmd = ['foo']
+            output, err = 'test_out', 'test_err'
+
+            mocked['Popen'].return_value.returncode = 0
+            (mocked['Popen'].return_value
+             .communicate.return_value) = (output.encode(), err.encode())
+
+            utils.execute(cmd)
+            mocked['logger'].debug.assert_any_call("Call command:\n%s", cmd[0])
+            mocked['logger'].debug.assert_any_call("Stdout:\n%s", output)
+            mocked['logger'].debug.assert_any_call("Stderr:\n%s", err)
+
+    def test_utils_execute_raise_error_on_unsuccessful_proc_call(self):
+        """
+        Ensures that execute() raise CalledProcessError exception when the
+        exit code of process is not 0.
+        """
+        with mock.patch('virtBootstrap.utils.Popen') as m_popen:
+            m_popen.return_value.returncode = 1
+            m_popen.return_value.communicate.return_value = (b'output', b'err')
+            with self.assertRaises(utils.CalledProcessError):
+                utils.execute(['foo'])
+
+    ###################################
+    # Tests for: safe_untar()
+    ###################################
+    def test_utils_safe_untar_calls_execute(self):
+        """
+        Ensures that safe_untar() calls execute with virt-sandbox
+        command to extract source files to destination folder.
+        Test for users with EUID 0 and 1000.
+        """
+        with mock.patch('virtBootstrap.utils.os.geteuid') as m_geteuid:
+            for uid in [0, 1000]:
+                m_geteuid.return_value = uid
+                reload(utils)
+                with mock.patch('virtBootstrap.utils.execute') as m_execute:
+                    src, dest = 'foo', 'bar'
+                    utils.safe_untar('foo', 'bar')
+                    cmd = ['virt-sandbox',
+                           '-c', utils.LIBVIRT_CONN,
+                           '-m', 'host-bind:/mnt=' + dest,
+                           '--',
+                           '/bin/tar', 'xf', src,
+                           '-C', '/mnt',
+                           '--exclude', 'dev/*']
+                    m_execute.assert_called_once_with(cmd)
+
+    ###################################
+    # Tests for: bytes_to_size()
+    ###################################
+    def test_utils_bytes_to_size(self):
+        """
+        Validates the output of bytes_to_size() for some test cases.
+        """
+        test_values = {
+            0: '0', 1: '1', 512: '512', 1000: '0.98 KiB', 1024: '1 KiB',
+            4096: '4 KiB', 5120: '5 KiB', 10 ** 10: '9.31 GiB'
+        }
+        for value in test_values:
+            self.assertEqual(utils.bytes_to_size(value), test_values[value])
+
+    ###################################
+    # Tests for: size_to_bytes()
+    ###################################
+    def test_utils_size_to_bytes(self):
+        """
+        Validates the output of size_to_bytes() for some test cases.
+        """
+        test_values = [1, '0']
+        test_formats = ['TB', 'GB', 'MB', 'KB', 'B']
+        expected_output = [1099511627776, 1073741824, 1048576, 1024, 1,
+                           0, 0, 0, 0, 0]
+        i = 0
+        for value in test_values:
+            for fmt in test_formats:
+                self.assertEqual(utils.size_to_bytes(value, fmt),
+                                 expected_output[i])
+                i += 1
+
+    ###################################
+    # Tests for: log_layer_extract()
+    ###################################
+    def test_utils_log_layer_extract(self):
+        """
+        Ensures that log_layer_extract() updates the progress and creates
+        log record with debug level.
+        """
+        m_progress = mock.Mock()
+        layer = ['sum_type', 'sum_value', 'layer_file', 'layer_size']
+        with mock.patch.multiple(utils, logger=mock.DEFAULT,
+                                 bytes_to_size=mock.DEFAULT) as mocked:
+            utils.log_layer_extract(layer, 'foo', 'bar', m_progress)
+        mocked['bytes_to_size'].assert_called_once_with('layer_size')
+        mocked['logger'].debug.assert_called_once()
+        m_progress.assert_called_once()
+
+    ###################################
+    # Tests for: get_mime_type()
+    ###################################
+    @mock.patch('virtBootstrap.utils.Popen')
+    def test_utils_get_mime_type(self, m_popen):
+        """
+        Ensures that get_mime_type() returns the detected MIME type
+        of /usr/bin/file.
+        """
+        path = "foo"
+        mime = "application/x-gzip"
+        stdout = ('%s: %s' % (path, mime)).encode()
+        m_popen.return_value.stdout.read.return_value = stdout
+        self.assertEqual(utils.get_mime_type(path), mime)
+        m_popen.assert_called_once_with(["/usr/bin/file", "--mime-type", path],
+                                        stdout=utils.PIPE)
+
+    ###################################
+    # Tests for: untar_layers()
+    ###################################
+    def test_utils_untar_all_layers_in_order(self):
+        """
+        Ensures that untar_layers() iterates through all passed layers
+        in order.
+        """
+        layers = ['l1', 'l2', 'l3']
+        layers_list = [['', '', layer] for layer in layers]
+        dest_dir = '/foo'
+        expected_calls = [mock.call(layer, dest_dir) for layer in layers]
+        with mock.patch.multiple(utils,
+                                 safe_untar=mock.DEFAULT,
+                                 log_layer_extract=mock.DEFAULT) as mocked:
+            utils.untar_layers(layers_list, dest_dir, mock.Mock())
+        mocked['safe_untar'].assert_has_calls(expected_calls)
+
+    ###################################
+    # Tests for: create_qcow2()
+    ###################################
+    def _apply_test_to_create_qcow2(self, expected_calls, *args):
+        """
+        This method contains common test pattern used in the next two
+        test cases.
+        """
+        with mock.patch.multiple(utils,
+                                 execute=mock.DEFAULT,
+                                 logger=mock.DEFAULT,
+                                 get_mime_type=mock.DEFAULT) as mocked:
+            mocked['get_mime_type'].return_value = 'application/x-gzip'
+            utils.create_qcow2(*args)
+        mocked['execute'].assert_has_calls(expected_calls)
+
+    def test_utils_create_qcow2_base_layer(self):
+        """
+        Ensures that create_qcow2() creates base layer when
+        backing_file = None.
+        """
+        tar_file = 'foo'
+        layer_file = 'bar'
+        size = '5G'
+        backing_file = None
+
+        expected_calls = [
+            mock.call(["qemu-img", "create", "-f", "qcow2", layer_file, size]),
+
+            mock.call(['virt-format',
+                       '--format=qcow2',
+                       '--partition=none',
+                       '--filesystem=ext3',
+                       '-a', layer_file]),
+
+            mock.call(['guestfish',
+                       '-a', layer_file,
+                       '-m', '/dev/sda',
+                       'tar-in', tar_file, '/', 'compress:gzip'])
+        ]
+
+        self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file,
+                                         backing_file, size)
+
+    def test_utils_create_qcow2_layer_with_backing_chain(self):
+        """
+        Ensures that create_qcow2() creates new layer with backing chains
+        when backing_file is specified.
+        """
+        tar_file = 'foo'
+        layer_file = 'bar'
+        backing_file = 'base'
+        size = '5G'
+
+        expected_calls = [
+            mock.call(['qemu-img', 'create',
+                       '-b', backing_file,
+                       '-f', 'qcow2',
+                       layer_file, size]),
+
+            mock.call(['guestfish',
+                       '-a', layer_file,
+                       '-m', '/dev/sda',
+                       'tar-in', tar_file, '/', 'compress:gzip'])
+        ]
+
+        self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file,
+                                         backing_file, size)
+
+    ###################################
+    # Tests for: extract_layers_in_qcow2()
+    ###################################
+    def test_utils_if_all_layers_extracted_in_order_in_qcow2(self):
+        """
+        Ensures that extract_layers_in_qcow2() iterates through all
+        layers in order.
+        """
+        layers = ['l1', 'l2', 'l3']
+        layers_list = [['', '', layer] for layer in layers]
+        dest_dir = '/foo'
+
+        # Generate expected calls
+        expected_calls = []
+        qcow2_backing_file = None
+        for index, layer in enumerate(layers):
+            qcow2_layer_file = dest_dir + "/layer-%s.qcow2" % index
+            expected_calls.append(
+                mock.call(layer, qcow2_layer_file, qcow2_backing_file))
+            qcow2_backing_file = qcow2_layer_file
+
+        # Mocking out and execute
+        with mock.patch.multiple(utils,
+                                 create_qcow2=mock.DEFAULT,
+                                 log_layer_extract=mock.DEFAULT) as mocked:
+            utils.extract_layers_in_qcow2(layers_list, dest_dir, mock.Mock())
+
+        # Check actual calls
+        mocked['create_qcow2'].assert_has_calls(expected_calls)
+
+    ###################################
+    # Tests for: get_image_dir()
+    ###################################
+    def test_utils_getimage_dir(self):
+        """
+        Ensures that get_image_dir() returns path to DEFAULT_IMG_DIR
+        if the no_cache argument is set to False and create it if
+        does not exist.
+        """
+        # Perform this test for UID 0 and 1000
+        for uid in [0, 1000]:
+            with mock.patch('os.geteuid') as m_geteuid:
+                m_geteuid.return_value = uid
+                reload(utils)
+                with mock.patch('os.makedirs') as m_makedirs:
+                    with mock.patch('os.path.exists') as m_path_exists:
+                        m_path_exists.return_value = False
+                        self.assertEqual(utils.get_image_dir(False),
+                                         utils.DEFAULT_IMG_DIR)
+            m_makedirs.assert_called_once_with(utils.DEFAULT_IMG_DIR)
+
+    @mock.patch('tempfile.mkdtemp')
+    def test_utils_getimage_dir_no_cache(self, m_mkdtemp):
+        """
+        Ensures that get_image_dir() returns temporary file path created
+        by tempfile.mkdtemp.
+        """
+        m_mkdtemp.return_value = 'foo'
+        self.assertEqual(utils.get_image_dir(True), 'foo')
+        m_mkdtemp.assert_called_once()
+
+    ###################################
+    # Tests for: get_image_details()
+    ###################################
+    @mock.patch('virtBootstrap.utils.Popen')
+    def test_utils_get_image_details_raise_error_on_fail(self, m_popen):
+        """
+        Ensures that get_image_details() throws ValueError exception
+        when stderr from skopeo is provided.
+        """
+        src = 'docker://foo'
+        m_popen.return_value.communicate.return_value = [b'', b'Error']
+        with self.assertRaises(ValueError):
+            utils.get_image_details(src)
+
+    @mock.patch('virtBootstrap.utils.Popen')
+    def test_utils_get_image_details_return_json_obj_on_success(self, m_popen):
+        """
+        Ensures that get_image_details() returns python dictionary which
+        represents the data provided from stdout of skopeo when stderr
+        is not present.
+        """
+        src = 'docker://foo'
+        json_dict = {'foo': 'bar'}
+        stdout = utils.json.dumps(json_dict).encode()
+        m_popen.return_value.communicate.return_value = [stdout, '']
+        self.assertDictEqual(utils.get_image_details(src), json_dict)
+
+    def test_utils_get_image_details_all_argument_passed(self):
+        """
+        Ensures that get_image_details() pass all argument values to
+        skopeo inspect.
+        """
+        src = 'docker://foo'
+        raw, insecure = True, True
+        username, password = 'user', 'password'
+        cmd = ['skopeo', 'inspect', src,
+               '--raw',
+               '--tls-verify=false',
+               "--creds=%s:%s" % (username, password)]
+
+        with mock.patch.multiple(utils,
+                                 Popen=mock.DEFAULT,
+                                 PIPE=mock.DEFAULT) as mocked:
+            mocked['Popen'].return_value.communicate.return_value = [b'{}',
+                                                                     b'']
+            utils.get_image_details(src, raw, insecure, username, password)
+
+        mocked['Popen'].assert_called_once_with(cmd,
+                                                stdout=mocked['PIPE'],
+                                                stderr=mocked['PIPE'])
+
+    ###################################
+    # Tests for: is_new_layer_message()
+    ###################################
+    def test_utils_is_new_layer_message(self):
+        """
+        Ensures that is_new_layer_message() returns True when message
+        from the skopeo's stdout indicates processing of new layer
+        and False otherwise.
+        """
+
+        valid_msgs = [
+            "Copying blob sha256:be232718519c940b04bc57",
+            "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2"
+        ]
+
+        invalid_msgs = [
+            'Copying config sha256', 'test', ''
+        ]
+
+        for msg in valid_msgs:
+            self.assertTrue(utils.is_new_layer_message(msg))
+        for msg in invalid_msgs:
+            self.assertFalse(utils.is_new_layer_message(msg))
+
+    ###################################
+    # Tests for: is_layer_config_message()
+    ###################################
+    def test_utils_is_layer_config_message(self):
+        """
+        Ensures that is_layer_config_message() returns True when message
+        from the skopeo's stdout indicates processing of manifest file
+        of container image and False otherwise.
+        """
+        invalid_msgs = [
+            "Copying blob sha256:be232718519c940b04bc57",
+            "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2",
+            ''
+        ]
+
+        valid_msg = 'Copying config sha256:d355ed3537e94e76389fd78b7724'
+
+        self.assertTrue(utils.is_layer_config_message(valid_msg))
+        for msg in invalid_msgs:
+            self.assertFalse(utils.is_layer_config_message(msg))
+
+    ###################################
+    # Tests for: make_async()
+    ###################################
+    def test_utils_make_async(self):
+        """
+        Ensures that make_async() sets O_NONBLOCK flag on PIPE.
+        """
+
+        pipe = utils.Popen(["echo"], stdout=utils.PIPE).stdout
+        fd = pipe.fileno()
+        F_GETFL = utils.fcntl.F_GETFL
+        O_NONBLOCK = utils.os.O_NONBLOCK
+
+        self.assertFalse(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
+        utils.make_async(fd)
+        self.assertTrue(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
+
+    ###################################
+    # Tests for: read_async()
+    ###################################
+    def test_utils_read_async_successful_read(self):
+        """
+        Ensures that read_async() calls read() of passed file descriptor.
+        """
+        m_fd = mock.MagicMock()
+        utils.read_async(m_fd)
+        m_fd.read.assert_called_once()
+
+    def test_utils_read_async_return_empty_str_on_EAGAIN_error(self):
+        """
+        Ensures that read_async() ignores EAGAIN errors and returns
+        empty string.
+        """
+        m_fd = mock.MagicMock()
+        m_fd.read.side_effect = IOError(utils.errno.EAGAIN, '')
+        self.assertEqual(utils.read_async(m_fd), '')
+
+    def test_utils_read_async_raise_errors(self):
+        """
+        Ensures that read_async() does not ignore IOError which is different
+        than EAGAIN and throws an exception.
+        """
+        m_fd = mock.MagicMock()
+        m_fd.read.side_effect = IOError()
+        with self.assertRaises(IOError):
+            utils.read_async(m_fd)
+
+    ###################################
+    # Tests for: str2float()
+    ###################################
+    def test_utils_str2float(self):
+        """
+        Validates the output of str2float() for some test cases.
+        """
+        test_values = {'1': 1.0, 'test': None, '0': 0.0, '1.25': 1.25}
+        for test in test_values:
+            self.assertEqual(utils.str2float(test), test_values[test])
+
+    ###################################
+    # Tests for: set_root_password()
+    ###################################
+    def test_utils_set_root_password_restore_shadow_permissions(self):
+        """
+        Ensures that set_root_password() restore shadow file permissions
+        after edit.
+        """
+        permissions = 700
+        rootfs_path = '/foo'
+        shadow_file = '%s/etc/shadow' % rootfs_path
+
+        m_open = mock.mock_open(read_data='')
+        with mock.patch('virtBootstrap.utils.open', m_open, create=True):
+            with mock.patch('virtBootstrap.utils.os') as m_os:
+                m_os.stat.return_value = [permissions]
+                m_os.path.join.return_value = shadow_file
+                utils.set_root_password(rootfs_path, 'password')
+
+        expected_calls = [
+            mock.call.path.join(rootfs_path, 'etc/shadow'),
+            mock.call.stat(shadow_file),
+            mock.call.chmod(shadow_file, 438),
+            mock.call.chmod(shadow_file, permissions)
+        ]
+        m_os.assert_has_calls(expected_calls)
+
+    def test_utils_set_root_password_restore_permissions_on_fail(self):
+        """
+        Ensures that set_root_password() restore shadow file permissions
+        in case of failure.
+        """
+        permissions = 700
+        rootfs_path = '/foo'
+        shadow_file = '%s/etc/shadow' % rootfs_path
+
+        m_open = mock.mock_open(read_data='')
+        with mock.patch('virtBootstrap.utils.open', m_open, create=True):
+            with mock.patch('virtBootstrap.utils.os') as m_os:
+                m_os.stat.return_value = [permissions]
+                m_os.path.join.return_value = shadow_file
+
+                with self.assertRaises(Exception):
+                    m_open.side_effect = Exception
+                    utils.set_root_password(rootfs_path, 'password')
+
+        expected_calls = [
+            mock.call.path.join(rootfs_path, 'etc/shadow'),
+            mock.call.stat(shadow_file),
+            mock.call.chmod(shadow_file, 438),
+            mock.call.chmod(shadow_file, permissions)
+        ]
+        m_os.assert_has_calls(expected_calls)
+
+    def test_utils_set_root_password_store_hash(self):
+        """
+        Ensures that set_root_password() stores the hashed root password for
+        in shadow file.
+        """
+        rootfs_path = '/foo'
+        password = 'secret'
+        initial_value = '!locked'
+        hashed_password = 'hashed_password'
+        shadow_content = '\n'.join([
+            "root:%s::0:99999:7:::",
+            "bin:*:17004:0:99999:7:::"
+            "daemon:*:17004:0:99999:7:::",
+            "adm:*:17004:0:99999:7:::"
+        ])
+
+        m_open = mock.mock_open(read_data=shadow_content % initial_value)
+        with mock.patch('virtBootstrap.utils.open', m_open, create=True):
+            with mock.patch('virtBootstrap.utils.os'):
+                with mock.patch('passlib.hosts.linux_context.hash') as m_hash:
+                    m_hash.return_value = hashed_password
+                    utils.set_root_password(rootfs_path, password)
+
+        m_hash.assert_called_once_with(password)
+        m_open().write.assert_called_once_with(shadow_content
+                                               % hashed_password)
+
+    ###################################
+    # Tests for: write_progress()
+    ###################################
+    def test_utils_write_progress_fill_terminal_width(self):
+        """
+        Ensures that write_progress() outputs a message with length
+        equal to terminal width and last symbol '\r'.
+        """
+        terminal_width = 120
+        prog = {'status': 'status', 'value': 0}
+        with mock.patch.multiple(utils,
+                                 Popen=mock.DEFAULT,
+                                 PIPE=mock.DEFAULT,
+                                 sys=mock.DEFAULT) as mocked:
+
+            (mocked['Popen'].return_value.stdout
+             .read.return_value) = ("20 %s" % terminal_width).encode()
+
+            utils.write_progress(prog)
+
+        mocked['Popen'].assert_called_once_with(["stty", "size"],
+                                                stdout=mocked['PIPE'])
+        output_message = mocked['sys'].stdout.write.call_args[0][0]
+        mocked['sys'].stdout.write.assert_called_once()
+        self.assertEqual(len(output_message), terminal_width + 1)
+        self.assertEqual(output_message[-1], '\r')
+
+    def test_utils_write_progress_use_default_term_width_on_failure(self):
+        """
+        Ensures that write_progress() outputs a message with length equal
+        to default terminal width (80) when the detecting terminal width
+        has failed.
+        """
+        default_terminal_width = 80
+        prog = {'status': 'status', 'value': 0}
+        with mock.patch.multiple(utils, Popen=mock.DEFAULT,
+                                 sys=mock.DEFAULT) as mocked:
+            mocked['Popen'].side_effect = Exception()
+            utils.write_progress(prog)
+
+        self.assertEqual(len(mocked['sys'].stdout.write.call_args[0][0]),
+                         default_terminal_width + 1)
+        mocked['sys'].stdout.write.assert_called_once()
+
+
+if __name__ == '__main__':
+    unittest.main(exit=False)
-- 
2.9.4




More information about the virt-tools-list mailing list