[lvm-devel] master - python-lvm: Improve unit test case(s)

tasleson tasleson at fedoraproject.org
Tue Jul 2 19:26:46 UTC 2013


Gitweb:        http://git.fedorahosted.org/git/?p=lvm2.git;a=commitdiff;h=04efe869a440590d99a886cda74aaf259b8067d7
Commit:        04efe869a440590d99a886cda74aaf259b8067d7
Parent:        50db109e2068d8d5645bd4117b6022acf4b7f8ba
Author:        Tony Asleson <tasleson at redhat.com>
AuthorDate:    Mon Jul 1 17:01:48 2013 -0400
Committer:     Tony Asleson <tasleson at redhat.com>
CommitterDate: Tue Jul 2 14:24:34 2013 -0500

python-lvm: Improve unit test case(s)

Create/remove PV
Create/remove snapshots (old type & thin)
PV lookups based on name and UUID
PV resize

Signed-off-by: Tony Asleson <tasleson at redhat.com>
---
 test/api/pytest.sh          |    7 +-
 test/api/python_lvm_unit.py |  332 +++++++++++++++++++++++++++++++++----------
 2 files changed, 259 insertions(+), 80 deletions(-)

diff --git a/test/api/pytest.sh b/test/api/pytest.sh
index ad99624..a1a3cb4 100644
--- a/test/api/pytest.sh
+++ b/test/api/pytest.sh
@@ -13,15 +13,16 @@
 
 . lib/test
 
-aux prepare_vg 1
-lvcreate -n test -l 5 $vg
+#If you change this change the unit test case too.
+aux prepare_pvs 6
 
 #Locate the python binding library to use.
 python_lib=`find $abs_top_builddir -name lvm.so`
 if [ "$python_lib" != "" ]
 then
 	export PYTHONPATH=`dirname $python_lib`:$PYTHONPATH
-	python_lvm_unit.py
+	python_lvm_unit.py -v
+	#nemiver python ../api/python_lvm_unit.py -v -f
 else
 	echo "Unable to test python bindings as library not available"
 fi
diff --git a/test/api/python_lvm_unit.py b/test/api/python_lvm_unit.py
index c4d7983..6491413 100755
--- a/test/api/python_lvm_unit.py
+++ b/test/api/python_lvm_unit.py
@@ -27,8 +27,130 @@ def rs(l=10):
 
 
 class TestLvm(unittest.TestCase):
+	(FULL_PV, THIN_PV_A, THIN_PV_B, RESIZE_PV) = (0, 1, 2, 3)
+
+
+	def _get_pv_devices(self):
+		rc = []
+		with lvm.listPvs() as pvs:
+			for p in pvs:
+				name = p.getName()
+				self.assertTrue(name is not None and len(name) > 0)
+				rc.append(name)
+				p = None
+		return rc
+
+	def _createThick(self, device_list):
+		vg = lvm.vgCreate('full_vg')
+
+		for d in device_list:
+			vg.extend(d)
+
+		new_extent = 1024 * 1024 * 2
+		vg.setExtentSize(new_extent)
+		self.assertEqual(vg.getExtentSize(), new_extent)
+
+		vg.createLvLinear('thick_lv', vg.getSize()/2)
+		vg.close()
+		vg = None
+
+	def _removeThick(self):
+		vg_name = 'full_vg'
+		vg = lvm.vgOpen(vg_name, 'w')
+
+		pvs = vg.listPVs()
+		lvs = vg.listLVs()
+
+		pe_devices = []
+
+		#Remove old snapshots first, then lv
+		for l in lvs:
+			attr = l.getAttr()
+			if attr[0] == 's':
+				l.remove()
+
+		for l in vg.listLVs():
+			l.remove()
+
+		for p in pvs:
+			pe_devices.append(p.getName())
+
+		for pv in pe_devices:
+			vg.reduce(pv)
+
+		vg.remove()
+		vg.close()
+
 	def setUp(self):
-		pass
+		device_list = self._get_pv_devices()
+
+		#Make sure our prepare script is doing as expected.
+		self.assertTrue(len(device_list) >= 4)
+
+		vg_names = lvm.listVgNames()
+
+		#If we don't have any volume groups lets setup one for
+		#those tests that are expecting one
+		if len(vg_names) == 0:
+			self._createThick([device_list[TestLvm.FULL_PV]])
+
+			vg = lvm.vgCreate('thin_vg')
+			vg.extend(device_list[TestLvm.THIN_PV_A])
+			vg.extend(device_list[TestLvm.THIN_PV_B])
+			vg.createLvThinpool('thin_pool', vg.getSize()/2, 0, 0,
+					lvm.THIN_DISCARDS_PASSDOWN, 1)
+			vg.createLvThin('thin_pool', 'thin_lv', vg.getSize()/3)
+			vg.close()
+			vg = None
+
+	def testPVresize(self):
+		with lvm.listPvs() as pvs:
+			pv = pvs[TestLvm.RESIZE_PV]
+			curr_size = pv.getSize()
+			dev_size = pv.getDevSize()
+			self.assertTrue(curr_size == dev_size)
+			pv.resize(curr_size/2)
+		with lvm.listPvs() as pvs:
+			pv = pvs[TestLvm.RESIZE_PV]
+			resized_size = pv.getSize()
+			self.assertTrue(resized_size != curr_size)
+			pv.resize(dev_size)
+
+	def testPVlifecycle(self):
+		"""
+		Test removing and re-creating a PV
+		"""
+		target = None
+
+		with lvm.listPvs() as pvs:
+			pv = pvs[TestLvm.RESIZE_PV]
+			target = pv.getName()
+			lvm.pvRemove(target)
+
+		with lvm.listPvs() as pvs:
+			for p in pvs:
+				self.assertTrue(p.getName() != target)
+
+		lvm.pvCreate(target, 0)
+
+		with lvm.listPvs() as pvs:
+			found = False
+			for p in pvs:
+				if p.getName() == target:
+					found = True
+
+		self.assertTrue(found)
+
+	def testPvMethods(self):
+		with lvm.listPvs() as pvs:
+			for p in pvs:
+				p.getName()
+				p.getUuid()
+				p.getMdaCount()
+				p.getSize()
+				p.getDevSize()
+				p.getFree()
+				p = None
 
 	def tearDown(self):
 		pass
@@ -49,26 +171,35 @@ class TestLvm(unittest.TestCase):
 			vg = lvm.vgOpen(i)
 			vg.close()
 
-	def _get_lv_test(self, mode='r'):
+	def _get_lv_test(self, lv_vol_type=None, lv_name=None):
 		vg_name_list = lvm.listVgNames()
 		for vgname in vg_name_list:
-			vg = lvm.vgOpen(vgname, mode)
+			vg = lvm.vgOpen(vgname, "w")
 			lvs = vg.listLVs()
-			if len(lvs):
-				return lvs[0]
-		return None
 
-	def _get_pv_test(self, mode='r'):
+			for l in lvs:
+				attr = l.getAttr()
+				if lv_vol_type or lv_name:
+					if lv_vol_type is not None and attr[0] == lv_vol_type:
+						return l, vg
+					elif lv_name is not None and lv_name == l.getName():
+						return l, vg
+				else:
+					return l, vg
+			vg.close()
+		return None, None
+
+	def _get_pv_test(self):
 		vg_name_list = lvm.listVgNames()
 		for vgname in vg_name_list:
-			vg = lvm.vgOpen(vgname, mode)
+			vg = lvm.vgOpen(vgname, "w")
 			pvs = vg.listPVs()
 			if len(pvs):
-				return pvs[0]
-		return None
+				return pvs[0], vg
+		return None, None
 
 	def testPvGetters(self):
-		pv = self._get_pv_test()
+		pv, vg = self._get_pv_test()
 
 		self.assertEqual(type(pv.getName()), str)
 		self.assertTrue(len(pv.getName()) > 0)
@@ -84,6 +215,8 @@ class TestLvm(unittest.TestCase):
 
 		self.assertTrue(type(pv.getFree()) == int or type(pv.getFree()) == long)
 
+		vg.close()
+
 	def _test_prop(self, prop_obj, prop, var_type, settable):
 		result = prop_obj.getProperty(prop)
 
@@ -92,7 +225,7 @@ class TestLvm(unittest.TestCase):
 		self.assertTrue(result[1] == settable)
 
 	def testPvSegs(self):
-		pv = self._get_pv_test("r")
+		pv, vg = self._get_pv_test()
 		pv_segs = pv.listPVsegs()
 
 		#LVsegs returns a tuple, (value, bool settable)
@@ -101,53 +234,96 @@ class TestLvm(unittest.TestCase):
 		for i in pv_segs:
 			self._test_prop(i, 'pvseg_start', long, False)
 
+		vg.close()
+
 	def testPvProperty(self):
-		pv = self._get_pv_test("r")
+		pv, vg = self._get_pv_test()
 		self._test_prop(pv, 'pv_mda_count', long, False)
+		vg.close()
 
 	def testLvProperty(self):
-		lv = self._get_lv_test("r")
+		lv, vg = self._get_lv_test()
 		self._test_prop(lv, 'seg_count', long, False)
+		vg.close()
 
 	def testLvTags(self):
-		lv = self._get_lv_test("w")
+		lv, vg = self._get_lv_test()
 		self._testTags(lv)
+		vg.close()
 
 	def testLvActiveInactive(self):
-		lv = self._get_lv_test("w")
+		lv, vg = self._get_lv_test()
 		lv.deactivate()
 		self.assertTrue(lv.isActive() == False)
 		lv.activate()
 		self.assertTrue(lv.isActive() == True)
+		vg.close()
 
 	def testLvRename(self):
-		lv = self._get_lv_test("w")
+		lv, vg = self._get_lv_test()
 
 		current_name = lv.getName()
 		new_name = rs()
 		lv.rename(new_name)
 		self.assertEqual(lv.getName(), new_name)
 		lv.rename(current_name)
+		vg.close()
+
+	def testLvSnapshot(self):
+
+		#Cleanup existing if already present
+		to_remove = [ 'thick_lv_snapshot', 'thin_lv_snapshot']
+
+		for ss in to_remove:
+			snap, vg = self._get_lv_test(None, ss)
+			if snap:
+				snap.remove()
+				vg.close()
+
+
+		thick_lv, vg = self._get_lv_test(None, 'thick_lv')
+
+		self.assertEqual('thick_lv', thick_lv.getName())
+
+		thick_lv.snapshot('thick_lv_snapshot', 1024*1024)
+		vg.close()
+
+		thin_lv, vg = self._get_lv_test(None, 'thin_lv')
+		thin_lv.snapshot('thin_lv_snapshot')
+
+		vg.close()
+
+		thin_ss, vg = self._get_lv_test(None, 'thin_lv_snapshot')
+		self.assertTrue(thin_ss is not None)
+
+		origin = thin_ss.getOrigin()
+		self.assertTrue('thin_lv', origin)
+
+		vg.close()
+
 
 	def testLvSuspend(self):
-		lv = self._get_lv_test("r")
+		lv, vg = self._get_lv_test()
 
 		result = lv.isSuspended()
-		self.assertTrue(type(result), bool)
+		self.assertTrue(type(result) == bool)
+		vg.close()
 
 	def testLvSize(self):
-		lv = self._get_lv_test("r")
+		lv, vg = self._get_lv_test()
 		result = lv.getSize()
-		self.assertTrue(type(result), bool)
+		self.assertTrue(type(result) == int or type(result)== long)
+		vg.close()
 
 	def testLvResize(self):
-		pass    #Not implemented!
-
-	def testPvResize(self):
-		pass    #Patch available, not committed
+		lv, vg = self._get_lv_test('V')
+		curr_size = lv.getSize()
+		lv.resize(curr_size+(1024*1024))
+		latest = lv.getSize()
+		self.assertTrue(curr_size != latest)
 
 	def testLvSeg(self):
-		lv = self._get_lv_test("r")
+		lv, vg = self._get_lv_test()
 
 		lv_segs = lv.listLVsegs()
 
@@ -157,62 +333,31 @@ class TestLvm(unittest.TestCase):
 		for i in lv_segs:
 			self._test_prop(i, 'seg_start_pe', long, False)
 
-	def testLvMisc(self):
-		#Need to look at lack of vg_write in vg create
-
-		#For this to work cleanly we will remove an existing lv & vg and then
-		#put it back so that the test framework can clean it up.
-		vg_name_list = lvm.listVgNames()
+		vg.close()
 
-		if len(vg_name_list):
-			vg_name = vg_name_list[0]
-
-			vg = lvm.vgOpen(vg_name, "w")
+	def testVGsetGetProp(self):
+		vg_name = 'full_vg'
+		vg = lvm.vgOpen(vg_name, 'w')
 
+		self.assertTrue(vg is not None)
+		if vg:
 			vg_mda_copies = vg.getProperty('vg_mda_copies')
 			vg.setProperty('vg_mda_copies', vg_mda_copies[0])
-
-			pvs = vg.listPVs()
-			lvs = vg.listLVs()
-
-			pe_devices = []
-			for p in pvs:
-				pe_devices.append(p.getName())
-
-			self.assertEquals(len(lvs), 1)
-
-			lv = lvs[0]
-
-			lv_name = lv.getName()
-			lv_size = lv.getSize()
-
-			lv.remove()
-			lv = None
-
-			vg.reduce(pe_devices[0])
-
-			vg.remove()
 			vg.close()
 
-			nvg = lvm.vgCreate(vg_name)
-			for p in pe_devices:
-				nvg.extend(p)
-
-			#2MiB extent size
-			new_extent = 1024 * 1024 * 2
-
-			nvg.setExtentSize(new_extent)
-			self.assertEqual(nvg.getExtentSize(), new_extent)
-
-			v = nvg.createLvLinear(lv_name, lv_size)
+	def testVGremoveRestore(self):
 
-			lv_find_name = nvg.lvFromName(lv_name)
-			lv_find_uuid = nvg.lvFromUuid(v.getUuid())
+		#Store off the list of physical devices
+		pe_devices = []
+		vg = lvm.vgOpen('full_vg', 'w')
 
-			self.assertTrue(lv_find_name.getName() == v.getName())
-			self.assertTrue(lv_find_uuid.getUuid() == v.getUuid())
+		pvs = vg.listPVs()
+		for p in pvs:
+			pe_devices.append(p.getName())
+		vg.close()
 
-			nvg.close()
+		self._removeThick()
+		self._createThick(pe_devices)
 
 	def testVgNames(self):
 		vg = lvm.listVgNames()
@@ -234,7 +379,8 @@ class TestLvm(unittest.TestCase):
 			if len(lvs):
 				lv = lvs[0]
 				lv_name = lv.getName()
-				self.assertRaises(lvm.LibLVMError, vg.createLvLinear, lv_name,
+				self.assertRaises(lvm.LibLVMError,
+					vg.createLvLinear, lv_name,
 					lv.getSize())
 
 	def testVgUuids(self):
@@ -249,15 +395,46 @@ class TestLvm(unittest.TestCase):
 		for vg_name in vgs_names:
 			vg = lvm.vgOpen(vg_name, "r")
 
-			#TODO Write/fix BUG, vg uuid don't match between lvm.listVgUuids
-			# and vg.getUuid()
+			#TODO Write/fix BUG, vg uuid don't match between
+			#lvm.listVgUuids and vg.getUuid()
 			vg_uuid_search = vg.getUuid().replace('-', '')
 
 			self.assertTrue(vg_uuid_search in vgs_uuids)
 			vgs_uuids.remove(vg_uuid_search)
+			vg.close()
 
 		self.assertTrue(len(vgs_uuids) == 0)
 
+	def testPvLookupFromVG(self):
+		vg_names = lvm.listVgNames()
+
+		for vg_name in vg_names:
+			vg = lvm.vgOpen(vg_name, 'w')
+			pvs = vg.listPVs()
+
+			for p in pvs:
+				name = p.getName()
+				uuid = p.getUuid()
+
+				pv_name_lookup = vg.pvFromName(name)
+				pv_uuid_lookup = vg.pvFromUuid(uuid)
+
+				self.assertTrue(pv_name_lookup.getName() == \
+						pv_uuid_lookup.getName())
+				self.assertTrue(pv_name_lookup.getUuid() == \
+						pv_uuid_lookup.getUuid())
+
+				self.assertTrue(name == pv_name_lookup.getName())
+				self.assertTrue(uuid == pv_uuid_lookup.getUuid())
+
+				pv_name_lookup = None
+				pv_uuid_lookup = None
+				p = None
+
+			pvs = None
+			vg.close()
+
+
 	def testPercentToFloat(self):
 		self.assertEqual(lvm.percentToFloat(0), 0.0)
 		self.assertEqual(lvm.percentToFloat(1000000), 1.0)
@@ -273,8 +450,9 @@ class TestLvm(unittest.TestCase):
 		self.assertEquals(lvm.configOverride("global.test = 1"), None)
 
 	def testConfigFindBool(self):
+		either_or = lvm.configFindBool("global/fallback_to_local_locking")
+		self.assertTrue(type(either_or) == bool)
 		self.assertTrue(lvm.configFindBool("global/locking_type"))
-		self.assertFalse(lvm.configFindBool("global/fallback_to_local_locking"))
 
 	def testVgFromPVLookups(self):
 		vgname_list = lvm.listVgNames()




More information about the lvm-devel mailing list