summaryrefslogtreecommitdiffstats
path: root/zebra/zebra_opaque.c
diff options
context:
space:
mode:
authorMark Stapp <mjs@voltanet.io>2020-04-19 19:23:09 +0200
committerMark Stapp <mjs@voltanet.io>2020-06-02 14:20:54 +0200
commit9bb02389d0423941e328c88b38e241aa722229d0 (patch)
treec97767d204dca3585d0de87da26912ee7dfd5dde /zebra/zebra_opaque.c
parentlib: add OPAQUE zapi message (diff)
downloadfrr-9bb02389d0423941e328c88b38e241aa722229d0.tar.xz
frr-9bb02389d0423941e328c88b38e241aa722229d0.zip
zebra: add zebra opaque module
Add the zebra_opaque module, designed to offload some opaque zapi message processing to a new, dedicated pthread. Add to the build; also re-sort the lists of zebra files in subdir.am. Start, stop, and clean-up the opaque module, integrate with zebra start and shutdown. Signed-off-by: Mark Stapp <mjs@voltanet.io>
Diffstat (limited to 'zebra/zebra_opaque.c')
-rw-r--r--zebra/zebra_opaque.c271
1 files changed, 271 insertions, 0 deletions
diff --git a/zebra/zebra_opaque.c b/zebra/zebra_opaque.c
new file mode 100644
index 000000000..570387e78
--- /dev/null
+++ b/zebra/zebra_opaque.c
@@ -0,0 +1,271 @@
+/*
+ * Zebra opaque message handler module
+ * Copyright (c) 2020 Volta Networks, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; see the file COPYING; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+
+#include <zebra.h>
+#include "lib/debug.h"
+#include "lib/frr_pthread.h"
+#include "lib/stream.h"
+#include "zebra/debug.h"
+#include "zebra/zserv.h"
+#include "zebra/zebra_opaque.h"
+
+/*
+ * Globals
+ */
+static struct zebra_opaque_globals {
+
+ /* Sentinel for run or start of shutdown */
+ _Atomic uint32_t run;
+
+ /* Limit number of pending, unprocessed updates */
+ _Atomic uint32_t max_queued_updates;
+
+ /* Limit number of new messages dequeued at once, to pace an
+ * incoming burst.
+ */
+ uint32_t msgs_per_cycle;
+
+ /* Stats: counters of incoming messages, errors, and yields (when
+ * the limit has been reached.)
+ */
+ _Atomic uint32_t msgs_in;
+ _Atomic uint32_t msg_errors;
+ _Atomic uint32_t yields;
+
+ /* pthread */
+ struct frr_pthread *pthread;
+
+ /* Event-delivery context 'master' for the module */
+ struct thread_master *master;
+
+ /* Event/'thread' pointer for queued zapi messages */
+ struct thread *t_msgs;
+
+ /* Input fifo queue to the module, and lock to protect it. */
+ pthread_mutex_t mutex;
+ struct stream_fifo in_fifo;
+
+} zo_info;
+
+/* Name string for debugs/logs */
+static const char LOG_NAME[] = "Zebra Opaque";
+
+/* Prototypes */
+
+/* Main event loop, processing incoming message queue */
+static int process_messages(struct thread *event);
+
+/*
+ * Initialize the module at startup
+ */
+void zebra_opaque_init(void)
+{
+ memset(&zo_info, 0, sizeof(zo_info));
+
+ pthread_mutex_init(&zo_info.mutex, NULL);
+ stream_fifo_init(&zo_info.in_fifo);
+
+ zo_info.msgs_per_cycle = ZEBRA_OPAQUE_MSG_LIMIT;
+}
+
+/*
+ * Start the module pthread. This step is run later than the
+ * 'init' step, in case zebra has fork-ed.
+ */
+void zebra_opaque_start(void)
+{
+ struct frr_pthread_attr pattr = {
+ .start = frr_pthread_attr_default.start,
+ .stop = frr_pthread_attr_default.stop
+ };
+
+ if (IS_ZEBRA_DEBUG_EVENT)
+ zlog_debug("%s module starting", LOG_NAME);
+
+ /* Start pthread */
+ zo_info.pthread = frr_pthread_new(&pattr, "Zebra Opaque thread",
+ "zebra_opaque");
+
+ /* Associate event 'master' */
+ zo_info.master = zo_info.pthread->master;
+
+ atomic_store_explicit(&zo_info.run, 1, memory_order_relaxed);
+
+ /* Enqueue an initial event for the pthread */
+ thread_add_event(zo_info.master, process_messages, NULL, 0,
+ &zo_info.t_msgs);
+
+ /* And start the pthread */
+ frr_pthread_run(zo_info.pthread, NULL);
+}
+
+/*
+ * Module stop, halting the dedicated pthread; called from the main pthread.
+ */
+void zebra_opaque_stop(void)
+{
+ if (IS_ZEBRA_DEBUG_EVENT)
+ zlog_debug("%s module stop", LOG_NAME);
+
+ atomic_store_explicit(&zo_info.run, 0, memory_order_relaxed);
+
+ frr_pthread_stop(zo_info.pthread, NULL);
+
+ frr_pthread_destroy(zo_info.pthread);
+
+ if (IS_ZEBRA_DEBUG_EVENT)
+ zlog_debug("%s module stop complete", LOG_NAME);
+}
+
+/*
+ * Module final cleanup, called from the zebra main pthread.
+ */
+void zebra_opaque_finish(void)
+{
+ if (IS_ZEBRA_DEBUG_EVENT)
+ zlog_debug("%s module shutdown", LOG_NAME);
+
+ pthread_mutex_destroy(&zo_info.mutex);
+ stream_fifo_deinit(&zo_info.in_fifo);
+}
+
+/*
+ * Does this module handle (intercept) the specified zapi message type?
+ */
+bool zebra_opaque_handles_msgid(uint16_t id)
+{
+ bool ret = false;
+
+ switch (id) {
+ case ZEBRA_OPAQUE_MESSAGE:
+ case ZEBRA_OPAQUE_REGISTER:
+ case ZEBRA_OPAQUE_UNREGISTER:
+ ret = true;
+ break;
+ default:
+ break;
+ }
+
+ return ret;
+}
+
+/*
+ * Enqueue a batch of messages for processing - this is the public api
+ * used from the zapi processing threads.
+ */
+uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch)
+{
+ uint32_t counter = 0;
+ struct stream *msg;
+
+ /* Dequeue messages from the incoming batch, and save them
+ * on the module fifo.
+ */
+ frr_with_mutex(&zo_info.mutex) {
+ msg = stream_fifo_pop(batch);
+ while (msg) {
+ stream_fifo_push(&zo_info.in_fifo, msg);
+ counter++;
+ msg = stream_fifo_pop(batch);
+ }
+ }
+
+ /* Schedule module pthread to process the batch */
+ if (counter > 0) {
+ if (IS_ZEBRA_DEBUG_RECV)
+ zlog_debug("%s: received %u messages",
+ __func__, counter);
+ thread_add_event(zo_info.master, process_messages, NULL, 0,
+ &zo_info.t_msgs);
+ }
+
+ return counter;
+}
+
+/*
+ * Pthread event loop, process the incoming message queue.
+ */
+static int process_messages(struct thread *event)
+{
+ struct stream_fifo fifo;
+ struct stream *msg;
+ uint32_t i;
+ bool need_resched = false;
+
+ stream_fifo_init(&fifo);
+
+ /* Check for zebra shutdown */
+ if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0)
+ goto done;
+
+ /* Dequeue some messages from the incoming queue, temporarily
+ * save them on the local fifo
+ */
+ frr_with_mutex(&zo_info.mutex) {
+
+ for (i = 0; i < zo_info.msgs_per_cycle; i++) {
+ msg = stream_fifo_pop(&zo_info.in_fifo);
+ if (msg == NULL)
+ break;
+
+ stream_fifo_push(&fifo, msg);
+ }
+
+ /* We may need to reschedule, if there are still
+ * queued messages
+ */
+ if (stream_fifo_head(&zo_info.in_fifo) != NULL)
+ need_resched = true;
+ }
+
+ /* Update stats */
+ atomic_fetch_add_explicit(&zo_info.msgs_in, i, memory_order_relaxed);
+
+ /* Check for zebra shutdown */
+ if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) {
+ need_resched = false;
+ goto done;
+ }
+
+ if (IS_ZEBRA_DEBUG_RECV)
+ zlog_debug("%s: processing %u messages", __func__, i);
+
+ /* Process the messages on the local fifo */
+ /* TODO -- just discarding the messages for now */
+ msg = stream_fifo_pop(&fifo);
+ while (msg) {
+ stream_free(msg);
+ msg = stream_fifo_pop(&fifo);
+ }
+
+done:
+
+ if (need_resched) {
+ atomic_fetch_add_explicit(&zo_info.yields, 1,
+ memory_order_relaxed);
+ thread_add_event(zo_info.master, process_messages, NULL, 0,
+ &zo_info.t_msgs);
+ }
+
+ /* This will also free any leftover messages, in the shutdown case */
+ stream_fifo_deinit(&fifo);
+
+ return 0;
+}