summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorKuldeep Kashyap <kashyapk@vmware.com>2020-04-01 08:56:54 +0200
committerKuldeep Kashyap <kashyapk@vmware.com>2020-04-08 14:27:23 +0200
commit38c3547ac8400a2995a0ce98a93545a3da38bc2d (patch)
tree97bb0f618383a0af84e52f2c8c85b717bce1979f /tests
parenttests: Added new tests to bgp-basic-functionality-topo1 (diff)
downloadfrr-38c3547ac8400a2995a0ce98a93545a3da38bc2d.tar.xz
frr-38c3547ac8400a2995a0ce98a93545a3da38bc2d.zip
tests: Adding new test suite bgp_communities_topo1
1. Added 1 test case to verify NO-ADVERTISE Community functionality 2. Enhanced bgp.py to exclude routers from verification, if doesn't have bgp config Signed-off-by: Kuldeep Kashyap <kashyapk@vmware.com>
Diffstat (limited to 'tests')
-rw-r--r--tests/topotests/bgp_communities_topo1/bgp_communities.json175
-rw-r--r--tests/topotests/bgp_communities_topo1/test_bgp_communities.py635
-rw-r--r--tests/topotests/lib/bgp.py168
-rw-r--r--tests/topotests/lib/common_config.py2
4 files changed, 905 insertions, 75 deletions
diff --git a/tests/topotests/bgp_communities_topo1/bgp_communities.json b/tests/topotests/bgp_communities_topo1/bgp_communities.json
new file mode 100644
index 000000000..da6aec239
--- /dev/null
+++ b/tests/topotests/bgp_communities_topo1/bgp_communities.json
@@ -0,0 +1,175 @@
+{
+ "address_types": [
+ "ipv4",
+ "ipv6"
+ ],
+ "ipv4base": "10.0.0.0",
+ "ipv4mask": 30,
+ "ipv6base": "fd00::",
+ "ipv6mask": 64,
+ "link_ip_start": {
+ "ipv4": "10.0.0.0",
+ "v4mask": 30,
+ "ipv6": "fd00::",
+ "v6mask": 64
+ },
+ "lo_prefix": {
+ "ipv4": "1.0.",
+ "v4mask": 32,
+ "ipv6": "2001:db8:f::",
+ "v6mask": 128
+ },
+ "routers": {
+ "r1": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r0": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "100",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1": {}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r2": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ },
+ "r3": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "200",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r2": {}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {}
+ }
+ },
+ "r3": {
+ "dest_link": {
+ "r2": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r3": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r2": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ },
+ "bgp": {
+ "local_as": "300",
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {}
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "r0": {
+ "links": {
+ "lo": {
+ "ipv4": "auto",
+ "ipv6": "auto",
+ "type": "loopback"
+ },
+ "r1": {
+ "ipv4": "auto",
+ "ipv6": "auto"
+ }
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/tests/topotests/bgp_communities_topo1/test_bgp_communities.py b/tests/topotests/bgp_communities_topo1/test_bgp_communities.py
new file mode 100644
index 000000000..7d960d691
--- /dev/null
+++ b/tests/topotests/bgp_communities_topo1/test_bgp_communities.py
@@ -0,0 +1,635 @@
+#!/usr/bin/python
+
+#
+# Copyright (c) 2020 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation,
+# Inc. ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+Following tests are covered to test bgp community functionality:
+- Verify routes are not advertised when NO-ADVERTISE Community is applied
+
+"""
+
+import os
+import sys
+import time
+import json
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from mininet.topo import Topo
+from lib.topogen import Topogen, get_topogen
+
+# Import topoJson from lib, to create topology and initial configuration
+from lib.common_config import (
+ start_topology,
+ write_test_header,
+ write_test_footer,
+ reset_config_on_routers,
+ verify_rib,
+ create_static_routes,
+ check_address_types,
+ step,
+ create_route_maps,
+ create_prefix_lists,
+ create_route_maps,
+)
+from lib.topolog import logger
+from lib.bgp import (
+ verify_bgp_convergence,
+ create_router_bgp,
+ clear_bgp_and_verify,
+ verify_bgp_rib,
+)
+from lib.topojson import build_topo_from_json, build_config_from_json
+from copy import deepcopy
+
+# Reading the data from JSON File for topology creation
+jsonFile = "{}/bgp_communities.json".format(CWD)
+try:
+ with open(jsonFile, "r") as topoJson:
+ topo = json.load(topoJson)
+except IOError:
+ assert False, "Could not read file {}".format(jsonFile)
+
+# Global variables
+BGP_CONVERGENCE = False
+ADDR_TYPES = check_address_types()
+NETWORK = {"ipv4": "2.2.2.2/32", "ipv6": "22:22::2/128"}
+NEXT_HOP_IP = {}
+
+
+class BGPCOMMUNITIES(Topo):
+ """
+ Test BGPCOMMUNITIES - topology 1
+
+ * `Topo`: Topology object
+ """
+
+ def build(self, *_args, **_opts):
+ """Build function"""
+ tgen = get_topogen(self)
+
+ # Building topology from json file
+ build_topo_from_json(tgen, topo)
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: {}".format(testsuite_run_time))
+ logger.info("=" * 40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ tgen = Topogen(BGPCOMMUNITIES, mod.__name__)
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start deamons and then start routers
+ start_topology(tgen)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ # Checking BGP convergence
+ global BGP_CONVERGENCE
+ global ADDR_TYPES
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # Api call verify whether BGP is converged
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error:" " {}".format(
+ BGP_CONVERGENCE
+ )
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module(mod):
+ """
+ Teardown the pytest environment
+
+ * `mod`: module name
+ """
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+ logger.info(
+ "Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
+ )
+ logger.info("=" * 40)
+
+
+#####################################################
+#
+# Tests starting
+#
+#####################################################
+
+
+def test_bgp_no_advertise_community_p0(request):
+ """
+ Verify routes are not advertised when NO-ADVERTISE Community is applied
+
+ """
+
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ reset_config_on_routers(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ NEXT_HOP_IP = {
+ "ipv4": topo["routers"]["r0"]["links"]["r1"]["ipv4"].split("/")[0],
+ "ipv6": topo["routers"]["r0"]["links"]["r1"]["ipv6"].split("/")[0],
+ }
+
+ # configure static routes
+ dut = "r3"
+ protocol = "bgp"
+
+ for addr_type in ADDR_TYPES:
+ # Enable static routes
+ input_dict = {
+ "r1": {
+ "static_routes": [
+ {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
+ ]
+ }
+ }
+
+ logger.info("Configure static routes")
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("configure redistribute static and connected in Router BGP " "in R1")
+
+ input_dict_2 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "BGP neighbors are up, static and connected route advertised from"
+ " R1 are present on R2 BGP table and RIB using show ip bgp and "
+ " show ip route"
+ )
+ step(
+ "Static and connected route advertised from R1 are present on R3"
+ " BGP table and RIB using show ip bgp and show ip route"
+ )
+
+ dut = "r3"
+ protocol = "bgp"
+ result = verify_bgp_rib(tgen, addr_type, dut, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Configure prefix list P1 on R2 to permit route coming from R1")
+ # Create ip prefix list
+ input_dict_2 = {
+ "r2": {
+ "prefix_lists": {
+ addr_type: {
+ "pf_list_1_{}".format(addr_type): [
+ {"seqid": 10, "network": "any", "action": "permit"}
+ ]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ # Create route map
+ input_dict_3 = {
+ "r2": {
+ "route_maps": {
+ "rmap_match_pf_1_{}".format(addr_type): [
+ {
+ "action": "permit",
+ "seq_id": "5",
+ "match": {
+ addr_type: {"prefix_lists": "pf_list_1_" + addr_type}
+ },
+ "set": {"community": {"num": "no-advertise"}},
+ }
+ ]
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ step(
+ "Apply route-map RM1 on R2, R2 to R3 BGP neighbor with no"
+ " advertise community"
+ )
+ # Configure neighbor for route map
+ input_dict_4 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {
+ "route_maps": [
+ {
+ "name": "rmap_match_pf_1_"
+ + addr_type,
+ "direction": "in",
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "After advertising no advertise community to BGP neighbor "
+ "static and connected router got removed from R3 verify using "
+ "show ip bgp & show ip route"
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, dut, input_dict, expected=False)
+ assert result is not True, "Testcase {} : Failed \n "
+ " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+ result = verify_rib(
+ tgen, addr_type, dut, input_dict, protocol=protocol, expected=False
+ )
+ assert result is not True, "Testcase {} : Failed \n "
+ " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+ step("Remove and Add no advertise community")
+ # Configure neighbor for route map
+ input_dict_4 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {
+ "route_maps": [
+ {
+ "name": "rmap_match_pf_1_"
+ + addr_type,
+ "direction": "in",
+ "delete": True,
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "After removing no advertise community from BGP neighbor "
+ "static and connected router got advertised to R3 and "
+ "removing route-map, verify route using show ip bgp"
+ " and show ip route"
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, dut, input_dict)
+ assert result is True, "Testcase {} : Failed \n "
+ " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+ result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n "
+ " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+ step("Repeat above steps when IBGP nbr configured between R1, R2 & R2, R3")
+ topo1 = deepcopy(topo)
+
+ topo1["routers"]["r1"]["bgp"]["local_as"] = "100"
+ topo1["routers"]["r2"]["bgp"]["local_as"] = "100"
+ topo1["routers"]["r3"]["bgp"]["local_as"] = "100"
+
+ for rtr in ["r1", "r2", "r3"]:
+ if "bgp" in topo1["routers"][rtr].keys():
+ delete_bgp = {rtr: {"bgp": {"delete": True}}}
+ result = create_router_bgp(tgen, topo1, delete_bgp)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ config_bgp = {
+ rtr: {"bgp": {"local_as": topo1["routers"][rtr]["bgp"]["local_as"]}}
+ }
+ result = create_router_bgp(tgen, topo1, config_bgp)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ build_config_from_json(tgen, topo1, save_bkup=False)
+
+ step("verify bgp convergence before starting test case")
+
+ bgp_convergence = verify_bgp_convergence(tgen, topo1)
+ assert bgp_convergence is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, bgp_convergence
+ )
+
+ # configure static routes
+ dut = "r3"
+ protocol = "bgp"
+
+ for addr_type in ADDR_TYPES:
+ # Enable static routes
+ input_dict = {
+ "r1": {
+ "static_routes": [
+ {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
+ ]
+ }
+ }
+
+ logger.info("Configure static routes")
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("configure redistribute static and connected in Router " "BGP in R1")
+
+ input_dict_2 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "redistribute": [
+ {"redist_type": "static"},
+ {"redist_type": "connected"},
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "BGP neighbors are up, static and connected route advertised from"
+ " R1 are present on R2 BGP table and RIB using show ip bgp and "
+ " show ip route"
+ )
+ step(
+ "Static and connected route advertised from R1 are present on R3"
+ " BGP table and RIB using show ip bgp and show ip route"
+ )
+
+ dut = "r2"
+ protocol = "bgp"
+ result = verify_bgp_rib(tgen, addr_type, dut, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Configure prefix list P1 on R2 to permit route coming from R1")
+ # Create ip prefix list
+ input_dict_2 = {
+ "r2": {
+ "prefix_lists": {
+ addr_type: {
+ "pf_list_1_{}".format(addr_type): [
+ {"seqid": 10, "network": "any", "action": "permit"}
+ ]
+ }
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ # Create route map
+ input_dict_3 = {
+ "r2": {
+ "route_maps": {
+ "rmap_match_pf_1_{}".format(addr_type): [
+ {
+ "action": "permit",
+ "seq_id": "5",
+ "match": {
+ addr_type: {"prefix_lists": "pf_list_1_" + addr_type}
+ },
+ "set": {"community": {"num": "no-advertise"}},
+ }
+ ]
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_3)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+ step(
+ "Apply route-map RM1 on R2, R2 to R3 BGP neighbor with no"
+ " advertise community"
+ )
+
+ # Configure neighbor for route map
+ input_dict_4 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {
+ "route_maps": [
+ {
+ "name": "rmap_match_pf_1_"
+ + addr_type,
+ "direction": "in",
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "After advertising no advertise community to BGP neighbor "
+ "static and connected router got removed from R3 verify using "
+ "show ip bgp & show ip route"
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, dut, input_dict)
+ assert result is True, "Testcase {} : Failed \n "
+ " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+ result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n "
+ " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+ step("Remove and Add no advertise community")
+ # Configure neighbor for route map
+ input_dict_4 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r1": {
+ "dest_link": {
+ "r2": {
+ "route_maps": [
+ {
+ "name": "rmap_match_pf_1_"
+ + addr_type,
+ "direction": "in",
+ "delete": True,
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step(
+ "After removing no advertise community from BGP neighbor "
+ "static and connected router got advertised to R3 and "
+ "removing route verify using show ip bgp and "
+ " show ip route"
+ )
+
+ result = verify_bgp_rib(tgen, addr_type, dut, input_dict)
+ assert result is True, "Testcase {} : Failed \n "
+ " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+ result = verify_rib(tgen, addr_type, dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n "
+ " Routes still present in R3 router. Error: {}".format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index 26181f097..b8dd7cda5 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -27,14 +27,17 @@ from lib import topotest
from lib.topolog import logger
# Import common_config to use commomnly used APIs
-from lib.common_config import (create_common_configuration,
- InvalidCLIError,
- load_config_to_router,
- check_address_types,
- generate_ips,
- validate_ip_address,
- find_interface_with_greater_ip,
- run_frr_cmd, retry)
+from lib.common_config import (
+ create_common_configuration,
+ InvalidCLIError,
+ load_config_to_router,
+ check_address_types,
+ generate_ips,
+ validate_ip_address,
+ find_interface_with_greater_ip,
+ run_frr_cmd,
+ retry,
+)
BGP_CONVERGENCE_TIMEOUT = 10
@@ -487,13 +490,11 @@ def __create_bgp_unicast_address_family(
if "allowas_in" in peer:
allow_as_in = peer["allowas_in"]
- config_data.append("{} allowas-in {}".format(neigh_cxt,
- allow_as_in))
+ config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in))
if "no_allowas_in" in peer:
allow_as_in = peer["no_allowas_in"]
- config_data.append("no {} allowas-in {}".format(neigh_cxt,
- allow_as_in))
+ config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in))
if prefix_lists:
for prefix_list in prefix_lists:
name = prefix_list.setdefault("name", {})
@@ -529,12 +530,10 @@ def __create_bgp_unicast_address_family(
config_data.append(cmd)
if allowas_in:
- number_occurences = allowas_in.\
- setdefault("number_occurences", {})
+ number_occurences = allowas_in.setdefault("number_occurences", {})
del_action = allowas_in.setdefault("delete", False)
- cmd = "{} allowas-in {}".format(neigh_cxt,
- number_occurences)
+ cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences)
if del_action:
cmd = "no {}".format(cmd)
@@ -640,6 +639,9 @@ def verify_bgp_convergence(tgen, topo):
logger.debug("Entering lib API: verify_bgp_convergence()")
for router, rnode in tgen.routers().iteritems():
+ if "bgp" not in topo["routers"][router]:
+ continue
+
logger.info("Verifying BGP Convergence on router %s", router)
show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
@@ -1031,7 +1033,7 @@ def clear_bgp_and_verify(tgen, topo, router):
" router {}".format(router)
)
return errormsg
- logger.info(peer_uptime_after_clear_bgp)
+
# Comparing peerUptimeEstablishedEpoch dictionaries
if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
logger.info("BGP neighborship is reset after clear BGP on router %s", router)
@@ -1750,9 +1752,9 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
# Static routes
sleep(2)
- logger.info('Checking router {} BGP RIB:'.format(dut))
+ logger.info("Checking router {} BGP RIB:".format(dut))
- if 'static_routes' in input_dict[routerInput]:
+ if "static_routes" in input_dict[routerInput]:
static_routes = input_dict[routerInput]["static_routes"]
for static_route in static_routes:
@@ -1762,12 +1764,10 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
nh_found = False
vrf = static_route.setdefault("vrf", None)
if vrf:
- cmd = "{} vrf {} {}".\
- format(command, vrf, addr_type)
+ cmd = "{} vrf {} {}".format(command, vrf, addr_type)
else:
- cmd = "{} {}".\
- format(command, addr_type)
+ cmd = "{} {}".format(command, addr_type)
cmd = "{} json".format(cmd)
@@ -1775,8 +1775,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
# Verifying output dictionary rib_routes_json is not empty
if bool(rib_routes_json) == False:
- errormsg = "No route found in rib of router {}..". \
- format(router)
+ errormsg = "No route found in rib of router {}..".format(router)
return errormsg
network = static_route["network"]
@@ -1804,64 +1803,84 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
if not isinstance(next_hop, list):
next_hop = [next_hop]
list1 = next_hop
- found_hops = [rib_r["ip"] for rib_r in
- rib_routes_json["routes"][
- st_rt][0]["nexthops"]]
+ found_hops = [
+ rib_r["ip"]
+ for rib_r in rib_routes_json["routes"][st_rt][0][
+ "nexthops"
+ ]
+ ]
list2 = found_hops
- missing_list_of_nexthops = \
- set(list2).difference(list1)
- additional_nexthops_in_required_nhs = \
- set(list1).difference(list2)
+ missing_list_of_nexthops = set(list2).difference(list1)
+ additional_nexthops_in_required_nhs = set(
+ list1
+ ).difference(list2)
if list2:
if additional_nexthops_in_required_nhs:
- logger.info("Missing nexthop %s for route"\
- " %s in RIB of router %s\n", \
- additional_nexthops_in_required_nhs, \
- st_rt, dut)
- errormsg=("Nexthop {} is Missing for "\
- "route {} in RIB of router {}\n".format(
+ logger.info(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
additional_nexthops_in_required_nhs,
- st_rt, dut))
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
return errormsg
else:
nh_found = True
if aspath:
- found_paths = rib_routes_json["routes"][
- st_rt][0]["path"]
+ found_paths = rib_routes_json["routes"][st_rt][0][
+ "path"
+ ]
if aspath == found_paths:
aspath_found = True
- logger.info("Found AS path {} for route" \
- " {} in RIB of router "\
- "{}\n".format(aspath, st_rt, dut))
+ logger.info(
+ "Found AS path {} for route"
+ " {} in RIB of router "
+ "{}\n".format(aspath, st_rt, dut)
+ )
else:
- errormsg=("AS Path {} is missing for route"\
- "for route {} in RIB of router {}\n"\
- .format(aspath, st_rt, dut))
+ errormsg = (
+ "AS Path {} is missing for route"
+ "for route {} in RIB of router {}\n".format(
+ aspath, st_rt, dut
+ )
+ )
return errormsg
else:
missing_routes.append(st_rt)
if nh_found:
- logger.info("Found next_hop {} for all bgp"
- " routes in RIB of"
- " router {}\n".format(next_hop, \
- router))
+ logger.info(
+ "Found next_hop {} for all bgp"
+ " routes in RIB of"
+ " router {}\n".format(next_hop, router)
+ )
if len(missing_routes) > 0:
- errormsg = ("Missing route in RIB of router {}, "
- "routes: {}\n".format(dut, missing_routes))
+ errormsg = (
+ "Missing route in RIB of router {}, "
+ "routes: {}\n".format(dut, missing_routes)
+ )
return errormsg
if found_routes:
- logger.info("Verified routes in router {} BGP RIB, "
- "found routes are: {} \n".\
- format(dut, found_routes))
+ logger.info(
+ "Verified routes in router {} BGP RIB, "
+ "found routes are: {} \n".format(dut, found_routes)
+ )
continue
if "bgp" not in input_dict[routerInput]:
- continue
+ continue
# Advertise networks
bgp_data_list = input_dict[routerInput]["bgp"]
@@ -1872,36 +1891,33 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
for bgp_data in bgp_data_list:
vrf_id = bgp_data.setdefault("vrf", None)
if vrf_id:
- cmd = "{} vrf {} {}".\
- format(command, vrf_id, addr_type)
+ cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
else:
- cmd = "{} {}".\
- format(command, addr_type)
+ cmd = "{} {}".format(command, addr_type)
cmd = "{} json".format(cmd)
- rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
# Verifying output dictionary rib_routes_json is not empty
if bool(rib_routes_json) == False:
- errormsg = "No route found in rib of router {}..". \
- format(router)
+ errormsg = "No route found in rib of router {}..".format(router)
return errormsg
bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
- advertise_network = \
- bgp_net_advertise.setdefault("advertise_networks", [])
+ advertise_network = bgp_net_advertise.setdefault(
+ "advertise_networks", []
+ )
for advertise_network_dict in advertise_network:
found_routes = []
missing_routes = []
found = False
- network = advertise_network_dict['network']
+ network = advertise_network_dict["network"]
- if 'no_of_network' in advertise_network_dict:
- no_of_network = advertise_network_dict[
- 'no_of_network']
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
else:
no_of_network = 1
@@ -1923,13 +1939,17 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
missing_routes.append(st_rt)
if len(missing_routes) > 0:
- errormsg = ("Missing route in BGP RIB of router {},"
- " are: {}\n".format(dut, missing_routes))
+ errormsg = (
+ "Missing route in BGP RIB of router {},"
+ " are: {}\n".format(dut, missing_routes)
+ )
return errormsg
if found_routes:
- logger.info("Verified routes in router {} BGP RIB, found "
- "routes are: {}\n".format(dut, found_routes))
+ logger.info(
+ "Verified routes in router {} BGP RIB, found "
+ "routes are: {}\n".format(dut, found_routes)
+ )
logger.debug("Exiting lib API: verify_bgp_rib()")
return True
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index 5ee59070c..2483930d1 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -1198,7 +1198,7 @@ def create_route_maps(tgen, input_dict, build=False):
ipv6_data = set_data.setdefault("ipv6", {})
local_preference = set_data.setdefault("locPrf", None)
metric = set_data.setdefault("metric", None)
- as_path = set_data.setdefault("path", {})
+ as_path = set_data.setdefault("aspath", {})
weight = set_data.setdefault("weight", None)
community = set_data.setdefault("community", {})
large_community = set_data.setdefault("large_community", {})