[lvm-devel] master - tests: move into generated file
Zdenek Kabelac
zkabelac at sourceware.org
Tue May 15 20:05:21 UTC 2018
Gitweb: https://sourceware.org/git/?p=lvm2.git;a=commitdiff;h=be154e30e895bf08e00a8cf32ceb79b7c0f4b5a6
Commit: be154e30e895bf08e00a8cf32ceb79b7c0f4b5a6
Parent: ad756bb7083743b612d3fc66b268b605576f448a
Author: Zdenek Kabelac <zkabelac at redhat.com>
AuthorDate: Tue May 15 21:40:32 2018 +0200
Committer: Zdenek Kabelac <zkabelac at redhat.com>
CommitterDate: Tue May 15 22:02:41 2018 +0200
tests: move into generated file
Since python path is evaluated and we cannot use anymore /usr/bin/env
switch to generated file.
---
configure | 3 +-
configure.ac | 1 +
test/api/python_lvm_unit.py | 1048 ----------------------------------------
test/api/python_lvm_unit.py.in | 1048 ++++++++++++++++++++++++++++++++++++++++
4 files changed, 1051 insertions(+), 1049 deletions(-)
diff --git a/configure b/configure
index ce70d92..7d945df 100755
--- a/configure
+++ b/configure
@@ -15559,7 +15559,7 @@ _ACEOF
################################################################################
-ac_config_files="$ac_config_files Makefile make.tmpl daemons/Makefile daemons/clvmd/Makefile daemons/cmirrord/Makefile daemons/dmeventd/Makefile daemons/dmeventd/libdevmapper-event.pc daemons/dmeventd/plugins/Makefile daemons/dmeventd/plugins/lvm2/Makefile daemons/dmeventd/plugins/raid/Makefile daemons/dmeventd/plugins/mirror/Makefile daemons/dmeventd/plugins/snapshot/Makefile daemons/dmeventd/plugins/thin/Makefile daemons/dmfilemapd/Makefile daemons/lvmdbusd/Makefile daemons/lvmdbusd/lvmdbusd daemons/lvmdbusd/lvmdb.py daemons/lvmdbusd/lvm_shell_proxy.py daemons/lvmdbusd/path.py daemons/lvmetad/Makefile daemons/lvmpolld/Makefile daemons/lvmlockd/Makefile device_mapper/Makefile conf/Makefile conf/example.conf conf/lvmlocal.conf conf/command_profile_template.profile conf/metadata_profile_template.profile include/.symlinks include/Makefile lib/Makefile lib/locking/Makefile include/lvm-version.h libdaemon/Makefile libdaemon/client/Makefile libdaemon/server/Makefile libdm/Makefile libdm/
libdevmapper.pc liblvm/Makefile liblvm/liblvm2app.pc man/Makefile po/Makefile python/Makefile python/setup.py scripts/blkdeactivate.sh scripts/blk_availability_init_red_hat scripts/blk_availability_systemd_red_hat.service scripts/clvmd_init_red_hat scripts/cmirrord_init_red_hat scripts/com.redhat.lvmdbus1.service scripts/dm_event_systemd_red_hat.service scripts/dm_event_systemd_red_hat.socket scripts/lvm2_cluster_activation_red_hat.sh scripts/lvm2_cluster_activation_systemd_red_hat.service scripts/lvm2_clvmd_systemd_red_hat.service scripts/lvm2_cmirrord_systemd_red_hat.service scripts/lvm2_lvmdbusd_systemd_red_hat.service scripts/lvm2_lvmetad_init_red_hat scripts/lvm2_lvmetad_systemd_red_hat.service scripts/lvm2_lvmetad_systemd_red_hat.socket scripts/lvm2_lvmpolld_init_red_hat scripts/lvm2_lvmpolld_systemd_red_hat.service scripts/lvm2_lvmpolld_systemd_red_hat.socket scripts/lvm2_lvmlockd_systemd_red_hat.service scripts/lvm2_lvmlocking_systemd_red_hat.service scripts/lvm2_monitoring_
init_red_hat scripts/lvm2_monitoring_systemd_red_hat.service scripts/lvm2_pvscan_systemd_red_hat at .service scripts/lvm2_tmpfiles_red_hat.conf scripts/lvmdump.sh scripts/Makefile test/Makefile test/api/Makefile test/unit/Makefile tools/Makefile udev/Makefile"
+ac_config_files="$ac_config_files Makefile make.tmpl daemons/Makefile daemons/clvmd/Makefile daemons/cmirrord/Makefile daemons/dmeventd/Makefile daemons/dmeventd/libdevmapper-event.pc daemons/dmeventd/plugins/Makefile daemons/dmeventd/plugins/lvm2/Makefile daemons/dmeventd/plugins/raid/Makefile daemons/dmeventd/plugins/mirror/Makefile daemons/dmeventd/plugins/snapshot/Makefile daemons/dmeventd/plugins/thin/Makefile daemons/dmfilemapd/Makefile daemons/lvmdbusd/Makefile daemons/lvmdbusd/lvmdbusd daemons/lvmdbusd/lvmdb.py daemons/lvmdbusd/lvm_shell_proxy.py daemons/lvmdbusd/path.py daemons/lvmetad/Makefile daemons/lvmpolld/Makefile daemons/lvmlockd/Makefile device_mapper/Makefile conf/Makefile conf/example.conf conf/lvmlocal.conf conf/command_profile_template.profile conf/metadata_profile_template.profile include/.symlinks include/Makefile lib/Makefile lib/locking/Makefile include/lvm-version.h libdaemon/Makefile libdaemon/client/Makefile libdaemon/server/Makefile libdm/Makefile libdm/
libdevmapper.pc liblvm/Makefile liblvm/liblvm2app.pc man/Makefile po/Makefile python/Makefile python/setup.py scripts/blkdeactivate.sh scripts/blk_availability_init_red_hat scripts/blk_availability_systemd_red_hat.service scripts/clvmd_init_red_hat scripts/cmirrord_init_red_hat scripts/com.redhat.lvmdbus1.service scripts/dm_event_systemd_red_hat.service scripts/dm_event_systemd_red_hat.socket scripts/lvm2_cluster_activation_red_hat.sh scripts/lvm2_cluster_activation_systemd_red_hat.service scripts/lvm2_clvmd_systemd_red_hat.service scripts/lvm2_cmirrord_systemd_red_hat.service scripts/lvm2_lvmdbusd_systemd_red_hat.service scripts/lvm2_lvmetad_init_red_hat scripts/lvm2_lvmetad_systemd_red_hat.service scripts/lvm2_lvmetad_systemd_red_hat.socket scripts/lvm2_lvmpolld_init_red_hat scripts/lvm2_lvmpolld_systemd_red_hat.service scripts/lvm2_lvmpolld_systemd_red_hat.socket scripts/lvm2_lvmlockd_systemd_red_hat.service scripts/lvm2_lvmlocking_systemd_red_hat.service scripts/lvm2_monitoring_
init_red_hat scripts/lvm2_monitoring_systemd_red_hat.service scripts/lvm2_pvscan_systemd_red_hat at .service scripts/lvm2_tmpfiles_red_hat.conf scripts/lvmdump.sh scripts/Makefile test/Makefile test/api/Makefile test/api/python_lvm_unit.py test/unit/Makefile tools/Makefile udev/Makefile"
cat >confcache <<\_ACEOF
# This file is a shell script that caches the results of configure
@@ -16327,6 +16327,7 @@ do
"scripts/Makefile") CONFIG_FILES="$CONFIG_FILES scripts/Makefile" ;;
"test/Makefile") CONFIG_FILES="$CONFIG_FILES test/Makefile" ;;
"test/api/Makefile") CONFIG_FILES="$CONFIG_FILES test/api/Makefile" ;;
+ "test/api/python_lvm_unit.py") CONFIG_FILES="$CONFIG_FILES test/api/python_lvm_unit.py" ;;
"test/unit/Makefile") CONFIG_FILES="$CONFIG_FILES test/unit/Makefile" ;;
"tools/Makefile") CONFIG_FILES="$CONFIG_FILES tools/Makefile" ;;
"udev/Makefile") CONFIG_FILES="$CONFIG_FILES udev/Makefile" ;;
diff --git a/configure.ac b/configure.ac
index 7b517ef..e427708 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2159,6 +2159,7 @@ scripts/lvmdump.sh
scripts/Makefile
test/Makefile
test/api/Makefile
+test/api/python_lvm_unit.py
test/unit/Makefile
tools/Makefile
udev/Makefile
diff --git a/test/api/python_lvm_unit.py b/test/api/python_lvm_unit.py
deleted file mode 100755
index 296ca14..0000000
--- a/test/api/python_lvm_unit.py
+++ /dev/null
@@ -1,1048 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
-#
-# This file is part of LVM2.
-#
-# This copyrighted material is made available to anyone wishing to use,
-# modify, copy, or redistribute it subject to the terms and conditions
-# of the GNU General Public License v.2.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software Foundation,
-# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-import unittest
-import random
-import string
-import lvm
-import os
-import itertools
-import sys
-
-if sys.version_info[0] > 2:
- long = int
-
-# Set of basic unit tests for the python bindings.
-#
-# *** WARNING ***
-#
-# This test tries to only modify configuration for the list of allowed
-# PVs, but an error in it could potentially cause data loss if run on a
-# production system. Therefore it is strongly advised that this unit test
-# not be run on a system that contains data of value.
-
-fh = None
-
-
-def l(txt):
- if os.environ.get('PY_UNIT_LOG') is not None:
- global fh
- if fh is None:
- fh = open('/tmp/lvm_py_unit_test_' + rs(10), "a")
- fh.write(txt + "\n")
- fh.flush()
-
-
-def rs(rand_len=10):
- """
- Generate a random string
- """
- return ''.join(
- random.choice(string.ascii_uppercase)for x in range(rand_len))
-
-
-def _get_allowed_devices():
- rc = os.environ.get('PY_UNIT_PVS')
- if rc is not None:
- rc = rc.splitlines()
- rc.sort()
- return rc
-
-
-class AllowedPVS(object):
- """
- We are only allowed to muck with certain PV, filter to only
- the ones we can use.
- """
-
- def __init__(self):
- self.handle = None
- self.pvs_all = None
-
- def __enter__(self):
- rc = []
-
- allowed_dev = _get_allowed_devices()
-
- if allowed_dev:
- self.handle = lvm.listPvs()
- self.pvs_all = self.handle.open()
-
- for p in self.pvs_all:
- if p.getName() in allowed_dev:
- rc.append(p)
-
- #Sort them consistently
- rc.sort(key=lambda x: x.getName())
- return rc
-
- def __exit__(self, t_type, value, traceback):
- if self.handle:
- self.pvs_all = None
- self.handle.close()
-
-
-class TestLvm(unittest.TestCase):
-
- VG_P = os.environ.get('PREFIX')
-
- @staticmethod
- def _get_pv_device_names():
- rc = []
- with AllowedPVS() as pvs:
- for p in pvs:
- rc.append(p.getName())
- return rc
-
- @staticmethod
- def _create_thick_lv(device_list, name):
- vg = lvm.vgCreate(TestLvm.VG_P + "_" + name)
-
- for d in device_list:
- vg.extend(d)
-
- vg.createLvLinear(name, vg.getSize() / 2)
- vg.close()
- vg = None
-
- @staticmethod
- def _create_thin_pool(device_list, pool_name):
- vg = lvm.vgCreate(TestLvm.VG_P + "_" + pool_name)
-
- for d in device_list:
- vg.extend(d)
-
- vg.createLvThinpool(
- pool_name, vg.getSize() / 2, 0, 0, lvm.THIN_DISCARDS_PASSDOWN, 1)
- return vg
-
- @staticmethod
- def _create_thin_lv(pv_devices, name):
- thin_pool_name = 'thin_vg_pool_' + rs(4)
- vg = TestLvm._create_thin_pool(pv_devices, thin_pool_name)
- vg.createLvThin(thin_pool_name, name, vg.getSize() / 8)
- vg.close()
- vg = None
-
- @staticmethod
- def _vg_names():
- rc = []
- vg_names = lvm.listVgNames()
-
- for i in vg_names:
- if i[0:len(TestLvm.VG_P)] == TestLvm.VG_P:
- rc.append(i)
-
- return rc
-
- @staticmethod
- def _get_lv(lv_vol_type=None, lv_name=None):
- vg_name_list = TestLvm._vg_names()
- for vg_name in vg_name_list:
- vg = lvm.vgOpen(vg_name, "w")
- lvs = vg.listLVs()
-
- for lv in lvs:
- attr = lv.getAttr()
- if lv_vol_type or lv_name:
- if lv_vol_type is not None and attr[0] == lv_vol_type:
- return lv, vg
- elif lv_name is not None and lv_name == lv.getName():
- return lv, vg
- else:
- return lv, vg
- vg.close()
- return None, None
-
- @staticmethod
- def _remove_vg(vg_name):
- vg = lvm.vgOpen(vg_name, 'w')
-
- pvs = vg.listPVs()
-
- pe_devices = []
-
- #Remove old snapshots first, then lv
- for lv in vg.listLVs():
- attr = lv.getAttr()
- if attr[0] == 's':
- lv.remove()
-
- lvs = vg.listLVs()
-
- #Now remove any thin lVs
- for lv in vg.listLVs():
- attr = lv.getAttr()
- if attr[0] == 'V':
- lv.remove()
-
- #now remove the rest
- for lv in vg.listLVs():
- name = lv.getName()
-
- #Don't remove the hidden ones
- if '_tmeta' not in name and '_tdata' not in name:
- lv.remove()
-
- for p in pvs:
- pe_devices.append(p.getName())
-
- for pv in pe_devices[:-1]:
- vg.reduce(pv)
-
- vg.remove()
- vg.close()
-
- @staticmethod
- def _clean_up():
- #Clear out the testing PVs, but only if they contain stuff
- #this unit test created
- for vg_n in TestLvm._vg_names():
- TestLvm._remove_vg(vg_n)
-
- for d in TestLvm._get_pv_device_names():
- lvm.pvRemove(d)
- lvm.pvCreate(d)
-
- def setUp(self):
- device_list = TestLvm._get_pv_device_names()
-
- #Make sure we have an adequate number of PVs to use
- self.assertTrue(len(device_list) >= 4)
- TestLvm._clean_up()
-
- def tearDown(self):
- TestLvm._clean_up()
-
- def test_pv_resize(self):
- with AllowedPVS() as pvs:
- pv = pvs[0]
- curr_size = pv.getSize()
- dev_size = pv.getDevSize()
- self.assertTrue(curr_size == dev_size)
- pv.resize(curr_size / 2)
- with AllowedPVS() as pvs:
- pv = pvs[0]
- resized_size = pv.getSize()
- self.assertTrue(resized_size != curr_size)
- pv.resize(dev_size)
-
- def test_pv_life_cycle(self):
- """
- Test removing and re-creating a PV
- """
- target_name = None
-
- with AllowedPVS() as pvs:
- pv = pvs[0]
- target_name = pv.getName()
- lvm.pvRemove(target_name)
-
- with AllowedPVS() as pvs:
- for p in pvs:
- self.assertTrue(p.getName() != target_name)
-
- lvm.pvCreate(target_name, 0)
-
- with AllowedPVS() as pvs:
- found = False
- for p in pvs:
- if p.getName() == target_name:
- found = True
-
- self.assertTrue(found)
-
- @staticmethod
- def _test_pv_methods():
- with AllowedPVS() as pvs:
- for p in pvs:
- p.getName()
- p.getUuid()
- p.getMdaCount()
- p.getSize()
- p.getDevSize()
- p.getFree()
- p = None
-
- def test_version(self):
- version = lvm.getVersion()
- self.assertNotEquals(version, None)
- self.assertEquals(type(version), str)
- self.assertTrue(len(version) > 0)
-
- def test_pv_getters(self):
- with AllowedPVS() as pvs:
- pv = pvs[0]
- self.assertEqual(type(pv.getName()), str)
- self.assertTrue(len(pv.getName()) > 0)
-
- self.assertEqual(type(pv.getUuid()), str)
- self.assertTrue(len(pv.getUuid()) > 0)
-
- self.assertTrue(
- type(pv.getMdaCount()) == int or
- type(pv.getMdaCount()) == long)
-
- self.assertTrue(
- type(pv.getSize()) == int or
- type(pv.getSize()) == long)
-
- self.assertTrue(
- type(pv.getDevSize()) == int or
- type(pv.getSize()) == long)
-
- self.assertTrue(
- type(pv.getFree()) == int or
- type(pv.getFree()) == long)
-
- def _test_prop(self, prop_obj, prop, var_type, settable):
- result = prop_obj.getProperty(prop)
-
- #If we have no string value we can get a None type back
- if result[0] is not None:
- self.assertEqual(type(result[0]), var_type)
- else:
- self.assertTrue(str == var_type)
- self.assertEqual(type(result[1]), bool)
- self.assertTrue(result[1] == settable)
-
- def test_pv_segs(self):
- with AllowedPVS() as pvs:
- pv = pvs[0]
- pv_segs = pv.listPVsegs()
-
- #LVsegs returns a tuple, (value, bool settable)
- #TODO: Test other properties of pv_seg
- for i in pv_segs:
- self._test_prop(i, 'pvseg_start', long, False)
-
- def test_pv_property(self):
- with AllowedPVS() as pvs:
- pv = pvs[0]
- self._test_prop(pv, 'pv_mda_count', long, False)
-
- def test_lv_property(self):
- lv_name = 'lv_test'
- TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
- lv, vg = TestLvm._get_lv(None, lv_name)
-
- lv_seg_properties = [
- ('chunk_size', long, False), ('devices', str, False),
- ('discards', str, False), ('region_size', long, False),
- ('segtype', str, False), ('seg_pe_ranges', str, False),
- ('seg_size', long, False), ('seg_size_pe', long, False),
- ('seg_start', long, False), ('seg_start_pe', long, False),
- ('seg_tags', str, False), ('stripes', long, False),
- ('stripe_size', long, False), ('thin_count', long, False),
- ('transaction_id', long, False), ('zero', long, False)]
-
- lv_properties = [
- ('convert_lv', str, False), ('copy_percent', long, False),
- ('data_lv', str, False), ('lv_attr', str, False),
- ('lv_host', str, False), ('lv_kernel_major', long, False),
- ('lv_kernel_minor', long, False),
- ('lv_kernel_read_ahead', long, False),
- ('lv_major', long, False), ('lv_minor', long, False),
- ('lv_name', str, False), ('lv_path', str, False),
- ('lv_profile', str, False), ('lv_read_ahead', long, False),
- ('lv_size', long, False), ('lv_tags', str, False),
- ('lv_time', str, False), ('lv_uuid', str, False),
- ('metadata_lv', str, False), ('mirror_log', str, False),
- ('lv_modules', str, False), ('move_pv', str, False),
- ('origin', str, False), ('origin_size', long, False),
- ('pool_lv', str, False), ('raid_max_recovery_rate', long, False),
- ('raid_min_recovery_rate', long, False),
- ('raid_mismatch_count', long, False),
- ('raid_sync_action', str, False),
- ('raid_write_behind', long, False), ('seg_count', long, False),
- ('snap_percent', long, False), ('sync_percent', long, False)]
-
- # Generic test case, make sure we get what we expect
- for t in lv_properties:
- self._test_prop(lv, *t)
-
- segments = lv.listLVsegs()
- if segments and len(segments):
- for s in segments:
- for t in lv_seg_properties:
- self._test_prop(s, *t)
-
- # Test specific cases
- tag = 'hello_world'
- lv.addTag(tag)
- tags = lv.getProperty('lv_tags')
- self.assertTrue(tag in tags[0])
- vg.close()
-
- def test_lv_tags(self):
- lv_name = 'lv_test'
- TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
- lv, vg = TestLvm._get_lv(None, lv_name)
- self._test_tags(lv)
- vg.close()
-
- def test_lv_active_inactive(self):
- lv_name = 'lv_test'
- TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
- lv, vg = TestLvm._get_lv(None, lv_name)
- lv.deactivate()
- self.assertTrue(lv.isActive() is False)
- lv.activate()
- self.assertTrue(lv.isActive() is True)
- vg.close()
-
- def test_lv_rename(self):
- lv_name = 'lv_test'
- TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
- lv, vg = TestLvm._get_lv(None, lv_name)
-
- current_name = lv.getName()
- new_name = rs()
- lv.rename(new_name)
- self.assertEqual(lv.getName(), new_name)
- lv.rename(current_name)
- vg.close()
-
- def test_lv_persistence(self):
- # Make changes to the lv, close the vg and re-open to make sure that
- # the changes persist
- lv_name = 'lv_test_persist'
- TestLvm._create_thick_lv(TestLvm._get_pv_device_names(), lv_name)
-
- # Test rename
- lv, vg = TestLvm._get_lv(None, lv_name)
- current_name = lv.getName()
- new_name = rs()
- lv.rename(new_name)
-
- vg.close()
- vg = None
-
- lv, vg = TestLvm._get_lv(None, new_name)
-
- self.assertTrue(lv is not None)
-
- if lv and vg:
- lv.rename(lv_name)
- vg.close()
- vg = None
-
- # Test lv tag add
- tag = 'hello_world'
-
- lv, vg = TestLvm._get_lv(None, lv_name)
- lv.addTag(tag)
- vg.close()
- vg = None
-
- lv, vg = TestLvm._get_lv(None, lv_name)
- tags = lv.getTags()
-
- self.assertTrue(tag in tags)
- vg.close()
- vg = None
-
- # Test lv tag delete
- lv, vg = TestLvm._get_lv(None, lv_name)
- self.assertTrue(lv is not None and vg is not None)
-
- if lv and vg:
- tags = lv.getTags()
-
- for t in tags:
- lv.removeTag(t)
-
- vg.close()
- vg = None
-
- lv, vg = TestLvm._get_lv(None, lv_name)
- self.assertTrue(lv is not None and vg is not None)
-
- if lv and vg:
- tags = lv.getTags()
-
- if tags:
- self.assertEqual(len(tags), 0)
- vg.close()
- vg = None
-
- # Test lv deactivate
- lv, vg = TestLvm._get_lv(None, lv_name)
- self.assertTrue(lv is not None and vg is not None)
-
- if lv and vg:
- lv.deactivate()
- vg.close()
- vg = None
-
- lv, vg = TestLvm._get_lv(None, lv_name)
- self.assertTrue(lv is not None and vg is not None)
- if lv and vg:
- self.assertFalse(lv.isActive())
- vg.close()
- vg = None
-
- # Test lv activate
- lv, vg = TestLvm._get_lv(None, lv_name)
- self.assertTrue(lv is not None and vg is not None)
- if lv and vg:
- lv.activate()
- vg.close()
- vg = None
-
- lv, vg = TestLvm._get_lv(None, lv_name)
- self.assertTrue(lv is not None and vg is not None)
- if lv and vg:
- self.assertTrue(lv.isActive())
- vg.close()
- vg = None
-
- def test_lv_snapshot(self):
-
- thin_lv = 'thin_lv'
- thick_lv = 'thick_lv'
-
- device_names = TestLvm._get_pv_device_names()
-
- TestLvm._create_thin_lv(device_names[0:2], thin_lv)
- TestLvm._create_thick_lv(device_names[2:4], thick_lv)
-
- lv, vg = TestLvm._get_lv(None, thick_lv)
-# FIXME lv.snapshot('thick_snap_shot', 1024*1024)
- vg.close()
-
-# FIXME thick_ss, vg = TestLvm._get_lv(None, 'thick_snap_shot')
-# FIXME self.assertTrue(thick_ss is not None)
-# FIXME vg.close()
-
- thin_lv, vg = TestLvm._get_lv(None, thin_lv)
- thin_lv.snapshot('thin_snap_shot')
- vg.close()
-
- thin_ss, vg = TestLvm._get_lv(None, 'thin_snap_shot')
- self.assertTrue(thin_ss is not None)
-
- origin = thin_ss.getOrigin()
- self.assertTrue(thin_lv, origin)
-
- vg.close()
-
- def test_lv_suspend(self):
- lv_name = 'lv_test'
- TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
- lv, vg = TestLvm._get_lv(None, lv_name)
-
- result = lv.isSuspended()
- self.assertTrue(type(result) == bool)
- vg.close()
-
- def test_lv_size(self):
- lv_name = 'lv_test'
- TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
- lv, vg = TestLvm._get_lv(None, lv_name)
-
- result = lv.getSize()
- self.assertTrue(type(result) == int or type(result) == long)
- vg.close()
-
- def test_lv_resize(self):
- lv_name = 'lv_test'
- TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
- lv, vg = TestLvm._get_lv(None, lv_name)
-
- curr_size = lv.getSize()
- lv.resize(curr_size + (1024 * 1024))
- latest = lv.getSize()
- self.assertTrue(curr_size != latest)
-
- def test_lv_seg(self):
- lv_name = 'lv_test'
- TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
- lv, vg = TestLvm._get_lv(None, lv_name)
-
- lv_segs = lv.listLVsegs()
-
- #LVsegs returns a tuple, (value, bool settable)
- #TODO: Test other properties of lv_seg
- for i in lv_segs:
- self._test_prop(i, 'seg_start_pe', long, False)
-
- vg.close()
-
- def test_get_set_extend_size(self):
- thick_lv = 'get_set_prop'
- device_names = TestLvm._get_pv_device_names()
- TestLvm._create_thick_lv(device_names[0:2], thick_lv)
- lv, vg = TestLvm._get_lv(None, thick_lv)
-
- new_extent = 1024 * 1024 * 4
-
- self.assertFalse(
- vg.getExtentSize() != new_extent,
- "Cannot determine if it works if they are the same")
-
- vg.setExtentSize(new_extent)
- self.assertEqual(vg.getExtentSize(), new_extent)
- vg.close()
-
- def test_vg_get_set_prop(self):
- thick_lv = 'get_set_prop'
- device_names = TestLvm._get_pv_device_names()
- TestLvm._create_thick_lv(device_names[0:2], thick_lv)
- lv, vg = TestLvm._get_lv(None, thick_lv)
-
- self.assertTrue(vg is not None)
- if vg:
- vg_mda_copies = vg.getProperty('vg_mda_copies')
- vg.setProperty('vg_mda_copies', vg_mda_copies[0])
- vg.close()
-
- def test_vg_remove_restore(self):
- #Store off the list of physical devices
- pv_devices = []
-
- thick_lv = 'get_set_prop'
- device_names = TestLvm._get_pv_device_names()
- TestLvm._create_thick_lv(device_names[0:2], thick_lv)
- lv, vg = TestLvm._get_lv(None, thick_lv)
-
- vg_name = vg.getName()
-
- pvs = vg.listPVs()
- for p in pvs:
- pv_devices.append(p.getName())
- vg.close()
-
- TestLvm._remove_vg(vg_name)
- self._create_thick_lv(pv_devices, thick_lv)
-
- def test_vg_names(self):
- vg = lvm.listVgNames()
- self.assertTrue(isinstance(vg, tuple))
-
- def test_dupe_lv_create(self):
- """
- Try to create a lv with the same name expecting a failure
- Note: This was causing a seg. fault previously
- """
- thick_lv = 'dupe_name'
- device_names = TestLvm._get_pv_device_names()
- TestLvm._create_thick_lv(device_names[0:2], thick_lv)
- lv, vg = TestLvm._get_lv(None, thick_lv)
-
- self.assertTrue(vg is not None)
-
- if vg:
- lvs = vg.listLVs()
-
- if len(lvs):
- lv = lvs[0]
- lv_name = lv.getName()
- self.assertRaises(
- lvm.LibLVMError, vg.createLvLinear, lv_name, lv.getSize())
- vg.close()
-
- def test_vg_uuids(self):
-
- device_names = TestLvm._get_pv_device_names()
- TestLvm._create_thin_lv(device_names[0:2], 'thin')
- TestLvm._create_thick_lv(device_names[2:4], 'thick')
-
- vgs_uuids = lvm.listVgUuids()
-
- self.assertTrue(len(vgs_uuids) > 0)
- self.assertTrue(isinstance(vgs_uuids, tuple))
-
- vgs_uuids = list(vgs_uuids)
- vgs_names = lvm.listVgNames()
-
- for vg_name in vgs_names:
- vg = lvm.vgOpen(vg_name, "r")
-
- #TODO Write/fix BUG, vg uuid don't match between
- #lvm.listVgUuids and vg.getUuid()
- vg_uuid_search = vg.getUuid().replace('-', '')
-
- self.assertTrue(vg_uuid_search in vgs_uuids)
- vgs_uuids.remove(vg_uuid_search)
- vg.close()
-
- self.assertTrue(len(vgs_uuids) == 0)
-
- def test_pv_lookup_from_vg(self):
- device_names = TestLvm._get_pv_device_names()
- TestLvm._create_thin_lv(device_names[0:2], 'thin')
- TestLvm._create_thick_lv(device_names[2:4], 'thick')
-
- vg_names = TestLvm._vg_names()
-
- self.assertTrue(len(vg_names) > 0)
-
- for vg_name in vg_names:
- vg = lvm.vgOpen(vg_name, 'w')
- pvs = vg.listPVs()
-
- for p in pvs:
- name = p.getName()
- uuid = p.getUuid()
-
- pv_name_lookup = vg.pvFromName(name)
- pv_uuid_lookup = vg.pvFromUuid(uuid)
-
- self.assertTrue(
- pv_name_lookup.getName() == pv_uuid_lookup.getName())
- self.assertTrue(
- pv_name_lookup.getUuid() == pv_uuid_lookup.getUuid())
-
- self.assertTrue(name == pv_name_lookup.getName())
- self.assertTrue(uuid == pv_uuid_lookup.getUuid())
-
- pv_name_lookup = None
- pv_uuid_lookup = None
- p = None
-
- pvs = None
- vg.close()
-
- def test_percent_to_float(self):
- self.assertEqual(lvm.percentToFloat(0), 0.0)
- self.assertEqual(lvm.percentToFloat(1000000), 1.0)
- self.assertEqual(lvm.percentToFloat(1000000 / 2), 0.5)
-
- def test_scan(self):
- self.assertEqual(lvm.scan(), None)
-
- def test_config_reload(self):
- self.assertEqual(lvm.configReload(), None)
-
- def test_config_override(self):
- self.assertEquals(lvm.configOverride("global.test = 1"), None)
-
- def test_config_find_bool(self):
- either_or = lvm.configFindBool("global/fallback_to_local_locking")
- self.assertTrue(type(either_or) == bool)
- self.assertTrue(lvm.configFindBool("global/locking_type"))
-
- def test_vg_from_pv_lookups(self):
- device_names = TestLvm._get_pv_device_names()
- TestLvm._create_thin_lv(device_names[0:2], 'thin')
- TestLvm._create_thick_lv(device_names[2:4], 'thick')
-
- vgname_list = TestLvm._vg_names()
-
- self.assertTrue(len(vgname_list) > 0)
-
- for vg_name in vgname_list:
- vg = lvm.vgOpen(vg_name, 'r')
-
- vg_name = vg.getName()
-
- pv_list = vg.listPVs()
- for pv in pv_list:
- vg_name_from_pv = lvm.vgNameFromPvid(pv.getUuid())
- self.assertEquals(vg_name, vg_name_from_pv)
- self.assertEqual(vg_name, lvm.vgNameFromDevice(pv.getName()))
- vg.close()
-
- def test_vg_get_name(self):
- device_names = TestLvm._get_pv_device_names()
- TestLvm._create_thin_lv(device_names[0:2], 'thin')
- TestLvm._create_thick_lv(device_names[2:4], 'thick')
-
- vgname_list = TestLvm._vg_names()
-
- self.assertTrue(len(vgname_list) > 0)
-
- for vg_name in vgname_list:
- vg = lvm.vgOpen(vg_name, 'r')
- self.assertEqual(vg.getName(), vg_name)
- vg.close()
-
- def test_vg_get_uuid(self):
- device_names = TestLvm._get_pv_device_names()
- TestLvm._create_thin_lv(device_names[0:2], 'thin')
- TestLvm._create_thick_lv(device_names[2:4], 'thick')
-
- vgname_list = TestLvm._vg_names()
-
- self.assertTrue(len(vgname_list) > 0)
-
- for vg_name in vgname_list:
- vg = lvm.vgOpen(vg_name, 'r')
- uuid = vg.getUuid()
- self.assertNotEqual(uuid, None)
- self.assertTrue(len(uuid) > 0)
- vg.close()
-
- RETURN_NUMERIC = [
- "getSeqno", "getSize", "getFreeSize", "getFreeSize",
- "getExtentSize", "getExtentCount", "getFreeExtentCount",
- "getPvCount", "getMaxPv", "getMaxLv"]
-
- def test_vg_getters(self):
- device_names = TestLvm._get_pv_device_names()
- TestLvm._create_thin_lv(device_names[0:2], 'thin')
- TestLvm._create_thick_lv(device_names[2:4], 'thick')
-
- vg_name_list = TestLvm._vg_names()
-
- self.assertTrue(len(vg_name_list) > 0)
-
- for vg_name in vg_name_list:
- vg = lvm.vgOpen(vg_name, 'r')
- self.assertTrue(type(vg.isClustered()) == bool)
- self.assertTrue(type(vg.isExported()) == bool)
- self.assertTrue(type(vg.isPartial()) == bool)
-
- #Loop through the list invoking the method
- for method_name in TestLvm.RETURN_NUMERIC:
- method = getattr(vg, method_name)
- result = method()
- self.assertTrue(type(result) == int or type(result) == long)
-
- vg.close()
-
- def _test_tags(self, tag_obj):
- existing_tags = tag_obj.getTags()
- self.assertTrue(type(existing_tags) == tuple)
-
- num_tags = random.randint(2, 40)
- created_tags = []
-
- for i in range(num_tags):
- tag_name = rs(random.randint(1, 128))
- tag_obj.addTag(tag_name)
- created_tags.append(tag_name)
-
- tags = tag_obj.getTags()
- self.assertTrue(len(existing_tags) + len(created_tags) == len(tags))
-
- num_remove = len(created_tags)
-
- for i in range(num_remove):
- tag_to_remove = created_tags[
- random.randint(0, len(created_tags) - 1)]
-
- created_tags.remove(tag_to_remove)
-
- tag_obj.removeTag(tag_to_remove)
-
- current_tags = tag_obj.getTags()
- self.assertFalse(tag_to_remove in current_tags)
-
- current_tags = tag_obj.getTags()
- self.assertTrue(len(current_tags) == len(existing_tags))
- for e in existing_tags:
- self.assertTrue(e in current_tags)
-
- def test_vg_tags(self):
- device_names = TestLvm._get_pv_device_names()
-
- i = 0
- for d in device_names:
- if i % 2 == 0:
- TestLvm._create_thin_lv([d], "thin_lv%d" % i)
- else:
- TestLvm._create_thick_lv([d], "thick_lv%d" % i)
- i += 1
-
- for vg_name in TestLvm._vg_names():
- vg = lvm.vgOpen(vg_name, 'w')
- self._test_tags(vg)
- vg.close()
-
- @staticmethod
- def _test_listing():
-
- env = os.environ
-
- for k, v in env.items():
- l("%s:%s" % (k, v))
-
- with lvm.listPvs() as pvs:
- for p in pvs:
- l('pv= %s' % p.getName())
-
- l('Checking for VG')
- for v in lvm.listVgNames():
- l('vg= %s' % v)
-
- def test_pv_empty_listing(self):
- #We had a bug where we would seg. fault if we had no PVs.
-
- l('testPVemptylisting entry')
-
- device_names = TestLvm._get_pv_device_names()
-
- for d in device_names:
- l("Removing %s" % d)
- lvm.pvRemove(d)
-
- count = 0
-
- with lvm.listPvs() as pvs:
- for p in pvs:
- count += 1
- l('pv= %s' % p.getName())
-
- self.assertTrue(count == 0)
-
- for d in device_names:
- lvm.pvCreate(d)
-
- def test_pv_create(self):
- size = [0, 1024 * 1024 * 8]
- pvmeta_copies = [0, 1, 2]
- pvmeta_size = [0, 255, 512, 1024]
- data_alignment = [0, 2048, 4096]
- zero = [0, 1]
-
- device_names = TestLvm._get_pv_device_names()
-
- for d in device_names:
- lvm.pvRemove(d)
-
- d = device_names[0]
-
- #Test some error cases
- self.assertRaises(TypeError, lvm.pvCreate, None)
- self.assertRaises(lvm.LibLVMError, lvm.pvCreate, '')
- self.assertRaises(lvm.LibLVMError, lvm.pvCreate, d, 4)
- self.assertRaises(lvm.LibLVMError, lvm.pvCreate, d, 0, 4)
- self.assertRaises(lvm.LibLVMError, lvm.pvCreate, d, 0, 0, 0, 2 ** 34)
- self.assertRaises(
- lvm.LibLVMError, lvm.pvCreate, d, 0, 0, 0, 4096, 2 ** 34)
-
- #Try a number of combinations and permutations
- for s in size:
- for copies in pvmeta_copies:
- for pv_size in pvmeta_size:
- for align in data_alignment:
- for z in zero:
- lvm.pvCreate(d, s, copies, pv_size, align,
- align, z)
- lvm.pvRemove(d)
-
- #Restore
- for d in device_names:
- lvm.pvCreate(d)
-
- def test_vg_reduce(self):
- # Test the case where we try to reduce a vg where the last PV has
- # no metadata copies. In this case the reduce should fail.
- vg_name = TestLvm.VG_P + 'reduce_test'
-
- device_names = TestLvm._get_pv_device_names()
-
- for d in device_names:
- lvm.pvRemove(d)
-
- lvm.pvCreate(device_names[0], 0, 0) # Size all, pvmetadatacopies 0
- lvm.pvCreate(device_names[1])
- lvm.pvCreate(device_names[2])
- lvm.pvCreate(device_names[3])
-
- vg = lvm.vgCreate(vg_name)
-
- vg.extend(device_names[3])
- vg.extend(device_names[2])
- vg.extend(device_names[1])
- vg.extend(device_names[0])
- vg.close()
-
- vg = None
-
- vg = lvm.vgOpen(vg_name, 'w')
-
- vg.reduce(device_names[3])
- vg.reduce(device_names[2])
-
- self.assertRaises(lvm.LibLVMError, vg.reduce, device_names[1])
-
- vg.close()
- vg = None
-
- vg = lvm.vgOpen(vg_name, 'w')
- vg.remove()
- vg.close()
-
- @staticmethod
- def _test_valid_names(method):
- sample = 'azAZ09._-+'
-
- method('x' * 127)
- method('.X')
- method('..X')
-
- for i in range(1, 7):
- tests = (''.join(i) for i in itertools.product(sample, repeat=i))
- for t in tests:
- if t == '.' or t == '..':
- t += 'X'
- elif t.startswith('-'):
- t = 'H' + t
- method(t)
-
- def _test_bad_names(self, method, dupe_name):
- # Test for duplicate name
- self.assertRaises(lvm.LibLVMError, method, dupe_name)
-
- # Test for too long a name
- self.assertRaises(lvm.LibLVMError, method, ('x' * 128))
-
- # Test empty
- self.assertRaises(lvm.LibLVMError, method, '')
-
- # Invalid characters
- self.assertRaises(lvm.LibLVMError, method, '&invalid^char')
-
- # Cannot start with .. and no following characters
- self.assertRaises(lvm.LibLVMError, method, '..')
-
- # Cannot start with . and no following characters
- self.assertRaises(lvm.LibLVMError, method, '.')
-
- # Cannot start with a hyphen
- self.assertRaises(lvm.LibLVMError, method, '-not_good')
-
- def _lv_reserved_names(self, method):
- prefixes = ['snapshot', 'pvmove']
- reserved = [
- '_mlog', '_mimage', '_pmspare', '_rimage', '_rmeta',
- '_vorigin', '_tdata', '_tmeta']
-
- for p in prefixes:
- self.assertRaises(lvm.LibLVMError, method, p + rs(3))
-
- for r in reserved:
- self.assertRaises(lvm.LibLVMError, method, rs(3) + r + rs(1))
- self.assertRaises(lvm.LibLVMError, method, r + rs(1))
-
- def test_vg_lv_name_validate(self):
- lv_name = 'vg_lv_name_validate'
- TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
- lv, vg = TestLvm._get_lv(None, lv_name)
-
- self._test_bad_names(lvm.vgNameValidate, vg.getName())
- self._test_bad_names(vg.lvNameValidate, lv.getName())
-
- # Test good values
- TestLvm._test_valid_names(lvm.vgNameValidate)
- TestLvm._test_valid_names(vg.lvNameValidate)
- self._lv_reserved_names(vg.lvNameValidate)
-
- vg.close()
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/test/api/python_lvm_unit.py.in b/test/api/python_lvm_unit.py.in
new file mode 100755
index 0000000..2f9cbb5
--- /dev/null
+++ b/test/api/python_lvm_unit.py.in
@@ -0,0 +1,1048 @@
+#!@PYTHON@
+
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This file is part of LVM2.
+#
+# This copyrighted material is made available to anyone wishing to use,
+# modify, copy, or redistribute it subject to the terms and conditions
+# of the GNU General Public License v.2.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software Foundation,
+# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import unittest
+import random
+import string
+import lvm
+import os
+import itertools
+import sys
+
+if sys.version_info[0] > 2:
+ long = int
+
+# Set of basic unit tests for the python bindings.
+#
+# *** WARNING ***
+#
+# This test tries to only modify configuration for the list of allowed
+# PVs, but an error in it could potentially cause data loss if run on a
+# production system. Therefore it is strongly advised that this unit test
+# not be run on a system that contains data of value.
+
+fh = None
+
+
+def l(txt):
+ if os.environ.get('PY_UNIT_LOG') is not None:
+ global fh
+ if fh is None:
+ fh = open('/tmp/lvm_py_unit_test_' + rs(10), "a")
+ fh.write(txt + "\n")
+ fh.flush()
+
+
+def rs(rand_len=10):
+ """
+ Generate a random string
+ """
+ return ''.join(
+ random.choice(string.ascii_uppercase)for x in range(rand_len))
+
+
+def _get_allowed_devices():
+ rc = os.environ.get('PY_UNIT_PVS')
+ if rc is not None:
+ rc = rc.splitlines()
+ rc.sort()
+ return rc
+
+
+class AllowedPVS(object):
+ """
+ We are only allowed to muck with certain PV, filter to only
+ the ones we can use.
+ """
+
+ def __init__(self):
+ self.handle = None
+ self.pvs_all = None
+
+ def __enter__(self):
+ rc = []
+
+ allowed_dev = _get_allowed_devices()
+
+ if allowed_dev:
+ self.handle = lvm.listPvs()
+ self.pvs_all = self.handle.open()
+
+ for p in self.pvs_all:
+ if p.getName() in allowed_dev:
+ rc.append(p)
+
+ #Sort them consistently
+ rc.sort(key=lambda x: x.getName())
+ return rc
+
+ def __exit__(self, t_type, value, traceback):
+ if self.handle:
+ self.pvs_all = None
+ self.handle.close()
+
+
+class TestLvm(unittest.TestCase):
+
+ VG_P = os.environ.get('PREFIX')
+
+ @staticmethod
+ def _get_pv_device_names():
+ rc = []
+ with AllowedPVS() as pvs:
+ for p in pvs:
+ rc.append(p.getName())
+ return rc
+
+ @staticmethod
+ def _create_thick_lv(device_list, name):
+ vg = lvm.vgCreate(TestLvm.VG_P + "_" + name)
+
+ for d in device_list:
+ vg.extend(d)
+
+ vg.createLvLinear(name, vg.getSize() / 2)
+ vg.close()
+ vg = None
+
+ @staticmethod
+ def _create_thin_pool(device_list, pool_name):
+ vg = lvm.vgCreate(TestLvm.VG_P + "_" + pool_name)
+
+ for d in device_list:
+ vg.extend(d)
+
+ vg.createLvThinpool(
+ pool_name, vg.getSize() / 2, 0, 0, lvm.THIN_DISCARDS_PASSDOWN, 1)
+ return vg
+
+ @staticmethod
+ def _create_thin_lv(pv_devices, name):
+ thin_pool_name = 'thin_vg_pool_' + rs(4)
+ vg = TestLvm._create_thin_pool(pv_devices, thin_pool_name)
+ vg.createLvThin(thin_pool_name, name, vg.getSize() / 8)
+ vg.close()
+ vg = None
+
+ @staticmethod
+ def _vg_names():
+ rc = []
+ vg_names = lvm.listVgNames()
+
+ for i in vg_names:
+ if i[0:len(TestLvm.VG_P)] == TestLvm.VG_P:
+ rc.append(i)
+
+ return rc
+
+ @staticmethod
+ def _get_lv(lv_vol_type=None, lv_name=None):
+ vg_name_list = TestLvm._vg_names()
+ for vg_name in vg_name_list:
+ vg = lvm.vgOpen(vg_name, "w")
+ lvs = vg.listLVs()
+
+ for lv in lvs:
+ attr = lv.getAttr()
+ if lv_vol_type or lv_name:
+ if lv_vol_type is not None and attr[0] == lv_vol_type:
+ return lv, vg
+ elif lv_name is not None and lv_name == lv.getName():
+ return lv, vg
+ else:
+ return lv, vg
+ vg.close()
+ return None, None
+
+ @staticmethod
+ def _remove_vg(vg_name):
+ vg = lvm.vgOpen(vg_name, 'w')
+
+ pvs = vg.listPVs()
+
+ pe_devices = []
+
+ #Remove old snapshots first, then lv
+ for lv in vg.listLVs():
+ attr = lv.getAttr()
+ if attr[0] == 's':
+ lv.remove()
+
+ lvs = vg.listLVs()
+
+ #Now remove any thin lVs
+ for lv in vg.listLVs():
+ attr = lv.getAttr()
+ if attr[0] == 'V':
+ lv.remove()
+
+ #now remove the rest
+ for lv in vg.listLVs():
+ name = lv.getName()
+
+ #Don't remove the hidden ones
+ if '_tmeta' not in name and '_tdata' not in name:
+ lv.remove()
+
+ for p in pvs:
+ pe_devices.append(p.getName())
+
+ for pv in pe_devices[:-1]:
+ vg.reduce(pv)
+
+ vg.remove()
+ vg.close()
+
+ @staticmethod
+ def _clean_up():
+ #Clear out the testing PVs, but only if they contain stuff
+ #this unit test created
+ for vg_n in TestLvm._vg_names():
+ TestLvm._remove_vg(vg_n)
+
+ for d in TestLvm._get_pv_device_names():
+ lvm.pvRemove(d)
+ lvm.pvCreate(d)
+
+ def setUp(self):
+ device_list = TestLvm._get_pv_device_names()
+
+ #Make sure we have an adequate number of PVs to use
+ self.assertTrue(len(device_list) >= 4)
+ TestLvm._clean_up()
+
+ def tearDown(self):
+ TestLvm._clean_up()
+
+ def test_pv_resize(self):
+ with AllowedPVS() as pvs:
+ pv = pvs[0]
+ curr_size = pv.getSize()
+ dev_size = pv.getDevSize()
+ self.assertTrue(curr_size == dev_size)
+ pv.resize(curr_size / 2)
+ with AllowedPVS() as pvs:
+ pv = pvs[0]
+ resized_size = pv.getSize()
+ self.assertTrue(resized_size != curr_size)
+ pv.resize(dev_size)
+
+ def test_pv_life_cycle(self):
+ """
+ Test removing and re-creating a PV
+ """
+ target_name = None
+
+ with AllowedPVS() as pvs:
+ pv = pvs[0]
+ target_name = pv.getName()
+ lvm.pvRemove(target_name)
+
+ with AllowedPVS() as pvs:
+ for p in pvs:
+ self.assertTrue(p.getName() != target_name)
+
+ lvm.pvCreate(target_name, 0)
+
+ with AllowedPVS() as pvs:
+ found = False
+ for p in pvs:
+ if p.getName() == target_name:
+ found = True
+
+ self.assertTrue(found)
+
+ @staticmethod
+ def _test_pv_methods():
+ with AllowedPVS() as pvs:
+ for p in pvs:
+ p.getName()
+ p.getUuid()
+ p.getMdaCount()
+ p.getSize()
+ p.getDevSize()
+ p.getFree()
+ p = None
+
+ def test_version(self):
+ version = lvm.getVersion()
+ self.assertNotEquals(version, None)
+ self.assertEquals(type(version), str)
+ self.assertTrue(len(version) > 0)
+
+ def test_pv_getters(self):
+ with AllowedPVS() as pvs:
+ pv = pvs[0]
+ self.assertEqual(type(pv.getName()), str)
+ self.assertTrue(len(pv.getName()) > 0)
+
+ self.assertEqual(type(pv.getUuid()), str)
+ self.assertTrue(len(pv.getUuid()) > 0)
+
+ self.assertTrue(
+ type(pv.getMdaCount()) == int or
+ type(pv.getMdaCount()) == long)
+
+ self.assertTrue(
+ type(pv.getSize()) == int or
+ type(pv.getSize()) == long)
+
+ self.assertTrue(
+ type(pv.getDevSize()) == int or
+ type(pv.getSize()) == long)
+
+ self.assertTrue(
+ type(pv.getFree()) == int or
+ type(pv.getFree()) == long)
+
+ def _test_prop(self, prop_obj, prop, var_type, settable):
+ result = prop_obj.getProperty(prop)
+
+ #If we have no string value we can get a None type back
+ if result[0] is not None:
+ self.assertEqual(type(result[0]), var_type)
+ else:
+ self.assertTrue(str == var_type)
+ self.assertEqual(type(result[1]), bool)
+ self.assertTrue(result[1] == settable)
+
+ def test_pv_segs(self):
+ with AllowedPVS() as pvs:
+ pv = pvs[0]
+ pv_segs = pv.listPVsegs()
+
+ #LVsegs returns a tuple, (value, bool settable)
+ #TODO: Test other properties of pv_seg
+ for i in pv_segs:
+ self._test_prop(i, 'pvseg_start', long, False)
+
+ def test_pv_property(self):
+ with AllowedPVS() as pvs:
+ pv = pvs[0]
+ self._test_prop(pv, 'pv_mda_count', long, False)
+
+ def test_lv_property(self):
+ lv_name = 'lv_test'
+ TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
+ lv, vg = TestLvm._get_lv(None, lv_name)
+
+ lv_seg_properties = [
+ ('chunk_size', long, False), ('devices', str, False),
+ ('discards', str, False), ('region_size', long, False),
+ ('segtype', str, False), ('seg_pe_ranges', str, False),
+ ('seg_size', long, False), ('seg_size_pe', long, False),
+ ('seg_start', long, False), ('seg_start_pe', long, False),
+ ('seg_tags', str, False), ('stripes', long, False),
+ ('stripe_size', long, False), ('thin_count', long, False),
+ ('transaction_id', long, False), ('zero', long, False)]
+
+ lv_properties = [
+ ('convert_lv', str, False), ('copy_percent', long, False),
+ ('data_lv', str, False), ('lv_attr', str, False),
+ ('lv_host', str, False), ('lv_kernel_major', long, False),
+ ('lv_kernel_minor', long, False),
+ ('lv_kernel_read_ahead', long, False),
+ ('lv_major', long, False), ('lv_minor', long, False),
+ ('lv_name', str, False), ('lv_path', str, False),
+ ('lv_profile', str, False), ('lv_read_ahead', long, False),
+ ('lv_size', long, False), ('lv_tags', str, False),
+ ('lv_time', str, False), ('lv_uuid', str, False),
+ ('metadata_lv', str, False), ('mirror_log', str, False),
+ ('lv_modules', str, False), ('move_pv', str, False),
+ ('origin', str, False), ('origin_size', long, False),
+ ('pool_lv', str, False), ('raid_max_recovery_rate', long, False),
+ ('raid_min_recovery_rate', long, False),
+ ('raid_mismatch_count', long, False),
+ ('raid_sync_action', str, False),
+ ('raid_write_behind', long, False), ('seg_count', long, False),
+ ('snap_percent', long, False), ('sync_percent', long, False)]
+
+ # Generic test case, make sure we get what we expect
+ for t in lv_properties:
+ self._test_prop(lv, *t)
+
+ segments = lv.listLVsegs()
+ if segments and len(segments):
+ for s in segments:
+ for t in lv_seg_properties:
+ self._test_prop(s, *t)
+
+ # Test specific cases
+ tag = 'hello_world'
+ lv.addTag(tag)
+ tags = lv.getProperty('lv_tags')
+ self.assertTrue(tag in tags[0])
+ vg.close()
+
+ def test_lv_tags(self):
+ lv_name = 'lv_test'
+ TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
+ lv, vg = TestLvm._get_lv(None, lv_name)
+ self._test_tags(lv)
+ vg.close()
+
+ def test_lv_active_inactive(self):
+ lv_name = 'lv_test'
+ TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
+ lv, vg = TestLvm._get_lv(None, lv_name)
+ lv.deactivate()
+ self.assertTrue(lv.isActive() is False)
+ lv.activate()
+ self.assertTrue(lv.isActive() is True)
+ vg.close()
+
+ def test_lv_rename(self):
+ lv_name = 'lv_test'
+ TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
+ lv, vg = TestLvm._get_lv(None, lv_name)
+
+ current_name = lv.getName()
+ new_name = rs()
+ lv.rename(new_name)
+ self.assertEqual(lv.getName(), new_name)
+ lv.rename(current_name)
+ vg.close()
+
+ def test_lv_persistence(self):
+ # Make changes to the lv, close the vg and re-open to make sure that
+ # the changes persist
+ lv_name = 'lv_test_persist'
+ TestLvm._create_thick_lv(TestLvm._get_pv_device_names(), lv_name)
+
+ # Test rename
+ lv, vg = TestLvm._get_lv(None, lv_name)
+ current_name = lv.getName()
+ new_name = rs()
+ lv.rename(new_name)
+
+ vg.close()
+ vg = None
+
+ lv, vg = TestLvm._get_lv(None, new_name)
+
+ self.assertTrue(lv is not None)
+
+ if lv and vg:
+ lv.rename(lv_name)
+ vg.close()
+ vg = None
+
+ # Test lv tag add
+ tag = 'hello_world'
+
+ lv, vg = TestLvm._get_lv(None, lv_name)
+ lv.addTag(tag)
+ vg.close()
+ vg = None
+
+ lv, vg = TestLvm._get_lv(None, lv_name)
+ tags = lv.getTags()
+
+ self.assertTrue(tag in tags)
+ vg.close()
+ vg = None
+
+ # Test lv tag delete
+ lv, vg = TestLvm._get_lv(None, lv_name)
+ self.assertTrue(lv is not None and vg is not None)
+
+ if lv and vg:
+ tags = lv.getTags()
+
+ for t in tags:
+ lv.removeTag(t)
+
+ vg.close()
+ vg = None
+
+ lv, vg = TestLvm._get_lv(None, lv_name)
+ self.assertTrue(lv is not None and vg is not None)
+
+ if lv and vg:
+ tags = lv.getTags()
+
+ if tags:
+ self.assertEqual(len(tags), 0)
+ vg.close()
+ vg = None
+
+ # Test lv deactivate
+ lv, vg = TestLvm._get_lv(None, lv_name)
+ self.assertTrue(lv is not None and vg is not None)
+
+ if lv and vg:
+ lv.deactivate()
+ vg.close()
+ vg = None
+
+ lv, vg = TestLvm._get_lv(None, lv_name)
+ self.assertTrue(lv is not None and vg is not None)
+ if lv and vg:
+ self.assertFalse(lv.isActive())
+ vg.close()
+ vg = None
+
+ # Test lv activate
+ lv, vg = TestLvm._get_lv(None, lv_name)
+ self.assertTrue(lv is not None and vg is not None)
+ if lv and vg:
+ lv.activate()
+ vg.close()
+ vg = None
+
+ lv, vg = TestLvm._get_lv(None, lv_name)
+ self.assertTrue(lv is not None and vg is not None)
+ if lv and vg:
+ self.assertTrue(lv.isActive())
+ vg.close()
+ vg = None
+
+ def test_lv_snapshot(self):
+
+ thin_lv = 'thin_lv'
+ thick_lv = 'thick_lv'
+
+ device_names = TestLvm._get_pv_device_names()
+
+ TestLvm._create_thin_lv(device_names[0:2], thin_lv)
+ TestLvm._create_thick_lv(device_names[2:4], thick_lv)
+
+ lv, vg = TestLvm._get_lv(None, thick_lv)
+# FIXME lv.snapshot('thick_snap_shot', 1024*1024)
+ vg.close()
+
+# FIXME thick_ss, vg = TestLvm._get_lv(None, 'thick_snap_shot')
+# FIXME self.assertTrue(thick_ss is not None)
+# FIXME vg.close()
+
+ thin_lv, vg = TestLvm._get_lv(None, thin_lv)
+ thin_lv.snapshot('thin_snap_shot')
+ vg.close()
+
+ thin_ss, vg = TestLvm._get_lv(None, 'thin_snap_shot')
+ self.assertTrue(thin_ss is not None)
+
+ origin = thin_ss.getOrigin()
+ self.assertTrue(thin_lv, origin)
+
+ vg.close()
+
+ def test_lv_suspend(self):
+ lv_name = 'lv_test'
+ TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
+ lv, vg = TestLvm._get_lv(None, lv_name)
+
+ result = lv.isSuspended()
+ self.assertTrue(type(result) == bool)
+ vg.close()
+
+ def test_lv_size(self):
+ lv_name = 'lv_test'
+ TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
+ lv, vg = TestLvm._get_lv(None, lv_name)
+
+ result = lv.getSize()
+ self.assertTrue(type(result) == int or type(result) == long)
+ vg.close()
+
+ def test_lv_resize(self):
+ lv_name = 'lv_test'
+ TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
+ lv, vg = TestLvm._get_lv(None, lv_name)
+
+ curr_size = lv.getSize()
+ lv.resize(curr_size + (1024 * 1024))
+ latest = lv.getSize()
+ self.assertTrue(curr_size != latest)
+
+ def test_lv_seg(self):
+ lv_name = 'lv_test'
+ TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
+ lv, vg = TestLvm._get_lv(None, lv_name)
+
+ lv_segs = lv.listLVsegs()
+
+ #LVsegs returns a tuple, (value, bool settable)
+ #TODO: Test other properties of lv_seg
+ for i in lv_segs:
+ self._test_prop(i, 'seg_start_pe', long, False)
+
+ vg.close()
+
+ def test_get_set_extend_size(self):
+ thick_lv = 'get_set_prop'
+ device_names = TestLvm._get_pv_device_names()
+ TestLvm._create_thick_lv(device_names[0:2], thick_lv)
+ lv, vg = TestLvm._get_lv(None, thick_lv)
+
+ new_extent = 1024 * 1024 * 4
+
+ self.assertFalse(
+ vg.getExtentSize() != new_extent,
+ "Cannot determine if it works if they are the same")
+
+ vg.setExtentSize(new_extent)
+ self.assertEqual(vg.getExtentSize(), new_extent)
+ vg.close()
+
+ def test_vg_get_set_prop(self):
+ thick_lv = 'get_set_prop'
+ device_names = TestLvm._get_pv_device_names()
+ TestLvm._create_thick_lv(device_names[0:2], thick_lv)
+ lv, vg = TestLvm._get_lv(None, thick_lv)
+
+ self.assertTrue(vg is not None)
+ if vg:
+ vg_mda_copies = vg.getProperty('vg_mda_copies')
+ vg.setProperty('vg_mda_copies', vg_mda_copies[0])
+ vg.close()
+
+ def test_vg_remove_restore(self):
+ #Store off the list of physical devices
+ pv_devices = []
+
+ thick_lv = 'get_set_prop'
+ device_names = TestLvm._get_pv_device_names()
+ TestLvm._create_thick_lv(device_names[0:2], thick_lv)
+ lv, vg = TestLvm._get_lv(None, thick_lv)
+
+ vg_name = vg.getName()
+
+ pvs = vg.listPVs()
+ for p in pvs:
+ pv_devices.append(p.getName())
+ vg.close()
+
+ TestLvm._remove_vg(vg_name)
+ self._create_thick_lv(pv_devices, thick_lv)
+
+ def test_vg_names(self):
+ vg = lvm.listVgNames()
+ self.assertTrue(isinstance(vg, tuple))
+
+ def test_dupe_lv_create(self):
+ """
+ Try to create a lv with the same name expecting a failure
+ Note: This was causing a seg. fault previously
+ """
+ thick_lv = 'dupe_name'
+ device_names = TestLvm._get_pv_device_names()
+ TestLvm._create_thick_lv(device_names[0:2], thick_lv)
+ lv, vg = TestLvm._get_lv(None, thick_lv)
+
+ self.assertTrue(vg is not None)
+
+ if vg:
+ lvs = vg.listLVs()
+
+ if len(lvs):
+ lv = lvs[0]
+ lv_name = lv.getName()
+ self.assertRaises(
+ lvm.LibLVMError, vg.createLvLinear, lv_name, lv.getSize())
+ vg.close()
+
+ def test_vg_uuids(self):
+
+ device_names = TestLvm._get_pv_device_names()
+ TestLvm._create_thin_lv(device_names[0:2], 'thin')
+ TestLvm._create_thick_lv(device_names[2:4], 'thick')
+
+ vgs_uuids = lvm.listVgUuids()
+
+ self.assertTrue(len(vgs_uuids) > 0)
+ self.assertTrue(isinstance(vgs_uuids, tuple))
+
+ vgs_uuids = list(vgs_uuids)
+ vgs_names = lvm.listVgNames()
+
+ for vg_name in vgs_names:
+ vg = lvm.vgOpen(vg_name, "r")
+
+ #TODO Write/fix BUG, vg uuid don't match between
+ #lvm.listVgUuids and vg.getUuid()
+ vg_uuid_search = vg.getUuid().replace('-', '')
+
+ self.assertTrue(vg_uuid_search in vgs_uuids)
+ vgs_uuids.remove(vg_uuid_search)
+ vg.close()
+
+ self.assertTrue(len(vgs_uuids) == 0)
+
+ def test_pv_lookup_from_vg(self):
+ device_names = TestLvm._get_pv_device_names()
+ TestLvm._create_thin_lv(device_names[0:2], 'thin')
+ TestLvm._create_thick_lv(device_names[2:4], 'thick')
+
+ vg_names = TestLvm._vg_names()
+
+ self.assertTrue(len(vg_names) > 0)
+
+ for vg_name in vg_names:
+ vg = lvm.vgOpen(vg_name, 'w')
+ pvs = vg.listPVs()
+
+ for p in pvs:
+ name = p.getName()
+ uuid = p.getUuid()
+
+ pv_name_lookup = vg.pvFromName(name)
+ pv_uuid_lookup = vg.pvFromUuid(uuid)
+
+ self.assertTrue(
+ pv_name_lookup.getName() == pv_uuid_lookup.getName())
+ self.assertTrue(
+ pv_name_lookup.getUuid() == pv_uuid_lookup.getUuid())
+
+ self.assertTrue(name == pv_name_lookup.getName())
+ self.assertTrue(uuid == pv_uuid_lookup.getUuid())
+
+ pv_name_lookup = None
+ pv_uuid_lookup = None
+ p = None
+
+ pvs = None
+ vg.close()
+
+ def test_percent_to_float(self):
+ self.assertEqual(lvm.percentToFloat(0), 0.0)
+ self.assertEqual(lvm.percentToFloat(1000000), 1.0)
+ self.assertEqual(lvm.percentToFloat(1000000 / 2), 0.5)
+
+ def test_scan(self):
+ self.assertEqual(lvm.scan(), None)
+
+ def test_config_reload(self):
+ self.assertEqual(lvm.configReload(), None)
+
+ def test_config_override(self):
+ self.assertEquals(lvm.configOverride("global.test = 1"), None)
+
+ def test_config_find_bool(self):
+ either_or = lvm.configFindBool("global/fallback_to_local_locking")
+ self.assertTrue(type(either_or) == bool)
+ self.assertTrue(lvm.configFindBool("global/locking_type"))
+
+ def test_vg_from_pv_lookups(self):
+ device_names = TestLvm._get_pv_device_names()
+ TestLvm._create_thin_lv(device_names[0:2], 'thin')
+ TestLvm._create_thick_lv(device_names[2:4], 'thick')
+
+ vgname_list = TestLvm._vg_names()
+
+ self.assertTrue(len(vgname_list) > 0)
+
+ for vg_name in vgname_list:
+ vg = lvm.vgOpen(vg_name, 'r')
+
+ vg_name = vg.getName()
+
+ pv_list = vg.listPVs()
+ for pv in pv_list:
+ vg_name_from_pv = lvm.vgNameFromPvid(pv.getUuid())
+ self.assertEquals(vg_name, vg_name_from_pv)
+ self.assertEqual(vg_name, lvm.vgNameFromDevice(pv.getName()))
+ vg.close()
+
+ def test_vg_get_name(self):
+ device_names = TestLvm._get_pv_device_names()
+ TestLvm._create_thin_lv(device_names[0:2], 'thin')
+ TestLvm._create_thick_lv(device_names[2:4], 'thick')
+
+ vgname_list = TestLvm._vg_names()
+
+ self.assertTrue(len(vgname_list) > 0)
+
+ for vg_name in vgname_list:
+ vg = lvm.vgOpen(vg_name, 'r')
+ self.assertEqual(vg.getName(), vg_name)
+ vg.close()
+
+ def test_vg_get_uuid(self):
+ device_names = TestLvm._get_pv_device_names()
+ TestLvm._create_thin_lv(device_names[0:2], 'thin')
+ TestLvm._create_thick_lv(device_names[2:4], 'thick')
+
+ vgname_list = TestLvm._vg_names()
+
+ self.assertTrue(len(vgname_list) > 0)
+
+ for vg_name in vgname_list:
+ vg = lvm.vgOpen(vg_name, 'r')
+ uuid = vg.getUuid()
+ self.assertNotEqual(uuid, None)
+ self.assertTrue(len(uuid) > 0)
+ vg.close()
+
+ RETURN_NUMERIC = [
+ "getSeqno", "getSize", "getFreeSize", "getFreeSize",
+ "getExtentSize", "getExtentCount", "getFreeExtentCount",
+ "getPvCount", "getMaxPv", "getMaxLv"]
+
+ def test_vg_getters(self):
+ device_names = TestLvm._get_pv_device_names()
+ TestLvm._create_thin_lv(device_names[0:2], 'thin')
+ TestLvm._create_thick_lv(device_names[2:4], 'thick')
+
+ vg_name_list = TestLvm._vg_names()
+
+ self.assertTrue(len(vg_name_list) > 0)
+
+ for vg_name in vg_name_list:
+ vg = lvm.vgOpen(vg_name, 'r')
+ self.assertTrue(type(vg.isClustered()) == bool)
+ self.assertTrue(type(vg.isExported()) == bool)
+ self.assertTrue(type(vg.isPartial()) == bool)
+
+ #Loop through the list invoking the method
+ for method_name in TestLvm.RETURN_NUMERIC:
+ method = getattr(vg, method_name)
+ result = method()
+ self.assertTrue(type(result) == int or type(result) == long)
+
+ vg.close()
+
+ def _test_tags(self, tag_obj):
+ existing_tags = tag_obj.getTags()
+ self.assertTrue(type(existing_tags) == tuple)
+
+ num_tags = random.randint(2, 40)
+ created_tags = []
+
+ for i in range(num_tags):
+ tag_name = rs(random.randint(1, 128))
+ tag_obj.addTag(tag_name)
+ created_tags.append(tag_name)
+
+ tags = tag_obj.getTags()
+ self.assertTrue(len(existing_tags) + len(created_tags) == len(tags))
+
+ num_remove = len(created_tags)
+
+ for i in range(num_remove):
+ tag_to_remove = created_tags[
+ random.randint(0, len(created_tags) - 1)]
+
+ created_tags.remove(tag_to_remove)
+
+ tag_obj.removeTag(tag_to_remove)
+
+ current_tags = tag_obj.getTags()
+ self.assertFalse(tag_to_remove in current_tags)
+
+ current_tags = tag_obj.getTags()
+ self.assertTrue(len(current_tags) == len(existing_tags))
+ for e in existing_tags:
+ self.assertTrue(e in current_tags)
+
+ def test_vg_tags(self):
+ device_names = TestLvm._get_pv_device_names()
+
+ i = 0
+ for d in device_names:
+ if i % 2 == 0:
+ TestLvm._create_thin_lv([d], "thin_lv%d" % i)
+ else:
+ TestLvm._create_thick_lv([d], "thick_lv%d" % i)
+ i += 1
+
+ for vg_name in TestLvm._vg_names():
+ vg = lvm.vgOpen(vg_name, 'w')
+ self._test_tags(vg)
+ vg.close()
+
+ @staticmethod
+ def _test_listing():
+
+ env = os.environ
+
+ for k, v in env.items():
+ l("%s:%s" % (k, v))
+
+ with lvm.listPvs() as pvs:
+ for p in pvs:
+ l('pv= %s' % p.getName())
+
+ l('Checking for VG')
+ for v in lvm.listVgNames():
+ l('vg= %s' % v)
+
+ def test_pv_empty_listing(self):
+ #We had a bug where we would seg. fault if we had no PVs.
+
+ l('testPVemptylisting entry')
+
+ device_names = TestLvm._get_pv_device_names()
+
+ for d in device_names:
+ l("Removing %s" % d)
+ lvm.pvRemove(d)
+
+ count = 0
+
+ with lvm.listPvs() as pvs:
+ for p in pvs:
+ count += 1
+ l('pv= %s' % p.getName())
+
+ self.assertTrue(count == 0)
+
+ for d in device_names:
+ lvm.pvCreate(d)
+
+ def test_pv_create(self):
+ size = [0, 1024 * 1024 * 8]
+ pvmeta_copies = [0, 1, 2]
+ pvmeta_size = [0, 255, 512, 1024]
+ data_alignment = [0, 2048, 4096]
+ zero = [0, 1]
+
+ device_names = TestLvm._get_pv_device_names()
+
+ for d in device_names:
+ lvm.pvRemove(d)
+
+ d = device_names[0]
+
+ #Test some error cases
+ self.assertRaises(TypeError, lvm.pvCreate, None)
+ self.assertRaises(lvm.LibLVMError, lvm.pvCreate, '')
+ self.assertRaises(lvm.LibLVMError, lvm.pvCreate, d, 4)
+ self.assertRaises(lvm.LibLVMError, lvm.pvCreate, d, 0, 4)
+ self.assertRaises(lvm.LibLVMError, lvm.pvCreate, d, 0, 0, 0, 2 ** 34)
+ self.assertRaises(
+ lvm.LibLVMError, lvm.pvCreate, d, 0, 0, 0, 4096, 2 ** 34)
+
+ #Try a number of combinations and permutations
+ for s in size:
+ for copies in pvmeta_copies:
+ for pv_size in pvmeta_size:
+ for align in data_alignment:
+ for z in zero:
+ lvm.pvCreate(d, s, copies, pv_size, align,
+ align, z)
+ lvm.pvRemove(d)
+
+ #Restore
+ for d in device_names:
+ lvm.pvCreate(d)
+
+ def test_vg_reduce(self):
+ # Test the case where we try to reduce a vg where the last PV has
+ # no metadata copies. In this case the reduce should fail.
+ vg_name = TestLvm.VG_P + 'reduce_test'
+
+ device_names = TestLvm._get_pv_device_names()
+
+ for d in device_names:
+ lvm.pvRemove(d)
+
+ lvm.pvCreate(device_names[0], 0, 0) # Size all, pvmetadatacopies 0
+ lvm.pvCreate(device_names[1])
+ lvm.pvCreate(device_names[2])
+ lvm.pvCreate(device_names[3])
+
+ vg = lvm.vgCreate(vg_name)
+
+ vg.extend(device_names[3])
+ vg.extend(device_names[2])
+ vg.extend(device_names[1])
+ vg.extend(device_names[0])
+ vg.close()
+
+ vg = None
+
+ vg = lvm.vgOpen(vg_name, 'w')
+
+ vg.reduce(device_names[3])
+ vg.reduce(device_names[2])
+
+ self.assertRaises(lvm.LibLVMError, vg.reduce, device_names[1])
+
+ vg.close()
+ vg = None
+
+ vg = lvm.vgOpen(vg_name, 'w')
+ vg.remove()
+ vg.close()
+
+ @staticmethod
+ def _test_valid_names(method):
+ sample = 'azAZ09._-+'
+
+ method('x' * 127)
+ method('.X')
+ method('..X')
+
+ for i in range(1, 7):
+ tests = (''.join(i) for i in itertools.product(sample, repeat=i))
+ for t in tests:
+ if t == '.' or t == '..':
+ t += 'X'
+ elif t.startswith('-'):
+ t = 'H' + t
+ method(t)
+
+ def _test_bad_names(self, method, dupe_name):
+ # Test for duplicate name
+ self.assertRaises(lvm.LibLVMError, method, dupe_name)
+
+ # Test for too long a name
+ self.assertRaises(lvm.LibLVMError, method, ('x' * 128))
+
+ # Test empty
+ self.assertRaises(lvm.LibLVMError, method, '')
+
+ # Invalid characters
+ self.assertRaises(lvm.LibLVMError, method, '&invalid^char')
+
+ # Cannot start with .. and no following characters
+ self.assertRaises(lvm.LibLVMError, method, '..')
+
+ # Cannot start with . and no following characters
+ self.assertRaises(lvm.LibLVMError, method, '.')
+
+ # Cannot start with a hyphen
+ self.assertRaises(lvm.LibLVMError, method, '-not_good')
+
+ def _lv_reserved_names(self, method):
+ prefixes = ['snapshot', 'pvmove']
+ reserved = [
+ '_mlog', '_mimage', '_pmspare', '_rimage', '_rmeta',
+ '_vorigin', '_tdata', '_tmeta']
+
+ for p in prefixes:
+ self.assertRaises(lvm.LibLVMError, method, p + rs(3))
+
+ for r in reserved:
+ self.assertRaises(lvm.LibLVMError, method, rs(3) + r + rs(1))
+ self.assertRaises(lvm.LibLVMError, method, r + rs(1))
+
+ def test_vg_lv_name_validate(self):
+ lv_name = 'vg_lv_name_validate'
+ TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
+ lv, vg = TestLvm._get_lv(None, lv_name)
+
+ self._test_bad_names(lvm.vgNameValidate, vg.getName())
+ self._test_bad_names(vg.lvNameValidate, lv.getName())
+
+ # Test good values
+ TestLvm._test_valid_names(lvm.vgNameValidate)
+ TestLvm._test_valid_names(vg.lvNameValidate)
+ self._lv_reserved_names(vg.lvNameValidate)
+
+ vg.close()
+
+if __name__ == "__main__":
+ unittest.main()
More information about the lvm-devel
mailing list