summaryrefslogtreecommitdiffstats
path: root/zebra/zebra_opaque.c
diff options
context:
space:
mode:
authorMark Stapp <mjs@labn.net>2023-05-31 22:25:15 +0200
committerMark Stapp <mjs@labn.net>2023-06-23 14:57:37 +0200
commitde1a9ce0a7f869ae80e6d1339c5fa7f3fc957cd4 (patch)
treee0bedb06087d9319576b8b4f6250d95eb621c780 /zebra/zebra_opaque.c
parentlib: add notifications for opaque zapi messages (diff)
downloadfrr-de1a9ce0a7f869ae80e6d1339c5fa7f3fc957cd4.tar.xz
frr-de1a9ce0a7f869ae80e6d1339c5fa7f3fc957cd4.zip
zebra: support notifications for opaque ZAPI messages
Allow zapi clients to register to be notified when a server for an opaque message type is present. Zebra maintains these notification registrations in the same data structures that it uses for opaque message handling. Signed-off-by: Mark Stapp <mjs@labn.net>
Diffstat (limited to 'zebra/zebra_opaque.c')
-rw-r--r--zebra/zebra_opaque.c378
1 files changed, 326 insertions, 52 deletions
diff --git a/zebra/zebra_opaque.c b/zebra/zebra_opaque.c
index 2bd6ac084..9503c7469 100644
--- a/zebra/zebra_opaque.c
+++ b/zebra/zebra_opaque.c
@@ -26,10 +26,16 @@ struct opq_client_reg {
int instance;
uint32_t session_id;
+ int flags;
+
struct opq_client_reg *next;
struct opq_client_reg *prev;
};
+/* Registration is for receiving or for notifications */
+#define OPQ_CLIENT_FLAG_RECV 0x01
+#define OPQ_CLIENT_FLAG_NOTIFY 0x02
+
/* Opaque message registration info */
struct opq_msg_reg {
struct opq_regh_item item;
@@ -99,14 +105,18 @@ static int handle_opq_registration(const struct zmsghdr *hdr,
struct stream *msg);
static int handle_opq_unregistration(const struct zmsghdr *hdr,
struct stream *msg);
+static int handle_opq_notif_req(const struct zmsghdr *hdr, struct stream *msg);
+static int handle_opq_notif_unreg(const struct zapi_opaque_notif_info *info);
static int dispatch_opq_messages(struct stream_fifo *msg_fifo);
static struct opq_msg_reg *opq_reg_lookup(uint32_t type);
static bool opq_client_match(const struct opq_client_reg *client,
const struct zapi_opaque_reg_info *info);
+static bool opq_client_notif_match(const struct opq_client_reg *client,
+ const struct zapi_opaque_notif_info *info);
static struct opq_msg_reg *opq_reg_alloc(uint32_t type);
static void opq_reg_free(struct opq_msg_reg **reg);
-static struct opq_client_reg *opq_client_alloc(
- const struct zapi_opaque_reg_info *info);
+static struct opq_client_reg *opq_client_alloc(uint8_t proto, uint16_t instance,
+ uint32_t session_id);
static void opq_client_free(struct opq_client_reg **client);
static const char *opq_client2str(char *buf, size_t buflen,
const struct opq_client_reg *client);
@@ -213,6 +223,7 @@ bool zebra_opaque_handles_msgid(uint16_t id)
case ZEBRA_OPAQUE_MESSAGE:
case ZEBRA_OPAQUE_REGISTER:
case ZEBRA_OPAQUE_UNREGISTER:
+ case ZEBRA_OPAQUE_NOTIFY:
ret = true;
break;
default:
@@ -243,7 +254,7 @@ uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch)
}
}
- /* Schedule module pthread to process the batch */
+ /* Schedule module's pthread to process the batch */
if (counter > 0) {
if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
zlog_debug("%s: received %u messages",
@@ -326,6 +337,38 @@ done:
}
/*
+ * Helper to acquire/lock a client session and send the message in 's'.
+ * Note that 's' is enqueued for an io pthread, so don't free it
+ * or touch it if this returns 'true'.
+ */
+static bool opq_send_message(uint8_t proto, uint16_t instance,
+ uint32_t session_id, struct stream *s)
+{
+ bool ret = false;
+ struct zserv *zclient;
+
+ /*
+ * TODO -- this isn't ideal: we're going through an
+ * acquire/release cycle for each client for each
+ * message. Replace this with a batching version.
+ */
+ zclient = zserv_acquire_client(proto, instance, session_id);
+ if (zclient) {
+ /*
+ * Sending a message actually means enqueuing
+ * it for a zapi io pthread to send - so we
+ * don't touch the message after this call.
+ */
+ zserv_send_message(zclient, s);
+
+ zserv_release_client(zclient);
+ ret = true;
+ }
+
+ return ret;
+}
+
+/*
* Process (dispatch) or drop opaque messages.
*/
static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
@@ -336,7 +379,6 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
struct opq_msg_reg *reg;
int ret;
struct opq_client_reg *client;
- struct zserv *zclient;
char buf[50];
while ((msg = stream_fifo_pop(msg_fifo)) != NULL) {
@@ -350,6 +392,9 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
} else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) {
handle_opq_unregistration(&hdr, msg);
continue;
+ } else if (hdr.command == ZEBRA_OPAQUE_NOTIFY) {
+ handle_opq_notif_req(&hdr, msg);
+ continue;
}
/* We only process OPAQUE messages - drop anything else */
@@ -400,36 +445,25 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
dup = stream_dup(msg);
}
+ if (IS_ZEBRA_DEBUG_SEND && IS_ZEBRA_DEBUG_DETAIL)
+ zlog_debug("%s: sending %s to client %s",
+ __func__, (dup ? "dup" : "msg"),
+ opq_client2str(buf, sizeof(buf),
+ client));
+
/*
* TODO -- this isn't ideal: we're going through an
* acquire/release cycle for each client for each
* message. Replace this with a batching version.
*/
- zclient = zserv_acquire_client(client->proto,
- client->instance,
- client->session_id);
- if (zclient) {
- if (IS_ZEBRA_DEBUG_SEND &&
- IS_ZEBRA_DEBUG_DETAIL)
- zlog_debug("%s: sending %s to client %s",
- __func__,
- (dup ? "dup" : "msg"),
- opq_client2str(buf,
- sizeof(buf),
- client));
-
- /*
- * Sending a message actually means enqueuing
- * it for a zapi io pthread to send - so we
- * don't touch the message after this call.
- */
- zserv_send_message(zclient, dup ? dup : msg);
+ if (opq_send_message(client->proto, client->instance,
+ client->session_id,
+ (dup ? dup : msg))) {
+ /* Message is gone - don't touch it */
if (dup)
dup = NULL;
else
msg = NULL;
-
- zserv_release_client(zclient);
} else {
if (IS_ZEBRA_DEBUG_RECV &&
IS_ZEBRA_DEBUG_DETAIL)
@@ -457,6 +491,67 @@ drop_it:
return 0;
}
+/* Enqueue registration client object */
+static void opq_enqueue_client(struct opq_msg_reg *reg,
+ struct opq_client_reg *client)
+{
+ client->next = reg->clients;
+ if (reg->clients)
+ reg->clients->prev = client;
+ reg->clients = client;
+}
+
+/* Dequeue registration client object */
+static void opq_dequeue_client(struct opq_msg_reg *reg,
+ struct opq_client_reg *client)
+{
+ if (client->prev)
+ client->prev->next = client->next;
+ if (client->next)
+ client->next->prev = client->prev;
+ if (reg->clients == client)
+ reg->clients = client->next;
+}
+
+/*
+ * Send notification messages to any interested clients in 'reg',
+ * about 'server'; the sense is 'registered' (or not).
+ * The 'server' is not required for un-registrations.
+ */
+static void opq_send_notifications(const struct opq_msg_reg *reg,
+ const struct opq_client_reg *server,
+ bool registered)
+{
+ const struct opq_client_reg *client;
+ struct stream *msg = NULL;
+
+ /* If there are any notification clients, send them a message */
+ for (client = reg->clients; client; client = client->next) {
+ if (CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY)) {
+ msg = stream_new(ZEBRA_SMALL_PACKET_SIZE);
+
+ if (registered) {
+ zclient_opaque_notif_encode(msg, reg->type,
+ registered,
+ server->proto,
+ server->instance,
+ server->session_id);
+ } else {
+ zclient_opaque_notif_encode(msg, reg->type,
+ registered, 0, 0, 0);
+ }
+
+ /* Locate zebra client and enqueue message to it */
+ if (!opq_send_message(client->proto, client->instance,
+ client->session_id, msg)) {
+ /* Error - need to free the message */
+ stream_free(msg);
+ msg = NULL;
+ }
+ }
+ }
+}
+
/*
* Process a register/unregister message
*/
@@ -499,7 +594,9 @@ static int handle_opq_registration(const struct zmsghdr *hdr,
goto done;
}
- client = opq_client_alloc(&info);
+ client = opq_client_alloc(info.proto, info.instance,
+ info.session_id);
+ SET_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV);
if (IS_ZEBRA_DEBUG_RECV)
zlog_debug("%s: client %s registers for %u",
@@ -508,17 +605,20 @@ static int handle_opq_registration(const struct zmsghdr *hdr,
info.type);
/* Link client into registration */
- client->next = reg->clients;
- if (reg->clients)
- reg->clients->prev = client;
- reg->clients = client;
+ opq_enqueue_client(reg, client);
+
+ /* Send notifications to any clients who want them */
+ opq_send_notifications(reg, client, true);
+
} else {
/*
* No existing registrations - create one, add the
* client, and add registration to hash.
*/
reg = opq_reg_alloc(info.type);
- client = opq_client_alloc(&info);
+ client = opq_client_alloc(info.proto, info.instance,
+ info.session_id);
+ SET_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV);
if (IS_ZEBRA_DEBUG_RECV)
zlog_debug("%s: client %s registers for new reg %u",
@@ -545,8 +645,9 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr,
{
int ret = 0;
struct zapi_opaque_reg_info info;
- struct opq_client_reg *client;
+ struct opq_client_reg *client, *tclient;
struct opq_msg_reg key, *reg;
+ int scount;
char buf[50];
memset(&info, 0, sizeof(info));
@@ -571,11 +672,16 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr,
goto done;
}
- /* Look for client */
- for (client = reg->clients; client != NULL;
- client = client->next) {
- if (opq_client_match(client, &info))
- break;
+ /* Look for client info, count servers and notif clients too */
+ client = NULL;
+ scount = 0;
+
+ for (tclient = reg->clients; tclient != NULL; tclient = tclient->next) {
+ if (opq_client_match(tclient, &info))
+ client = tclient;
+
+ if (CHECK_FLAG(tclient->flags, OPQ_CLIENT_FLAG_RECV))
+ scount++;
}
if (client == NULL) {
@@ -592,19 +698,18 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr,
__func__, opq_client2str(buf, sizeof(buf), client),
info.type);
- if (client->prev)
- client->prev->next = client->next;
- if (client->next)
- client->next->prev = client->prev;
- if (reg->clients == client)
- reg->clients = client->next;
-
+ opq_dequeue_client(reg, client);
opq_client_free(&client);
+ scount--;
/* Is registration empty now? */
if (reg->clients == NULL) {
+
opq_regh_del(&opq_reg_hash, reg);
opq_reg_free(&reg);
+ } else if (scount == 0) {
+ /* Send notifications if no more servers for the message. */
+ opq_send_notifications(reg, NULL, false);
}
done:
@@ -613,13 +718,182 @@ done:
return ret;
}
+/*
+ * Handle requests about opaque notifications.
+ */
+static int handle_opq_notif_req(const struct zmsghdr *hdr, struct stream *msg)
+{
+ int ret;
+ struct zapi_opaque_notif_info info = {};
+ struct opq_client_reg *client;
+ struct opq_msg_reg key, *reg;
+ char buf[50];
+
+ ret = zclient_opaque_notif_decode(msg, &info);
+ if (ret < 0)
+ goto done;
+
+ /* Handle deregistration */
+ if (!info.reg) {
+ ret = handle_opq_notif_unreg(&info);
+ goto done;
+ }
+
+ memset(&key, 0, sizeof(key));
+
+ key.type = info.msg_type;
+
+ reg = opq_regh_find(&opq_reg_hash, &key);
+ if (reg) {
+ /* Look for dup client */
+ for (client = reg->clients; client != NULL;
+ client = client->next) {
+ if (opq_client_notif_match(client, &info))
+ break;
+ }
+
+ if (client) {
+ /* Oops - duplicate ? */
+ if (IS_ZEBRA_DEBUG_RECV)
+ zlog_debug("%s: duplicate opq notif reg client %s",
+ __func__, opq_client2str(buf,
+ sizeof(buf),
+ client));
+ goto done;
+ }
+
+ client = opq_client_alloc(info.proto, info.instance,
+ info.session_id);
+ SET_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY);
+
+ if (IS_ZEBRA_DEBUG_RECV)
+ zlog_debug("%s: client %s registers for notif %u",
+ __func__,
+ opq_client2str(buf, sizeof(buf), client),
+ info.msg_type);
+
+ /* Link client into registration */
+ opq_enqueue_client(reg, client);
+
+ /* Send notification if any registered servers */
+ /* Look for a server */
+ for (client = reg->clients; client != NULL;
+ client = client->next) {
+ if (CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV))
+ break;
+ }
+ if (client)
+ opq_send_notifications(reg, client, true);
+
+ } else if (info.reg) {
+ /*
+ * No existing registrations - create one, add the
+ * client, and add registration to hash.
+ */
+ reg = opq_reg_alloc(info.msg_type);
+ client = opq_client_alloc(info.proto, info.instance,
+ info.session_id);
+ SET_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY);
+
+ if (IS_ZEBRA_DEBUG_RECV)
+ zlog_debug("%s: client %s registers for new notif %u",
+ __func__,
+ opq_client2str(buf, sizeof(buf), client),
+ info.msg_type);
+
+ reg->clients = client;
+
+ opq_regh_add(&opq_reg_hash, reg);
+ }
+
+done:
+ stream_free(msg);
+ return ret;
+}
+
+/*
+ * Unregister notification
+ */
+static int handle_opq_notif_unreg(const struct zapi_opaque_notif_info *info)
+{
+ int ret = 0;
+ struct opq_client_reg *client;
+ struct opq_msg_reg key, *reg;
+ char buf[50];
+
+ memset(&key, 0, sizeof(key));
+
+ key.type = info->msg_type;
+
+ reg = opq_regh_find(&opq_reg_hash, &key);
+ if (reg == NULL) {
+ /* Weird: unregister for unknown message? */
+ if (IS_ZEBRA_DEBUG_RECV)
+ zlog_debug("%s: unknown client %s/%u/%u unregisters notif for unknown type %u",
+ __func__, zebra_route_string(info->proto),
+ info->instance, info->session_id,
+ info->msg_type);
+ goto done;
+ }
+
+ /* Look for client */
+ for (client = reg->clients; client != NULL; client = client->next) {
+ if (opq_client_notif_match(client, info))
+ break;
+ }
+
+ if (client == NULL) {
+ /* Oops - unregister for unknown client? */
+ if (IS_ZEBRA_DEBUG_RECV)
+ zlog_debug("%s: unknown client %s/%u/%u unregisters notif for %u",
+ __func__, zebra_route_string(info->proto),
+ info->instance, info->session_id,
+ info->msg_type);
+ goto done;
+ }
+
+ if (IS_ZEBRA_DEBUG_RECV)
+ zlog_debug("%s: client %s unregisters notif for %u", __func__,
+ opq_client2str(buf, sizeof(buf), client),
+ info->msg_type);
+
+ /* Dequeue client object */
+ opq_dequeue_client(reg, client);
+
+ opq_client_free(&client);
+
+ /* Is registration empty now? */
+ if (reg->clients == NULL) {
+ opq_regh_del(&opq_reg_hash, reg);
+ opq_reg_free(&reg);
+ }
+
+done:
+
+ return ret;
+}
+
/* Compare utility for registered clients */
static bool opq_client_match(const struct opq_client_reg *client,
const struct zapi_opaque_reg_info *info)
{
- if (client->proto == info->proto &&
- client->instance == info->instance &&
- client->session_id == info->session_id)
+ /* look for matching client, skip notifications */
+ if (client->proto == info->proto && client->instance == info->instance &&
+ client->session_id == info->session_id &&
+ CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV))
+ return true;
+ else
+ return false;
+}
+
+/* Compare helper for clients registered for notifications */
+static bool opq_client_notif_match(const struct opq_client_reg *client,
+ const struct zapi_opaque_notif_info *info)
+{
+ /* look for matching client, only for notifications */
+ if (client->proto == info->proto && client->instance == info->instance &&
+ client->session_id == info->session_id &&
+ CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY))
return true;
else
return false;
@@ -655,16 +929,16 @@ static void opq_reg_free(struct opq_msg_reg **reg)
XFREE(MTYPE_OPQ, (*reg));
}
-static struct opq_client_reg *opq_client_alloc(
- const struct zapi_opaque_reg_info *info)
+static struct opq_client_reg *opq_client_alloc(uint8_t proto, uint16_t instance,
+ uint32_t session_id)
{
struct opq_client_reg *client;
client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg));
- client->proto = info->proto;
- client->instance = info->instance;
- client->session_id = info->session_id;
+ client->proto = proto;
+ client->instance = instance;
+ client->session_id = session_id;
return client;
}