diff options
author | Kuldeep Kashyap <kashyapk@vmware.com> | 2020-04-01 08:56:54 +0200 |
---|---|---|
committer | Kuldeep Kashyap <kashyapk@vmware.com> | 2020-04-08 14:27:23 +0200 |
commit | 38c3547ac8400a2995a0ce98a93545a3da38bc2d (patch) | |
tree | 97bb0f618383a0af84e52f2c8c85b717bce1979f /tests | |
parent | tests: Added new tests to bgp-basic-functionality-topo1 (diff) | |
download | frr-38c3547ac8400a2995a0ce98a93545a3da38bc2d.tar.xz frr-38c3547ac8400a2995a0ce98a93545a3da38bc2d.zip |
tests: Adding new test suite bgp_communities_topo1
1. Added 1 test case to verify NO-ADVERTISE Community functionality
2. Enhanced bgp.py to exclude routers from verification, if doesn't have bgp config
Signed-off-by: Kuldeep Kashyap <kashyapk@vmware.com>
Diffstat (limited to 'tests')
-rw-r--r-- | tests/topotests/bgp_communities_topo1/bgp_communities.json | 175 | ||||
-rw-r--r-- | tests/topotests/bgp_communities_topo1/test_bgp_communities.py | 635 | ||||
-rw-r--r-- | tests/topotests/lib/bgp.py | 168 | ||||
-rw-r--r-- | tests/topotests/lib/common_config.py | 2 |
4 files changed, 905 insertions, 75 deletions
diff --git a/tests/topotests/bgp_communities_topo1/bgp_communities.json b/tests/topotests/bgp_communities_topo1/bgp_communities.json new file mode 100644 index 000000000..da6aec239 --- /dev/null +++ b/tests/topotests/bgp_communities_topo1/bgp_communities.json @@ -0,0 +1,175 @@ +{ + "address_types": [ + "ipv4", + "ipv6" + ], + "ipv4base": "10.0.0.0", + "ipv4mask": 30, + "ipv6base": "fd00::", + "ipv6mask": 64, + "link_ip_start": { + "ipv4": "10.0.0.0", + "v4mask": 30, + "ipv6": "fd00::", + "v6mask": 64 + }, + "lo_prefix": { + "ipv4": "1.0.", + "v4mask": 32, + "ipv6": "2001:db8:f::", + "v6mask": 128 + }, + "routers": { + "r1": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r2": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r0": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "100", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": {} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r1": {} + } + } + } + } + } + } + } + }, + "r2": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r1": { + "ipv4": "auto", + "ipv6": "auto" + }, + "r3": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "200", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {} + } + }, + "r3": { + "dest_link": { + "r2": {} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": {} + } + }, + "r3": { + "dest_link": { + "r2": {} + } + } + } + } + } + } + } + }, + "r3": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r2": { + "ipv4": "auto", + "ipv6": "auto" + } + }, + "bgp": { + "local_as": "300", + "address_family": { + "ipv4": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": {} + } + } + } + } + }, + "ipv6": { + "unicast": { + "neighbor": { + "r2": { + "dest_link": { + "r3": {} + } + } + } + } + } + } + } + }, + "r0": { + "links": { + "lo": { + "ipv4": "auto", + "ipv6": "auto", + "type": "loopback" + }, + "r1": { + "ipv4": "auto", + "ipv6": "auto" + } + } + } + } +}
\ No newline at end of file diff --git a/tests/topotests/bgp_communities_topo1/test_bgp_communities.py b/tests/topotests/bgp_communities_topo1/test_bgp_communities.py new file mode 100644 index 000000000..7d960d691 --- /dev/null +++ b/tests/topotests/bgp_communities_topo1/test_bgp_communities.py @@ -0,0 +1,635 @@ +#!/usr/bin/python + +# +# Copyright (c) 2020 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, +# Inc. ("NetDEF") in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + +""" +Following tests are covered to test bgp community functionality: +- Verify routes are not advertised when NO-ADVERTISE Community is applied + +""" + +import os +import sys +import time +import json +import pytest + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from mininet.topo import Topo +from lib.topogen import Topogen, get_topogen + +# Import topoJson from lib, to create topology and initial configuration +from lib.common_config import ( + start_topology, + write_test_header, + write_test_footer, + reset_config_on_routers, + verify_rib, + create_static_routes, + check_address_types, + step, + create_route_maps, + create_prefix_lists, + create_route_maps, +) +from lib.topolog import logger +from lib.bgp import ( + verify_bgp_convergence, + create_router_bgp, + clear_bgp_and_verify, + verify_bgp_rib, +) +from lib.topojson import build_topo_from_json, build_config_from_json +from copy import deepcopy + +# Reading the data from JSON File for topology creation +jsonFile = "{}/bgp_communities.json".format(CWD) +try: + with open(jsonFile, "r") as topoJson: + topo = json.load(topoJson) +except IOError: + assert False, "Could not read file {}".format(jsonFile) + +# Global variables +BGP_CONVERGENCE = False +ADDR_TYPES = check_address_types() +NETWORK = {"ipv4": "2.2.2.2/32", "ipv6": "22:22::2/128"} +NEXT_HOP_IP = {} + + +class BGPCOMMUNITIES(Topo): + """ + Test BGPCOMMUNITIES - topology 1 + + * `Topo`: Topology object + """ + + def build(self, *_args, **_opts): + """Build function""" + tgen = get_topogen(self) + + # Building topology from json file + build_topo_from_json(tgen, topo) + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("=" * 40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(BGPCOMMUNITIES, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # Starting topology, create tmp files which are loaded to routers + # to start deamons and then start routers + start_topology(tgen) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + # Checking BGP convergence + global BGP_CONVERGENCE + global ADDR_TYPES + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # Api call verify whether BGP is converged + BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo) + assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error:" " {}".format( + BGP_CONVERGENCE + ) + + logger.info("Running setup_module() done") + + +def teardown_module(mod): + """ + Teardown the pytest environment + + * `mod`: module name + """ + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + # Stop toplogy and Remove tmp files + tgen.stop_topology() + + logger.info( + "Testsuite end time: {}".format(time.asctime(time.localtime(time.time()))) + ) + logger.info("=" * 40) + + +##################################################### +# +# Tests starting +# +##################################################### + + +def test_bgp_no_advertise_community_p0(request): + """ + Verify routes are not advertised when NO-ADVERTISE Community is applied + + """ + + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + reset_config_on_routers(tgen) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + NEXT_HOP_IP = { + "ipv4": topo["routers"]["r0"]["links"]["r1"]["ipv4"].split("/")[0], + "ipv6": topo["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0], + } + + # configure static routes + dut = "r3" + protocol = "bgp" + + for addr_type in ADDR_TYPES: + # Enable static routes + input_dict = { + "r1": { + "static_routes": [ + {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]} + ] + } + } + + logger.info("Configure static routes") + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("configure redistribute static and connected in Router BGP " "in R1") + + input_dict_2 = { + "r1": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "BGP neighbors are up, static and connected route advertised from" + " R1 are present on R2 BGP table and RIB using show ip bgp and " + " show ip route" + ) + step( + "Static and connected route advertised from R1 are present on R3" + " BGP table and RIB using show ip bgp and show ip route" + ) + + dut = "r3" + protocol = "bgp" + result = verify_bgp_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Configure prefix list P1 on R2 to permit route coming from R1") + # Create ip prefix list + input_dict_2 = { + "r2": { + "prefix_lists": { + addr_type: { + "pf_list_1_{}".format(addr_type): [ + {"seqid": 10, "network": "any", "action": "permit"} + ] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + # Create route map + input_dict_3 = { + "r2": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [ + { + "action": "permit", + "seq_id": "5", + "match": { + addr_type: {"prefix_lists": "pf_list_1_" + addr_type} + }, + "set": {"community": {"num": "no-advertise"}}, + } + ] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + step( + "Apply route-map RM1 on R2, R2 to R3 BGP neighbor with no" + " advertise community" + ) + # Configure neighbor for route map + input_dict_4 = { + "r2": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": { + "route_maps": [ + { + "name": "rmap_match_pf_1_" + + addr_type, + "direction": "in", + } + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "After advertising no advertise community to BGP neighbor " + "static and connected router got removed from R3 verify using " + "show ip bgp & show ip route" + ) + + result = verify_bgp_rib(tgen, addr_type, dut, input_dict, expected=False) + assert result is not True, "Testcase {} : Failed \n " + " Routes still present in R3 router. Error: {}".format(tc_name, result) + + result = verify_rib( + tgen, addr_type, dut, input_dict, protocol=protocol, expected=False + ) + assert result is not True, "Testcase {} : Failed \n " + " Routes still present in R3 router. Error: {}".format(tc_name, result) + + step("Remove and Add no advertise community") + # Configure neighbor for route map + input_dict_4 = { + "r2": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": { + "route_maps": [ + { + "name": "rmap_match_pf_1_" + + addr_type, + "direction": "in", + "delete": True, + } + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "After removing no advertise community from BGP neighbor " + "static and connected router got advertised to R3 and " + "removing route-map, verify route using show ip bgp" + " and show ip route" + ) + + result = verify_bgp_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase {} : Failed \n " + " Routes still present in R3 router. Error: {}".format(tc_name, result) + + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n " + " Routes still present in R3 router. Error: {}".format(tc_name, result) + + step("Repeat above steps when IBGP nbr configured between R1, R2 & R2, R3") + topo1 = deepcopy(topo) + + topo1["routers"]["r1"]["bgp"]["local_as"] = "100" + topo1["routers"]["r2"]["bgp"]["local_as"] = "100" + topo1["routers"]["r3"]["bgp"]["local_as"] = "100" + + for rtr in ["r1", "r2", "r3"]: + if "bgp" in topo1["routers"][rtr].keys(): + delete_bgp = {rtr: {"bgp": {"delete": True}}} + result = create_router_bgp(tgen, topo1, delete_bgp) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + config_bgp = { + rtr: {"bgp": {"local_as": topo1["routers"][rtr]["bgp"]["local_as"]}} + } + result = create_router_bgp(tgen, topo1, config_bgp) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + build_config_from_json(tgen, topo1, save_bkup=False) + + step("verify bgp convergence before starting test case") + + bgp_convergence = verify_bgp_convergence(tgen, topo1) + assert bgp_convergence is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, bgp_convergence + ) + + # configure static routes + dut = "r3" + protocol = "bgp" + + for addr_type in ADDR_TYPES: + # Enable static routes + input_dict = { + "r1": { + "static_routes": [ + {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]} + ] + } + } + + logger.info("Configure static routes") + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("configure redistribute static and connected in Router " "BGP in R1") + + input_dict_2 = { + "r1": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"}, + ] + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "BGP neighbors are up, static and connected route advertised from" + " R1 are present on R2 BGP table and RIB using show ip bgp and " + " show ip route" + ) + step( + "Static and connected route advertised from R1 are present on R3" + " BGP table and RIB using show ip bgp and show ip route" + ) + + dut = "r2" + protocol = "bgp" + result = verify_bgp_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step("Configure prefix list P1 on R2 to permit route coming from R1") + # Create ip prefix list + input_dict_2 = { + "r2": { + "prefix_lists": { + addr_type: { + "pf_list_1_{}".format(addr_type): [ + {"seqid": 10, "network": "any", "action": "permit"} + ] + } + } + } + } + result = create_prefix_lists(tgen, input_dict_2) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + # Create route map + input_dict_3 = { + "r2": { + "route_maps": { + "rmap_match_pf_1_{}".format(addr_type): [ + { + "action": "permit", + "seq_id": "5", + "match": { + addr_type: {"prefix_lists": "pf_list_1_" + addr_type} + }, + "set": {"community": {"num": "no-advertise"}}, + } + ] + } + } + } + result = create_route_maps(tgen, input_dict_3) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + step( + "Apply route-map RM1 on R2, R2 to R3 BGP neighbor with no" + " advertise community" + ) + + # Configure neighbor for route map + input_dict_4 = { + "r2": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": { + "route_maps": [ + { + "name": "rmap_match_pf_1_" + + addr_type, + "direction": "in", + } + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "After advertising no advertise community to BGP neighbor " + "static and connected router got removed from R3 verify using " + "show ip bgp & show ip route" + ) + + result = verify_bgp_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase {} : Failed \n " + " Routes still present in R3 router. Error: {}".format(tc_name, result) + + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n " + " Routes still present in R3 router. Error: {}".format(tc_name, result) + + step("Remove and Add no advertise community") + # Configure neighbor for route map + input_dict_4 = { + "r2": { + "bgp": { + "address_family": { + addr_type: { + "unicast": { + "neighbor": { + "r1": { + "dest_link": { + "r2": { + "route_maps": [ + { + "name": "rmap_match_pf_1_" + + addr_type, + "direction": "in", + "delete": True, + } + ] + } + } + } + } + } + } + } + } + } + } + result = create_router_bgp(tgen, topo, input_dict_4) + assert result is True, "Testcase {} : Failed \n Error: {}".format( + tc_name, result + ) + + step( + "After removing no advertise community from BGP neighbor " + "static and connected router got advertised to R3 and " + "removing route verify using show ip bgp and " + " show ip route" + ) + + result = verify_bgp_rib(tgen, addr_type, dut, input_dict) + assert result is True, "Testcase {} : Failed \n " + " Routes still present in R3 router. Error: {}".format(tc_name, result) + + result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} : Failed \n " + " Routes still present in R3 router. Error: {}".format(tc_name, result) + + write_test_footer(tc_name) + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index 26181f097..b8dd7cda5 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -27,14 +27,17 @@ from lib import topotest from lib.topolog import logger # Import common_config to use commomnly used APIs -from lib.common_config import (create_common_configuration, - InvalidCLIError, - load_config_to_router, - check_address_types, - generate_ips, - validate_ip_address, - find_interface_with_greater_ip, - run_frr_cmd, retry) +from lib.common_config import ( + create_common_configuration, + InvalidCLIError, + load_config_to_router, + check_address_types, + generate_ips, + validate_ip_address, + find_interface_with_greater_ip, + run_frr_cmd, + retry, +) BGP_CONVERGENCE_TIMEOUT = 10 @@ -487,13 +490,11 @@ def __create_bgp_unicast_address_family( if "allowas_in" in peer: allow_as_in = peer["allowas_in"] - config_data.append("{} allowas-in {}".format(neigh_cxt, - allow_as_in)) + config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in)) if "no_allowas_in" in peer: allow_as_in = peer["no_allowas_in"] - config_data.append("no {} allowas-in {}".format(neigh_cxt, - allow_as_in)) + config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in)) if prefix_lists: for prefix_list in prefix_lists: name = prefix_list.setdefault("name", {}) @@ -529,12 +530,10 @@ def __create_bgp_unicast_address_family( config_data.append(cmd) if allowas_in: - number_occurences = allowas_in.\ - setdefault("number_occurences", {}) + number_occurences = allowas_in.setdefault("number_occurences", {}) del_action = allowas_in.setdefault("delete", False) - cmd = "{} allowas-in {}".format(neigh_cxt, - number_occurences) + cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences) if del_action: cmd = "no {}".format(cmd) @@ -640,6 +639,9 @@ def verify_bgp_convergence(tgen, topo): logger.debug("Entering lib API: verify_bgp_convergence()") for router, rnode in tgen.routers().iteritems(): + if "bgp" not in topo["routers"][router]: + continue + logger.info("Verifying BGP Convergence on router %s", router) show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) # Verifying output dictionary show_bgp_json is empty or not @@ -1031,7 +1033,7 @@ def clear_bgp_and_verify(tgen, topo, router): " router {}".format(router) ) return errormsg - logger.info(peer_uptime_after_clear_bgp) + # Comparing peerUptimeEstablishedEpoch dictionaries if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp: logger.info("BGP neighborship is reset after clear BGP on router %s", router) @@ -1750,9 +1752,9 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None) # Static routes sleep(2) - logger.info('Checking router {} BGP RIB:'.format(dut)) + logger.info("Checking router {} BGP RIB:".format(dut)) - if 'static_routes' in input_dict[routerInput]: + if "static_routes" in input_dict[routerInput]: static_routes = input_dict[routerInput]["static_routes"] for static_route in static_routes: @@ -1762,12 +1764,10 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None) nh_found = False vrf = static_route.setdefault("vrf", None) if vrf: - cmd = "{} vrf {} {}".\ - format(command, vrf, addr_type) + cmd = "{} vrf {} {}".format(command, vrf, addr_type) else: - cmd = "{} {}".\ - format(command, addr_type) + cmd = "{} {}".format(command, addr_type) cmd = "{} json".format(cmd) @@ -1775,8 +1775,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None) # Verifying output dictionary rib_routes_json is not empty if bool(rib_routes_json) == False: - errormsg = "No route found in rib of router {}..". \ - format(router) + errormsg = "No route found in rib of router {}..".format(router) return errormsg network = static_route["network"] @@ -1804,64 +1803,84 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None) if not isinstance(next_hop, list): next_hop = [next_hop] list1 = next_hop - found_hops = [rib_r["ip"] for rib_r in - rib_routes_json["routes"][ - st_rt][0]["nexthops"]] + found_hops = [ + rib_r["ip"] + for rib_r in rib_routes_json["routes"][st_rt][0][ + "nexthops" + ] + ] list2 = found_hops - missing_list_of_nexthops = \ - set(list2).difference(list1) - additional_nexthops_in_required_nhs = \ - set(list1).difference(list2) + missing_list_of_nexthops = set(list2).difference(list1) + additional_nexthops_in_required_nhs = set( + list1 + ).difference(list2) if list2: if additional_nexthops_in_required_nhs: - logger.info("Missing nexthop %s for route"\ - " %s in RIB of router %s\n", \ - additional_nexthops_in_required_nhs, \ - st_rt, dut) - errormsg=("Nexthop {} is Missing for "\ - "route {} in RIB of router {}\n".format( + logger.info( + "Missing nexthop %s for route" + " %s in RIB of router %s\n", additional_nexthops_in_required_nhs, - st_rt, dut)) + st_rt, + dut, + ) + errormsg = ( + "Nexthop {} is Missing for " + "route {} in RIB of router {}\n".format( + additional_nexthops_in_required_nhs, + st_rt, + dut, + ) + ) return errormsg else: nh_found = True if aspath: - found_paths = rib_routes_json["routes"][ - st_rt][0]["path"] + found_paths = rib_routes_json["routes"][st_rt][0][ + "path" + ] if aspath == found_paths: aspath_found = True - logger.info("Found AS path {} for route" \ - " {} in RIB of router "\ - "{}\n".format(aspath, st_rt, dut)) + logger.info( + "Found AS path {} for route" + " {} in RIB of router " + "{}\n".format(aspath, st_rt, dut) + ) else: - errormsg=("AS Path {} is missing for route"\ - "for route {} in RIB of router {}\n"\ - .format(aspath, st_rt, dut)) + errormsg = ( + "AS Path {} is missing for route" + "for route {} in RIB of router {}\n".format( + aspath, st_rt, dut + ) + ) return errormsg else: missing_routes.append(st_rt) if nh_found: - logger.info("Found next_hop {} for all bgp" - " routes in RIB of" - " router {}\n".format(next_hop, \ - router)) + logger.info( + "Found next_hop {} for all bgp" + " routes in RIB of" + " router {}\n".format(next_hop, router) + ) if len(missing_routes) > 0: - errormsg = ("Missing route in RIB of router {}, " - "routes: {}\n".format(dut, missing_routes)) + errormsg = ( + "Missing route in RIB of router {}, " + "routes: {}\n".format(dut, missing_routes) + ) return errormsg if found_routes: - logger.info("Verified routes in router {} BGP RIB, " - "found routes are: {} \n".\ - format(dut, found_routes)) + logger.info( + "Verified routes in router {} BGP RIB, " + "found routes are: {} \n".format(dut, found_routes) + ) continue if "bgp" not in input_dict[routerInput]: - continue + continue # Advertise networks bgp_data_list = input_dict[routerInput]["bgp"] @@ -1872,36 +1891,33 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None) for bgp_data in bgp_data_list: vrf_id = bgp_data.setdefault("vrf", None) if vrf_id: - cmd = "{} vrf {} {}".\ - format(command, vrf_id, addr_type) + cmd = "{} vrf {} {}".format(command, vrf_id, addr_type) else: - cmd = "{} {}".\ - format(command, addr_type) + cmd = "{} {}".format(command, addr_type) cmd = "{} json".format(cmd) - rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) + rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) # Verifying output dictionary rib_routes_json is not empty if bool(rib_routes_json) == False: - errormsg = "No route found in rib of router {}..". \ - format(router) + errormsg = "No route found in rib of router {}..".format(router) return errormsg bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"] - advertise_network = \ - bgp_net_advertise.setdefault("advertise_networks", []) + advertise_network = bgp_net_advertise.setdefault( + "advertise_networks", [] + ) for advertise_network_dict in advertise_network: found_routes = [] missing_routes = [] found = False - network = advertise_network_dict['network'] + network = advertise_network_dict["network"] - if 'no_of_network' in advertise_network_dict: - no_of_network = advertise_network_dict[ - 'no_of_network'] + if "no_of_network" in advertise_network_dict: + no_of_network = advertise_network_dict["no_of_network"] else: no_of_network = 1 @@ -1923,13 +1939,17 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None) missing_routes.append(st_rt) if len(missing_routes) > 0: - errormsg = ("Missing route in BGP RIB of router {}," - " are: {}\n".format(dut, missing_routes)) + errormsg = ( + "Missing route in BGP RIB of router {}," + " are: {}\n".format(dut, missing_routes) + ) return errormsg if found_routes: - logger.info("Verified routes in router {} BGP RIB, found " - "routes are: {}\n".format(dut, found_routes)) + logger.info( + "Verified routes in router {} BGP RIB, found " + "routes are: {}\n".format(dut, found_routes) + ) logger.debug("Exiting lib API: verify_bgp_rib()") return True diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index 5ee59070c..2483930d1 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -1198,7 +1198,7 @@ def create_route_maps(tgen, input_dict, build=False): ipv6_data = set_data.setdefault("ipv6", {}) local_preference = set_data.setdefault("locPrf", None) metric = set_data.setdefault("metric", None) - as_path = set_data.setdefault("path", {}) + as_path = set_data.setdefault("aspath", {}) weight = set_data.setdefault("weight", None) community = set_data.setdefault("community", {}) large_community = set_data.setdefault("large_community", {}) |