summaryrefslogtreecommitdiffstats
path: root/tests/topotests/lib/bgp.py
diff options
context:
space:
mode:
authorKuldeep Kashyap <kashyapk@vmware.com>2020-07-01 15:13:08 +0200
committerKuldeep Kashyap <kashyapk@vmware.com>2020-07-22 08:12:55 +0200
commit3c334c391524159d944ffb2abf841d5f07df9721 (patch)
tree3f561d530b58bd625241c71c959d3e65f2ba7d56 /tests/topotests/lib/bgp.py
parentMerge pull request #6780 from chiragshah6/evpn_dev2 (diff)
downloadfrr-3c334c391524159d944ffb2abf841d5f07df9721.tar.xz
frr-3c334c391524159d944ffb2abf841d5f07df9721.zip
tests: Adding framework support for EVPN-Type5 automation
1. Added APIs to create evpn related config. 2. Added APIs to verify evpn config and routes. Signed-off-by: Kuldeep Kashyap <kashyapk@vmware.com>
Diffstat (limited to 'tests/topotests/lib/bgp.py')
-rw-r--r--tests/topotests/lib/bgp.py761
1 files changed, 745 insertions, 16 deletions
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index 971bbd0f3..5d49cfd09 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -27,6 +27,8 @@ import sys
from lib import topotest
from lib.topolog import logger
+from lib.topogen import TopoRouter, get_topogen
+
# Import common_config to use commomnly used APIs
from lib.common_config import (
create_common_configuration,
@@ -166,6 +168,7 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True
ipv4_data = bgp_addr_data.setdefault("ipv4", {})
ipv6_data = bgp_addr_data.setdefault("ipv6", {})
+ l2vpn_data = bgp_addr_data.setdefault("l2vpn", {})
neigh_unicast = (
True
@@ -174,6 +177,8 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True
else False
)
+ l2vpn_evpn = True if l2vpn_data.setdefault("evpn", {}) else False
+
if neigh_unicast:
data_all_bgp = __create_bgp_unicast_neighbor(
tgen,
@@ -184,6 +189,11 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True
config_data=data_all_bgp,
)
+ if l2vpn_evpn:
+ data_all_bgp = __create_l2vpn_evpn_address_family(
+ tgen, topo, bgp_data, router, config_data=data_all_bgp
+ )
+
try:
result = create_common_configuration(
tgen, router, data_all_bgp, "bgp", build, load_config
@@ -467,6 +477,166 @@ def __create_bgp_unicast_neighbor(
return config_data
+def __create_l2vpn_evpn_address_family(
+ tgen, topo, input_dict, router, config_data=None
+):
+ """
+ Helper API to create configuration for l2vpn evpn address-family
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : Input dict data, required when configuring
+ from testcase
+ * `router` : router id to be configured.
+ * `build` : Only for initial setup phase this is set as True.
+ """
+
+ result = False
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ bgp_data = input_dict["address_family"]
+
+ for family_type, family_dict in bgp_data.iteritems():
+ if family_type != "l2vpn":
+ continue
+
+ family_data = family_dict["evpn"]
+ if family_data:
+ config_data.append("address-family l2vpn evpn")
+
+ advertise_data = family_data.setdefault("advertise", {})
+ neighbor_data = family_data.setdefault("neighbor", {})
+ advertise_all_vni_data = family_data.setdefault("advertise-all-vni", None)
+ rd_data = family_data.setdefault("rd", None)
+ no_rd_data = family_data.setdefault("no rd", False)
+ route_target_data = family_data.setdefault("route-target", {})
+
+ if advertise_data:
+ for address_type, unicast_type in advertise_data.items():
+
+ if isinstance(unicast_type, dict):
+ for key, value in unicast_type.items():
+ cmd = "advertise {} {}".format(address_type, key)
+
+ if value:
+ route_map = value.setdefault("route-map", {})
+ advertise_del_action = value.setdefault("delete", None)
+
+ if route_map:
+ cmd = "{} route-map {}".format(cmd, route_map)
+
+ if advertise_del_action:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+
+ if neighbor_data:
+ for neighbor, neighbor_data in neighbor_data.items():
+ ipv4_neighbor = neighbor_data.setdefault("ipv4", {})
+ ipv6_neighbor = neighbor_data.setdefault("ipv6", {})
+
+ if ipv4_neighbor:
+ for neighbor_name, action in ipv4_neighbor.items():
+ neighbor_ip = topo[neighbor]["links"][neighbor_name][
+ "ipv4"
+ ].split("/")[0]
+
+ if isinstance(action, dict):
+ next_hop_self = action.setdefault("next_hop_self", None)
+ route_maps = action.setdefault("route_maps", {})
+
+ if next_hop_self is not None:
+ if next_hop_self is True:
+ config_data.append(
+ "neighbor {} "
+ "next-hop-self".format(neighbor_ip)
+ )
+ elif next_hop_self is False:
+ config_data.append(
+ "no neighbor {} "
+ "next-hop-self".format(neighbor_ip)
+ )
+
+ if route_maps:
+ for route_map in route_maps:
+ name = route_map.setdefault("name", {})
+ direction = route_map.setdefault("direction", "in")
+ del_action = route_map.setdefault("delete", False)
+
+ if not name:
+ logger.info(
+ "Router %s: 'name' "
+ "not present in "
+ "input_dict for BGP "
+ "neighbor route name",
+ router,
+ )
+ else:
+ cmd = "neighbor {} route-map {} " "{}".format(
+ neighbor_ip, name, direction
+ )
+
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+
+ else:
+ if action == "activate":
+ cmd = "neighbor {} activate".format(neighbor_ip)
+ elif action == "deactivate":
+ cmd = "no neighbor {} activate".format(neighbor_ip)
+
+ config_data.append(cmd)
+
+ if ipv6_neighbor:
+ for neighbor_name, action in ipv4_neighbor.items():
+ neighbor_ip = topo[neighbor]["links"][neighbor_name][
+ "ipv6"
+ ].split("/")[0]
+ if action == "activate":
+ cmd = "neighbor {} activate".format(neighbor_ip)
+ elif action == "deactivate":
+ cmd = "no neighbor {} activate".format(neighbor_ip)
+
+ config_data.append(cmd)
+
+ if advertise_all_vni_data == True:
+ cmd = "advertise-all-vni"
+ config_data.append(cmd)
+ elif advertise_all_vni_data == False:
+ cmd = "no advertise-all-vni"
+ config_data.append(cmd)
+
+ if rd_data:
+ cmd = "rd {}".format(rd_data)
+ config_data.append(cmd)
+
+ if no_rd_data:
+ cmd = "no rd {}".format(no_rd_data)
+ config_data.append(cmd)
+
+ if route_target_data:
+ for rt_type, rt_dict in route_target_data.items():
+ for _rt_dict in rt_dict:
+ rt_value = _rt_dict.setdefault("value", None)
+ del_rt = _rt_dict.setdefault("delete", None)
+
+ if rt_value:
+ cmd = "route-target {} {}".format(rt_type, rt_value)
+ if del_rt:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return config_data
+
+
def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
"""
Helper API to create neighbor specific configuration
@@ -489,7 +659,7 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
for dest_link, peer in peer_dict["dest_link"].iteritems():
nh_details = topo[name]
- if "vrfs" in topo[router]:
+ if "vrfs" in topo[router] or type(nh_details["bgp"]) is list:
remote_as = nh_details["bgp"][0]["local_as"]
else:
remote_as = nh_details["bgp"]["local_as"]
@@ -905,7 +1075,10 @@ def verify_bgp_convergence(tgen, topo, dut=None):
errormsg(str) or True
"""
- logger.debug("Entering lib API: verify_bgp_convergence()")
+ result = False
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ tgen = get_topogen()
for router, rnode in tgen.routers().iteritems():
if "bgp" not in topo["routers"][router]:
continue
@@ -968,13 +1141,14 @@ def verify_bgp_convergence(tgen, topo, dut=None):
)
return errormsg
- l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][
- "peers"
- ]
- nh_state = l2VpnEvpn_data[neighbor_ip]["state"]
+ if "l2VpnEvpn" in show_bgp_json[vrf]:
+ l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][
+ "peers"
+ ]
+ nh_state = l2VpnEvpn_data[neighbor_ip]["state"]
- if nh_state == "Established":
- no_of_evpn_peer += 1
+ if nh_state == "Established":
+ no_of_evpn_peer += 1
if no_of_evpn_peer == total_evpn_peer:
logger.info(
@@ -982,6 +1156,7 @@ def verify_bgp_convergence(tgen, topo, dut=None):
router,
vrf,
)
+ result = True
else:
errormsg = (
"[DUT: %s] VRF: %s, BGP is not converged "
@@ -989,22 +1164,22 @@ def verify_bgp_convergence(tgen, topo, dut=None):
)
return errormsg
else:
+ total_peer = 0
for addr_type in bgp_addr_type.keys():
if not check_address_types(addr_type):
continue
- total_peer = 0
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
for bgp_neighbor in bgp_neighbors:
total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+ no_of_peer = 0
for addr_type in bgp_addr_type.keys():
if not check_address_types(addr_type):
continue
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
- no_of_peer = 0
for bgp_neighbor, peer_data in bgp_neighbors.items():
for dest_link in peer_data["dest_link"].keys():
data = topo["routers"][bgp_neighbor]["links"]
@@ -1015,8 +1190,11 @@ def verify_bgp_convergence(tgen, topo, dut=None):
"neighbor_type" in peer_details
and peer_details["neighbor_type"] == "link-local"
):
- neighbor_ip = get_ipv6_linklocal_address(
- topo["routers"], bgp_neighbor, dest_link
+ intf = topo["routers"][bgp_neighbor]["links"][
+ dest_link
+ ]["interface"]
+ neighbor_ip = get_frr_ipv6_linklocal(
+ tgen, bgp_neighbor, intf
)
elif "source_link" in peer_details:
neighbor_ip = topo["routers"][bgp_neighbor][
@@ -1036,7 +1214,7 @@ def verify_bgp_convergence(tgen, topo, dut=None):
0
]
nh_state = None
-
+ neighbor_ip = neighbor_ip.lower()
if addr_type == "ipv4":
ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][
"peers"
@@ -1046,7 +1224,8 @@ def verify_bgp_convergence(tgen, topo, dut=None):
ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][
"peers"
]
- nh_state = ipv6_data[neighbor_ip]["state"]
+ if neighbor_ip in ipv6_data:
+ nh_state = ipv6_data[neighbor_ip]["state"]
if nh_state == "Established":
no_of_peer += 1
@@ -1059,8 +1238,9 @@ def verify_bgp_convergence(tgen, topo, dut=None):
(router, vrf, addr_type))
return errormsg
- logger.debug("Exiting API: verify_bgp_convergence()")
- return True
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return result
@retry(attempts=3, wait=4, return_is_str=True)
@@ -3463,3 +3643,552 @@ def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut):
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_attributes_for_evpn_routes(
+ tgen,
+ topo,
+ dut,
+ input_dict,
+ rd=None,
+ rt=None,
+ ethTag=None,
+ ipLen=None,
+ rd_peer=None,
+ rt_peer=None,
+):
+ """
+ API to verify rd and rt value using "sh bgp l2vpn evpn 10.1.1.1"
+ command.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : device under test
+ * `input_dict`: having details like - for which route, rd value
+ needs to be verified
+ * `rd` : route distinguisher
+ * `rt` : route target
+ * `ethTag` : Ethernet Tag
+ * `ipLen` : IP prefix length
+ * `rd_peer` : Peer name from which RD will be auto-generated
+ * `rt_peer` : Peer name from which RT will be auto-generated
+
+ Usage
+ -----
+ input_dict_1 = {
+ "r1": {
+ "static_routes": [{
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ "vrf": "RED"
+ }]
+ }
+ }
+
+ result = verify_attributes_for_evpn_routes(tgen, topo,
+ input_dict, rd = "10.0.0.33:1")
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+ for router in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ if "static_routes" in input_dict[router]:
+ for static_route in input_dict[router]["static_routes"]:
+ network = static_route["network"]
+
+ if "vrf" in static_route:
+ vrf = static_route["vrf"]
+
+ if type(network) is not list:
+ network = [network]
+
+ for route in network:
+ route = route.split("/")[0]
+ _addr_type = validate_ip_address(route)
+ if "v4" in _addr_type:
+ input_afi = "v4"
+ elif "v6" in _addr_type:
+ input_afi = "v6"
+
+ cmd = "show bgp l2vpn evpn {} json".format(route)
+ evpn_rd_value_json = run_frr_cmd(rnode, cmd, isjson=True)
+ if not bool(evpn_rd_value_json):
+ errormsg = "No output for '{}' cli".format(cmd)
+ return errormsg
+
+ if rd is not None and rd != "auto":
+ logger.info(
+ "[DUT: %s]: Verifying rd value for " "evpn route %s:",
+ dut,
+ route,
+ )
+
+ if rd in evpn_rd_value_json:
+ rd_value_json = evpn_rd_value_json[rd]
+ if rd_value_json["rd"] != rd:
+ errormsg = (
+ "[DUT: %s] Failed: Verifying"
+ " RD value for EVPN route: %s"
+ "[FAILED]!!, EXPECTED : %s "
+ " FOUND : %s"
+ % (dut, route, rd, rd_value_json["rd"])
+ )
+ return errormsg
+
+ else:
+ logger.info(
+ "[DUT %s]: Verifying RD value for"
+ " EVPN route: %s [PASSED]|| "
+ "Found Exprected: %s",
+ dut,
+ route,
+ rd,
+ )
+ return True
+
+ else:
+ errormsg = (
+ "[DUT: %s] RD : %s is not present"
+ " in cli json output" % (dut, rd)
+ )
+ return errormsg
+
+ if rd == "auto":
+ logger.info(
+ "[DUT: %s]: Verifying auto-rd value for " "evpn route %s:",
+ dut,
+ route,
+ )
+
+ if rd_peer:
+ index = 1
+ vni_dict = {}
+
+ rnode = tgen.routers()[rd_peer]
+ vrfs = topo["routers"][rd_peer]["vrfs"]
+ for vrf_dict in vrfs:
+ vni_dict[vrf_dict["name"]] = index
+ index += 1
+
+ show_bgp_json = run_frr_cmd(
+ rnode, "show bgp vrf all summary json", isjson=True
+ )
+
+ # Verifying output dictionary show_bgp_json is empty
+ if not bool(show_bgp_json):
+ errormsg = "BGP is not running"
+ return errormsg
+
+ show_bgp_json_vrf = show_bgp_json[vrf]
+ for afi, afi_data in show_bgp_json_vrf.items():
+ if input_afi not in afi:
+ continue
+ router_id = afi_data["routerId"]
+
+ rd = "{}:{}".format(router_id, vni_dict[vrf])
+ if rd in evpn_rd_value_json:
+ rd_value_json = evpn_rd_value_json[rd]
+ if rd_value_json["rd"] != rd:
+ errormsg = (
+ "[DUT: %s] Failed: Verifying"
+ " RD value for EVPN route: %s"
+ "[FAILED]!!, EXPECTED : %s "
+ " FOUND : %s"
+ % (dut, route, rd, rd_value_json["rd"])
+ )
+ return errormsg
+
+ else:
+ logger.info(
+ "[DUT %s]: Verifying RD value for"
+ " EVPN route: %s [PASSED]|| "
+ "Found Exprected: %s",
+ dut,
+ route,
+ rd,
+ )
+ return True
+
+ else:
+ errormsg = (
+ "[DUT: %s] RD : %s is not present"
+ " in cli json output" % (dut, rd)
+ )
+ return errormsg
+
+ if rt == "auto":
+ logger.info(
+ "[DUT: %s]: Verifying auto-rt value for " "evpn route %s:",
+ dut,
+ route,
+ )
+
+ if rt_peer:
+ vni_dict = {}
+
+ rnode = tgen.routers()[rt_peer]
+ show_bgp_json = run_frr_cmd(
+ rnode, "show bgp vrf all summary json", isjson=True
+ )
+
+ # Verifying output dictionary show_bgp_json is empty
+ if not bool(show_bgp_json):
+ errormsg = "BGP is not running"
+ return errormsg
+
+ show_bgp_json_vrf = show_bgp_json[vrf]
+ for afi, afi_data in show_bgp_json_vrf.items():
+ if input_afi not in afi:
+ continue
+ as_num = afi_data["as"]
+
+ show_vrf_vni_json = run_frr_cmd(
+ rnode, "show vrf vni json", isjson=True
+ )
+
+ vrfs = show_vrf_vni_json["vrfs"]
+ for vrf_dict in vrfs:
+ if vrf_dict["vrf"] == vrf:
+ vni_dict[vrf_dict["vrf"]] = str(vrf_dict["vni"])
+
+ # If AS is 4 byte, FRR uses only the lower 2 bytes of ASN+VNI
+ # for auto derived RT value.
+ if as_num > 65535:
+ as_bin = bin(as_num)
+ as_bin = as_bin[-16:]
+ as_num = int(as_bin, 2)
+
+ rt = "{}:{}".format(str(as_num), vni_dict[vrf])
+ for _rd, route_data in evpn_rd_value_json.items():
+ if route_data["ip"] == route:
+ for rt_data in route_data["paths"]:
+ if vni_dict[vrf] == rt_data["VNI"]:
+ rt_string = rt_data["extendedCommunity"][
+ "string"
+ ]
+ rt_input = "RT:{}".format(rt)
+ if rt_input not in rt_string:
+ errormsg = (
+ "[DUT: %s] Failed:"
+ " Verifying RT "
+ "value for EVPN "
+ " route: %s"
+ "[FAILED]!!,"
+ " EXPECTED : %s "
+ " FOUND : %s"
+ % (dut, route, rt_input, rt_string)
+ )
+ return errormsg
+
+ else:
+ logger.info(
+ "[DUT %s]: Verifying "
+ "RT value for EVPN "
+ "route: %s [PASSED]||"
+ "Found Exprected: %s",
+ dut,
+ route,
+ rt_input,
+ )
+ return True
+
+ else:
+ errormsg = (
+ "[DUT: %s] Route : %s is not"
+ " present in cli json output" % (dut, route)
+ )
+ return errormsg
+
+ if rt is not None and rt != "auto":
+ logger.info(
+ "[DUT: %s]: Verifying rt value for " "evpn route %s:",
+ dut,
+ route,
+ )
+
+ if type(rt) is not list:
+ rt = [rt]
+
+ for _rt in rt:
+ for _rd, route_data in evpn_rd_value_json.items():
+ if route_data["ip"] == route:
+ for rt_data in route_data["paths"]:
+ rt_string = rt_data["extendedCommunity"][
+ "string"
+ ]
+ rt_input = "RT:{}".format(_rt)
+ if rt_input not in rt_string:
+ errormsg = (
+ "[DUT: %s] Failed: "
+ "Verifying RT value "
+ "for EVPN route: %s"
+ "[FAILED]!!,"
+ " EXPECTED : %s "
+ " FOUND : %s"
+ % (dut, route, rt_input, rt_string)
+ )
+ return errormsg
+
+ else:
+ logger.info(
+ "[DUT %s]: Verifying RT"
+ " value for EVPN route:"
+ " %s [PASSED]|| "
+ "Found Exprected: %s",
+ dut,
+ route,
+ rt_input,
+ )
+ return True
+
+ else:
+ errormsg = (
+ "[DUT: %s] Route : %s is not"
+ " present in cli json output" % (dut, route)
+ )
+ return errormsg
+
+ if ethTag is not None:
+ logger.info(
+ "[DUT: %s]: Verifying ethTag value for " "evpn route :", dut
+ )
+
+ for _rd, route_data in evpn_rd_value_json.items():
+ if route_data["ip"] == route:
+ if route_data["ethTag"] != ethTag:
+ errormsg = (
+ "[DUT: %s] RD: %s, Failed: "
+ "Verifying ethTag value "
+ "for EVPN route: %s"
+ "[FAILED]!!,"
+ " EXPECTED : %s "
+ " FOUND : %s"
+ % (
+ dut,
+ _rd,
+ route,
+ ethTag,
+ route_data["ethTag"],
+ )
+ )
+ return errormsg
+
+ else:
+ logger.info(
+ "[DUT %s]: RD: %s, Verifying "
+ "ethTag value for EVPN route:"
+ " %s [PASSED]|| "
+ "Found Exprected: %s",
+ dut,
+ _rd,
+ route,
+ ethTag,
+ )
+ return True
+
+ else:
+ errormsg = (
+ "[DUT: %s] RD: %s, Route : %s "
+ "is not present in cli json "
+ "output" % (dut, _rd, route)
+ )
+ return errormsg
+
+ if ipLen is not None:
+ logger.info(
+ "[DUT: %s]: Verifying ipLen value for " "evpn route :", dut
+ )
+
+ for _rd, route_data in evpn_rd_value_json.items():
+ if route_data["ip"] == route:
+ if route_data["ipLen"] != int(ipLen):
+ errormsg = (
+ "[DUT: %s] RD: %s, Failed: "
+ "Verifying ipLen value "
+ "for EVPN route: %s"
+ "[FAILED]!!,"
+ " EXPECTED : %s "
+ " FOUND : %s"
+ % (dut, _rd, route, ipLen, route_data["ipLen"])
+ )
+ return errormsg
+
+ else:
+ logger.info(
+ "[DUT %s]: RD: %s, Verifying "
+ "ipLen value for EVPN route:"
+ " %s [PASSED]|| "
+ "Found Exprected: %s",
+ dut,
+ _rd,
+ route,
+ ipLen,
+ )
+ return True
+
+ else:
+ errormsg = (
+ "[DUT: %s] RD: %s, Route : %s "
+ "is not present in cli json "
+ "output " % (dut, route)
+ )
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return False
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_evpn_routes(
+ tgen, topo, dut, input_dict, routeType=5, EthTag=0, next_hop=None
+):
+ """
+ API to verify evpn routes using "sh bgp l2vpn evpn"
+ command.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo` : json file data
+ * `dut` : device under test
+ * `input_dict`: having details like - for which route, rd value
+ needs to be verified
+ * `route_type` : Route type 5 is supported as of now
+ * `EthTag` : Ethernet tag, by-default is 0
+ * `next_hop` : Prefered nexthop for the evpn routes
+
+ Usage
+ -----
+ input_dict_1 = {
+ "r1": {
+ "static_routes": [{
+ "network": [NETWORK1_1[addr_type]],
+ "next_hop": NEXT_HOP_IP[addr_type],
+ "vrf": "RED"
+ }]
+ }
+ }
+ result = verify_evpn_routes(tgen, topo, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ for router in input_dict.keys():
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying evpn routes: ", dut)
+
+ if "static_routes" in input_dict[router]:
+ for static_route in input_dict[router]["static_routes"]:
+ network = static_route["network"]
+
+ if type(network) is not list:
+ network = [network]
+
+ missing_routes = {}
+ for route in network:
+ rd_keys = 0
+ ip_len = route.split("/")[1]
+ route = route.split("/")[0]
+
+ prefix = "[{}]:[{}]:[{}]:[{}]".format(
+ routeType, EthTag, ip_len, route
+ )
+
+ cmd = "show bgp l2vpn evpn route json"
+ evpn_value_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ if not bool(evpn_value_json):
+ errormsg = "No output for '{}' cli".format(cmd)
+ return errormsg
+
+ if evpn_value_json["numPrefix"] == 0:
+ errormsg = "[DUT: %s]: No EVPN prefixes exist" % (dut)
+ return errormsg
+
+ for key, route_data_json in evpn_value_json.items():
+ if isinstance(route_data_json, dict):
+ rd_keys += 1
+ if prefix not in route_data_json:
+ missing_routes[key] = prefix
+
+ if rd_keys == len(missing_routes.keys()):
+ errormsg = (
+ "[DUT: %s]: "
+ "Missing EVPN routes: "
+ "%s [FAILED]!!" % (dut, list(set(missing_routes.values())))
+ )
+ return errormsg
+
+ for key, route_data_json in evpn_value_json.items():
+ if isinstance(route_data_json, dict):
+ if prefix not in route_data_json:
+ continue
+
+ for paths in route_data_json[prefix]["paths"]:
+ for path in paths:
+ if path["routeType"] != routeType:
+ errormsg = (
+ "[DUT: %s]: "
+ "Verifying routeType "
+ "for EVPN route: %s "
+ "[FAILED]!! "
+ "Expected: %s, "
+ "Found: %s"
+ % (
+ dut,
+ prefix,
+ routeType,
+ path["routeType"],
+ )
+ )
+ return errormsg
+
+ elif next_hop:
+ for nh_dict in path["nexthops"]:
+ if nh_dict["ip"] != next_hop:
+ errormsg = (
+ "[DUT: %s]: "
+ "Verifying "
+ "nexthop for "
+ "EVPN route: %s"
+ "[FAILED]!! "
+ "Expected: %s,"
+ " Found: %s"
+ % (
+ dut,
+ prefix,
+ next_hop,
+ nh_dict["ip"],
+ )
+ )
+ return errormsg
+
+ else:
+ logger.info(
+ "[DUT %s]: Verifying "
+ "EVPN route : %s, "
+ "routeType: %s is "
+ "installed "
+ "[PASSED]|| ",
+ dut,
+ prefix,
+ routeType,
+ )
+ return True
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+
+ return False