diff options
Diffstat (limited to 'tests/topotests/lib/pim.py')
-rw-r--r-- | tests/topotests/lib/pim.py | 264 |
1 files changed, 247 insertions, 17 deletions
diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py index f7440efd..369a794e 100644 --- a/tests/topotests/lib/pim.py +++ b/tests/topotests/lib/pim.py @@ -149,7 +149,7 @@ def _add_pim_rp_config(tgen, topo, input_dict, router, build, config_data_dict): # At least one interface must be enabled for PIM on the router pim_if_enabled = False pim6_if_enabled = False - for destLink, data in topo[dut]["links"].items(): + for _, data in topo[dut]["links"].items(): if "pim" in data: pim_if_enabled = True if "pim6" in data: @@ -332,6 +332,13 @@ def create_igmp_config(tgen, topo, input_dict=None, build=False): cmd = "no {}".format(cmd) config_data.append(cmd) + if attribute == "static-group": + for group in data: + cmd = "ip {} {} {}".format(protocol, attribute, group) + if del_attr: + cmd = "no {}".format(cmd) + config_data.append(cmd) + if attribute == "query": for query, value in data.items(): if query != "delete": @@ -603,7 +610,7 @@ def find_rp_details(tgen, topo): # ip address of RP rp_addr = rp_dict["rp_addr"] - for link, data in topo["routers"][router]["links"].items(): + for _, data in topo["routers"][router]["links"].items(): if data["ipv4"].split("/")[0] == rp_addr: rp_details[router] = rp_addr @@ -1600,7 +1607,7 @@ def verify_pim_rp_info( if type(group_addresses) is not list: group_addresses = [group_addresses] - if type(oif) is not list: + if oif is not None and type(oif) is not list: oif = [oif] for grp in group_addresses: @@ -1739,6 +1746,49 @@ def verify_pim_rp_info( @retry(retry_timeout=60, diag_pct=0) +def verify_pim_rp_info_is_empty(tgen, dut, af="ipv4"): + """ + Verify pim rp info by running "show ip pim rp-info" cli + + Parameters + ---------- + * `tgen`: topogen object + * `dut`: device under test + + Usage + ----- + dut = "r1" + result = verify_pim_rp_info_is_empty(tgen, dut) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + if dut not in tgen.routers(): + return False + + rnode = tgen.routers()[dut] + + ip_cmd = "ip" + if af == "ipv6": + ip_cmd = "ipv6" + + logger.info("[DUT: %s]: Verifying %s rp info", dut, ip_cmd) + cmd = "show {} pim rp-info json".format(ip_cmd) + show_ip_rp_info_json = run_frr_cmd(rnode, cmd, isjson=True) + + if show_ip_rp_info_json: + errormsg = "[DUT %s]: Verifying empty rp-info [FAILED]!!" % (dut) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +@retry(retry_timeout=60, diag_pct=0) def verify_pim_state( tgen, dut, @@ -2089,7 +2139,7 @@ def verify_pim_interface( ) return True else: - for destLink, data in topo["routers"][dut]["links"].items(): + for _, data in topo["routers"][dut]["links"].items(): if "type" in data and data["type"] == "loopback": continue @@ -2292,7 +2342,7 @@ def clear_pim_interfaces(tgen, dut): # Waiting for maximum 60 sec fail_intf = [] - for retry in range(1, 13): + for _ in range(1, 13): sleep(5) logger.info("[DUT: %s]: Waiting for 5 sec for PIM neighbors" " to come up", dut) run_json_after = run_frr_cmd(rnode, "show ip pim neighbor json", isjson=True) @@ -2368,7 +2418,7 @@ def clear_igmp_interfaces(tgen, dut): total_groups_before_clear = igmp_json["totalGroups"] - for key, value in igmp_json.items(): + for _, value in igmp_json.items(): if type(value) is not dict: continue @@ -2381,7 +2431,7 @@ def clear_igmp_interfaces(tgen, dut): result = run_frr_cmd(rnode, "clear ip igmp interfaces") # Waiting for maximum 60 sec - for retry in range(1, 13): + for _ in range(1, 13): logger.info( "[DUT: %s]: Waiting for 5 sec for igmp interfaces" " to come up", dut ) @@ -2404,10 +2454,11 @@ def clear_igmp_interfaces(tgen, dut): # Verify uptime for groups for group in group_before_clear.keys(): - d1 = datetime.datetime.strptime(group_before_clear[group], "%H:%M:%S") - d2 = datetime.datetime.strptime(group_after_clear[group], "%H:%M:%S") - if d2 >= d1: - errormsg = ("[DUT: %s]: IGMP group is not cleared", " [FAILED!!]", dut) + if group in group_after_clear: + d1 = datetime.datetime.strptime(group_before_clear[group], "%H:%M:%S") + d2 = datetime.datetime.strptime(group_after_clear[group], "%H:%M:%S") + if d2 >= d1: + errormsg = ("[DUT: %s]: IGMP group is not cleared", " [FAILED!!]", dut) logger.info("[DUT: %s]: IGMP group is cleared [PASSED!!]") @@ -2460,7 +2511,7 @@ def clear_mroute_verify(tgen, dut, expected=True): # RFC 3376: 8.2. Query Interval - Default: 125 seconds # So waiting for maximum 130 sec to get the igmp report - for retry in range(1, 26): + for _ in range(1, 26): logger.info("[DUT: %s]: Waiting for 2 sec for mroutes" " to come up", dut) sleep(5) keys_json1 = mroute_json_1.keys() @@ -2671,7 +2722,7 @@ def add_rp_interfaces_and_pim_config(tgen, topo, interface, rp, rp_mapping): try: config_data = [] - for group, rp_list in rp_mapping.items(): + for _, rp_list in rp_mapping.items(): for _rp in rp_list: config_data.append("interface {}".format(interface)) config_data.append("ip address {}".format(_rp)) @@ -2720,7 +2771,7 @@ def scapy_send_bsr_raw_packet(tgen, topo, senderRouter, receiverRouter, packet=N script_path = os.path.join(CWD, "send_bsr_packet.py") node = tgen.net[senderRouter] - for destLink, data in topo["routers"][senderRouter]["links"].items(): + for _, data in topo["routers"][senderRouter]["links"].items(): if "type" in data and data["type"] == "loopback": continue @@ -2744,6 +2795,48 @@ def scapy_send_bsr_raw_packet(tgen, topo, senderRouter, receiverRouter, packet=N return True +def scapy_send_autorp_raw_packet(tgen, senderRouter, senderInterface, packet=None): + """ + Using scapy Raw() method to send AutoRP raw packet from one FRR + to other + + Parameters: + ----------- + * `tgen` : Topogen object + * `senderRouter` : Sender router + * `senderInterface` : SenderInterface + * `packet` : AutoRP packet in raw format + + returns: + -------- + errormsg or True + """ + + global CWD + result = "" + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + python3_path = tgen.net.get_exec_path(["python3", "python"]) + # send_bsr_packet.py has no direct ties to bsr, just sends a raw packet out + # a given interface, so just reuse it + script_path = os.path.join(CWD, "send_bsr_packet.py") + node = tgen.net[senderRouter] + + cmd = [ + python3_path, + script_path, + packet, + senderInterface, + "--interval=1", + "--count=1", + ] + logger.info("Scapy cmd: \n %s", cmd) + node.cmd_raises(cmd) + + logger.debug("Exiting lib API: scapy_send_autorp_raw_packet") + return True + + def find_rp_from_bsrp_info(tgen, dut, bsr, grp=None): """ Find which RP is having lowest prioriy and returns rp IP @@ -2795,12 +2888,12 @@ def find_rp_from_bsrp_info(tgen, dut, bsr, grp=None): # RP with lowest priority if len(priority_dict) != 1: - rp_p, lowest_priority = sorted(rp_priority.items(), key=lambda x: x[1])[0] + rp_p, _ = sorted(rp_priority.items(), key=lambda x: x[1])[0] rp_details[group] = rp_p # RP with highest hash value if len(priority_dict) == 1: - rp_h, highest_hash = sorted(rp_hash.items(), key=lambda x: x[1])[-1] + rp_h, _ = sorted(rp_hash.items(), key=lambda x: x[1])[-1] rp_details[group] = rp_h # RP with highest IP address @@ -3239,7 +3332,7 @@ def verify_pim_join( interface_json = show_pim_join_json[interface] grp_addr = grp_addr.split("/")[0] - for source, data in interface_json[grp_addr].items(): + for _, data in interface_json[grp_addr].items(): # Verify pim join if pim_join: if data["group"] == grp_addr and data["channelJoinName"] == "JOIN": @@ -4254,6 +4347,143 @@ def verify_local_igmp_groups(tgen, dut, interface, group_addresses): return True +@retry(retry_timeout=62) +def verify_static_groups(tgen, dut, interface, group_addresses): + """ + Verify static groups are received from an intended interface + by running "show ip igmp static-group json" command + + Parameters + ---------- + * `tgen`: topogen object + * `dut`: device under test + * `interface`: interface, from which IGMP groups are configured + * `group_addresses`: IGMP group address + + Usage + ----- + dut = "r1" + interface = "r1-r0-eth0" + group_address = "225.1.1.1" + result = verify_static_groups(tgen, dut, interface, group_address) + + Returns + ------- + errormsg(str) or True + """ + + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + + if dut not in tgen.routers(): + return False + + rnode = tgen.routers()[dut] + + logger.info("[DUT: %s]: Verifying static groups received:", dut) + show_static_group_json = run_frr_cmd( + rnode, "show ip igmp static-group json", isjson=True + ) + + if type(group_addresses) is not list: + group_addresses = [group_addresses] + + if interface not in show_static_group_json: + errormsg = ( + "[DUT %s]: Verifying static group received" + " from interface %s [FAILED]!! " % (dut, interface) + ) + return errormsg + + for grp_addr in group_addresses: + found = False + for index in show_static_group_json[interface]["groups"]: + if index["group"] == grp_addr: + found = True + break + if not found: + errormsg = ( + "[DUT %s]: Verifying static group received" + " from interface %s [FAILED]!! " + " Expected: %s " % (dut, interface, grp_addr) + ) + return errormsg + + logger.info( + "[DUT %s]: Verifying static group %s received " + "from interface %s [PASSED]!! ", + dut, + grp_addr, + interface, + ) + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True + + +@retry(retry_timeout=62) +def verify_local_igmp_proxy_groups( + tgen, dut, group_addresses_present, group_addresses_not_present +): + """ + Verify igmp proxy groups are as expected by running + "show ip igmp static-group json" command + + Parameters + ---------- + * `tgen`: topogen object + * `dut`: device under test + * `group_addresses_present`: IGMP group addresses which should + currently be proxied + * `group_addresses_not_present`: IGMP group addresses which should + not currently be proxied + + Usage + ----- + dut = "r1" + group_addresses_present = "225.1.1.1" + group_addresses_not_present = "225.2.2.2" + result = verify_igmp_proxy_groups(tgen, dut, group_p, group_np) + + Returns + ------- + errormsg(str) or True + """ + + if dut not in tgen.routers(): + errormsg = "[DUT %s]: Device not found!" + return errormsg + + rnode = tgen.routers()[dut] + + logger.info("[DUT: %s]: Verifying local IGMP proxy groups:", dut) + + out = rnode.vtysh_cmd("show ip igmp proxy json", isjson=True) + groups = [g["group"] if "group" in g else None for g in out["r1-eth1"]["groups"]] + + if type(group_addresses_present) is not list: + group_addresses_present = [group_addresses_present] + if type(group_addresses_not_present) is not list: + group_addresses_not_present = [group_addresses_not_present] + + for test_addr in group_addresses_present: + if not test_addr in groups: + errormsg = ( + "[DUT %s]: Verifying local IGMP proxy joins FAILED!! " + " Expected but not found: %s " % (dut, test_addr) + ) + return errormsg + + for test_addr in group_addresses_not_present: + if test_addr in groups: + errormsg = ( + "[DUT %s]: Verifying local IGMP proxy join removed FAILED!! " + " Unexpected but found: %s " % (dut, test_addr) + ) + return errormsg + + return True + + def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type="ipv4"): """ Verify ip pim interface traffic by running |