[libvirt] [libvirt-test-API][PATCH 2/2] Add securitylabel test case

jiahu jiahu at redhat.com
Sun Feb 15 08:12:14 UTC 2015


2 new APIs securityLabel and securityLabelList will be covered in
securitylabel.py
---
 repos/domain/securitylabel.py | 170 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 170 insertions(+)
 create mode 100644 repos/domain/securitylabel.py

diff --git a/repos/domain/securitylabel.py b/repos/domain/securitylabel.py
new file mode 100644
index 0000000..cf4aaf3
--- /dev/null
+++ b/repos/domain/securitylabel.py
@@ -0,0 +1,170 @@
+#!/usr/bin/env python
+# test securityLabel() and securityLabelList() API for libvirt
+
+import libvirt
+
+from libvirt import libvirtError
+from src import sharedmod
+from utils import utils
+
+required_params = ('guestname',)
+optional_params = {}
+
+def check_qemu_conf(logger):
+    """
+       If security_driver is not equal to "selinux", report an error
+    """
+    GREP = "grep \"^security_driver\" /etc/libvirt/qemu.conf"
+    status, output = utils.exec_cmd(GREP, shell=True)
+    if status:
+        return True
+    else:
+        if "selinux" in output[0]:
+            return True
+        else:
+            logger.error("Not a default setting in qemu.conf")
+            return False
+
+def get_security_policy(logger):
+    """
+       get selinux type from host OS
+    """
+    SELINUX = "getenforce"
+    status, output = utils.exec_cmd(SELINUX, shell=True)
+    if not status:
+        if output[0] == "Enforcing":
+            sevalue = True
+        elif output[0] == "Permissive":
+            sevalue = False
+        elif output[0] == "Disabled":
+            sevalue = False
+        else:
+            logger.error("Can not find any results")
+    else:
+        logger.error("\"" + SELINUX + "\"" + "error")
+        logger.error(output)
+        return False
+    return sevalue
+
+def get_pid(name,logger):
+    """
+       get process id of specified domain.
+    """
+    PID = "ps aux |grep -v grep | grep \" -name %s\" \
+           |awk '{print $2}'"
+    status, output = utils.exec_cmd(PID % name, shell=True)
+    if not status:
+        pass
+    else:
+        logger.error("\"" + PID + "\"" + "error")
+        logger.error(output)
+        return False
+    return output[0]
+
+def get_pid_context(domain,logger):
+    """
+       return context of domain's pid
+    """
+    pid = get_pid(domain,logger)
+    CONTEXT = "ls -nZd /proc/%s"
+    status, output = utils.exec_cmd(CONTEXT % pid, shell=True)
+    if not status:
+        pass
+    else:
+        logger.error("\"" + CONTEXT + "\"" + "error")
+        logger.error(output)
+        return False
+    return pid,output[0]
+
+def check_selinux_label(api,domain,logger):
+    """
+       check vaules in selinux mode
+    """
+    pid,context = get_pid_context(domain,logger)
+    logger.debug("The context of %d is %s" % (int(pid), context))
+    get_enforce = get_security_policy(logger)
+    if api[0] in context:
+        if api[1] == get_enforce:
+            logger.debug("PASS: '%s'" % api)
+            return True
+        else:
+            logger.debug("Fail: '%s'" % api[1])
+            return False
+    else:
+         logger.debug("Fail: '%s'" % api[0])
+         return False
+
+def check_DAC_label(api,domain,logger):
+    """
+       check vaules in DAC mode
+    """
+    tmp = []
+    pid,context = get_pid_context(domain,logger)
+    logger.debug("The context of %d is %s" % (int(pid), context))
+    #enforcing is always false in DAC mode
+    for item in api:
+         tmp.append(item)
+    get_enforce = False
+    tmp1 = tmp[0].strip().replace("+","")
+    tmp[0] = tmp1.split(':')
+    tmp1 = context.split()
+    context = str(tmp1.pop(1) +" "+ tmp1.pop(1)).split()
+    if tmp[0] == context:
+        if tmp[1] == get_enforce:
+            logger.debug("PASS: '%s'" % api)
+            return True
+        else:
+            logger.debug("Fail: '%s'" % api[1])
+            return False
+    else:
+         logger.debug("Fail: '%s'" % api[0])
+         return False
+
+def securitylabel(params):
+    """
+       test APIs for securityLabel and securityLabelList in class virDomain
+    """
+    logger = params['logger']
+    domain_name = params['guestname']
+    if not check_qemu_conf(logger):
+       return 1
+    try:
+        conn = sharedmod.libvirtobj['conn']
+
+        if conn.lookupByName(domain_name):
+           dom = conn.lookupByName(domain_name)
+        else:
+           logger.error("Domain %s is not exist" % domain_name)
+           return 1
+        if not dom.isActive():
+           logger.error("Domain %s is not running" % domain_name)
+           return 1
+
+        first_label_api = dom.securityLabel()
+        logger.info("The first lable is %s" % first_label_api)
+
+        if check_selinux_label(first_label_api, domain_name, logger):
+            logger.info("PASS, %s" % first_label_api)
+        else:
+            logger.error("FAIL, %s" % first_label_api)
+            return 1
+
+        all_label_api = dom.securityLabelList()
+        logger.info("The all lable is %s" % all_label_api)
+        if check_selinux_label(all_label_api[0], domain_name, logger):
+            logger.info("PASS, %s" % all_label_api[0])
+        else:
+            logger.error("FAIL, %s" % all_label_api[0])
+            return 1
+
+        if check_DAC_label(all_label_api[1], domain_name, logger):
+            logger.info("PASS, %s" % all_label_api[1])
+        else:
+            logger.error("FAIL, %s" % all_label_api[1])
+            return 1
+
+    except libvirtError, e:
+        logger.error("API error message: %s" % e.message)
+        return 1
+
+    return 0
-- 
1.8.3.1




More information about the libvir-list mailing list