diff options
author | Daniel Baumann <daniel@debian.org> | 2024-11-09 14:26:35 +0100 |
---|---|---|
committer | Daniel Baumann <daniel@debian.org> | 2024-11-09 14:26:35 +0100 |
commit | 47e4d7c791a050deb06e6c0fdfcac94a782a7cb9 (patch) | |
tree | 19edcac0f5dbda32bc329fa68773254fb2c488c3 /pimd/pim_zpthread.c | |
parent | Initial commit. (diff) | |
download | frr-47e4d7c791a050deb06e6c0fdfcac94a782a7cb9.tar.xz frr-47e4d7c791a050deb06e6c0fdfcac94a782a7cb9.zip |
Adding upstream version 10.1.1.upstream/10.1.1
Signed-off-by: Daniel Baumann <daniel@debian.org>
Diffstat (limited to 'pimd/pim_zpthread.c')
-rw-r--r-- | pimd/pim_zpthread.c | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/pimd/pim_zpthread.c b/pimd/pim_zpthread.c new file mode 100644 index 00000000..d6b2621f --- /dev/null +++ b/pimd/pim_zpthread.c @@ -0,0 +1,216 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * PIM for Quagga + * Copyright (C) 2008 Everton da Silva Marques + */ + +#include <zebra.h> +#include <lib/log.h> +#include <lib/lib_errors.h> + +#include "pimd.h" +#include "pim_instance.h" +#include "pim_mlag.h" +#include "pim_zebra.h" + +extern struct zclient *zclient; + +#define PIM_MLAG_POST_LIMIT 100 + +int32_t mlag_bulk_cnt; + +static void pim_mlag_zebra_fill_header(enum mlag_msg_type msg_type) +{ + uint32_t fill_msg_type = msg_type; + uint16_t data_len = 0; + uint16_t msg_cnt = 1; + + switch (msg_type) { + case MLAG_REGISTER: + case MLAG_DEREGISTER: + data_len = sizeof(struct mlag_msg); + break; + case MLAG_MROUTE_ADD: + data_len = sizeof(struct mlag_mroute_add); + fill_msg_type = MLAG_MROUTE_ADD_BULK; + break; + case MLAG_MROUTE_DEL: + data_len = sizeof(struct mlag_mroute_del); + fill_msg_type = MLAG_MROUTE_DEL_BULK; + break; + case MLAG_MSG_NONE: + return; + case MLAG_STATUS_UPDATE: + case MLAG_DUMP: + case MLAG_MROUTE_ADD_BULK: + case MLAG_MROUTE_DEL_BULK: + case MLAG_PIM_CFG_DUMP: + case MLAG_VXLAN_UPDATE: + case MLAG_PEER_FRR_STATUS: + data_len = 0; + break; + } + + stream_reset(router->mlag_stream); + /* ADD Hedaer */ + stream_putl(router->mlag_stream, fill_msg_type); + /* + * In case of Bulk actual size & msg_cnt will be updated + * just before writing onto zebra + */ + stream_putw(router->mlag_stream, data_len); + stream_putw(router->mlag_stream, msg_cnt); + + if (PIM_DEBUG_MLAG) + zlog_debug(":%s: msg_type: %d/%d len %d", + __func__, msg_type, fill_msg_type, data_len); +} + +static void pim_mlag_zebra_flush_buffer(void) +{ + uint32_t msg_type; + + /* Stream had bulk messages update the Hedaer */ + if (mlag_bulk_cnt > 1) { + /* + * No need to reset the pointer, below api reads from data[0] + */ + STREAM_GETL(router->mlag_stream, msg_type); + if (msg_type == MLAG_MROUTE_ADD_BULK) { + stream_putw_at( + router->mlag_stream, 4, + (mlag_bulk_cnt * sizeof(struct mlag_mroute_add))); + stream_putw_at(router->mlag_stream, 6, mlag_bulk_cnt); + } else if (msg_type == MLAG_MROUTE_DEL_BULK) { + stream_putw_at( + router->mlag_stream, 4, + (mlag_bulk_cnt * sizeof(struct mlag_mroute_del))); + stream_putw_at(router->mlag_stream, 6, mlag_bulk_cnt); + } else { + flog_err(EC_LIB_ZAPI_ENCODE, + "unknown bulk message type %d bulk_count %d", + msg_type, mlag_bulk_cnt); + stream_reset(router->mlag_stream); + mlag_bulk_cnt = 0; + return; + } + } + + zclient_send_mlag_data(zclient, router->mlag_stream); +stream_failure: + stream_reset(router->mlag_stream); + mlag_bulk_cnt = 0; +} + +/* + * Only ROUTE add & Delete will be bulked. + * Buffer will be flushed, when + * 1) there were no messages in the queue + * 2) Curr_msg_type != prev_msg_type + */ + +static void pim_mlag_zebra_check_for_buffer_flush(uint32_t curr_msg_type, + uint32_t prev_msg_type) +{ + /* First Message, keep bulking */ + if (prev_msg_type == MLAG_MSG_NONE) { + mlag_bulk_cnt = 1; + return; + } + + /*msg type is route add & delete, keep bulking */ + if (curr_msg_type == prev_msg_type + && (curr_msg_type == MLAG_MROUTE_ADD + || curr_msg_type == MLAG_MROUTE_DEL)) { + mlag_bulk_cnt++; + return; + } + + pim_mlag_zebra_flush_buffer(); +} + +/* + * Thsi thread reads the clients data from the Gloabl queue and encodes with + * protobuf and pass on to the MLAG socket. + */ +static void pim_mlag_zthread_handler(struct event *event) +{ + struct stream *read_s; + uint32_t wr_count = 0; + uint32_t prev_msg_type = MLAG_MSG_NONE; + uint32_t curr_msg_type = MLAG_MSG_NONE; + + router->zpthread_mlag_write = NULL; + wr_count = stream_fifo_count_safe(router->mlag_fifo); + + if (PIM_DEBUG_MLAG) + zlog_debug(":%s: Processing MLAG write, %d messages in queue", + __func__, wr_count); + + if (wr_count == 0) + return; + + for (wr_count = 0; wr_count < PIM_MLAG_POST_LIMIT; wr_count++) { + /* FIFO is empty,wait for teh message to be add */ + if (stream_fifo_count_safe(router->mlag_fifo) == 0) + break; + + read_s = stream_fifo_pop_safe(router->mlag_fifo); + if (!read_s) { + zlog_debug(":%s: Got a NULL Messages, some thing wrong", + __func__); + break; + } + STREAM_GETL(read_s, curr_msg_type); + /* + * Check for Buffer Overflow, + * MLAG Can't process more than 'PIM_MLAG_BUF_LIMIT' bytes + */ + if (router->mlag_stream->endp + read_s->endp + ZEBRA_HEADER_SIZE + > MLAG_BUF_LIMIT) + pim_mlag_zebra_flush_buffer(); + + pim_mlag_zebra_check_for_buffer_flush(curr_msg_type, + prev_msg_type); + + /* + * First message to Buffer, fill the Header + */ + if (router->mlag_stream->endp == 0) + pim_mlag_zebra_fill_header(curr_msg_type); + + /* + * add the data now + */ + stream_put(router->mlag_stream, read_s->data + read_s->getp, + read_s->endp - read_s->getp); + + stream_free(read_s); + prev_msg_type = curr_msg_type; + } + +stream_failure: + /* + * we are here , because + * 1. Queue might be empty + * 2. we crossed the max Q Read limit + * In any acse flush the buffer towards zebra + */ + pim_mlag_zebra_flush_buffer(); + + if (wr_count >= PIM_MLAG_POST_LIMIT) + pim_mlag_signal_zpthread(); +} + + +int pim_mlag_signal_zpthread(void) +{ + if (router->master) { + if (PIM_DEBUG_MLAG) + zlog_debug(":%s: Scheduling PIM MLAG write Thread", + __func__); + event_add_event(router->master, pim_mlag_zthread_handler, NULL, + 0, &router->zpthread_mlag_write); + } + return (0); +} |