summaryrefslogtreecommitdiffstats
path: root/tests/topotests/isis_topo1/test_isis_topo1.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/isis_topo1/test_isis_topo1.py')
-rw-r--r--tests/topotests/isis_topo1/test_isis_topo1.py896
1 files changed, 896 insertions, 0 deletions
diff --git a/tests/topotests/isis_topo1/test_isis_topo1.py b/tests/topotests/isis_topo1/test_isis_topo1.py
new file mode 100644
index 00000000..b629757d
--- /dev/null
+++ b/tests/topotests/isis_topo1/test_isis_topo1.py
@@ -0,0 +1,896 @@
+#!/usr/bin/env python
+# SPDX-License-Identifier: ISC
+
+#
+# test_isis_topo1.py
+# Part of NetDEF Topology Tests
+#
+# Copyright (c) 2017 by
+# Network Device Education Foundation, Inc. ("NetDEF")
+#
+
+"""
+test_isis_topo1.py: Test ISIS topology.
+"""
+import datetime
+import functools
+import json
+import os
+import re
+import sys
+import pytest
+
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, "../"))
+
+# pylint: disable=C0413
+from lib import topotest
+from lib.common_config import (
+ retry,
+ stop_router,
+ start_router,
+)
+from lib.topogen import Topogen, TopoRouter, get_topogen
+from lib.topolog import logger
+
+
+pytestmark = [pytest.mark.isisd]
+
+VERTEX_TYPE_LIST = [
+ "pseudo_IS",
+ "pseudo_TE-IS",
+ "IS",
+ "TE-IS",
+ "ES",
+ "IP internal",
+ "IP external",
+ "IP TE",
+ "IP6 internal",
+ "IP6 external",
+ "UNKNOWN",
+]
+
+
+def build_topo(tgen):
+ "Build function"
+
+ # Add ISIS routers:
+ # r1 r2
+ # | sw1 | sw2
+ # r3 r4
+ # | |
+ # sw3 sw4
+ # \ /
+ # r5
+ for routern in range(1, 6):
+ tgen.add_router("r{}".format(routern))
+
+ # r1 <- sw1 -> r3
+ sw = tgen.add_switch("sw1")
+ sw.add_link(tgen.gears["r1"])
+ sw.add_link(tgen.gears["r3"])
+
+ # r2 <- sw2 -> r4
+ sw = tgen.add_switch("sw2")
+ sw.add_link(tgen.gears["r2"])
+ sw.add_link(tgen.gears["r4"])
+
+ # r3 <- sw3 -> r5
+ sw = tgen.add_switch("sw3")
+ sw.add_link(tgen.gears["r3"])
+ sw.add_link(tgen.gears["r5"])
+
+ # r4 <- sw4 -> r5
+ sw = tgen.add_switch("sw4")
+ sw.add_link(tgen.gears["r4"])
+ sw.add_link(tgen.gears["r5"])
+
+
+def setup_module(mod):
+ "Sets up the pytest environment"
+ tgen = Topogen(build_topo, mod.__name__)
+ tgen.start_topology()
+
+ # For all registered routers, load the zebra configuration file
+ for rname, router in tgen.routers().items():
+ router.load_config(
+ TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
+ )
+ router.load_config(
+ TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
+ )
+
+ # After loading the configurations, this function loads configured daemons.
+ tgen.start_router()
+
+
+def teardown_module(mod):
+ "Teardown the pytest environment"
+ tgen = get_topogen()
+
+ # This function tears down the whole topology.
+ tgen.stop_topology()
+
+
+def test_isis_convergence():
+ "Wait for the protocol to converge before starting to test"
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info("waiting for ISIS protocol to converge")
+ # Code to generate the json files.
+ # for rname, router in tgen.routers().items():
+ # open('/tmp/{}_topology.json'.format(rname), 'w').write(
+ # json.dumps(show_isis_topology(router), indent=2, sort_keys=True)
+ # )
+
+ for rname, router in tgen.routers().items():
+ filename = "{0}/{1}/{1}_topology.json".format(CWD, rname)
+ expected = json.loads(open(filename).read())
+
+ def compare_isis_topology(router, expected):
+ "Helper function to test ISIS topology convergence."
+ actual = show_isis_topology(router)
+ return topotest.json_cmp(actual, expected)
+
+ test_func = functools.partial(compare_isis_topology, router, expected)
+ (result, diff) = topotest.run_and_expect(test_func, None, wait=0.5, count=120)
+ assert result, "ISIS did not converge on {}:\n{}".format(rname, diff)
+
+
+def test_isis_route_installation():
+ "Check whether all expected routes are present"
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info("Checking routers for installed ISIS routes")
+
+ # Check for routes in 'show ip route json'
+ for rname, router in tgen.routers().items():
+ filename = "{0}/{1}/{1}_route.json".format(CWD, rname)
+ expected = json.loads(open(filename, "r").read())
+
+ def compare_isis_installed_routes(router, expected):
+ "Helper function to test ISIS routes installed in rib."
+ actual = router.vtysh_cmd("show ip route json", isjson=True)
+ return topotest.json_cmp(actual, expected)
+
+ test_func = functools.partial(compare_isis_installed_routes, router, expected)
+ (result, diff) = topotest.run_and_expect(test_func, None, wait=1, count=10)
+ assertmsg = "Router '{}' routes mismatch".format(rname)
+ assert result, assertmsg
+
+
+def test_isis_linux_route_installation():
+ "Check whether all expected routes are present and installed in the OS"
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info("Checking routers for installed ISIS routes in OS")
+
+ # Check for routes in `ip route`
+ for rname, router in tgen.routers().items():
+ filename = "{0}/{1}/{1}_route_linux.json".format(CWD, rname)
+ expected = json.loads(open(filename, "r").read())
+ actual = topotest.ip4_route(router)
+ assertmsg = "Router '{}' OS routes mismatch".format(rname)
+ assert topotest.json_cmp(actual, expected) is None, assertmsg
+
+
+def test_isis_route6_installation():
+ "Check whether all expected routes are present"
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info("Checking routers for installed ISIS IPv6 routes")
+
+ # Check for routes in 'show ip route json'
+ for rname, router in tgen.routers().items():
+ filename = "{0}/{1}/{1}_route6.json".format(CWD, rname)
+ expected = json.loads(open(filename, "r").read())
+
+ def compare_isis_v6_installed_routes(router, expected):
+ "Helper function to test ISIS v6 routes installed in rib."
+ actual = router.vtysh_cmd("show ipv6 route json", isjson=True)
+ return topotest.json_cmp(actual, expected)
+
+ test_func = functools.partial(
+ compare_isis_v6_installed_routes, router, expected
+ )
+ (result, diff) = topotest.run_and_expect(test_func, None, wait=1, count=10)
+ assertmsg = "Router '{}' routes mismatch".format(rname)
+ assert result, assertmsg
+
+
+def test_isis_linux_route6_installation():
+ "Check whether all expected routes are present and installed in the OS"
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info("Checking routers for installed ISIS IPv6 routes in OS")
+
+ # Check for routes in `ip route`
+ for rname, router in tgen.routers().items():
+ filename = "{0}/{1}/{1}_route6_linux.json".format(CWD, rname)
+ expected = json.loads(open(filename, "r").read())
+ actual = topotest.ip6_route(router)
+ assertmsg = "Router '{}' OS routes mismatch".format(rname)
+ assert topotest.json_cmp(actual, expected) is None, assertmsg
+
+
+def test_isis_summary_json():
+ "Check json struct in show isis summary json"
+
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info("Checking 'show isis summary json'")
+ for rname, router in tgen.routers().items():
+ logger.info("Checking router %s", rname)
+ json_output = tgen.gears[rname].vtysh_cmd("show isis summary json", isjson=True)
+ assertmsg = "Test isis summary json failed in '{}' data '{}'".format(
+ rname, json_output
+ )
+ assert json_output["vrf"] == "default", assertmsg
+ assert json_output["areas"][0]["area"] == "1", assertmsg
+ assert json_output["areas"][0]["levels"][0]["id"] != "3", assertmsg
+
+
+def test_isis_interface_json():
+ "Check json struct in show isis interface json"
+
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info("Checking 'show isis interface json'")
+ for rname, router in tgen.routers().items():
+ logger.info("Checking router %s", rname)
+ json_output = tgen.gears[rname].vtysh_cmd(
+ "show isis interface json", isjson=True
+ )
+ assertmsg = "Test isis interface json failed in '{}' data '{}'".format(
+ rname, json_output
+ )
+ assert (
+ json_output["areas"][0]["circuits"][0]["interface"]["name"]
+ == rname + "-eth0"
+ ), assertmsg
+
+ for rname, router in tgen.routers().items():
+ logger.info("Checking router %s", rname)
+ json_output = tgen.gears[rname].vtysh_cmd(
+ "show isis interface detail json", isjson=True
+ )
+ assertmsg = "Test isis interface json failed in '{}' data '{}'".format(
+ rname, json_output
+ )
+ assert (
+ json_output["areas"][0]["circuits"][0]["interface"]["name"]
+ == rname + "-eth0"
+ ), assertmsg
+
+
+def test_isis_neighbor_json():
+ "Check json struct in show isis neighbor json"
+
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # tgen.mininet_cli()
+ logger.info("Checking 'show isis neighbor json'")
+ for rname, router in tgen.routers().items():
+ logger.info("Checking router %s", rname)
+ json_output = tgen.gears[rname].vtysh_cmd(
+ "show isis neighbor json", isjson=True
+ )
+ assertmsg = "Test isis neighbor json failed in '{}' data '{}'".format(
+ rname, json_output
+ )
+ assert (
+ json_output["areas"][0]["circuits"][0]["interface"] == rname + "-eth0"
+ ), assertmsg
+
+ for rname, router in tgen.routers().items():
+ logger.info("Checking router %s", rname)
+ json_output = tgen.gears[rname].vtysh_cmd(
+ "show isis neighbor detail json", isjson=True
+ )
+ assertmsg = "Test isis neighbor json failed in '{}' data '{}'".format(
+ rname, json_output
+ )
+ assert (
+ json_output["areas"][0]["circuits"][0]["interface"]["name"]
+ == rname + "-eth0"
+ ), assertmsg
+
+
+def test_isis_database_json():
+ "Check json struct in show isis database json"
+
+ tgen = get_topogen()
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # tgen.mininet_cli()
+ logger.info("Checking 'show isis database json'")
+ for rname, router in tgen.routers().items():
+ logger.info("Checking router %s", rname)
+ json_output = tgen.gears[rname].vtysh_cmd(
+ "show isis database json", isjson=True
+ )
+ assertmsg = "Test isis database json failed in '{}' data '{}'".format(
+ rname, json_output
+ )
+ assert json_output["areas"][0]["area"]["name"] == "1", assertmsg
+ assert json_output["areas"][0]["levels"][0]["id"] != "3", assertmsg
+
+ for rname, router in tgen.routers().items():
+ logger.info("Checking router %s", rname)
+ json_output = tgen.gears[rname].vtysh_cmd(
+ "show isis database detail json", isjson=True
+ )
+ assertmsg = "Test isis database json failed in '{}' data '{}'".format(
+ rname, json_output
+ )
+ assert json_output["areas"][0]["area"]["name"] == "1", assertmsg
+ assert json_output["areas"][0]["levels"][0]["id"] != "3", assertmsg
+
+
+def test_isis_overload_on_startup():
+ "Check that overload on startup behaves as expected"
+
+ tgen = get_topogen()
+ net = get_topogen().net
+ overload_time = 120
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info("Testing overload on startup behavior")
+
+ # Configure set-overload-bit on-startup on r3
+ r3 = tgen.gears["r3"]
+ r3.vtysh_cmd(
+ f"""
+ configure
+ router isis 1
+ set-overload-bit on-startup {overload_time}
+ """
+ )
+ # Restart r3
+ logger.info("Stop router")
+ stop_router(tgen, "r3")
+ logger.info("Start router")
+
+ tstamp_before_start_router = datetime.datetime.now()
+ start_router(tgen, "r3")
+ tstamp_after_start_router = datetime.datetime.now()
+ startup_router_time = (
+ tstamp_after_start_router - tstamp_before_start_router
+ ).total_seconds()
+
+ # Check that the overload bit is set in r3's LSP
+ check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
+ check_lsp_overload_bit("r1", "r3.00-00", "0/0/1")
+
+ # Attempt to unset overload bit while timer is still running
+ r3.vtysh_cmd(
+ """
+ configure
+ router isis 1
+ no set-overload-bit on-startup
+ no set-overload-bit
+ """
+ )
+
+ # Check overload bit is still set
+ check_lsp_overload_bit("r1", "r3.00-00", "0/0/1")
+
+ # Check that overload bit is unset after timer completes
+ check_lsp_overload_bit("r3", "r3.00-00", "0/0/0")
+ tstamp_after_bit_unset = datetime.datetime.now()
+ check_lsp_overload_bit("r1", "r3.00-00", "0/0/0")
+
+ # Collect time overloaded
+ time_overloaded = (
+ tstamp_after_bit_unset - tstamp_after_start_router
+ ).total_seconds()
+ logger.info(f"Time Overloaded: {time_overloaded}")
+
+ # Use time it took to startup router as lower bound
+ logger.info(
+ f"Assert that overload time falls in range: {overload_time - startup_router_time} < {time_overloaded} <= {overload_time}"
+ )
+ result = overload_time - startup_router_time < time_overloaded <= overload_time
+ assert result
+
+
+def test_isis_overload_on_startup_cancel_timer():
+ "Check that overload on startup timer is cancelled when overload bit is set/unset"
+
+ tgen = get_topogen()
+ net = get_topogen().net
+ overload_time = 90
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info(
+ "Testing overload on startup behavior with set overload bit: cancel timer"
+ )
+
+ # Configure set-overload-bit on-startup on r3
+ r3 = tgen.gears["r3"]
+ r3.vtysh_cmd(
+ f"""
+ configure
+ router isis 1
+ set-overload-bit on-startup {overload_time}
+ set-overload-bit
+ """
+ )
+ # Restart r3
+ logger.info("Stop router")
+ stop_router(tgen, "r3")
+ logger.info("Start router")
+ start_router(tgen, "r3")
+
+ # Check that the overload bit is set in r3's LSP
+ check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
+
+ # Check that overload timer is running
+ check_overload_timer("r3", True)
+
+ # Unset overload bit while timer is running
+ r3.vtysh_cmd(
+ """
+ configure
+ router isis 1
+ no set-overload-bit
+ """
+ )
+
+ # Check that overload timer is cancelled
+ check_overload_timer("r3", False)
+
+ # Check overload bit is unset
+ check_lsp_overload_bit("r3", "r3.00-00", "0/0/0")
+
+
+def test_isis_overload_on_startup_override_timer():
+ "Check that overload bit remains set after overload timer expires if overload bit is configured"
+
+ tgen = get_topogen()
+ net = get_topogen().net
+ overload_time = 60
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info(
+ "Testing overload on startup behavior with set overload bit: override timer"
+ )
+
+ # Configure set-overload-bit on-startup on r3
+ r3 = tgen.gears["r3"]
+ r3.vtysh_cmd(
+ f"""
+ configure
+ router isis 1
+ set-overload-bit on-startup {overload_time}
+ set-overload-bit
+ """
+ )
+ # Restart r3
+ logger.info("Stop router")
+ stop_router(tgen, "r3")
+ logger.info("Start router")
+ start_router(tgen, "r3")
+
+ # Check that the overload bit is set in r3's LSP
+ check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
+
+ # Check that overload timer is running
+ check_overload_timer("r3", True)
+
+ # Check that overload timer expired
+ check_overload_timer("r3", False)
+
+ # Check overload bit is still set
+ check_lsp_overload_bit("r3", "r3.00-00", "0/0/1")
+
+
+def test_isis_advertise_passive_only():
+ """Check that we only advertise prefixes of passive interfaces when advertise-passive-only is enabled."""
+ tgen = get_topogen()
+ net = get_topogen().net
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info("Testing isis advertise-passive-only behavior")
+ expected_prefixes_no_advertise_passive_only = set(
+ ["10.0.20.0/24", "10.254.0.1/32", "2001:db8:f::1/128", "2001:db8:1:1::/64"]
+ )
+ expected_prefixes_advertise_passive_only = set(
+ ["10.254.0.1/32", "2001:db8:f::1/128"]
+ )
+ lsp_id = "r1.00-00"
+
+ r1 = tgen.gears["r1"]
+ r1.vtysh_cmd(
+ """
+ configure
+ router isis 1
+ no redistribute ipv4 connected level-2
+ no redistribute ipv6 connected level-2
+ interface lo
+ ip router isis 1
+ ipv6 router isis 1
+ isis passive
+ end
+ """
+ )
+
+ result = check_advertised_prefixes(
+ r1, lsp_id, expected_prefixes_no_advertise_passive_only
+ )
+ assert result is True, result
+
+ r1.vtysh_cmd(
+ """
+ configure
+ router isis 1
+ advertise-passive-only
+ end
+ """
+ )
+
+ result = check_advertised_prefixes(
+ r1, lsp_id, expected_prefixes_advertise_passive_only
+ )
+ assert result is True, result
+
+
+def test_isis_hello_padding_during_adjacency_formation():
+ """Check that IIH packets is only padded when adjacency is still being formed
+ when isis hello padding during-adjacency-formation is configured
+ """
+ tgen = get_topogen()
+ net = get_topogen().net
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ logger.info("Testing isis hello padding during-adjacency-formation behavior")
+ r3 = tgen.gears["r3"]
+
+ # Reduce hello-multiplier to make the adjacency go down faster.
+ r3.vtysh_cmd(
+ """
+ configure
+ interface r3-eth0
+ isis hello-multiplier 2
+ """
+ )
+
+ r1 = tgen.gears["r1"]
+ cmd_output = r1.vtysh_cmd(
+ """
+ configure
+ interface r1-eth0
+ isis hello padding during-adjacency-formation
+ end
+ debug isis adj-packets
+ """
+ )
+ result = check_last_iih_packet_for_padding(r1, expect_padding=False)
+ assert result is True, result
+
+ r3.vtysh_cmd(
+ """
+ configure
+ interface r3-eth0
+ shutdown
+ """
+ )
+ result = check_last_iih_packet_for_padding(r1, expect_padding=True)
+ assert result is True, result
+
+ r3 = tgen.gears["r3"]
+ r3.vtysh_cmd(
+ """
+ configure
+ interface r3-eth0
+ no shutdown
+ """
+ )
+ result = check_last_iih_packet_for_padding(r1, expect_padding=False)
+ assert result is True, result
+
+
+@retry(retry_timeout=10)
+def check_last_iih_packet_for_padding(router, expect_padding):
+ logfilename = "{}/{}".format(router.gearlogdir, "isisd.log")
+ last_hello_packet_line = None
+ with open(logfilename, "r") as f:
+ lines = f.readlines()
+ for line in lines:
+ if re.search("Sending .+? IIH", line):
+ last_hello_packet_line = line
+
+ if last_hello_packet_line is None:
+ return "Expected IIH packet in {}, but no packet found".format(logfilename)
+
+ interface_name, packet_length = re.search(
+ r"Sending .+ IIH on (.+), length (\d+)", last_hello_packet_line
+ ).group(1, 2)
+ packet_length = int(packet_length)
+ interface_output = router.vtysh_cmd("show interface {} json".format(interface_name))
+ interface_json = json.loads(interface_output)
+ padded_packet_length = interface_json[interface_name]["mtu"] - 3
+ if expect_padding:
+ if packet_length == padded_packet_length:
+ return True
+ return (
+ "Expected padded packet with length {}, got packet with length {}".format(
+ padded_packet_length, packet_length
+ )
+ )
+ if packet_length < padded_packet_length:
+ return True
+ return "Expected unpadded packet with length less than {}, got packet with length {}".format(
+ padded_packet_length, packet_length
+ )
+
+
+@retry(retry_timeout=5)
+def check_advertised_prefixes(router, lsp_id, expected_prefixes):
+ output = router.vtysh_cmd("show isis database detail {}".format(lsp_id))
+ prefixes = set(re.findall(r"IP(?:v6)? Reachability: (.*) \(Metric: 10\)", output))
+ if prefixes == expected_prefixes:
+ return True
+ return str({"expected_prefixes:": expected_prefixes, "prefixes": prefixes})
+
+
+@retry(retry_timeout=200)
+def _check_lsp_overload_bit(router, overloaded_router_lsp, att_p_ol_expected):
+ "Verfiy overload bit in router's LSP"
+
+ tgen = get_topogen()
+ router = tgen.gears[router]
+ logger.info(f"check_overload_bit {router}")
+ isis_database_output = router.vtysh_cmd(
+ "show isis database {} json".format(overloaded_router_lsp)
+ )
+
+ database_json = json.loads(isis_database_output)
+ att_p_ol = database_json["areas"][0]["levels"][1]["lsps"][0]["attPOl"]
+ if att_p_ol == att_p_ol_expected:
+ return True
+ return "{} peer with expected att_p_ol {} got {} ".format(
+ router.name, att_p_ol_expected, att_p_ol
+ )
+
+
+def check_lsp_overload_bit(router, overloaded_router_lsp, att_p_ol_expected):
+ "Verfiy overload bit in router's LSP"
+
+ assertmsg = _check_lsp_overload_bit(
+ router, overloaded_router_lsp, att_p_ol_expected
+ )
+ assert assertmsg is True, assertmsg
+
+
+@retry(retry_timeout=200)
+def _check_overload_timer(router, timer_expected):
+ "Verfiy overload bit in router's LSP"
+
+ tgen = get_topogen()
+ router = tgen.gears[router]
+ output = router.vtysh_cmd("show event timers")
+
+ timer_running = "set_overload_on_start_timer" in output
+ if timer_running == timer_expected:
+ return True
+ return "Expected timer running status: {}".format(timer_expected)
+
+
+def check_overload_timer(router, timer_expected):
+ "Verfiy overload bit in router's LSP"
+
+ assertmsg = _check_overload_timer(router, timer_expected)
+ assert assertmsg is True, assertmsg
+
+
+def test_memory_leak():
+ "Run the memory leak test and report results."
+ tgen = get_topogen()
+ if not tgen.is_memleak_enabled():
+ pytest.skip("Memory leak test/report is disabled")
+
+ tgen.report_memory_leaks()
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
+
+
+#
+# Auxiliary functions
+#
+
+
+def dict_merge(dct, merge_dct):
+ """
+ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
+ updating only top-level keys, dict_merge recurses down into dicts nested
+ to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
+ ``dct``.
+ :param dct: dict onto which the merge is executed
+ :param merge_dct: dct merged into dct
+ :return: None
+
+ Source:
+ https://gist.github.com/angstwad/bf22d1822c38a92ec0a9
+ """
+ for k, v in merge_dct.items():
+ if k in dct and isinstance(dct[k], dict) and topotest.is_mapping(merge_dct[k]):
+ dict_merge(dct[k], merge_dct[k])
+ else:
+ dct[k] = merge_dct[k]
+
+
+def parse_topology(lines, level):
+ """
+ Parse the output of 'show isis topology level-X' into a Python dict.
+ """
+ areas = {}
+ area = None
+ ipv = None
+ vertex_type_regex = "|".join(VERTEX_TYPE_LIST)
+
+ for line in lines:
+ area_match = re.match(r"Area (.+):", line)
+ if area_match:
+ area = area_match.group(1)
+ if area not in areas:
+ areas[area] = {level: {"ipv4": [], "ipv6": []}}
+ ipv = None
+ continue
+ elif area is None:
+ continue
+
+ if re.match(r"IS\-IS paths to level-. routers that speak IPv6", line):
+ ipv = "ipv6"
+ continue
+ if re.match(r"IS\-IS paths to level-. routers that speak IP", line):
+ ipv = "ipv4"
+ continue
+
+ item_match = re.match(
+ r"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)", line
+ )
+ if (
+ item_match is not None
+ and item_match.group(1) == "Vertex"
+ and item_match.group(2) == "Type"
+ and item_match.group(3) == "Metric"
+ and item_match.group(4) == "Next-Hop"
+ and item_match.group(5) == "Interface"
+ and item_match.group(6) == "Parent"
+ ):
+ # Skip header
+ continue
+
+ item_match = re.match(
+ r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)".format(
+ vertex_type_regex
+ ),
+ line,
+ )
+ if item_match is not None:
+ areas[area][level][ipv].append(
+ {
+ "vertex": item_match.group(1),
+ "type": item_match.group(2),
+ "metric": item_match.group(3),
+ "next-hop": item_match.group(5),
+ "interface": item_match.group(6),
+ "parent": item_match.group(7),
+ }
+ )
+ continue
+
+ item_match = re.match(
+ r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)".format(vertex_type_regex),
+ line,
+ )
+
+ if item_match is not None:
+ areas[area][level][ipv].append(
+ {
+ "vertex": item_match.group(1),
+ "type": item_match.group(2),
+ "metric": item_match.group(3),
+ "parent": item_match.group(5),
+ }
+ )
+ continue
+
+ item_match = re.match(r"([^\s]+)", line)
+ if item_match is not None:
+ areas[area][level][ipv].append({"vertex": item_match.group(1)})
+ continue
+
+ return areas
+
+
+def show_isis_topology(router):
+ """
+ Get the ISIS topology in a dictionary format.
+
+ Sample:
+ {
+ 'area-name': {
+ 'level-1': [
+ {
+ 'vertex': 'r1'
+ }
+ ],
+ 'level-2': [
+ {
+ 'vertex': '10.0.0.1/24',
+ 'type': 'IP',
+ 'parent': '0',
+ 'metric': 'internal'
+ }
+ ]
+ },
+ 'area-name-2': {
+ 'level-2': [
+ {
+ "interface": "rX-ethY",
+ "metric": "Z",
+ "next-hop": "rA",
+ "parent": "rC(B)",
+ "type": "TE-IS",
+ "vertex": "rD"
+ }
+ ]
+ }
+ }
+ """
+ l1out = topotest.normalize_text(
+ router.vtysh_cmd("show isis topology level-1")
+ ).splitlines()
+ l2out = topotest.normalize_text(
+ router.vtysh_cmd("show isis topology level-2")
+ ).splitlines()
+
+ l1 = parse_topology(l1out, "level-1")
+ l2 = parse_topology(l2out, "level-2")
+
+ dict_merge(l1, l2)
+ return l1