diff options
author | Kuldeep Kashyap <kashyapk@vmware.com> | 2020-07-01 15:13:08 +0200 |
---|---|---|
committer | Kuldeep Kashyap <kashyapk@vmware.com> | 2020-07-22 08:12:55 +0200 |
commit | 3c334c391524159d944ffb2abf841d5f07df9721 (patch) | |
tree | 3f561d530b58bd625241c71c959d3e65f2ba7d56 /tests/topotests/lib/bgp.py | |
parent | Merge pull request #6780 from chiragshah6/evpn_dev2 (diff) | |
download | frr-3c334c391524159d944ffb2abf841d5f07df9721.tar.xz frr-3c334c391524159d944ffb2abf841d5f07df9721.zip |
tests: Adding framework support for EVPN-Type5 automation
1. Added APIs to create evpn related config.
2. Added APIs to verify evpn config and routes.
Signed-off-by: Kuldeep Kashyap <kashyapk@vmware.com>
Diffstat (limited to 'tests/topotests/lib/bgp.py')
-rw-r--r-- | tests/topotests/lib/bgp.py | 761 |
1 files changed, 745 insertions, 16 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index 971bbd0f3..5d49cfd09 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -27,6 +27,8 @@ import sys from lib import topotest from lib.topolog import logger +from lib.topogen import TopoRouter, get_topogen + # Import common_config to use commomnly used APIs from lib.common_config import ( create_common_configuration, @@ -166,6 +168,7 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True ipv4_data = bgp_addr_data.setdefault("ipv4", {}) ipv6_data = bgp_addr_data.setdefault("ipv6", {}) + l2vpn_data = bgp_addr_data.setdefault("l2vpn", {}) neigh_unicast = ( True @@ -174,6 +177,8 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True else False ) + l2vpn_evpn = True if l2vpn_data.setdefault("evpn", {}) else False + if neigh_unicast: data_all_bgp = __create_bgp_unicast_neighbor( tgen, @@ -184,6 +189,11 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True config_data=data_all_bgp, ) + if l2vpn_evpn: + data_all_bgp = __create_l2vpn_evpn_address_family( + tgen, topo, bgp_data, router, config_data=data_all_bgp + ) + try: result = create_common_configuration( tgen, router, data_all_bgp, "bgp", build, load_config @@ -467,6 +477,166 @@ def __create_bgp_unicast_neighbor( return config_data +def __create_l2vpn_evpn_address_family( + tgen, topo, input_dict, router, config_data=None +): + """ + Helper API to create configuration for l2vpn evpn address-family + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : json file data + * `input_dict` : Input dict data, required when configuring + from testcase + * `router` : router id to be configured. + * `build` : Only for initial setup phase this is set as True. + """ + + result = False + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + bgp_data = input_dict["address_family"] + + for family_type, family_dict in bgp_data.iteritems(): + if family_type != "l2vpn": + continue + + family_data = family_dict["evpn"] + if family_data: + config_data.append("address-family l2vpn evpn") + + advertise_data = family_data.setdefault("advertise", {}) + neighbor_data = family_data.setdefault("neighbor", {}) + advertise_all_vni_data = family_data.setdefault("advertise-all-vni", None) + rd_data = family_data.setdefault("rd", None) + no_rd_data = family_data.setdefault("no rd", False) + route_target_data = family_data.setdefault("route-target", {}) + + if advertise_data: + for address_type, unicast_type in advertise_data.items(): + + if isinstance(unicast_type, dict): + for key, value in unicast_type.items(): + cmd = "advertise {} {}".format(address_type, key) + + if value: + route_map = value.setdefault("route-map", {}) + advertise_del_action = value.setdefault("delete", None) + + if route_map: + cmd = "{} route-map {}".format(cmd, route_map) + + if advertise_del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + + if neighbor_data: + for neighbor, neighbor_data in neighbor_data.items(): + ipv4_neighbor = neighbor_data.setdefault("ipv4", {}) + ipv6_neighbor = neighbor_data.setdefault("ipv6", {}) + + if ipv4_neighbor: + for neighbor_name, action in ipv4_neighbor.items(): + neighbor_ip = topo[neighbor]["links"][neighbor_name][ + "ipv4" + ].split("/")[0] + + if isinstance(action, dict): + next_hop_self = action.setdefault("next_hop_self", None) + route_maps = action.setdefault("route_maps", {}) + + if next_hop_self is not None: + if next_hop_self is True: + config_data.append( + "neighbor {} " + "next-hop-self".format(neighbor_ip) + ) + elif next_hop_self is False: + config_data.append( + "no neighbor {} " + "next-hop-self".format(neighbor_ip) + ) + + if route_maps: + for route_map in route_maps: + name = route_map.setdefault("name", {}) + direction = route_map.setdefault("direction", "in") + del_action = route_map.setdefault("delete", False) + + if not name: + logger.info( + "Router %s: 'name' " + "not present in " + "input_dict for BGP " + "neighbor route name", + router, + ) + else: + cmd = "neighbor {} route-map {} " "{}".format( + neighbor_ip, name, direction + ) + + if del_action: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + + else: + if action == "activate": + cmd = "neighbor {} activate".format(neighbor_ip) + elif action == "deactivate": + cmd = "no neighbor {} activate".format(neighbor_ip) + + config_data.append(cmd) + + if ipv6_neighbor: + for neighbor_name, action in ipv4_neighbor.items(): + neighbor_ip = topo[neighbor]["links"][neighbor_name][ + "ipv6" + ].split("/")[0] + if action == "activate": + cmd = "neighbor {} activate".format(neighbor_ip) + elif action == "deactivate": + cmd = "no neighbor {} activate".format(neighbor_ip) + + config_data.append(cmd) + + if advertise_all_vni_data == True: + cmd = "advertise-all-vni" + config_data.append(cmd) + elif advertise_all_vni_data == False: + cmd = "no advertise-all-vni" + config_data.append(cmd) + + if rd_data: + cmd = "rd {}".format(rd_data) + config_data.append(cmd) + + if no_rd_data: + cmd = "no rd {}".format(no_rd_data) + config_data.append(cmd) + + if route_target_data: + for rt_type, rt_dict in route_target_data.items(): + for _rt_dict in rt_dict: + rt_value = _rt_dict.setdefault("value", None) + del_rt = _rt_dict.setdefault("delete", None) + + if rt_value: + cmd = "route-target {} {}".format(rt_type, rt_value) + if del_rt: + cmd = "no {}".format(cmd) + + config_data.append(cmd) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + return config_data + + def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True): """ Helper API to create neighbor specific configuration @@ -489,7 +659,7 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True): for dest_link, peer in peer_dict["dest_link"].iteritems(): nh_details = topo[name] - if "vrfs" in topo[router]: + if "vrfs" in topo[router] or type(nh_details["bgp"]) is list: remote_as = nh_details["bgp"][0]["local_as"] else: remote_as = nh_details["bgp"]["local_as"] @@ -905,7 +1075,10 @@ def verify_bgp_convergence(tgen, topo, dut=None): errormsg(str) or True """ - logger.debug("Entering lib API: verify_bgp_convergence()") + result = False + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + tgen = get_topogen() for router, rnode in tgen.routers().iteritems(): if "bgp" not in topo["routers"][router]: continue @@ -968,13 +1141,14 @@ def verify_bgp_convergence(tgen, topo, dut=None): ) return errormsg - l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][ - "peers" - ] - nh_state = l2VpnEvpn_data[neighbor_ip]["state"] + if "l2VpnEvpn" in show_bgp_json[vrf]: + l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][ + "peers" + ] + nh_state = l2VpnEvpn_data[neighbor_ip]["state"] - if nh_state == "Established": - no_of_evpn_peer += 1 + if nh_state == "Established": + no_of_evpn_peer += 1 if no_of_evpn_peer == total_evpn_peer: logger.info( @@ -982,6 +1156,7 @@ def verify_bgp_convergence(tgen, topo, dut=None): router, vrf, ) + result = True else: errormsg = ( "[DUT: %s] VRF: %s, BGP is not converged " @@ -989,22 +1164,22 @@ def verify_bgp_convergence(tgen, topo, dut=None): ) return errormsg else: + total_peer = 0 for addr_type in bgp_addr_type.keys(): if not check_address_types(addr_type): continue - total_peer = 0 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] for bgp_neighbor in bgp_neighbors: total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"]) + no_of_peer = 0 for addr_type in bgp_addr_type.keys(): if not check_address_types(addr_type): continue bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] - no_of_peer = 0 for bgp_neighbor, peer_data in bgp_neighbors.items(): for dest_link in peer_data["dest_link"].keys(): data = topo["routers"][bgp_neighbor]["links"] @@ -1015,8 +1190,11 @@ def verify_bgp_convergence(tgen, topo, dut=None): "neighbor_type" in peer_details and peer_details["neighbor_type"] == "link-local" ): - neighbor_ip = get_ipv6_linklocal_address( - topo["routers"], bgp_neighbor, dest_link + intf = topo["routers"][bgp_neighbor]["links"][ + dest_link + ]["interface"] + neighbor_ip = get_frr_ipv6_linklocal( + tgen, bgp_neighbor, intf ) elif "source_link" in peer_details: neighbor_ip = topo["routers"][bgp_neighbor][ @@ -1036,7 +1214,7 @@ def verify_bgp_convergence(tgen, topo, dut=None): 0 ] nh_state = None - + neighbor_ip = neighbor_ip.lower() if addr_type == "ipv4": ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][ "peers" @@ -1046,7 +1224,8 @@ def verify_bgp_convergence(tgen, topo, dut=None): ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][ "peers" ] - nh_state = ipv6_data[neighbor_ip]["state"] + if neighbor_ip in ipv6_data: + nh_state = ipv6_data[neighbor_ip]["state"] if nh_state == "Established": no_of_peer += 1 @@ -1059,8 +1238,9 @@ def verify_bgp_convergence(tgen, topo, dut=None): (router, vrf, addr_type)) return errormsg - logger.debug("Exiting API: verify_bgp_convergence()") - return True + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + return result @retry(attempts=3, wait=4, return_is_str=True) @@ -3463,3 +3643,552 @@ def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut): return errormsg logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + +@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) +def verify_attributes_for_evpn_routes( + tgen, + topo, + dut, + input_dict, + rd=None, + rt=None, + ethTag=None, + ipLen=None, + rd_peer=None, + rt_peer=None, +): + """ + API to verify rd and rt value using "sh bgp l2vpn evpn 10.1.1.1" + command. + + Parameters + ---------- + * `tgen`: topogen object + * `topo` : json file data + * `dut` : device under test + * `input_dict`: having details like - for which route, rd value + needs to be verified + * `rd` : route distinguisher + * `rt` : route target + * `ethTag` : Ethernet Tag + * `ipLen` : IP prefix length + * `rd_peer` : Peer name from which RD will be auto-generated + * `rt_peer` : Peer name from which RT will be auto-generated + + Usage + ----- + input_dict_1 = { + "r1": { + "static_routes": [{ + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + "vrf": "RED" + }] + } + } + + result = verify_attributes_for_evpn_routes(tgen, topo, + input_dict, rd = "10.0.0.33:1") + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + for router in input_dict.keys(): + rnode = tgen.routers()[dut] + + if "static_routes" in input_dict[router]: + for static_route in input_dict[router]["static_routes"]: + network = static_route["network"] + + if "vrf" in static_route: + vrf = static_route["vrf"] + + if type(network) is not list: + network = [network] + + for route in network: + route = route.split("/")[0] + _addr_type = validate_ip_address(route) + if "v4" in _addr_type: + input_afi = "v4" + elif "v6" in _addr_type: + input_afi = "v6" + + cmd = "show bgp l2vpn evpn {} json".format(route) + evpn_rd_value_json = run_frr_cmd(rnode, cmd, isjson=True) + if not bool(evpn_rd_value_json): + errormsg = "No output for '{}' cli".format(cmd) + return errormsg + + if rd is not None and rd != "auto": + logger.info( + "[DUT: %s]: Verifying rd value for " "evpn route %s:", + dut, + route, + ) + + if rd in evpn_rd_value_json: + rd_value_json = evpn_rd_value_json[rd] + if rd_value_json["rd"] != rd: + errormsg = ( + "[DUT: %s] Failed: Verifying" + " RD value for EVPN route: %s" + "[FAILED]!!, EXPECTED : %s " + " FOUND : %s" + % (dut, route, rd, rd_value_json["rd"]) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: Verifying RD value for" + " EVPN route: %s [PASSED]|| " + "Found Exprected: %s", + dut, + route, + rd, + ) + return True + + else: + errormsg = ( + "[DUT: %s] RD : %s is not present" + " in cli json output" % (dut, rd) + ) + return errormsg + + if rd == "auto": + logger.info( + "[DUT: %s]: Verifying auto-rd value for " "evpn route %s:", + dut, + route, + ) + + if rd_peer: + index = 1 + vni_dict = {} + + rnode = tgen.routers()[rd_peer] + vrfs = topo["routers"][rd_peer]["vrfs"] + for vrf_dict in vrfs: + vni_dict[vrf_dict["name"]] = index + index += 1 + + show_bgp_json = run_frr_cmd( + rnode, "show bgp vrf all summary json", isjson=True + ) + + # Verifying output dictionary show_bgp_json is empty + if not bool(show_bgp_json): + errormsg = "BGP is not running" + return errormsg + + show_bgp_json_vrf = show_bgp_json[vrf] + for afi, afi_data in show_bgp_json_vrf.items(): + if input_afi not in afi: + continue + router_id = afi_data["routerId"] + + rd = "{}:{}".format(router_id, vni_dict[vrf]) + if rd in evpn_rd_value_json: + rd_value_json = evpn_rd_value_json[rd] + if rd_value_json["rd"] != rd: + errormsg = ( + "[DUT: %s] Failed: Verifying" + " RD value for EVPN route: %s" + "[FAILED]!!, EXPECTED : %s " + " FOUND : %s" + % (dut, route, rd, rd_value_json["rd"]) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: Verifying RD value for" + " EVPN route: %s [PASSED]|| " + "Found Exprected: %s", + dut, + route, + rd, + ) + return True + + else: + errormsg = ( + "[DUT: %s] RD : %s is not present" + " in cli json output" % (dut, rd) + ) + return errormsg + + if rt == "auto": + logger.info( + "[DUT: %s]: Verifying auto-rt value for " "evpn route %s:", + dut, + route, + ) + + if rt_peer: + vni_dict = {} + + rnode = tgen.routers()[rt_peer] + show_bgp_json = run_frr_cmd( + rnode, "show bgp vrf all summary json", isjson=True + ) + + # Verifying output dictionary show_bgp_json is empty + if not bool(show_bgp_json): + errormsg = "BGP is not running" + return errormsg + + show_bgp_json_vrf = show_bgp_json[vrf] + for afi, afi_data in show_bgp_json_vrf.items(): + if input_afi not in afi: + continue + as_num = afi_data["as"] + + show_vrf_vni_json = run_frr_cmd( + rnode, "show vrf vni json", isjson=True + ) + + vrfs = show_vrf_vni_json["vrfs"] + for vrf_dict in vrfs: + if vrf_dict["vrf"] == vrf: + vni_dict[vrf_dict["vrf"]] = str(vrf_dict["vni"]) + + # If AS is 4 byte, FRR uses only the lower 2 bytes of ASN+VNI + # for auto derived RT value. + if as_num > 65535: + as_bin = bin(as_num) + as_bin = as_bin[-16:] + as_num = int(as_bin, 2) + + rt = "{}:{}".format(str(as_num), vni_dict[vrf]) + for _rd, route_data in evpn_rd_value_json.items(): + if route_data["ip"] == route: + for rt_data in route_data["paths"]: + if vni_dict[vrf] == rt_data["VNI"]: + rt_string = rt_data["extendedCommunity"][ + "string" + ] + rt_input = "RT:{}".format(rt) + if rt_input not in rt_string: + errormsg = ( + "[DUT: %s] Failed:" + " Verifying RT " + "value for EVPN " + " route: %s" + "[FAILED]!!," + " EXPECTED : %s " + " FOUND : %s" + % (dut, route, rt_input, rt_string) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: Verifying " + "RT value for EVPN " + "route: %s [PASSED]||" + "Found Exprected: %s", + dut, + route, + rt_input, + ) + return True + + else: + errormsg = ( + "[DUT: %s] Route : %s is not" + " present in cli json output" % (dut, route) + ) + return errormsg + + if rt is not None and rt != "auto": + logger.info( + "[DUT: %s]: Verifying rt value for " "evpn route %s:", + dut, + route, + ) + + if type(rt) is not list: + rt = [rt] + + for _rt in rt: + for _rd, route_data in evpn_rd_value_json.items(): + if route_data["ip"] == route: + for rt_data in route_data["paths"]: + rt_string = rt_data["extendedCommunity"][ + "string" + ] + rt_input = "RT:{}".format(_rt) + if rt_input not in rt_string: + errormsg = ( + "[DUT: %s] Failed: " + "Verifying RT value " + "for EVPN route: %s" + "[FAILED]!!," + " EXPECTED : %s " + " FOUND : %s" + % (dut, route, rt_input, rt_string) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: Verifying RT" + " value for EVPN route:" + " %s [PASSED]|| " + "Found Exprected: %s", + dut, + route, + rt_input, + ) + return True + + else: + errormsg = ( + "[DUT: %s] Route : %s is not" + " present in cli json output" % (dut, route) + ) + return errormsg + + if ethTag is not None: + logger.info( + "[DUT: %s]: Verifying ethTag value for " "evpn route :", dut + ) + + for _rd, route_data in evpn_rd_value_json.items(): + if route_data["ip"] == route: + if route_data["ethTag"] != ethTag: + errormsg = ( + "[DUT: %s] RD: %s, Failed: " + "Verifying ethTag value " + "for EVPN route: %s" + "[FAILED]!!," + " EXPECTED : %s " + " FOUND : %s" + % ( + dut, + _rd, + route, + ethTag, + route_data["ethTag"], + ) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: RD: %s, Verifying " + "ethTag value for EVPN route:" + " %s [PASSED]|| " + "Found Exprected: %s", + dut, + _rd, + route, + ethTag, + ) + return True + + else: + errormsg = ( + "[DUT: %s] RD: %s, Route : %s " + "is not present in cli json " + "output" % (dut, _rd, route) + ) + return errormsg + + if ipLen is not None: + logger.info( + "[DUT: %s]: Verifying ipLen value for " "evpn route :", dut + ) + + for _rd, route_data in evpn_rd_value_json.items(): + if route_data["ip"] == route: + if route_data["ipLen"] != int(ipLen): + errormsg = ( + "[DUT: %s] RD: %s, Failed: " + "Verifying ipLen value " + "for EVPN route: %s" + "[FAILED]!!," + " EXPECTED : %s " + " FOUND : %s" + % (dut, _rd, route, ipLen, route_data["ipLen"]) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: RD: %s, Verifying " + "ipLen value for EVPN route:" + " %s [PASSED]|| " + "Found Exprected: %s", + dut, + _rd, + route, + ipLen, + ) + return True + + else: + errormsg = ( + "[DUT: %s] RD: %s, Route : %s " + "is not present in cli json " + "output " % (dut, route) + ) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return False + + +@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) +def verify_evpn_routes( + tgen, topo, dut, input_dict, routeType=5, EthTag=0, next_hop=None +): + """ + API to verify evpn routes using "sh bgp l2vpn evpn" + command. + + Parameters + ---------- + * `tgen`: topogen object + * `topo` : json file data + * `dut` : device under test + * `input_dict`: having details like - for which route, rd value + needs to be verified + * `route_type` : Route type 5 is supported as of now + * `EthTag` : Ethernet tag, by-default is 0 + * `next_hop` : Prefered nexthop for the evpn routes + + Usage + ----- + input_dict_1 = { + "r1": { + "static_routes": [{ + "network": [NETWORK1_1[addr_type]], + "next_hop": NEXT_HOP_IP[addr_type], + "vrf": "RED" + }] + } + } + result = verify_evpn_routes(tgen, topo, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + for router in input_dict.keys(): + rnode = tgen.routers()[dut] + + logger.info("[DUT: %s]: Verifying evpn routes: ", dut) + + if "static_routes" in input_dict[router]: + for static_route in input_dict[router]["static_routes"]: + network = static_route["network"] + + if type(network) is not list: + network = [network] + + missing_routes = {} + for route in network: + rd_keys = 0 + ip_len = route.split("/")[1] + route = route.split("/")[0] + + prefix = "[{}]:[{}]:[{}]:[{}]".format( + routeType, EthTag, ip_len, route + ) + + cmd = "show bgp l2vpn evpn route json" + evpn_value_json = run_frr_cmd(rnode, cmd, isjson=True) + + if not bool(evpn_value_json): + errormsg = "No output for '{}' cli".format(cmd) + return errormsg + + if evpn_value_json["numPrefix"] == 0: + errormsg = "[DUT: %s]: No EVPN prefixes exist" % (dut) + return errormsg + + for key, route_data_json in evpn_value_json.items(): + if isinstance(route_data_json, dict): + rd_keys += 1 + if prefix not in route_data_json: + missing_routes[key] = prefix + + if rd_keys == len(missing_routes.keys()): + errormsg = ( + "[DUT: %s]: " + "Missing EVPN routes: " + "%s [FAILED]!!" % (dut, list(set(missing_routes.values()))) + ) + return errormsg + + for key, route_data_json in evpn_value_json.items(): + if isinstance(route_data_json, dict): + if prefix not in route_data_json: + continue + + for paths in route_data_json[prefix]["paths"]: + for path in paths: + if path["routeType"] != routeType: + errormsg = ( + "[DUT: %s]: " + "Verifying routeType " + "for EVPN route: %s " + "[FAILED]!! " + "Expected: %s, " + "Found: %s" + % ( + dut, + prefix, + routeType, + path["routeType"], + ) + ) + return errormsg + + elif next_hop: + for nh_dict in path["nexthops"]: + if nh_dict["ip"] != next_hop: + errormsg = ( + "[DUT: %s]: " + "Verifying " + "nexthop for " + "EVPN route: %s" + "[FAILED]!! " + "Expected: %s," + " Found: %s" + % ( + dut, + prefix, + next_hop, + nh_dict["ip"], + ) + ) + return errormsg + + else: + logger.info( + "[DUT %s]: Verifying " + "EVPN route : %s, " + "routeType: %s is " + "installed " + "[PASSED]|| ", + dut, + prefix, + routeType, + ) + return True + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + + return False |