diff options
Diffstat (limited to 'tests/topotests/isis_topo1/test_isis_topo1.py')
-rw-r--r-- | tests/topotests/isis_topo1/test_isis_topo1.py | 896 |
1 files changed, 896 insertions, 0 deletions
diff --git a/tests/topotests/isis_topo1/test_isis_topo1.py b/tests/topotests/isis_topo1/test_isis_topo1.py new file mode 100644 index 00000000..b629757d --- /dev/null +++ b/tests/topotests/isis_topo1/test_isis_topo1.py @@ -0,0 +1,896 @@ +#!/usr/bin/env python +# SPDX-License-Identifier: ISC + +# +# test_isis_topo1.py +# Part of NetDEF Topology Tests +# +# Copyright (c) 2017 by +# Network Device Education Foundation, Inc. ("NetDEF") +# + +""" +test_isis_topo1.py: Test ISIS topology. +""" +import datetime +import functools +import json +import os +import re +import sys +import pytest + +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) + +# pylint: disable=C0413 +from lib import topotest +from lib.common_config import ( + retry, + stop_router, + start_router, +) +from lib.topogen import Topogen, TopoRouter, get_topogen +from lib.topolog import logger + + +pytestmark = [pytest.mark.isisd] + +VERTEX_TYPE_LIST = [ + "pseudo_IS", + "pseudo_TE-IS", + "IS", + "TE-IS", + "ES", + "IP internal", + "IP external", + "IP TE", + "IP6 internal", + "IP6 external", + "UNKNOWN", +] + + +def build_topo(tgen): + "Build function" + + # Add ISIS routers: + # r1 r2 + # | sw1 | sw2 + # r3 r4 + # | | + # sw3 sw4 + # \ / + # r5 + for routern in range(1, 6): + tgen.add_router("r{}".format(routern)) + + # r1 <- sw1 -> r3 + sw = tgen.add_switch("sw1") + sw.add_link(tgen.gears["r1"]) + sw.add_link(tgen.gears["r3"]) + + # r2 <- sw2 -> r4 + sw = tgen.add_switch("sw2") + sw.add_link(tgen.gears["r2"]) + sw.add_link(tgen.gears["r4"]) + + # r3 <- sw3 -> r5 + sw = tgen.add_switch("sw3") + sw.add_link(tgen.gears["r3"]) + sw.add_link(tgen.gears["r5"]) + + # r4 <- sw4 -> r5 + sw = tgen.add_switch("sw4") + sw.add_link(tgen.gears["r4"]) + sw.add_link(tgen.gears["r5"]) + + +def setup_module(mod): + "Sets up the pytest environment" + tgen = Topogen(build_topo, mod.__name__) + tgen.start_topology() + + # For all registered routers, load the zebra configuration file + for rname, router in tgen.routers().items(): + router.load_config( + TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname)) + ) + router.load_config( + TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname)) + ) + + # After loading the configurations, this function loads configured daemons. + tgen.start_router() + + +def teardown_module(mod): + "Teardown the pytest environment" + tgen = get_topogen() + + # This function tears down the whole topology. + tgen.stop_topology() + + +def test_isis_convergence(): + "Wait for the protocol to converge before starting to test" + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + logger.info("waiting for ISIS protocol to converge") + # Code to generate the json files. + # for rname, router in tgen.routers().items(): + # open('/tmp/{}_topology.json'.format(rname), 'w').write( + # json.dumps(show_isis_topology(router), indent=2, sort_keys=True) + # ) + + for rname, router in tgen.routers().items(): + filename = "{0}/{1}/{1}_topology.json".format(CWD, rname) + expected = json.loads(open(filename).read()) + + def compare_isis_topology(router, expected): + "Helper function to test ISIS topology convergence." + actual = show_isis_topology(router) + return topotest.json_cmp(actual, expected) + + test_func = functools.partial(compare_isis_topology, router, expected) + (result, diff) = topotest.run_and_expect(test_func, None, wait=0.5, count=120) + assert result, "ISIS did not converge on {}:\n{}".format(rname, diff) + + +def test_isis_route_installation(): + "Check whether all expected routes are present" + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + logger.info("Checking routers for installed ISIS routes") + + # Check for routes in 'show ip route json' + for rname, router in tgen.routers().items(): + filename = "{0}/{1}/{1}_route.json".format(CWD, rname) + expected = json.loads(open(filename, "r").read()) + + def compare_isis_installed_routes(router, expected): + "Helper function to test ISIS routes installed in rib." + actual = router.vtysh_cmd("show ip route json", isjson=True) + return topotest.json_cmp(actual, expected) + + test_func = functools.partial(compare_isis_installed_routes, router, expected) + (result, diff) = topotest.run_and_expect(test_func, None, wait=1, count=10) + assertmsg = "Router '{}' routes mismatch".format(rname) + assert result, assertmsg + + +def test_isis_linux_route_installation(): + "Check whether all expected routes are present and installed in the OS" + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + logger.info("Checking routers for installed ISIS routes in OS") + + # Check for routes in `ip route` + for rname, router in tgen.routers().items(): + filename = "{0}/{1}/{1}_route_linux.json".format(CWD, rname) + expected = json.loads(open(filename, "r").read()) + actual = topotest.ip4_route(router) + assertmsg = "Router '{}' OS routes mismatch".format(rname) + assert topotest.json_cmp(actual, expected) is None, assertmsg + + +def test_isis_route6_installation(): + "Check whether all expected routes are present" + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + logger.info("Checking routers for installed ISIS IPv6 routes") + + # Check for routes in 'show ip route json' + for rname, router in tgen.routers().items(): + filename = "{0}/{1}/{1}_route6.json".format(CWD, rname) + expected = json.loads(open(filename, "r").read()) + + def compare_isis_v6_installed_routes(router, expected): + "Helper function to test ISIS v6 routes installed in rib." + actual = router.vtysh_cmd("show ipv6 route json", isjson=True) + return topotest.json_cmp(actual, expected) + + test_func = functools.partial( + compare_isis_v6_installed_routes, router, expected + ) + (result, diff) = topotest.run_and_expect(test_func, None, wait=1, count=10) + assertmsg = "Router '{}' routes mismatch".format(rname) + assert result, assertmsg + + +def test_isis_linux_route6_installation(): + "Check whether all expected routes are present and installed in the OS" + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + logger.info("Checking routers for installed ISIS IPv6 routes in OS") + + # Check for routes in `ip route` + for rname, router in tgen.routers().items(): + filename = "{0}/{1}/{1}_route6_linux.json".format(CWD, rname) + expected = json.loads(open(filename, "r").read()) + actual = topotest.ip6_route(router) + assertmsg = "Router '{}' OS routes mismatch".format(rname) + assert topotest.json_cmp(actual, expected) is None, assertmsg + + +def test_isis_summary_json(): + "Check json struct in show isis summary json" + + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + logger.info("Checking 'show isis summary json'") + for rname, router in tgen.routers().items(): + logger.info("Checking router %s", rname) + json_output = tgen.gears[rname].vtysh_cmd("show isis summary json", isjson=True) + assertmsg = "Test isis summary json failed in '{}' data '{}'".format( + rname, json_output + ) + assert json_output["vrf"] == "default", assertmsg + assert json_output["areas"][0]["area"] == "1", assertmsg + assert json_output["areas"][0]["levels"][0]["id"] != "3", assertmsg + + +def test_isis_interface_json(): + "Check json struct in show isis interface json" + + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + logger.info("Checking 'show isis interface json'") + for rname, router in tgen.routers().items(): + logger.info("Checking router %s", rname) + json_output = tgen.gears[rname].vtysh_cmd( + "show isis interface json", isjson=True + ) + assertmsg = "Test isis interface json failed in '{}' data '{}'".format( + rname, json_output + ) + assert ( + json_output["areas"][0]["circuits"][0]["interface"]["name"] + == rname + "-eth0" + ), assertmsg + + for rname, router in tgen.routers().items(): + logger.info("Checking router %s", rname) + json_output = tgen.gears[rname].vtysh_cmd( + "show isis interface detail json", isjson=True + ) + assertmsg = "Test isis interface json failed in '{}' data '{}'".format( + rname, json_output + ) + assert ( + json_output["areas"][0]["circuits"][0]["interface"]["name"] + == rname + "-eth0" + ), assertmsg + + +def test_isis_neighbor_json(): + "Check json struct in show isis neighbor json" + + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # tgen.mininet_cli() + logger.info("Checking 'show isis neighbor json'") + for rname, router in tgen.routers().items(): + logger.info("Checking router %s", rname) + json_output = tgen.gears[rname].vtysh_cmd( + "show isis neighbor json", isjson=True + ) + assertmsg = "Test isis neighbor json failed in '{}' data '{}'".format( + rname, json_output + ) + assert ( + json_output["areas"][0]["circuits"][0]["interface"] == rname + "-eth0" + ), assertmsg + + for rname, router in tgen.routers().items(): + logger.info("Checking router %s", rname) + json_output = tgen.gears[rname].vtysh_cmd( + "show isis neighbor detail json", isjson=True + ) + assertmsg = "Test isis neighbor json failed in '{}' data '{}'".format( + rname, json_output + ) + assert ( + json_output["areas"][0]["circuits"][0]["interface"]["name"] + == rname + "-eth0" + ), assertmsg + + +def test_isis_database_json(): + "Check json struct in show isis database json" + + tgen = get_topogen() + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + # tgen.mininet_cli() + logger.info("Checking 'show isis database json'") + for rname, router in tgen.routers().items(): + logger.info("Checking router %s", rname) + json_output = tgen.gears[rname].vtysh_cmd( + "show isis database json", isjson=True + ) + assertmsg = "Test isis database json failed in '{}' data '{}'".format( + rname, json_output + ) + assert json_output["areas"][0]["area"]["name"] == "1", assertmsg + assert json_output["areas"][0]["levels"][0]["id"] != "3", assertmsg + + for rname, router in tgen.routers().items(): + logger.info("Checking router %s", rname) + json_output = tgen.gears[rname].vtysh_cmd( + "show isis database detail json", isjson=True + ) + assertmsg = "Test isis database json failed in '{}' data '{}'".format( + rname, json_output + ) + assert json_output["areas"][0]["area"]["name"] == "1", assertmsg + assert json_output["areas"][0]["levels"][0]["id"] != "3", assertmsg + + +def test_isis_overload_on_startup(): + "Check that overload on startup behaves as expected" + + tgen = get_topogen() + net = get_topogen().net + overload_time = 120 + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + logger.info("Testing overload on startup behavior") + + # Configure set-overload-bit on-startup on r3 + r3 = tgen.gears["r3"] + r3.vtysh_cmd( + f""" + configure + router isis 1 + set-overload-bit on-startup {overload_time} + """ + ) + # Restart r3 + logger.info("Stop router") + stop_router(tgen, "r3") + logger.info("Start router") + + tstamp_before_start_router = datetime.datetime.now() + start_router(tgen, "r3") + tstamp_after_start_router = datetime.datetime.now() + startup_router_time = ( + tstamp_after_start_router - tstamp_before_start_router + ).total_seconds() + + # Check that the overload bit is set in r3's LSP + check_lsp_overload_bit("r3", "r3.00-00", "0/0/1") + check_lsp_overload_bit("r1", "r3.00-00", "0/0/1") + + # Attempt to unset overload bit while timer is still running + r3.vtysh_cmd( + """ + configure + router isis 1 + no set-overload-bit on-startup + no set-overload-bit + """ + ) + + # Check overload bit is still set + check_lsp_overload_bit("r1", "r3.00-00", "0/0/1") + + # Check that overload bit is unset after timer completes + check_lsp_overload_bit("r3", "r3.00-00", "0/0/0") + tstamp_after_bit_unset = datetime.datetime.now() + check_lsp_overload_bit("r1", "r3.00-00", "0/0/0") + + # Collect time overloaded + time_overloaded = ( + tstamp_after_bit_unset - tstamp_after_start_router + ).total_seconds() + logger.info(f"Time Overloaded: {time_overloaded}") + + # Use time it took to startup router as lower bound + logger.info( + f"Assert that overload time falls in range: {overload_time - startup_router_time} < {time_overloaded} <= {overload_time}" + ) + result = overload_time - startup_router_time < time_overloaded <= overload_time + assert result + + +def test_isis_overload_on_startup_cancel_timer(): + "Check that overload on startup timer is cancelled when overload bit is set/unset" + + tgen = get_topogen() + net = get_topogen().net + overload_time = 90 + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + logger.info( + "Testing overload on startup behavior with set overload bit: cancel timer" + ) + + # Configure set-overload-bit on-startup on r3 + r3 = tgen.gears["r3"] + r3.vtysh_cmd( + f""" + configure + router isis 1 + set-overload-bit on-startup {overload_time} + set-overload-bit + """ + ) + # Restart r3 + logger.info("Stop router") + stop_router(tgen, "r3") + logger.info("Start router") + start_router(tgen, "r3") + + # Check that the overload bit is set in r3's LSP + check_lsp_overload_bit("r3", "r3.00-00", "0/0/1") + + # Check that overload timer is running + check_overload_timer("r3", True) + + # Unset overload bit while timer is running + r3.vtysh_cmd( + """ + configure + router isis 1 + no set-overload-bit + """ + ) + + # Check that overload timer is cancelled + check_overload_timer("r3", False) + + # Check overload bit is unset + check_lsp_overload_bit("r3", "r3.00-00", "0/0/0") + + +def test_isis_overload_on_startup_override_timer(): + "Check that overload bit remains set after overload timer expires if overload bit is configured" + + tgen = get_topogen() + net = get_topogen().net + overload_time = 60 + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + logger.info( + "Testing overload on startup behavior with set overload bit: override timer" + ) + + # Configure set-overload-bit on-startup on r3 + r3 = tgen.gears["r3"] + r3.vtysh_cmd( + f""" + configure + router isis 1 + set-overload-bit on-startup {overload_time} + set-overload-bit + """ + ) + # Restart r3 + logger.info("Stop router") + stop_router(tgen, "r3") + logger.info("Start router") + start_router(tgen, "r3") + + # Check that the overload bit is set in r3's LSP + check_lsp_overload_bit("r3", "r3.00-00", "0/0/1") + + # Check that overload timer is running + check_overload_timer("r3", True) + + # Check that overload timer expired + check_overload_timer("r3", False) + + # Check overload bit is still set + check_lsp_overload_bit("r3", "r3.00-00", "0/0/1") + + +def test_isis_advertise_passive_only(): + """Check that we only advertise prefixes of passive interfaces when advertise-passive-only is enabled.""" + tgen = get_topogen() + net = get_topogen().net + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + logger.info("Testing isis advertise-passive-only behavior") + expected_prefixes_no_advertise_passive_only = set( + ["10.0.20.0/24", "10.254.0.1/32", "2001:db8:f::1/128", "2001:db8:1:1::/64"] + ) + expected_prefixes_advertise_passive_only = set( + ["10.254.0.1/32", "2001:db8:f::1/128"] + ) + lsp_id = "r1.00-00" + + r1 = tgen.gears["r1"] + r1.vtysh_cmd( + """ + configure + router isis 1 + no redistribute ipv4 connected level-2 + no redistribute ipv6 connected level-2 + interface lo + ip router isis 1 + ipv6 router isis 1 + isis passive + end + """ + ) + + result = check_advertised_prefixes( + r1, lsp_id, expected_prefixes_no_advertise_passive_only + ) + assert result is True, result + + r1.vtysh_cmd( + """ + configure + router isis 1 + advertise-passive-only + end + """ + ) + + result = check_advertised_prefixes( + r1, lsp_id, expected_prefixes_advertise_passive_only + ) + assert result is True, result + + +def test_isis_hello_padding_during_adjacency_formation(): + """Check that IIH packets is only padded when adjacency is still being formed + when isis hello padding during-adjacency-formation is configured + """ + tgen = get_topogen() + net = get_topogen().net + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + logger.info("Testing isis hello padding during-adjacency-formation behavior") + r3 = tgen.gears["r3"] + + # Reduce hello-multiplier to make the adjacency go down faster. + r3.vtysh_cmd( + """ + configure + interface r3-eth0 + isis hello-multiplier 2 + """ + ) + + r1 = tgen.gears["r1"] + cmd_output = r1.vtysh_cmd( + """ + configure + interface r1-eth0 + isis hello padding during-adjacency-formation + end + debug isis adj-packets + """ + ) + result = check_last_iih_packet_for_padding(r1, expect_padding=False) + assert result is True, result + + r3.vtysh_cmd( + """ + configure + interface r3-eth0 + shutdown + """ + ) + result = check_last_iih_packet_for_padding(r1, expect_padding=True) + assert result is True, result + + r3 = tgen.gears["r3"] + r3.vtysh_cmd( + """ + configure + interface r3-eth0 + no shutdown + """ + ) + result = check_last_iih_packet_for_padding(r1, expect_padding=False) + assert result is True, result + + +@retry(retry_timeout=10) +def check_last_iih_packet_for_padding(router, expect_padding): + logfilename = "{}/{}".format(router.gearlogdir, "isisd.log") + last_hello_packet_line = None + with open(logfilename, "r") as f: + lines = f.readlines() + for line in lines: + if re.search("Sending .+? IIH", line): + last_hello_packet_line = line + + if last_hello_packet_line is None: + return "Expected IIH packet in {}, but no packet found".format(logfilename) + + interface_name, packet_length = re.search( + r"Sending .+ IIH on (.+), length (\d+)", last_hello_packet_line + ).group(1, 2) + packet_length = int(packet_length) + interface_output = router.vtysh_cmd("show interface {} json".format(interface_name)) + interface_json = json.loads(interface_output) + padded_packet_length = interface_json[interface_name]["mtu"] - 3 + if expect_padding: + if packet_length == padded_packet_length: + return True + return ( + "Expected padded packet with length {}, got packet with length {}".format( + padded_packet_length, packet_length + ) + ) + if packet_length < padded_packet_length: + return True + return "Expected unpadded packet with length less than {}, got packet with length {}".format( + padded_packet_length, packet_length + ) + + +@retry(retry_timeout=5) +def check_advertised_prefixes(router, lsp_id, expected_prefixes): + output = router.vtysh_cmd("show isis database detail {}".format(lsp_id)) + prefixes = set(re.findall(r"IP(?:v6)? Reachability: (.*) \(Metric: 10\)", output)) + if prefixes == expected_prefixes: + return True + return str({"expected_prefixes:": expected_prefixes, "prefixes": prefixes}) + + +@retry(retry_timeout=200) +def _check_lsp_overload_bit(router, overloaded_router_lsp, att_p_ol_expected): + "Verfiy overload bit in router's LSP" + + tgen = get_topogen() + router = tgen.gears[router] + logger.info(f"check_overload_bit {router}") + isis_database_output = router.vtysh_cmd( + "show isis database {} json".format(overloaded_router_lsp) + ) + + database_json = json.loads(isis_database_output) + att_p_ol = database_json["areas"][0]["levels"][1]["lsps"][0]["attPOl"] + if att_p_ol == att_p_ol_expected: + return True + return "{} peer with expected att_p_ol {} got {} ".format( + router.name, att_p_ol_expected, att_p_ol + ) + + +def check_lsp_overload_bit(router, overloaded_router_lsp, att_p_ol_expected): + "Verfiy overload bit in router's LSP" + + assertmsg = _check_lsp_overload_bit( + router, overloaded_router_lsp, att_p_ol_expected + ) + assert assertmsg is True, assertmsg + + +@retry(retry_timeout=200) +def _check_overload_timer(router, timer_expected): + "Verfiy overload bit in router's LSP" + + tgen = get_topogen() + router = tgen.gears[router] + output = router.vtysh_cmd("show event timers") + + timer_running = "set_overload_on_start_timer" in output + if timer_running == timer_expected: + return True + return "Expected timer running status: {}".format(timer_expected) + + +def check_overload_timer(router, timer_expected): + "Verfiy overload bit in router's LSP" + + assertmsg = _check_overload_timer(router, timer_expected) + assert assertmsg is True, assertmsg + + +def test_memory_leak(): + "Run the memory leak test and report results." + tgen = get_topogen() + if not tgen.is_memleak_enabled(): + pytest.skip("Memory leak test/report is disabled") + + tgen.report_memory_leaks() + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args)) + + +# +# Auxiliary functions +# + + +def dict_merge(dct, merge_dct): + """ + Recursive dict merge. Inspired by :meth:``dict.update()``, instead of + updating only top-level keys, dict_merge recurses down into dicts nested + to an arbitrary depth, updating keys. The ``merge_dct`` is merged into + ``dct``. + :param dct: dict onto which the merge is executed + :param merge_dct: dct merged into dct + :return: None + + Source: + https://gist.github.com/angstwad/bf22d1822c38a92ec0a9 + """ + for k, v in merge_dct.items(): + if k in dct and isinstance(dct[k], dict) and topotest.is_mapping(merge_dct[k]): + dict_merge(dct[k], merge_dct[k]) + else: + dct[k] = merge_dct[k] + + +def parse_topology(lines, level): + """ + Parse the output of 'show isis topology level-X' into a Python dict. + """ + areas = {} + area = None + ipv = None + vertex_type_regex = "|".join(VERTEX_TYPE_LIST) + + for line in lines: + area_match = re.match(r"Area (.+):", line) + if area_match: + area = area_match.group(1) + if area not in areas: + areas[area] = {level: {"ipv4": [], "ipv6": []}} + ipv = None + continue + elif area is None: + continue + + if re.match(r"IS\-IS paths to level-. routers that speak IPv6", line): + ipv = "ipv6" + continue + if re.match(r"IS\-IS paths to level-. routers that speak IP", line): + ipv = "ipv4" + continue + + item_match = re.match( + r"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)", line + ) + if ( + item_match is not None + and item_match.group(1) == "Vertex" + and item_match.group(2) == "Type" + and item_match.group(3) == "Metric" + and item_match.group(4) == "Next-Hop" + and item_match.group(5) == "Interface" + and item_match.group(6) == "Parent" + ): + # Skip header + continue + + item_match = re.match( + r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)".format( + vertex_type_regex + ), + line, + ) + if item_match is not None: + areas[area][level][ipv].append( + { + "vertex": item_match.group(1), + "type": item_match.group(2), + "metric": item_match.group(3), + "next-hop": item_match.group(5), + "interface": item_match.group(6), + "parent": item_match.group(7), + } + ) + continue + + item_match = re.match( + r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)".format(vertex_type_regex), + line, + ) + + if item_match is not None: + areas[area][level][ipv].append( + { + "vertex": item_match.group(1), + "type": item_match.group(2), + "metric": item_match.group(3), + "parent": item_match.group(5), + } + ) + continue + + item_match = re.match(r"([^\s]+)", line) + if item_match is not None: + areas[area][level][ipv].append({"vertex": item_match.group(1)}) + continue + + return areas + + +def show_isis_topology(router): + """ + Get the ISIS topology in a dictionary format. + + Sample: + { + 'area-name': { + 'level-1': [ + { + 'vertex': 'r1' + } + ], + 'level-2': [ + { + 'vertex': '10.0.0.1/24', + 'type': 'IP', + 'parent': '0', + 'metric': 'internal' + } + ] + }, + 'area-name-2': { + 'level-2': [ + { + "interface": "rX-ethY", + "metric": "Z", + "next-hop": "rA", + "parent": "rC(B)", + "type": "TE-IS", + "vertex": "rD" + } + ] + } + } + """ + l1out = topotest.normalize_text( + router.vtysh_cmd("show isis topology level-1") + ).splitlines() + l2out = topotest.normalize_text( + router.vtysh_cmd("show isis topology level-2") + ).splitlines() + + l1 = parse_topology(l1out, "level-1") + l2 = parse_topology(l2out, "level-2") + + dict_merge(l1, l2) + return l1 |