[libvirt] [libvirt-test-API][PATCH] Add network_dhcp_leases test case

jiahu jiahu at redhat.com
Wed Jan 21 13:15:33 UTC 2015


The network_dhcp_leases.py uses DHCPLeases() to validate new API
virNetworkGetDHCPLeases of libvirt.
---
 repos/network/network_dhcp_leases.py | 277 +++++++++++++++++++++++++++++++++++
 1 file changed, 277 insertions(+)
 create mode 100644 repos/network/network_dhcp_leases.py

diff --git a/repos/network/network_dhcp_leases.py b/repos/network/network_dhcp_leases.py
new file mode 100644
index 0000000..29ee529
--- /dev/null
+++ b/repos/network/network_dhcp_leases.py
@@ -0,0 +1,277 @@
+#!/usr/bin/env python
+#test DHCPLeases() API for libvirt
+
+import os
+import time
+import libvirt
+from libvirt import libvirtError
+from utils import utils
+from src import sharedmod
+
+required_params = ('networkname',)
+optional_params = {'macaddr': ''}
+
+LEASE_FILE = "/var/lib/libvirt/dnsmasq/"
+
+def check_ip(ipaddr, logger):
+    """
+       return a string according to ip address type, return 'ipv4' for ipv4,
+       return 'ipv6' for ipv6, return False for others
+    """
+    addr4 = ipaddr.strip().split('.')
+    addr6 = ipaddr.strip().split(':')
+    if len(addr4) == 4:
+        iptype = "ipv4"
+    elif len(addr6) == 6:
+        iptype = "ipv6"
+    else:
+        return False
+    return iptype
+
+def get_network_type(ipaddr,logger):
+    """
+       return 0 or 1 for ipv4/ipv6, this function will be used in
+       check_ipv4_values()/check_ipv6_values()
+    """
+    if check_ip(ipaddr, logger) == "ipv4":
+        return 0
+    elif check_ip(ipaddr, logger) == "ipv6":
+        return 1
+
+def get_bridge_name(network,logger):
+    """
+       get bridge name under specified network from specified network conf
+    """
+    CONF_NETWORK = LEASE_FILE + network + ".conf"
+    GREP_BRIDGE = "grep \"^interface=\" %s | awk -F\"=\" '{print $2}'"
+    status, output = utils.exec_cmd(GREP_BRIDGE % CONF_NETWORK, shell=True)
+    if not status:
+        pass
+    else:
+        logger.error("\"" + GREP_BRIDGE + "\"" + "error")
+        logger.error(output)
+        return False
+    return output[0]
+
+def get_ip_prefix(network, iptype, logger):
+    """
+       get ip prefix according to IP type
+    """
+    br = get_bridge_name(network, logger)
+    PREFIX = "ip -4 -o ad show %s | awk '{print $4}'|awk -F\"/\" '{print $2}'"
+    PREFIX_6 = "ip -6 -o ad show %s|awk '{print $4}'|awk -F\"/\" '{print $2}'"
+    if iptype == "ipv4":
+        status, output = utils.exec_cmd(PREFIX % br, shell=True)
+    elif iptype == "ipv6":
+        status, output = utils.exec_cmd(PREFIX_6 % br, shell=True)
+    if not status:
+        pass
+    else:
+        logger.error("\"" + GREP_BRIDGE + "\"" + "error")
+        logger.error(output)
+        return False
+    return output[0]
+
+def get_info_from_dnsmasq(network,macaddr,logger):
+    """
+       generate dict for lease info from virtual network's lease file
+    """
+    title = ['expirytime','mac','ipaddr','hostname','clientid']
+    output_list = []
+    lease_dnsmasq = []
+    temp = []
+    remove_list = []
+    GREP_MAC = "grep -w %s" + " " + LEASE_FILE_DNSMASQ
+    CAT_FILE = "cat" + " " + LEASE_FILE_DNSMASQ
+
+    status, output = utils.exec_cmd(CAT_FILE, shell=True)
+    if not status:
+        for i in range(0, len(output)):
+            output_list = []
+            output_str = output[i]
+            for item in output_str.split(" "):
+                output_list.append(item)
+            lease_dnsmasq.append(dict(zip(title,output_list)))
+
+        #due to no mac field in IPv6 line, so do nothing here temporarily.
+        if macaddr != None:
+             pass
+
+        #remove bridge duid line
+        for i in range(0, len(lease_dnsmasq)):
+            if lease_dnsmasq[i]['expirytime'] == 'duid':
+                remove_list.append(lease_dnsmasq[i])
+
+        for i in range(0, len(remove_list)):
+                lease_dnsmasq.remove(remove_list[i])
+
+        #remove expiry leases
+        for i in range(0, len(lease_dnsmasq)):
+            temp = int(lease_dnsmasq[i]['expirytime'])
+            lease_dnsmasq[i]['expirytime'] = temp
+
+        remove_list = []
+        for i in range(0, len(lease_dnsmasq)):
+            if time.time() >= int(lease_dnsmasq[i]['expirytime']):
+                remove_list.append(lease_dnsmasq[i])
+
+        for i in range(0, len(remove_list)):
+                lease_dnsmasq.remove(remove_list[i])
+
+        #replace * to None
+        for i in range(0, len(lease_dnsmasq)):
+            if lease_dnsmasq[i]['hostname'] == "*":
+                lease_dnsmasq[i]['hostname'] = None
+            if lease_dnsmasq[i]['clientid'] == "*":
+                lease_dnsmasq[i]['clientid'] = None
+
+        return lease_dnsmasq
+    else:
+        logger.error("\"" + CAT_FILE + "\"" + "error")
+        logger.error(output)
+        return False
+
+def compare_values(op1, op2, network, iptype, logger):
+    """
+       check all printed values from API
+    """
+    dnsmasq = op1
+    api = op2
+    temp = int(api['expirytime'])
+    api['expirytime'] = temp
+
+    for j in range(0,len(dnsmasq)):
+        if dnsmasq[j]['hostname'] == api['hostname'] and \
+           dnsmasq[j]['expirytime'] == api['expirytime']:
+            if dnsmasq[j]['ipaddr'] == api['ipaddr'] and \
+               dnsmasq[j]['clientid'] == api['clientid']:
+
+                if iptype == "ipv4":
+                    logger.debug("PASS: hostname: %s expirytime: %s ipaddr: %s" \
+                           % (api['hostname'],api['expirytime'],api['ipaddr']))
+                    logger.debug("Unsupported: clientid: %s in IPv4" \
+                            % (api['clientid']))
+                elif iptype == "ipv6":
+                    logger.debug("PASS: hostname: %s expirytime: %s ipaddr: %s \
+clientid: %s" % (api['hostname'],api['expirytime'],api['ipaddr'],\
+api['clientid']))
+
+                if iptype == "ipv4" and api['mac'] == dnsmasq[j]['mac']:
+                    logger.debug("PASS: mac: %s" % api['mac'])
+                elif iptype == "ipv6" and api['iaid'] == dnsmasq[j]['mac']:
+                    logger.debug("PASS: iaid: %s" % api['iaid'])
+                else:
+                    logger.error("Fail: mac/iaid: %s/%s" % (api['mac'], \
+                                 api['iaid']))
+                    return False
+
+                break
+            else:
+                if j == len(dnsmasq) - 1:
+                    logger.debug("Last loop %d, FAIL: %s" % (j,api))
+                    logger.debug("failed on ipaddr or clientid")
+                    return False
+                else:
+                    logger.debug("Skipped loop %d,Warning: ipaddr: %s \
+clientid: %s" % (j,api['ipaddr'],api['clientid']))
+                    continue
+        else:
+            if j == len(dnsmasq) - 1:
+                logger.error("Fail: hostname: %s expirytime: %s ipaddr: %s \
+clientid: %s"  % (api['hostname'],api['expirytime'],api['ipaddr'], \
+api['clientid']))
+                logger.error("Last loop %d, FAIL: %s" % (j,api))
+                return False
+            else:
+                logger.debug("Skipped loop %d,Warning: hostname: \
+%s expirytime: %s" % (j,api['hostname'],api['expirytime']))
+                continue
+    if not api['iface'] == get_bridge_name(network,logger):
+        logger.error("FAIL: iface: %s" % api['iface'])
+        return False
+    else:
+        logger.debug("PASS: iface: %s" % api['iface'])
+    if not api['type'] == get_network_type(api['ipaddr'],logger):
+        logger.error("FAIL: type: %s" % api['type'])
+        return False
+    else:
+        logger.debug("PASS: type: %s" % api['type'])
+
+    if not api['prefix'] == int(get_ip_prefix(network, iptype ,logger)):
+        logger.error("FAIL: prefix: %s" % api['prefix'])
+        logger.error("FAIL: %s" % api)
+        return False
+    else:
+        logger.debug("PASS: prefix: %s" % api['prefix'])
+    if iptype == "ipv4":
+        if not api['iaid'] == None:
+            logger.error("FAIL: iaid: %s" % api['iaid'])
+            return False
+        else:
+            logger.debug("Unsupported: iaid: %s in IPv4" % api['iaid'])
+            logger.debug("PASS: %s" % api)
+    elif iptype == "ipv6":
+         logger.debug("Ignoring mac checking on IPv6 line %s" % api['mac'])
+         logger.debug("PASS: %s" % api)
+
+    return True
+
+def check_values(op1, op2, network, logger):
+     """
+        check each line accorting to ip type, if ipv4 go to check_ipv4_values
+        if ipv6, go to check_ipv6_values.
+     """
+     networkname = network
+     dnsmasq = op1
+     api = op2
+
+     for i in range(0, len(api)):
+         if check_ip(api[i]['ipaddr'],logger) == "ipv4":
+             if not compare_values(dnsmasq,api[i],networkname,"ipv4",logger):
+                 return False
+         elif check_ip(api[i]['ipaddr'],logger) == "ipv6":
+             if not compare_values(dnsmasq,api[i],networkname,"ipv6",logger):
+                 return False
+         else:
+             logger.error("invalid list element for ipv4 and ipv6")
+             return False
+     return True
+
+def network_dhcp_leases(params):
+    """
+       test API for DHCPLeases in class virNetwork
+    """
+    global LEASE_FILE_DNSMASQ
+    logger = params['logger']
+    networkname = params['networkname']
+    LEASE_FILE_DNSMASQ = "/var/lib/libvirt/dnsmasq/" + networkname + ".leases"
+    mac_value = params.get('macaddr', None)
+    conn = sharedmod.libvirtobj['conn']
+    logger.info("The given mac is %s" % (mac_value))
+
+    if not os.path.exists(LEASE_FILE_DNSMASQ):
+        logger.error("leases file for %s is not exist" % networkname)
+        logger.error("%s" % LEASE_FILE_DNSMASQ)
+        return 1
+    dhcp_lease_dns = get_info_from_dnsmasq(networkname, mac_value, logger)
+    logger.info("From dnsmasq: %s" % (dhcp_lease_dns))
+    if not dhcp_lease_dns:
+        return 1
+
+    netobj = conn.networkLookupByName(networkname)
+
+    try:
+        dhcp_lease_api = netobj.DHCPLeases(mac_value,0)
+        if not dhcp_lease_api and dhcp_lease_dns:
+            logger.info("From API: %s" % (dhcp_lease_api))
+            return 1 
+        logger.info("From API: %s" % (dhcp_lease_api))
+        if not check_values(dhcp_lease_dns,dhcp_lease_api,networkname,logger):
+           return 1
+
+    except libvirtError, e:
+        logger.error("API error message: %s, error code is %s" \
+                     % (e.message, e.get_error_code()))
+        return 1
+
+    return 0
-- 
1.8.3.1




More information about the libvir-list mailing list