diff options
-rw-r--r-- | tests/topotests/lib/bgp.py | 761 | ||||
-rw-r--r-- | tests/topotests/lib/common_config.py | 577 |
2 files changed, 1321 insertions, 17 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index 971bbd0f3..5d49cfd09 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -27,6 +27,8 @@ import sys from lib import topotest from lib.topolog import logger +from lib.topogen import TopoRouter, get_topogen + # Import common_config to use commomnly used APIs from lib.common_config import ( create_common_configuration, @@ -166,6 +168,7 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True ipv4_data = bgp_addr_data.setdefault("ipv4", {}) ipv6_data = bgp_addr_data.setdefault("ipv6", {}) + l2vpn_data = bgp_addr_data.setdefault("l2vpn", {}) neigh_unicast = ( True @@ -174,6 +177,8 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True else False ) + l2vpn_evpn = True if l2vpn_data.setdefault("evpn", {}) else False + if neigh_unicast: data_all_bgp = __create_bgp_unicast_neighbor( tgen, @@ -184,6 +189,11 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True config_data=data_all_bgp, ) + if l2vpn_evpn: + data_all_bgp = __create_l2vpn_evpn_address_family( + tgen, topo, bgp_data, router, config_data=data_all_bgp + ) + try: result = create_common_configuration( tgen, router, data_all_bgp, "bgp", build, load_config @@ -467,6 +477,166 @@ def __create_bgp_unicast_neighbor( return config_data +def __create_l2vpn_evpn_address_family( + tgen, topo, input_dict, router, config_data=None +): + """ + Helper API to create configuration for l2vpn evpn address-family + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : json file data + * `input_dict` : Input dict data, required when configuring + from testcase + * `router` : router id to be configured. + * `build` : Only for initial setup phase this is set as True. + """ + + result = False + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + bgp_data = input_dict["address_family"] + + for family_type, family_dict in bgp_data.iteritems(): + if family_type != "l2vpn": + continue + + family_data = family_dict["evpn"] + if family_data: + config_data.append("address-family l2vpn evpn") + + advertise_data = family_data.setdefault("advertise", {}) + neighbor_data = family_data.setdefault("neighbor", {}) + advertise_all_vni_data = family_data.setdefault("advertise-all-vni", None) + rd_data = family_data.setdefault("rd", None) + no_rd_data = family_data.setdefault("no rd", False) + route_target_data = family_data.setdefault("route-target", {}) + + if advertise_data: + for address_type, unicast_type in advertise_data.items(): + + if isinstance(unicast_type, dict): + for key, value in unicast_type.items(): + cmd = "advertise {} {}".format(address_type, key) + + if value: + route_map = value.setdefault("route-map", {}) + advertise_del_action = value.setdefault("delete", None) + + if route_map: + cmd = "{} route-map {}".format(cmd, route_map) + + if advertise_del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + + if neighbor_data: + for neighbor, neighbor_data in neighbor_data.items(): + ipv4_neighbor = neighbor_data.setdefault("ipv4", {}) + ipv6_neighbor = neighbor_data.setdefault("ipv6", {}) + + if ipv4_neighbor: + for neighbor_name, action in ipv4_neighbor.items(): + neighbor_ip = topo[neighbor]["links"][neighbor_name][ + "ipv4" + ].split("/")[0] + + if isinstance(action, dict): + next_hop_self = action.setdefault("next_hop_self", None) + route_maps = action.setdefault("route_maps", {}) + + if next_hop_self is not None: + if next_hop_self is True: + config_data.append( + "neighbor {} " + "next-hop-self".format(neighbor_ip) + ) + elif next_hop_self is False: + config_data.append( + "no neighbor {} " + "next-hop-self".format(neighbor_ip) + ) + + if route_maps: + for route_map in route_maps: + name = route_map.setdefault("name", {}) + direction = route_map.setdefault("direction", "in") + del_action = route_map.setdefault("delete", False) + + if not name: + logger.info( + "Router %s: 'name' " + "not present in " + "input_dict for BGP " + "neighbor route name", + router, + ) + else: + cmd = "neighbor {} route-map {} " "{}".format( + neighbor_ip, name, direction + ) + + if del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + + else: + if action == "activate": + cmd = "neighbor {} activate".format(neighbor_ip) + elif action == "deactivate": + cmd = "no neighbor {} activate".format(neighbor_ip) + + config_data.append(cmd) + + if ipv6_neighbor: + for neighbor_name, action in ipv4_neighbor.items(): + neighbor_ip = topo[neighbor]["links"][neighbor_name][ + "ipv6" + ].split("/")[0] + if action == "activate": + cmd = "neighbor {} activate".format(neighbor_ip) + elif action == "deactivate": + cmd = "no neighbor {} activate".format(neighbor_ip) + + config_data.append(cmd) + + if advertise_all_vni_data == True: + cmd = "advertise-all-vni" + config_data.append(cmd) + elif advertise_all_vni_data == False: + cmd = "no advertise-all-vni" + config_data.append(cmd) + + if rd_data: + cmd = "rd {}".format(rd_data) + config_data.append(cmd) + + if no_rd_data: + cmd = "no rd {}".format(no_rd_data) + config_data.append(cmd) + + if route_target_data: + for rt_type, rt_dict in route_target_data.items(): + for _rt_dict in rt_dict: + rt_value = _rt_dict.setdefault("value", None) + del_rt = _rt_dict.setdefault("delete", None) + + if rt_value: + cmd = "route-target {} {}".format(rt_type, rt_value) + if del_rt: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + return config_data + + def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True): """ Helper API to create neighbor specific configuration @@ -489,7 +659,7 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True): for dest_link, peer in peer_dict["dest_link"].iteritems(): nh_details = topo[name] - if "vrfs" in topo[router]: + if "vrfs" in topo[router] or type(nh_details["bgp"]) is list: remote_as = nh_details["bgp"][0]["local_as"] else: remote_as = nh_details["bgp"]["local_as"] @@ -905,7 +1075,10 @@ def verify_bgp_convergence(tgen, topo, dut=None): errormsg(str) or True """ - logger.debug("Entering lib API: verify_bgp_convergence()") + result = False + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + tgen = get_topogen() for router, rnode in tgen.routers().iteritems(): if "bgp" not in topo["routers"][router]: continue @@ -968,13 +1141,14 @@ def verify_bgp_convergence(tgen, topo, dut=None): ) return errormsg - l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][ - "peers" - ] - nh_state = l2VpnEvpn_data[neighbor_ip]["state"] + if "l2VpnEvpn" in show_bgp_json[vrf]: + l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][ + "peers" + ] + nh_state = l2VpnEvpn_data[neighbor_ip]["state"] - if nh_state == "Established": - no_of_evpn_peer += 1 + if nh_state == "Established": + no_of_evpn_peer += 1 if no_of_evpn_peer == total_evpn_peer: logger.info( @@ -982,6 +1156,7 @@ def verify_bgp_convergence(tgen, topo, dut=None): router, vrf, ) + result = True else: errormsg = ( "[DUT: %s] VRF: %s, BGP is not converged " @@ -989,22 +1164,22 @@ def verify_bgp_convergence(tgen, topo, dut=None): ) return errormsg else: + total_peer = 0 for addr_type in bgp_addr_type.keys(): if not check_address_types(addr_type): continue - total_peer = 0 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] for bgp_neighbor in bgp_neighbors: total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"]) + no_of_peer = 0 for addr_type in bgp_addr_type.keys(): if not check_address_types(addr_type): continue bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] - no_of_peer = 0 for bgp_neighbor, peer_data in bgp_neighbors.items(): for dest_link in peer_data["dest_link"].keys(): data = topo["routers"][bgp_neighbor]["links"] @@ -1015,8 +1190,11 @@ def verify_bgp_convergence(tgen, topo, dut=None): "neighbor_type" in peer_details and peer_details["neighbor_type"] == "link-local" ): - neighbor_ip = get_ipv6_linklocal_address( - topo["routers"], bgp_neighbor, dest_link + intf = topo["routers"][bgp_neighbor]["links"][ + dest_link + ]["interface"] + neighbor_ip = get_frr_ipv6_linklocal( + tgen, bgp_neighbor, intf ) elif "source_link" in peer_details: neighbor_ip = topo["routers"][bgp_neighbor][ @@ -1036,7 +1214,7 @@ def verify_bgp_convergence(tgen, topo, dut=None): 0 ] nh_state = None - + neighbor_ip = neighbor_ip.lower() if addr_type == "ipv4": ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][ "peers" @@ -1046,7 +1224,8 @@ def verify_bgp_convergence(tgen, topo, dut=None): ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][ "peers" ] - nh_state = ipv6_data[neighbor_ip]["state"] + if neighbor_ip in ipv6_data: + nh_state = ipv6_data[neighbor_ip]["state"] if nh_state == "Established": no_of_peer += 1 @@ -1059,8 +1238,9 @@ def verify_bgp_convergence(tgen, topo, dut=None): (router, vrf, addr_type)) return errormsg - logger.debug("Exiting API: verify_bgp_convergence()") - return True + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + return result @retry(attempts=3, wait=4, return_is_str=True) @@ -3463,3 +3643,552 @@ def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut): return errormsg logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + +@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) +def verify_attributes_for_evpn_routes( + tgen, + topo, + dut, + input_dict, + rd=None, + rt=None, + ethTag=None, + ipLen=None, + rd_peer=None, + rt_peer=None, +): + """ + API to verify rd and rt value using "sh bgp l2vpn evpn 10.1.1.1" + command. + + Parameters + ---------- + * `tgen`: topogen object + * `topo` : json file data + * `dut` : device under test + * `input_dict`: having details like - for which route, rd value + needs to be verified + * `rd` : route distinguisher + * `rt` : route target + * `ethTag` : Ethernet Tag + * `ipLen` : IP prefix length + * `rd_peer` : Peer name from which RD will be auto-generated + * `rt_peer` : Peer name from which RT will be auto-generated + + Usage + ----- + input_dict_1 = { + "r1": { + "static_routes": [{ + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + "vrf": "RED" + }] + } + } + + result = verify_attributes_for_evpn_routes(tgen, topo, + input_dict, rd = "10.0.0.33:1") + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + for router in input_dict.keys(): + rnode = tgen.routers()[dut] + + if "static_routes" in input_dict[router]: + for static_route in input_dict[router]["static_routes"]: + network = static_route["network"] + + if "vrf" in static_route: + vrf = static_route["vrf"] + + if type(network) is not list: + network = [network] + + for route in network: + route = route.split("/")[0] + _addr_type = validate_ip_address(route) + if "v4" in _addr_type: + input_afi = "v4" + elif "v6" in _addr_type: + input_afi = "v6" + + cmd = "show bgp l2vpn evpn {} json".format(route) + evpn_rd_value_json = run_frr_cmd(rnode, cmd, isjson=True) + if not bool(evpn_rd_value_json): + errormsg = "No output for '{}' cli".format(cmd) + return errormsg + + if rd is not None and rd != "auto": + logger.info( + "[DUT: %s]: Verifying rd value for " "evpn route %s:", + dut, + route, + ) + + if rd in evpn_rd_value_json: + rd_value_json = evpn_rd_value_json[rd] + if rd_value_json["rd"] != rd: + errormsg = ( + "[DUT: %s] Failed: Verifying" + " RD value for EVPN route: %s" + "[FAILED]!!, EXPECTED : %s " + " FOUND : %s" + % (dut, route, rd, rd_value_json["rd"]) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: Verifying RD value for" + " EVPN route: %s [PASSED]|| " + "Found Exprected: %s", + dut, + route, + rd, + ) + return True + + else: + errormsg = ( + "[DUT: %s] RD : %s is not present" + " in cli json output" % (dut, rd) + ) + return errormsg + + if rd == "auto": + logger.info( + "[DUT: %s]: Verifying auto-rd value for " "evpn route %s:", + dut, + route, + ) + + if rd_peer: + index = 1 + vni_dict = {} + + rnode = tgen.routers()[rd_peer] + vrfs = topo["routers"][rd_peer]["vrfs"] + for vrf_dict in vrfs: + vni_dict[vrf_dict["name"]] = index + index += 1 + + show_bgp_json = run_frr_cmd( + rnode, "show bgp vrf all summary json", isjson=True + ) + + # Verifying output dictionary show_bgp_json is empty + if not bool(show_bgp_json): + errormsg = "BGP is not running" + return errormsg + + show_bgp_json_vrf = show_bgp_json[vrf] + for afi, afi_data in show_bgp_json_vrf.items(): + if input_afi not in afi: + continue + router_id = afi_data["routerId"] + + rd = "{}:{}".format(router_id, vni_dict[vrf]) + if rd in evpn_rd_value_json: + rd_value_json = evpn_rd_value_json[rd] + if rd_value_json["rd"] != rd: + errormsg = ( + "[DUT: %s] Failed: Verifying" + " RD value for EVPN route: %s" + "[FAILED]!!, EXPECTED : %s " + " FOUND : %s" + % (dut, route, rd, rd_value_json["rd"]) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: Verifying RD value for" + " EVPN route: %s [PASSED]|| " + "Found Exprected: %s", + dut, + route, + rd, + ) + return True + + else: + errormsg = ( + "[DUT: %s] RD : %s is not present" + " in cli json output" % (dut, rd) + ) + return errormsg + + if rt == "auto": + logger.info( + "[DUT: %s]: Verifying auto-rt value for " "evpn route %s:", + dut, + route, + ) + + if rt_peer: + vni_dict = {} + + rnode = tgen.routers()[rt_peer] + show_bgp_json = run_frr_cmd( + rnode, "show bgp vrf all summary json", isjson=True + ) + + # Verifying output dictionary show_bgp_json is empty + if not bool(show_bgp_json): + errormsg = "BGP is not running" + return errormsg + + show_bgp_json_vrf = show_bgp_json[vrf] + for afi, afi_data in show_bgp_json_vrf.items(): + if input_afi not in afi: + continue + as_num = afi_data["as"] + + show_vrf_vni_json = run_frr_cmd( + rnode, "show vrf vni json", isjson=True + ) + + vrfs = show_vrf_vni_json["vrfs"] + for vrf_dict in vrfs: + if vrf_dict["vrf"] == vrf: + vni_dict[vrf_dict["vrf"]] = str(vrf_dict["vni"]) + + # If AS is 4 byte, FRR uses only the lower 2 bytes of ASN+VNI + # for auto derived RT value. + if as_num > 65535: + as_bin = bin(as_num) + as_bin = as_bin[-16:] + as_num = int(as_bin, 2) + + rt = "{}:{}".format(str(as_num), vni_dict[vrf]) + for _rd, route_data in evpn_rd_value_json.items(): + if route_data["ip"] == route: + for rt_data in route_data["paths"]: + if vni_dict[vrf] == rt_data["VNI"]: + rt_string = rt_data["extendedCommunity"][ + "string" + ] + rt_input = "RT:{}".format(rt) + if rt_input not in rt_string: + errormsg = ( + "[DUT: %s] Failed:" + " Verifying RT " + "value for EVPN " + " route: %s" + "[FAILED]!!," + " EXPECTED : %s " + " FOUND : %s" + % (dut, route, rt_input, rt_string) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: Verifying " + "RT value for EVPN " + "route: %s [PASSED]||" + "Found Exprected: %s", + dut, + route, + rt_input, + ) + return True + + else: + errormsg = ( + "[DUT: %s] Route : %s is not" + " present in cli json output" % (dut, route) + ) + return errormsg + + if rt is not None and rt != "auto": + logger.info( + "[DUT: %s]: Verifying rt value for " "evpn route %s:", + dut, + route, + ) + + if type(rt) is not list: + rt = [rt] + + for _rt in rt: + for _rd, route_data in evpn_rd_value_json.items(): + if route_data["ip"] == route: + for rt_data in route_data["paths"]: + rt_string = rt_data["extendedCommunity"][ + "string" + ] + rt_input = "RT:{}".format(_rt) + if rt_input not in rt_string: + errormsg = ( + "[DUT: %s] Failed: " + "Verifying RT value " + "for EVPN route: %s" + "[FAILED]!!," + " EXPECTED : %s " + " FOUND : %s" + % (dut, route, rt_input, rt_string) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: Verifying RT" + " value for EVPN route:" + " %s [PASSED]|| " + "Found Exprected: %s", + dut, + route, + rt_input, + ) + return True + + else: + errormsg = ( + "[DUT: %s] Route : %s is not" + " present in cli json output" % (dut, route) + ) + return errormsg + + if ethTag is not None: + logger.info( + "[DUT: %s]: Verifying ethTag value for " "evpn route :", dut + ) + + for _rd, route_data in evpn_rd_value_json.items(): + if route_data["ip"] == route: + if route_data["ethTag"] != ethTag: + errormsg = ( + "[DUT: %s] RD: %s, Failed: " + "Verifying ethTag value " + "for EVPN route: %s" + "[FAILED]!!," + " EXPECTED : %s " + " FOUND : %s" + % ( + dut, + _rd, + route, + ethTag, + route_data["ethTag"], + ) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: RD: %s, Verifying " + "ethTag value for EVPN route:" + " %s [PASSED]|| " + "Found Exprected: %s", + dut, + _rd, + route, + ethTag, + ) + return True + + else: + errormsg = ( + "[DUT: %s] RD: %s, Route : %s " + "is not present in cli json " + "output" % (dut, _rd, route) + ) + return errormsg + + if ipLen is not None: + logger.info( + "[DUT: %s]: Verifying ipLen value for " "evpn route :", dut + ) + + for _rd, route_data in evpn_rd_value_json.items(): + if route_data["ip"] == route: + if route_data["ipLen"] != int(ipLen): + errormsg = ( + "[DUT: %s] RD: %s, Failed: " + "Verifying ipLen value " + "for EVPN route: %s" + "[FAILED]!!," + " EXPECTED : %s " + " FOUND : %s" + % (dut, _rd, route, ipLen, route_data["ipLen"]) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: RD: %s, Verifying " + "ipLen value for EVPN route:" + " %s [PASSED]|| " + "Found Exprected: %s", + dut, + _rd, + route, + ipLen, + ) + return True + + else: + errormsg = ( + "[DUT: %s] RD: %s, Route : %s " + "is not present in cli json " + "output " % (dut, route) + ) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return False + + +@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) +def verify_evpn_routes( + tgen, topo, dut, input_dict, routeType=5, EthTag=0, next_hop=None +): + """ + API to verify evpn routes using "sh bgp l2vpn evpn" + command. + + Parameters + ---------- + * `tgen`: topogen object + * `topo` : json file data + * `dut` : device under test + * `input_dict`: having details like - for which route, rd value + needs to be verified + * `route_type` : Route type 5 is supported as of now + * `EthTag` : Ethernet tag, by-default is 0 + * `next_hop` : Prefered nexthop for the evpn routes + + Usage + ----- + input_dict_1 = { + "r1": { + "static_routes": [{ + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + "vrf": "RED" + }] + } + } + result = verify_evpn_routes(tgen, topo, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + for router in input_dict.keys(): + rnode = tgen.routers()[dut] + + logger.info("[DUT: %s]: Verifying evpn routes: ", dut) + + if "static_routes" in input_dict[router]: + for static_route in input_dict[router]["static_routes"]: + network = static_route["network"] + + if type(network) is not list: + network = [network] + + missing_routes = {} + for route in network: + rd_keys = 0 + ip_len = route.split("/")[1] + route = route.split("/")[0] + + prefix = "[{}]:[{}]:[{}]:[{}]".format( + routeType, EthTag, ip_len, route + ) + + cmd = "show bgp l2vpn evpn route json" + evpn_value_json = run_frr_cmd(rnode, cmd, isjson=True) + + if not bool(evpn_value_json): + errormsg = "No output for '{}' cli".format(cmd) + return errormsg + + if evpn_value_json["numPrefix"] == 0: + errormsg = "[DUT: %s]: No EVPN prefixes exist" % (dut) + return errormsg + + for key, route_data_json in evpn_value_json.items(): + if isinstance(route_data_json, dict): + rd_keys += 1 + if prefix not in route_data_json: + missing_routes[key] = prefix + + if rd_keys == len(missing_routes.keys()): + errormsg = ( + "[DUT: %s]: " + "Missing EVPN routes: " + "%s [FAILED]!!" % (dut, list(set(missing_routes.values()))) + ) + return errormsg + + for key, route_data_json in evpn_value_json.items(): + if isinstance(route_data_json, dict): + if prefix not in route_data_json: + continue + + for paths in route_data_json[prefix]["paths"]: + for path in paths: + if path["routeType"] != routeType: + errormsg = ( + "[DUT: %s]: " + "Verifying routeType " + "for EVPN route: %s " + "[FAILED]!! " + "Expected: %s, " + "Found: %s" + % ( + dut, + prefix, + routeType, + path["routeType"], + ) + ) + return errormsg + + elif next_hop: + for nh_dict in path["nexthops"]: + if nh_dict["ip"] != next_hop: + errormsg = ( + "[DUT: %s]: " + "Verifying " + "nexthop for " + "EVPN route: %s" + "[FAILED]!! " + "Expected: %s," + " Found: %s" + % ( + dut, + prefix, + next_hop, + nh_dict["ip"], + ) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: Verifying " + "EVPN route : %s, " + "routeType: %s is " + "installed " + "[PASSED]|| ", + dut, + prefix, + routeType, + ) + return True + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + return False diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index d72d0aa22..2b1f269e6 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -933,6 +933,16 @@ def create_vrf_cfg(tgen, topo, input_dict=None, build=False): ) rnode.run(cmd) + if vni: + config_data.append("vrf {}".format(vrf["name"])) + cmd = "vni {}".format(vni) + config_data.append(cmd) + + if del_vni: + config_data.append("vrf {}".format(vrf["name"])) + cmd = "no vni {}".format(del_vni) + config_data.append(cmd) + result = create_common_configuration( tgen, c_router, config_data, "vrf", build=build ) @@ -984,6 +994,34 @@ def create_interface_in_kernel( rnode.run(cmd) +def shutdown_bringup_interface_in_kernel(tgen, dut, intf_name, ifaceaction=False): + """ + Cretae interfaces in kernel for ipv4/ipv6 + Config is done in Linux Kernel: + + Parameters + ---------- + * `tgen` : Topogen object + * `dut` : Device for which interfaces to be added + * `intf_name` : interface name + * `ifaceaction` : False to shutdown and True to bringup the + ineterface + """ + + rnode = tgen.routers()[dut] + + cmd = "ip link set dev" + if ifaceaction: + action = "up" + cmd = "{} {} {}".format(cmd, intf_name, action) + else: + action = "down" + cmd = "{} {} {}".format(cmd, intf_name, action) + + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + rnode.run(cmd) + + def validate_ip_address(ip_address): """ Validates the type of ip address @@ -1042,7 +1080,7 @@ def check_address_types(addr_type=None): return addr_types if addr_type not in addr_types: - logger.error( + logger.debug( "{} not in supported/configured address types {}".format( addr_type, addr_types ) @@ -1732,6 +1770,7 @@ def create_route_maps(tgen, input_dict, build=False): set_action = set_data.setdefault("set_action", None) nexthop = set_data.setdefault("nexthop", None) origin = set_data.setdefault("origin", None) + ext_comm_list = set_data.setdefault("extcommunity", {}) # Local Preference if local_preference: @@ -1796,6 +1835,19 @@ def create_route_maps(tgen, input_dict, build=False): logger.error("In large_comm_list 'id' not" " provided") return False + if ext_comm_list: + rt = ext_comm_list.setdefault("rt", None) + del_comm = ext_comm_list.setdefault("delete", None) + if rt: + cmd = "set extcommunity rt {}".format(rt) + if del_comm: + cmd = "{} delete".format(cmd) + + rmap_data.append(cmd) + else: + logger.debug("In ext_comm_list 'rt' not" " provided") + return False + # Weight if weight: rmap_data.append("set weight {}".format(weight)) @@ -2151,6 +2203,249 @@ def addKernelRoute( return True +def configure_vxlan(tgen, input_dict): + """ + Add and configure vxlan + + * `tgen`: tgen onject + * `input_dict` : data for vxlan config + + Usage: + ------ + input_dict= { + "dcg2":{ + "vxlan":[{ + "vxlan_name": "vxlan75100", + "vxlan_id": "75100", + "dstport": 4789, + "local_addr": "120.0.0.1", + "learning": "no", + "delete": True + }] + } + } + + configure_vxlan(tgen, input_dict) + + Returns: + ------- + True or errormsg + + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + router_list = tgen.routers() + for dut in input_dict.keys(): + rnode = tgen.routers()[dut] + + if "vxlan" in input_dict[dut]: + for vxlan_dict in input_dict[dut]["vxlan"]: + cmd = "ip link " + + del_vxlan = vxlan_dict.setdefault("delete", None) + vxlan_names = vxlan_dict.setdefault("vxlan_name", []) + vxlan_ids = vxlan_dict.setdefault("vxlan_id", []) + dstport = vxlan_dict.setdefault("dstport", None) + local_addr = vxlan_dict.setdefault("local_addr", None) + learning = vxlan_dict.setdefault("learning", None) + + config_data = [] + if vxlan_names and vxlan_ids: + for vxlan_name, vxlan_id in zip(vxlan_names, vxlan_ids): + cmd = "ip link" + + if del_vxlan: + cmd = "{} del {} type vxlan id {}".format( + cmd, vxlan_name, vxlan_id + ) + else: + cmd = "{} add {} type vxlan id {}".format( + cmd, vxlan_name, vxlan_id + ) + + if dstport: + cmd = "{} dstport {}".format(cmd, dstport) + + if local_addr: + ip_cmd = "ip addr add {} dev {}".format( + local_addr, vxlan_name + ) + if del_vxlan: + ip_cmd = "ip addr del {} dev {}".format( + local_addr, vxlan_name + ) + + config_data.append(ip_cmd) + + cmd = "{} local {}".format(cmd, local_addr) + + if learning == "no": + cmd = "{} {} learning".format(cmd, learning) + + elif learning == "yes": + cmd = "{} learning".format(cmd) + + config_data.append(cmd) + + try: + for _cmd in config_data: + logger.info("[DUT: %s]: Running command: %s", dut, _cmd) + rnode.run(_cmd) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + return True + + +def configure_brctl(tgen, topo, input_dict): + """ + Add and configure brctl + + * `tgen`: tgen onject + * `input_dict` : data for brctl config + + Usage: + ------ + input_dict= { + dut:{ + "brctl": [{ + "brctl_name": "br100", + "addvxlan": "vxlan75100", + "vrf": "RED", + "stp": "off" + }] + } + } + + configure_brctl(tgen, topo, input_dict) + + Returns: + ------- + True or errormsg + + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + router_list = tgen.routers() + for dut in input_dict.keys(): + rnode = tgen.routers()[dut] + + if "brctl" in input_dict[dut]: + for brctl_dict in input_dict[dut]["brctl"]: + + brctl_names = brctl_dict.setdefault("brctl_name", []) + addvxlans = brctl_dict.setdefault("addvxlan", []) + stp_values = brctl_dict.setdefault("stp", []) + vrfs = brctl_dict.setdefault("vrf", []) + + ip_cmd = "ip link set" + for brctl_name, vxlan, vrf, stp in zip( + brctl_names, addvxlans, vrfs, stp_values + ): + + ip_cmd_list = [] + cmd = "brctl addbr {}".format(brctl_name) + + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + rnode.run(cmd) + + ip_cmd_list.append("{} up dev {}".format(ip_cmd, brctl_name)) + + if vxlan: + cmd = "brctl addif {} {}".format(brctl_name, vxlan) + + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + rnode.run(cmd) + + ip_cmd_list.append("{} up dev {}".format(ip_cmd, vxlan)) + + if stp: + cmd = "brctl stp {} {}".format(brctl_name, stp) + + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + rnode.run(cmd) + + if vrf: + ip_cmd_list.append( + "{} dev {} master {}".format(ip_cmd, brctl_name, vrf) + ) + + for intf_name, data in topo["routers"][dut]["links"].items(): + if "vrf" not in data: + continue + + if data["vrf"] == vrf: + ip_cmd_list.append( + "{} up dev {}".format(ip_cmd, data["interface"]) + ) + + try: + for _ip_cmd in ip_cmd_list: + logger.info("[DUT: %s]: Running command: %s", dut, _ip_cmd) + rnode.run(_ip_cmd) + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +def configure_interface_mac(tgen, input_dict): + """ + Add and configure brctl + + * `tgen`: tgen onject + * `input_dict` : data for mac config + + input_mac= { + "edge1":{ + "br75100": "00:80:48:BA:d1:00, + "br75200": "00:80:48:BA:d1:00 + } + } + + configure_interface_mac(tgen, input_mac) + + Returns: + ------- + True or errormsg + + """ + + router_list = tgen.routers() + for dut in input_dict.keys(): + rnode = tgen.routers()[dut] + + for intf, mac in input_dict[dut].items(): + cmd = "ifconfig {} hw ether {}".format(intf, mac) + logger.info("[DUT: %s]: Running command: %s", dut, cmd) + + try: + result = rnode.run(cmd) + if len(result) != 0: + return result + + except InvalidCLIError: + # Traceback + errormsg = traceback.format_exc() + logger.error(errormsg) + return errormsg + + return True + + ############################################# # Verification APIs ############################################# @@ -2875,3 +3170,283 @@ def verify_create_community_list(tgen, input_dict): logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return True + + +def verify_cli_json(tgen, input_dict): + """ + API to verify if JSON is available for clis + command. + + Parameters + ---------- + * `tgen`: topogen object + * `input_dict`: CLIs for which JSON needs to be verified + Usage + ----- + input_dict = { + "edge1":{ + "cli": ["show evpn vni detail", show evpn rmac vni all] + } + } + + result = verify_cli_json(tgen, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + for dut in input_dict.keys(): + rnode = tgen.routers()[dut] + + for cli in input_dict[dut]["cli"]: + logger.info( + "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut, cli + ) + + test_cli = "{} json".format(cli) + ret_json = rnode.vtysh_cmd(test_cli, isjson=True) + if not bool(ret_json): + errormsg = "CLI: %s, JSON format is not available" % (cli) + return errormsg + elif "unknown" in ret_json or "Unknown" in ret_json: + errormsg = "CLI: %s, JSON format is not available" % (cli) + return errormsg + else: + logger.info( + "CLI : %s JSON format is available: " "\n %s", cli, ret_json + ) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + return True + + +@retry(attempts=2, wait=4, return_is_str=True, initial_wait=2) +def verify_evpn_vni(tgen, input_dict): + """ + API to verify evpn vni details using "show evpn vni detail json" + command. + + Parameters + ---------- + * `tgen`: topogen object + * `input_dict`: having details like - for which router, evpn details + needs to be verified + Usage + ----- + input_dict = { + "edge1":{ + "vni": [ + { + "75100":{ + "vrf": "RED", + "vxlanIntf": "vxlan75100", + "localVtepIp": "120.1.1.1", + "sviIntf": "br100" + } + } + ] + } + } + + result = verify_evpn_vni(tgen, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + for dut in input_dict.keys(): + rnode = tgen.routers()[dut] + + logger.info("[DUT: %s]: Verifying evpn vni details :", dut) + + cmd = "show evpn vni detail json" + evpn_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True) + if not bool(evpn_all_vni_json): + errormsg = "No output for '{}' cli".format(cmd) + return errormsg + + if "vni" in input_dict[dut]: + for vni_dict in input_dict[dut]["vni"]: + found = False + vni = vni_dict["name"] + for evpn_vni_json in evpn_all_vni_json: + if "vni" in evpn_vni_json: + if evpn_vni_json["vni"] != int(vni): + continue + + for attribute in vni_dict.keys(): + if vni_dict[attribute] != evpn_vni_json[attribute]: + errormsg = ( + "[DUT: %s] Verifying " + "%s for VNI: %s [FAILED]||" + ", EXPECTED : %s " + " FOUND : %s" + % ( + dut, + attribute, + vni, + vni_dict[attribute], + evpn_vni_json[attribute], + ) + ) + return errormsg + + else: + found = True + logger.info( + "[DUT: %s] Verifying" + " %s for VNI: %s , " + "Found Expected : %s ", + dut, + attribute, + vni, + evpn_vni_json[attribute], + ) + + if evpn_vni_json["state"] != "Up": + errormsg = ( + "[DUT: %s] Failed: Verifying" + " State for VNI: %s is not Up" % (dut, vni) + ) + return errormsg + + else: + errormsg = ( + "[DUT: %s] Failed:" + " VNI: %s is not present in JSON" % (dut, vni) + ) + return errormsg + + if found: + logger.info( + "[DUT %s]: Verifying VNI : %s " + "details and state is Up [PASSED]!!", + dut, + vni, + ) + return True + + else: + errormsg = ( + "[DUT: %s] Failed:" " vni details are not present in input data" % (dut) + ) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return False + + +@retry(attempts=2, wait=4, return_is_str=True, initial_wait=2) +def verify_vrf_vni(tgen, input_dict): + """ + API to verify vrf vni details using "show vrf vni json" + command. + + Parameters + ---------- + * `tgen`: topogen object + * `input_dict`: having details like - for which router, evpn details + needs to be verified + Usage + ----- + input_dict = { + "edge1":{ + "vrfs": [ + { + "RED":{ + "vni": 75000, + "vxlanIntf": "vxlan75100", + "sviIntf": "br100", + "routerMac": "00:80:48:ba:d1:00", + "state": "Up" + } + } + ] + } + } + + result = verify_vrf_vni(tgen, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + for dut in input_dict.keys(): + rnode = tgen.routers()[dut] + + logger.info("[DUT: %s]: Verifying vrf vni details :", dut) + + cmd = "show vrf vni json" + vrf_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True) + if not bool(vrf_all_vni_json): + errormsg = "No output for '{}' cli".format(cmd) + return errormsg + + if "vrfs" in input_dict[dut]: + for vrfs in input_dict[dut]["vrfs"]: + for vrf, vrf_dict in vrfs.items(): + found = False + for vrf_vni_json in vrf_all_vni_json["vrfs"]: + if "vrf" in vrf_vni_json: + if vrf_vni_json["vrf"] != vrf: + continue + + for attribute in vrf_dict.keys(): + if vrf_dict[attribute] == vrf_vni_json[attribute]: + found = True + logger.info( + "[DUT %s]: VRF: %s, " + "verifying %s " + ", Found Expected: %s " + "[PASSED]!!", + dut, + vrf, + attribute, + vrf_vni_json[attribute], + ) + else: + errormsg = ( + "[DUT: %s] VRF: %s, " + "verifying %s [FAILED!!] " + ", EXPECTED : %s " + ", FOUND : %s" + % ( + dut, + vrf, + attribute, + vrf_dict[attribute], + vrf_vni_json[attribute], + ) + ) + return errormsg + + else: + errormsg = "[DUT: %s] VRF: %s " "is not present in JSON" % ( + dut, + vrf, + ) + return errormsg + + if found: + logger.info( + "[DUT %s] Verifying VRF: %s " " details [PASSED]!!", + dut, + vrf, + ) + return True + + else: + errormsg = ( + "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut) + ) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return False |