[virt-tools-list] [virt-bootstrap] [PATCH v4 15/26] Make UID/GID mapping reusable

Cedric Bosdonnat cbosdonnat at suse.com
Thu Aug 3 16:06:27 UTC 2017


On Thu, 2017-08-03 at 14:13 +0100, Radostin Stoyanov wrote:
> Move the implementation of UID/GID file ownership mapping for root file
> system in the utils module.
> 
> This could be used for the ownership mapping for files in qcow2 images.
> 
> Update the unit tests as well.
> ---
>  src/virtBootstrap/utils.py          |  72 ++++++++++++++++
>  src/virtBootstrap/virt_bootstrap.py |  74 +---------------
>  tests/test_utils.py                 | 149 ++++++++++++++++++++++++++++++++
>  tests/test_virt_bootstrap.py        | 168 ++----------------------------------
>  4 files changed, 230 insertions(+), 233 deletions(-)
> 
> diff --git a/src/virtBootstrap/utils.py b/src/virtBootstrap/utils.py
> index 965dcad..071bb3e 100644
> --- a/src/virtBootstrap/utils.py
> +++ b/src/virtBootstrap/utils.py
> @@ -471,3 +471,75 @@ def write_progress(prog):
>      # Write message to console
>      sys.stdout.write(msg)
>      sys.stdout.flush()
> +
> +
> +# The implementation for remapping ownership of all files inside a
> +# container's rootfs is inspired by the tool uidmapshift:
> +#
> +# Original author: Serge Hallyn <serge.hallyn at ubuntu.com>
> +# Original license: GPLv2
> +# http://bazaar.launchpad.net/%7Eserge-hallyn/+junk/nsexec/view/head:/uidmapshift.c
> +
> +def get_map_id(old_id, opts):
> +    """
> +    Calculate new map_id.
> +    """
> +    if old_id >= opts['first'] and old_id < opts['last']:
> +        return old_id + opts['offset']
> +    return -1
> +
> +
> +def get_mapping_opts(mapping):
> +    """
> +    Get range options from UID/GID mapping
> +    """
> +    start = mapping[0] if mapping[0] > -1 else 0
> +    target = mapping[1] if mapping[1] > -1 else 0
> +    count = mapping[2] if mapping[2] > -1 else 1
> +
> +    opts = {
> +        'first': start,
> +        'last': start + count,
> +        'offset': target - start
> +    }
> +    return opts
> +
> +
> +def map_id(path, map_uid, map_gid):
> +    """
> +    Remapping ownership of all files inside a container's rootfs.
> +
> +    map_gid and map_uid: Contain integers in a list with format:
> +        [<start>, <target>, <count>]
> +    """
> +    if map_uid:
> +        uid_opts = get_mapping_opts(map_uid)
> +    if map_gid:
> +        gid_opts = get_mapping_opts(map_gid)
> +
> +    for root, _ignore, files in os.walk(os.path.realpath(path)):
> +        for name in [root] + files:
> +            file_path = os.path.join(root, name)
> +
> +            stat_info = os.lstat(file_path)
> +            old_uid = stat_info.st_uid
> +            old_gid = stat_info.st_gid
> +
> +            new_uid = get_map_id(old_uid, uid_opts) if map_uid else -1
> +            new_gid = get_map_id(old_gid, gid_opts) if map_gid else -1
> +            os.lchown(file_path, new_uid, new_gid)
> +
> +
> +def mapping_uid_gid(dest, uid_map, gid_map):
> +    """
> +    Mapping ownership for each uid_map and gid_map.
> +    """
> +    len_diff = len(uid_map) - len(gid_map)
> +
> +    if len_diff < 0:
> +        uid_map += [None] * abs(len_diff)
> +    elif len_diff > 0:
> +        gid_map += [None] * len_diff
> +
> +    for uid, gid in zip(uid_map, gid_map):
> +        map_id(dest, uid, gid)
> diff --git a/src/virtBootstrap/virt_bootstrap.py b/src/virtBootstrap/virt_bootstrap.py
> index ddc5456..0bc2e2b 100755
> --- a/src/virtBootstrap/virt_bootstrap.py
> +++ b/src/virtBootstrap/virt_bootstrap.py
> @@ -69,22 +69,6 @@ def get_source(source_type):
>          raise Exception("Invalid image URL scheme: '%s'" % source_type)
>  
>  
> -# The implementation for remapping ownership of all files inside a
> -# container's rootfs is inspired by the tool uidmapshift:
> -#
> -# Original author: Serge Hallyn <serge.hallyn at ubuntu.com>
> -# Original license: GPLv2
> -# http://bazaar.launchpad.net/%7Eserge-hallyn/+junk/nsexec/view/head:/uidmapshift.c
> -
> -def get_map_id(old_id, opts):
> -    """
> -    Calculate new map_id.
> -    """
> -    if old_id >= opts['first'] and old_id < opts['last']:
> -        return old_id + opts['offset']
> -    return -1
> -
> -
>  def parse_idmap(idmap):
>      """
>      Parse user input to 'start', 'target' and 'count' values.
> @@ -107,62 +91,6 @@ def parse_idmap(idmap):
>          raise ValueError("Invalid UID/GID mapping value: %s" % idmap)
>  
>  
> -def get_mapping_opts(mapping):
> -    """
> -    Get range options from UID/GID mapping
> -    """
> -    start = mapping[0] if mapping[0] > -1 else 0
> -    target = mapping[1] if mapping[1] > -1 else 0
> -    count = mapping[2] if mapping[2] > -1 else 1
> -
> -    opts = {
> -        'first': start,
> -        'last': start + count,
> -        'offset': target - start
> -    }
> -    return opts
> -
> -
> -def map_id(path, map_uid, map_gid):
> -    """
> -    Remapping ownership of all files inside a container's rootfs.
> -
> -    map_gid and map_uid: Contain integers in a list with format:
> -        [<start>, <target>, <count>]
> -    """
> -    if map_uid:
> -        uid_opts = get_mapping_opts(map_uid)
> -    if map_gid:
> -        gid_opts = get_mapping_opts(map_gid)
> -
> -    for root, _ignore, files in os.walk(os.path.realpath(path)):
> -        for name in [root] + files:
> -            file_path = os.path.join(root, name)
> -
> -            stat_info = os.lstat(file_path)
> -            old_uid = stat_info.st_uid
> -            old_gid = stat_info.st_gid
> -
> -            new_uid = get_map_id(old_uid, uid_opts) if map_uid else -1
> -            new_gid = get_map_id(old_gid, gid_opts) if map_gid else -1
> -            os.lchown(file_path, new_uid, new_gid)
> -
> -
> -def mapping_uid_gid(dest, uid_map, gid_map):
> -    """
> -    Mapping ownership for each uid_map and gid_map.
> -    """
> -    len_diff = len(uid_map) - len(gid_map)
> -
> -    if len_diff < 0:
> -        uid_map += [None] * abs(len_diff)
> -    elif len_diff > 0:
> -        gid_map += [None] * len_diff
> -
> -    for uid, gid in zip(uid_map, gid_map):
> -        map_id(dest, uid, gid)
> -
> -
>  # pylint: disable=too-many-arguments
>  def bootstrap(uri, dest,
>                fmt=utils.DEFAULT_OUTPUT_FORMAT,
> @@ -206,7 +134,7 @@ def bootstrap(uri, dest,
>  
>      if fmt == "dir" and uid_map or gid_map:
>          logger.info("Mapping UID/GID")
> -        mapping_uid_gid(dest, uid_map, gid_map)
> +        utils.mapping_uid_gid(dest, uid_map, gid_map)
>  
>  
>  def set_logging_conf(loglevel=None):
> diff --git a/tests/test_utils.py b/tests/test_utils.py
> index e45a2c9..7ce2ba4 100644
> --- a/tests/test_utils.py
> +++ b/tests/test_utils.py
> @@ -603,6 +603,155 @@ class TestUtils(unittest.TestCase):
>                           default_terminal_width + 1)
>          mocked['sys'].stdout.write.assert_called_once()
>  
> +    ###################################
> +    # Tests for: mapping_uid_gid()
> +    ###################################
> +    def test_mapping_uid_gid(self):
> +        """
> +        Ensures that mapping_uid_gid() calls map_id() with
> +        correct parameters.
> +        """
> +        dest = '/path'
> +        calls = [
> +            {  # Call 1
> +                'dest': dest,
> +                'uid': [[0, 1000, 10]],
> +                'gid': [[0, 1000, 10]]
> +            },
> +            {  # Call 2
> +                'dest': dest,
> +                'uid': [],
> +                'gid': [[0, 1000, 10]]
> +            },
> +            {  # Call 3
> +                'dest': dest,
> +                'uid': [[0, 1000, 10]],
> +                'gid': []
> +            },
> +            {  # Call 4
> +                'dest': dest,
> +                'uid': [[0, 1000, 10], [500, 500, 10]],
> +                'gid': [[0, 1000, 10]]
> +            }
> +        ]
> +
> +        expected_calls = [
> +            # Expected from call 1
> +            mock.call(dest, [0, 1000, 10], [0, 1000, 10]),
> +            # Expected from call 2
> +            mock.call(dest, None, [0, 1000, 10]),
> +            # Expected from call 3
> +            mock.call(dest, [0, 1000, 10], None),
> +            # Expected from call 4
> +            mock.call(dest, [0, 1000, 10], [0, 1000, 10]),
> +            mock.call(dest, [500, 500, 10], None)
> +        ]
> +
> +        with mock.patch('virtBootstrap.utils.map_id') as m_map_id:
> +            for args in calls:
> +                utils.mapping_uid_gid(args['dest'], args['uid'], args['gid'])
> +
> +        m_map_id.assert_has_calls(expected_calls)
> +
> +    ###################################
> +    # Tests for: map_id()
> +    ###################################
> +    @mock.patch('os.path.realpath')
> +    def test_map_id(self, m_realpath):
> +        """
> +        Ensures that the UID/GID mapping applies to all files
> +        and directories in root file system.
> +        """
> +        root_path = '/root'
> +        files = ['foo1', 'foo2']
> +        m_realpath.return_value = root_path
> +
> +        map_uid = [0, 1000, 10]
> +        map_gid = [0, 1000, 10]
> +        new_id = 'new_id'
> +
> +        expected_calls = [
> +            mock.call('/root', new_id, new_id),
> +            mock.call('/root/foo1', new_id, new_id),
> +            mock.call('/root/foo2', new_id, new_id)
> +        ]
> +
> +        with mock.patch.multiple('os',
> +                                 lchown=mock.DEFAULT,
> +                                 lstat=mock.DEFAULT,
> +                                 walk=mock.DEFAULT) as mocked:
> +
> +            mocked['walk'].return_value = [(root_path, [], files)]
> +            mocked['lstat']().st_uid = map_uid[0]
> +            mocked['lstat']().st_gid = map_gid[0]
> +
> +            get_map_id = 'virtBootstrap.utils.get_map_id'
> +            with mock.patch(get_map_id) as m_get_map_id:
> +                m_get_map_id.return_value = new_id
> +                utils.map_id(root_path, map_uid, map_gid)
> +
> +        mocked['lchown'].assert_has_calls(expected_calls)
> +
> +    ###################################
> +    # Tests for: get_mapping_opts()
> +    ###################################
> +    def test_get_mapping_opts(self):
> +        """
> +        Ensures that get_mapping_opts() returns correct options for
> +        mapping value.
> +        """
> +        test_values = [
> +            {
> +                'mapping': [0, 1000, 10],
> +                'expected_result': {'first': 0, 'last': 10, 'offset': 1000},
> +            },
> +            {
> +                'mapping': [0, 1000, 10],
> +                'expected_result': {'first': 0, 'last': 10, 'offset': 1000},
> +            },
> +            {
> +                'mapping': [500, 1500, 1],
> +                'expected_result': {'first': 500, 'last': 501, 'offset': 1000},
> +            },
> +            {
> +                'mapping': [-1, -1, -1],
> +                'expected_result': {'first': 0, 'last': 1, 'offset': 0},
> +            }
> +        ]
> +
> +        for test in test_values:
> +            res = utils.get_mapping_opts(test['mapping'])
> +            self.assertEqual(test['expected_result'], res)
> +
> +    ###################################
> +    # Tests for: get_map_id()
> +    ###################################
> +    def test_get_map_id(self):
> +        """
> +        Ensures that get_map_id() returns correct UID/GID mapping value.
> +        """
> +        test_values = [
> +            {
> +                'old_id': 0,
> +                'mapping': [0, 1000, 10],
> +                'expected_result': 1000
> +            },
> +            {
> +                'old_id': 5,
> +                'mapping': [0, 500, 10],
> +                'expected_result': 505
> +            },
> +            {
> +                'old_id': 10,
> +                'mapping': [0, 100, 10],
> +                'expected_result': -1
> +            },
> +        ]
> +        for test in test_values:
> +            opts = utils.get_mapping_opts(test['mapping'])
> +            res = utils.get_map_id(test['old_id'], opts)
> +            self.assertEqual(test['expected_result'], res)
> +
>  
>  if __name__ == '__main__':
>      unittest.main(exit=False)
> diff --git a/tests/test_virt_bootstrap.py b/tests/test_virt_bootstrap.py
> index ff744f7..c0def7e 100644
> --- a/tests/test_virt_bootstrap.py
> +++ b/tests/test_virt_bootstrap.py
> @@ -61,157 +61,6 @@ class TestVirtBootstrap(unittest.TestCase):
>                        sources.FileSource)
>  
>      ###################################
> -    # Tests for: mapping_uid_gid()
> -    ###################################
> -    def test_mapping_uid_gid(self):
> -        """
> -        Ensures that mapping_uid_gid() calls map_id() with
> -        correct parameters.
> -        """
> -        dest = '/path'
> -        calls = [
> -            {  # Call 1
> -                'dest': dest,
> -                'uid': [[0, 1000, 10]],
> -                'gid': [[0, 1000, 10]]
> -            },
> -            {  # Call 2
> -                'dest': dest,
> -                'uid': [],
> -                'gid': [[0, 1000, 10]]
> -            },
> -            {  # Call 3
> -                'dest': dest,
> -                'uid': [[0, 1000, 10]],
> -                'gid': []
> -            },
> -            {  # Call 4
> -                'dest': dest,
> -                'uid': [[0, 1000, 10], [500, 500, 10]],
> -                'gid': [[0, 1000, 10]]
> -            }
> -        ]
> -
> -        expected_calls = [
> -            # Expected from call 1
> -            mock.call(dest, [0, 1000, 10], [0, 1000, 10]),
> -            # Expected from call 2
> -            mock.call(dest, None, [0, 1000, 10]),
> -            # Expected from call 3
> -            mock.call(dest, [0, 1000, 10], None),
> -            # Expected from call 4
> -            mock.call(dest, [0, 1000, 10], [0, 1000, 10]),
> -            mock.call(dest, [500, 500, 10], None)
> -
> -        ]
> -        with mock.patch('virtBootstrap.virt_bootstrap.map_id') as m_map_id:
> -            for args in calls:
> -                virt_bootstrap.mapping_uid_gid(args['dest'],
> -                                               args['uid'],
> -                                               args['gid'])
> -
> -        m_map_id.assert_has_calls(expected_calls)
> -
> -    ###################################
> -    # Tests for: map_id()
> -    ###################################
> -    @mock.patch('os.path.realpath')
> -    def test_map_id(self, m_realpath):
> -        """
> -        Ensures that the UID/GID mapping applies to all files
> -        and directories in root file system.
> -        """
> -        root_path = '/root'
> -        files = ['foo1', 'foo2']
> -        m_realpath.return_value = root_path
> -
> -        map_uid = [0, 1000, 10]
> -        map_gid = [0, 1000, 10]
> -        new_id = 'new_id'
> -
> -        expected_calls = [
> -            mock.call('/root', new_id, new_id),
> -            mock.call('/root/foo1', new_id, new_id),
> -            mock.call('/root/foo2', new_id, new_id)
> -        ]
> -
> -        with mock.patch.multiple('os',
> -                                 lchown=mock.DEFAULT,
> -                                 lstat=mock.DEFAULT,
> -                                 walk=mock.DEFAULT) as mocked:
> -
> -            mocked['walk'].return_value = [(root_path, [], files)]
> -            mocked['lstat']().st_uid = map_uid[0]
> -            mocked['lstat']().st_gid = map_gid[0]
> -
> -            get_map_id = 'virtBootstrap.virt_bootstrap.get_map_id'
> -            with mock.patch(get_map_id) as m_get_map_id:
> -                m_get_map_id.return_value = new_id
> -                virt_bootstrap.map_id(root_path, map_uid, map_gid)
> -
> -        mocked['lchown'].assert_has_calls(expected_calls)
> -
> -    ###################################
> -    # Tests for: get_mapping_opts()
> -    ###################################
> -    def test_get_mapping_opts(self):
> -        """
> -        Ensures that get_mapping_opts() returns correct options for
> -        mapping value.
> -        """
> -        test_values = [
> -            {
> -                'mapping': [0, 1000, 10],
> -                'expected_result': {'first': 0, 'last': 10, 'offset': 1000},
> -            },
> -            {
> -                'mapping': [0, 1000, 10],
> -                'expected_result': {'first': 0, 'last': 10, 'offset': 1000},
> -            },
> -            {
> -                'mapping': [500, 1500, 1],
> -                'expected_result': {'first': 500, 'last': 501, 'offset': 1000},
> -            },
> -            {
> -                'mapping': [-1, -1, -1],
> -                'expected_result': {'first': 0, 'last': 1, 'offset': 0},
> -            }
> -        ]
> -
> -        for test in test_values:
> -            res = virt_bootstrap.get_mapping_opts(test['mapping'])
> -            self.assertEqual(test['expected_result'], res)
> -
> -    ###################################
> -    # Tests for: get_map_id()
> -    ###################################
> -    def test_get_map_id(self):
> -        """
> -        Ensures that get_map_id() returns correct UID/GID mapping value.
> -        """
> -        test_values = [
> -            {
> -                'old_id': 0,
> -                'mapping': [0, 1000, 10],
> -                'expected_result': 1000
> -            },
> -            {
> -                'old_id': 5,
> -                'mapping': [0, 500, 10],
> -                'expected_result': 505
> -            },
> -            {
> -                'old_id': 10,
> -                'mapping': [0, 100, 10],
> -                'expected_result': -1
> -            },
> -        ]
> -        for test in test_values:
> -            opts = virt_bootstrap.get_mapping_opts(test['mapping'])
> -            res = virt_bootstrap.get_map_id(test['old_id'], opts)
> -            self.assertEqual(test['expected_result'], res)
> -
> -    ###################################
>      # Tests for: parse_idmap()
>      ###################################
>      def test_parse_idmap(self):
> @@ -415,9 +264,9 @@ class TestVirtBootstrap(unittest.TestCase):
>      def test_if_bootstrap_calls_set_mapping_uid_gid(self):
>          """
>          Ensures that bootstrap() calls mapping_uid_gid() when the argument
> -        uid_map or gid_map is specified.
> +        uid_map or gid_map is specified and format='dir'.
>          """
> -        src, dest, uid_map, gid_map = 'foo', 'bar', 'id', 'id'
> +        src, dest, fmt, uid_map, gid_map = 'foo', 'bar', 'dir', 'id', 'id'
>          expected_calls = [
>              mock.call('bar', None, 'id'),
>              mock.call('bar', 'id', None),
> @@ -427,18 +276,17 @@ class TestVirtBootstrap(unittest.TestCase):
>          with mock.patch.multiple(virt_bootstrap,
>                                   get_source=mock.DEFAULT,
>                                   os=mock.DEFAULT,
> -                                 mapping_uid_gid=mock.DEFAULT,
> -                                 utils=mock.DEFAULT,
> -                                 sys=mock.DEFAULT) as mocked:
> +                                 utils=mock.DEFAULT) as mocked:
>              mocked['os'].path.exists.return_value = True
>              mocked['os'].path.isdir.return_value = True
>              mocked['os'].access.return_value = True
>  
> -            virt_bootstrap.bootstrap(src, dest, gid_map=gid_map)
> -            virt_bootstrap.bootstrap(src, dest, uid_map=uid_map)
> -            virt_bootstrap.bootstrap(src, dest,
> +            virt_bootstrap.bootstrap(src, dest, fmt=fmt, gid_map=gid_map)
> +            virt_bootstrap.bootstrap(src, dest, fmt=fmt, uid_map=uid_map)
> +            virt_bootstrap.bootstrap(src, dest, fmt=fmt,
>                                       uid_map=uid_map, gid_map=gid_map)
> -        mocked['mapping_uid_gid'].assert_has_calls(expected_calls)
> +
> +        mocked['utils'].mapping_uid_gid.assert_has_calls(expected_calls)
>  
>      ###################################
>      # Tests for: set_logging_conf()

ACK

--
Cedric




More information about the virt-tools-list mailing list