summaryrefslogtreecommitdiffstats
path: root/tests/topotests/lib/pim.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/pim.py')
-rw-r--r--tests/topotests/lib/pim.py264
1 files changed, 247 insertions, 17 deletions
diff --git a/tests/topotests/lib/pim.py b/tests/topotests/lib/pim.py
index f7440efd..369a794e 100644
--- a/tests/topotests/lib/pim.py
+++ b/tests/topotests/lib/pim.py
@@ -149,7 +149,7 @@ def _add_pim_rp_config(tgen, topo, input_dict, router, build, config_data_dict):
# At least one interface must be enabled for PIM on the router
pim_if_enabled = False
pim6_if_enabled = False
- for destLink, data in topo[dut]["links"].items():
+ for _, data in topo[dut]["links"].items():
if "pim" in data:
pim_if_enabled = True
if "pim6" in data:
@@ -332,6 +332,13 @@ def create_igmp_config(tgen, topo, input_dict=None, build=False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
+ if attribute == "static-group":
+ for group in data:
+ cmd = "ip {} {} {}".format(protocol, attribute, group)
+ if del_attr:
+ cmd = "no {}".format(cmd)
+ config_data.append(cmd)
+
if attribute == "query":
for query, value in data.items():
if query != "delete":
@@ -603,7 +610,7 @@ def find_rp_details(tgen, topo):
# ip address of RP
rp_addr = rp_dict["rp_addr"]
- for link, data in topo["routers"][router]["links"].items():
+ for _, data in topo["routers"][router]["links"].items():
if data["ipv4"].split("/")[0] == rp_addr:
rp_details[router] = rp_addr
@@ -1600,7 +1607,7 @@ def verify_pim_rp_info(
if type(group_addresses) is not list:
group_addresses = [group_addresses]
- if type(oif) is not list:
+ if oif is not None and type(oif) is not list:
oif = [oif]
for grp in group_addresses:
@@ -1739,6 +1746,49 @@ def verify_pim_rp_info(
@retry(retry_timeout=60, diag_pct=0)
+def verify_pim_rp_info_is_empty(tgen, dut, af="ipv4"):
+ """
+ Verify pim rp info by running "show ip pim rp-info" cli
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+
+ Usage
+ -----
+ dut = "r1"
+ result = verify_pim_rp_info_is_empty(tgen, dut)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ ip_cmd = "ip"
+ if af == "ipv6":
+ ip_cmd = "ipv6"
+
+ logger.info("[DUT: %s]: Verifying %s rp info", dut, ip_cmd)
+ cmd = "show {} pim rp-info json".format(ip_cmd)
+ show_ip_rp_info_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ if show_ip_rp_info_json:
+ errormsg = "[DUT %s]: Verifying empty rp-info [FAILED]!!" % (dut)
+ return errormsg
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=60, diag_pct=0)
def verify_pim_state(
tgen,
dut,
@@ -2089,7 +2139,7 @@ def verify_pim_interface(
)
return True
else:
- for destLink, data in topo["routers"][dut]["links"].items():
+ for _, data in topo["routers"][dut]["links"].items():
if "type" in data and data["type"] == "loopback":
continue
@@ -2292,7 +2342,7 @@ def clear_pim_interfaces(tgen, dut):
# Waiting for maximum 60 sec
fail_intf = []
- for retry in range(1, 13):
+ for _ in range(1, 13):
sleep(5)
logger.info("[DUT: %s]: Waiting for 5 sec for PIM neighbors" " to come up", dut)
run_json_after = run_frr_cmd(rnode, "show ip pim neighbor json", isjson=True)
@@ -2368,7 +2418,7 @@ def clear_igmp_interfaces(tgen, dut):
total_groups_before_clear = igmp_json["totalGroups"]
- for key, value in igmp_json.items():
+ for _, value in igmp_json.items():
if type(value) is not dict:
continue
@@ -2381,7 +2431,7 @@ def clear_igmp_interfaces(tgen, dut):
result = run_frr_cmd(rnode, "clear ip igmp interfaces")
# Waiting for maximum 60 sec
- for retry in range(1, 13):
+ for _ in range(1, 13):
logger.info(
"[DUT: %s]: Waiting for 5 sec for igmp interfaces" " to come up", dut
)
@@ -2404,10 +2454,11 @@ def clear_igmp_interfaces(tgen, dut):
# Verify uptime for groups
for group in group_before_clear.keys():
- d1 = datetime.datetime.strptime(group_before_clear[group], "%H:%M:%S")
- d2 = datetime.datetime.strptime(group_after_clear[group], "%H:%M:%S")
- if d2 >= d1:
- errormsg = ("[DUT: %s]: IGMP group is not cleared", " [FAILED!!]", dut)
+ if group in group_after_clear:
+ d1 = datetime.datetime.strptime(group_before_clear[group], "%H:%M:%S")
+ d2 = datetime.datetime.strptime(group_after_clear[group], "%H:%M:%S")
+ if d2 >= d1:
+ errormsg = ("[DUT: %s]: IGMP group is not cleared", " [FAILED!!]", dut)
logger.info("[DUT: %s]: IGMP group is cleared [PASSED!!]")
@@ -2460,7 +2511,7 @@ def clear_mroute_verify(tgen, dut, expected=True):
# RFC 3376: 8.2. Query Interval - Default: 125 seconds
# So waiting for maximum 130 sec to get the igmp report
- for retry in range(1, 26):
+ for _ in range(1, 26):
logger.info("[DUT: %s]: Waiting for 2 sec for mroutes" " to come up", dut)
sleep(5)
keys_json1 = mroute_json_1.keys()
@@ -2671,7 +2722,7 @@ def add_rp_interfaces_and_pim_config(tgen, topo, interface, rp, rp_mapping):
try:
config_data = []
- for group, rp_list in rp_mapping.items():
+ for _, rp_list in rp_mapping.items():
for _rp in rp_list:
config_data.append("interface {}".format(interface))
config_data.append("ip address {}".format(_rp))
@@ -2720,7 +2771,7 @@ def scapy_send_bsr_raw_packet(tgen, topo, senderRouter, receiverRouter, packet=N
script_path = os.path.join(CWD, "send_bsr_packet.py")
node = tgen.net[senderRouter]
- for destLink, data in topo["routers"][senderRouter]["links"].items():
+ for _, data in topo["routers"][senderRouter]["links"].items():
if "type" in data and data["type"] == "loopback":
continue
@@ -2744,6 +2795,48 @@ def scapy_send_bsr_raw_packet(tgen, topo, senderRouter, receiverRouter, packet=N
return True
+def scapy_send_autorp_raw_packet(tgen, senderRouter, senderInterface, packet=None):
+ """
+ Using scapy Raw() method to send AutoRP raw packet from one FRR
+ to other
+
+ Parameters:
+ -----------
+ * `tgen` : Topogen object
+ * `senderRouter` : Sender router
+ * `senderInterface` : SenderInterface
+ * `packet` : AutoRP packet in raw format
+
+ returns:
+ --------
+ errormsg or True
+ """
+
+ global CWD
+ result = ""
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ python3_path = tgen.net.get_exec_path(["python3", "python"])
+ # send_bsr_packet.py has no direct ties to bsr, just sends a raw packet out
+ # a given interface, so just reuse it
+ script_path = os.path.join(CWD, "send_bsr_packet.py")
+ node = tgen.net[senderRouter]
+
+ cmd = [
+ python3_path,
+ script_path,
+ packet,
+ senderInterface,
+ "--interval=1",
+ "--count=1",
+ ]
+ logger.info("Scapy cmd: \n %s", cmd)
+ node.cmd_raises(cmd)
+
+ logger.debug("Exiting lib API: scapy_send_autorp_raw_packet")
+ return True
+
+
def find_rp_from_bsrp_info(tgen, dut, bsr, grp=None):
"""
Find which RP is having lowest prioriy and returns rp IP
@@ -2795,12 +2888,12 @@ def find_rp_from_bsrp_info(tgen, dut, bsr, grp=None):
# RP with lowest priority
if len(priority_dict) != 1:
- rp_p, lowest_priority = sorted(rp_priority.items(), key=lambda x: x[1])[0]
+ rp_p, _ = sorted(rp_priority.items(), key=lambda x: x[1])[0]
rp_details[group] = rp_p
# RP with highest hash value
if len(priority_dict) == 1:
- rp_h, highest_hash = sorted(rp_hash.items(), key=lambda x: x[1])[-1]
+ rp_h, _ = sorted(rp_hash.items(), key=lambda x: x[1])[-1]
rp_details[group] = rp_h
# RP with highest IP address
@@ -3239,7 +3332,7 @@ def verify_pim_join(
interface_json = show_pim_join_json[interface]
grp_addr = grp_addr.split("/")[0]
- for source, data in interface_json[grp_addr].items():
+ for _, data in interface_json[grp_addr].items():
# Verify pim join
if pim_join:
if data["group"] == grp_addr and data["channelJoinName"] == "JOIN":
@@ -4254,6 +4347,143 @@ def verify_local_igmp_groups(tgen, dut, interface, group_addresses):
return True
+@retry(retry_timeout=62)
+def verify_static_groups(tgen, dut, interface, group_addresses):
+ """
+ Verify static groups are received from an intended interface
+ by running "show ip igmp static-group json" command
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `interface`: interface, from which IGMP groups are configured
+ * `group_addresses`: IGMP group address
+
+ Usage
+ -----
+ dut = "r1"
+ interface = "r1-r0-eth0"
+ group_address = "225.1.1.1"
+ result = verify_static_groups(tgen, dut, interface, group_address)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+
+ if dut not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying static groups received:", dut)
+ show_static_group_json = run_frr_cmd(
+ rnode, "show ip igmp static-group json", isjson=True
+ )
+
+ if type(group_addresses) is not list:
+ group_addresses = [group_addresses]
+
+ if interface not in show_static_group_json:
+ errormsg = (
+ "[DUT %s]: Verifying static group received"
+ " from interface %s [FAILED]!! " % (dut, interface)
+ )
+ return errormsg
+
+ for grp_addr in group_addresses:
+ found = False
+ for index in show_static_group_json[interface]["groups"]:
+ if index["group"] == grp_addr:
+ found = True
+ break
+ if not found:
+ errormsg = (
+ "[DUT %s]: Verifying static group received"
+ " from interface %s [FAILED]!! "
+ " Expected: %s " % (dut, interface, grp_addr)
+ )
+ return errormsg
+
+ logger.info(
+ "[DUT %s]: Verifying static group %s received "
+ "from interface %s [PASSED]!! ",
+ dut,
+ grp_addr,
+ interface,
+ )
+
+ logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
+ return True
+
+
+@retry(retry_timeout=62)
+def verify_local_igmp_proxy_groups(
+ tgen, dut, group_addresses_present, group_addresses_not_present
+):
+ """
+ Verify igmp proxy groups are as expected by running
+ "show ip igmp static-group json" command
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: device under test
+ * `group_addresses_present`: IGMP group addresses which should
+ currently be proxied
+ * `group_addresses_not_present`: IGMP group addresses which should
+ not currently be proxied
+
+ Usage
+ -----
+ dut = "r1"
+ group_addresses_present = "225.1.1.1"
+ group_addresses_not_present = "225.2.2.2"
+ result = verify_igmp_proxy_groups(tgen, dut, group_p, group_np)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ if dut not in tgen.routers():
+ errormsg = "[DUT %s]: Device not found!"
+ return errormsg
+
+ rnode = tgen.routers()[dut]
+
+ logger.info("[DUT: %s]: Verifying local IGMP proxy groups:", dut)
+
+ out = rnode.vtysh_cmd("show ip igmp proxy json", isjson=True)
+ groups = [g["group"] if "group" in g else None for g in out["r1-eth1"]["groups"]]
+
+ if type(group_addresses_present) is not list:
+ group_addresses_present = [group_addresses_present]
+ if type(group_addresses_not_present) is not list:
+ group_addresses_not_present = [group_addresses_not_present]
+
+ for test_addr in group_addresses_present:
+ if not test_addr in groups:
+ errormsg = (
+ "[DUT %s]: Verifying local IGMP proxy joins FAILED!! "
+ " Expected but not found: %s " % (dut, test_addr)
+ )
+ return errormsg
+
+ for test_addr in group_addresses_not_present:
+ if test_addr in groups:
+ errormsg = (
+ "[DUT %s]: Verifying local IGMP proxy join removed FAILED!! "
+ " Unexpected but found: %s " % (dut, test_addr)
+ )
+ return errormsg
+
+ return True
+
+
def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type="ipv4"):
"""
Verify ip pim interface traffic by running