diff options
author | Mark Stapp <mjs@labn.net> | 2023-05-31 22:25:15 +0200 |
---|---|---|
committer | Mark Stapp <mjs@labn.net> | 2023-06-23 14:57:37 +0200 |
commit | de1a9ce0a7f869ae80e6d1339c5fa7f3fc957cd4 (patch) | |
tree | e0bedb06087d9319576b8b4f6250d95eb621c780 /zebra/zebra_opaque.c | |
parent | lib: add notifications for opaque zapi messages (diff) | |
download | frr-de1a9ce0a7f869ae80e6d1339c5fa7f3fc957cd4.tar.xz frr-de1a9ce0a7f869ae80e6d1339c5fa7f3fc957cd4.zip |
zebra: support notifications for opaque ZAPI messages
Allow zapi clients to register to be notified when a server
for an opaque message type is present. Zebra maintains these
notification registrations in the same data structures that it
uses for opaque message handling.
Signed-off-by: Mark Stapp <mjs@labn.net>
Diffstat (limited to 'zebra/zebra_opaque.c')
-rw-r--r-- | zebra/zebra_opaque.c | 378 |
1 files changed, 326 insertions, 52 deletions
diff --git a/zebra/zebra_opaque.c b/zebra/zebra_opaque.c index 2bd6ac084..9503c7469 100644 --- a/zebra/zebra_opaque.c +++ b/zebra/zebra_opaque.c @@ -26,10 +26,16 @@ struct opq_client_reg { int instance; uint32_t session_id; + int flags; + struct opq_client_reg *next; struct opq_client_reg *prev; }; +/* Registration is for receiving or for notifications */ +#define OPQ_CLIENT_FLAG_RECV 0x01 +#define OPQ_CLIENT_FLAG_NOTIFY 0x02 + /* Opaque message registration info */ struct opq_msg_reg { struct opq_regh_item item; @@ -99,14 +105,18 @@ static int handle_opq_registration(const struct zmsghdr *hdr, struct stream *msg); static int handle_opq_unregistration(const struct zmsghdr *hdr, struct stream *msg); +static int handle_opq_notif_req(const struct zmsghdr *hdr, struct stream *msg); +static int handle_opq_notif_unreg(const struct zapi_opaque_notif_info *info); static int dispatch_opq_messages(struct stream_fifo *msg_fifo); static struct opq_msg_reg *opq_reg_lookup(uint32_t type); static bool opq_client_match(const struct opq_client_reg *client, const struct zapi_opaque_reg_info *info); +static bool opq_client_notif_match(const struct opq_client_reg *client, + const struct zapi_opaque_notif_info *info); static struct opq_msg_reg *opq_reg_alloc(uint32_t type); static void opq_reg_free(struct opq_msg_reg **reg); -static struct opq_client_reg *opq_client_alloc( - const struct zapi_opaque_reg_info *info); +static struct opq_client_reg *opq_client_alloc(uint8_t proto, uint16_t instance, + uint32_t session_id); static void opq_client_free(struct opq_client_reg **client); static const char *opq_client2str(char *buf, size_t buflen, const struct opq_client_reg *client); @@ -213,6 +223,7 @@ bool zebra_opaque_handles_msgid(uint16_t id) case ZEBRA_OPAQUE_MESSAGE: case ZEBRA_OPAQUE_REGISTER: case ZEBRA_OPAQUE_UNREGISTER: + case ZEBRA_OPAQUE_NOTIFY: ret = true; break; default: @@ -243,7 +254,7 @@ uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch) } } - /* Schedule module pthread to process the batch */ + /* Schedule module's pthread to process the batch */ if (counter > 0) { if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL) zlog_debug("%s: received %u messages", @@ -326,6 +337,38 @@ done: } /* + * Helper to acquire/lock a client session and send the message in 's'. + * Note that 's' is enqueued for an io pthread, so don't free it + * or touch it if this returns 'true'. + */ +static bool opq_send_message(uint8_t proto, uint16_t instance, + uint32_t session_id, struct stream *s) +{ + bool ret = false; + struct zserv *zclient; + + /* + * TODO -- this isn't ideal: we're going through an + * acquire/release cycle for each client for each + * message. Replace this with a batching version. + */ + zclient = zserv_acquire_client(proto, instance, session_id); + if (zclient) { + /* + * Sending a message actually means enqueuing + * it for a zapi io pthread to send - so we + * don't touch the message after this call. + */ + zserv_send_message(zclient, s); + + zserv_release_client(zclient); + ret = true; + } + + return ret; +} + +/* * Process (dispatch) or drop opaque messages. */ static int dispatch_opq_messages(struct stream_fifo *msg_fifo) @@ -336,7 +379,6 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo) struct opq_msg_reg *reg; int ret; struct opq_client_reg *client; - struct zserv *zclient; char buf[50]; while ((msg = stream_fifo_pop(msg_fifo)) != NULL) { @@ -350,6 +392,9 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo) } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) { handle_opq_unregistration(&hdr, msg); continue; + } else if (hdr.command == ZEBRA_OPAQUE_NOTIFY) { + handle_opq_notif_req(&hdr, msg); + continue; } /* We only process OPAQUE messages - drop anything else */ @@ -400,36 +445,25 @@ static int dispatch_opq_messages(struct stream_fifo *msg_fifo) dup = stream_dup(msg); } + if (IS_ZEBRA_DEBUG_SEND && IS_ZEBRA_DEBUG_DETAIL) + zlog_debug("%s: sending %s to client %s", + __func__, (dup ? "dup" : "msg"), + opq_client2str(buf, sizeof(buf), + client)); + /* * TODO -- this isn't ideal: we're going through an * acquire/release cycle for each client for each * message. Replace this with a batching version. */ - zclient = zserv_acquire_client(client->proto, - client->instance, - client->session_id); - if (zclient) { - if (IS_ZEBRA_DEBUG_SEND && - IS_ZEBRA_DEBUG_DETAIL) - zlog_debug("%s: sending %s to client %s", - __func__, - (dup ? "dup" : "msg"), - opq_client2str(buf, - sizeof(buf), - client)); - - /* - * Sending a message actually means enqueuing - * it for a zapi io pthread to send - so we - * don't touch the message after this call. - */ - zserv_send_message(zclient, dup ? dup : msg); + if (opq_send_message(client->proto, client->instance, + client->session_id, + (dup ? dup : msg))) { + /* Message is gone - don't touch it */ if (dup) dup = NULL; else msg = NULL; - - zserv_release_client(zclient); } else { if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL) @@ -457,6 +491,67 @@ drop_it: return 0; } +/* Enqueue registration client object */ +static void opq_enqueue_client(struct opq_msg_reg *reg, + struct opq_client_reg *client) +{ + client->next = reg->clients; + if (reg->clients) + reg->clients->prev = client; + reg->clients = client; +} + +/* Dequeue registration client object */ +static void opq_dequeue_client(struct opq_msg_reg *reg, + struct opq_client_reg *client) +{ + if (client->prev) + client->prev->next = client->next; + if (client->next) + client->next->prev = client->prev; + if (reg->clients == client) + reg->clients = client->next; +} + +/* + * Send notification messages to any interested clients in 'reg', + * about 'server'; the sense is 'registered' (or not). + * The 'server' is not required for un-registrations. + */ +static void opq_send_notifications(const struct opq_msg_reg *reg, + const struct opq_client_reg *server, + bool registered) +{ + const struct opq_client_reg *client; + struct stream *msg = NULL; + + /* If there are any notification clients, send them a message */ + for (client = reg->clients; client; client = client->next) { + if (CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY)) { + msg = stream_new(ZEBRA_SMALL_PACKET_SIZE); + + if (registered) { + zclient_opaque_notif_encode(msg, reg->type, + registered, + server->proto, + server->instance, + server->session_id); + } else { + zclient_opaque_notif_encode(msg, reg->type, + registered, 0, 0, 0); + } + + /* Locate zebra client and enqueue message to it */ + if (!opq_send_message(client->proto, client->instance, + client->session_id, msg)) { + /* Error - need to free the message */ + stream_free(msg); + msg = NULL; + } + } + } +} + /* * Process a register/unregister message */ @@ -499,7 +594,9 @@ static int handle_opq_registration(const struct zmsghdr *hdr, goto done; } - client = opq_client_alloc(&info); + client = opq_client_alloc(info.proto, info.instance, + info.session_id); + SET_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV); if (IS_ZEBRA_DEBUG_RECV) zlog_debug("%s: client %s registers for %u", @@ -508,17 +605,20 @@ static int handle_opq_registration(const struct zmsghdr *hdr, info.type); /* Link client into registration */ - client->next = reg->clients; - if (reg->clients) - reg->clients->prev = client; - reg->clients = client; + opq_enqueue_client(reg, client); + + /* Send notifications to any clients who want them */ + opq_send_notifications(reg, client, true); + } else { /* * No existing registrations - create one, add the * client, and add registration to hash. */ reg = opq_reg_alloc(info.type); - client = opq_client_alloc(&info); + client = opq_client_alloc(info.proto, info.instance, + info.session_id); + SET_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV); if (IS_ZEBRA_DEBUG_RECV) zlog_debug("%s: client %s registers for new reg %u", @@ -545,8 +645,9 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr, { int ret = 0; struct zapi_opaque_reg_info info; - struct opq_client_reg *client; + struct opq_client_reg *client, *tclient; struct opq_msg_reg key, *reg; + int scount; char buf[50]; memset(&info, 0, sizeof(info)); @@ -571,11 +672,16 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr, goto done; } - /* Look for client */ - for (client = reg->clients; client != NULL; - client = client->next) { - if (opq_client_match(client, &info)) - break; + /* Look for client info, count servers and notif clients too */ + client = NULL; + scount = 0; + + for (tclient = reg->clients; tclient != NULL; tclient = tclient->next) { + if (opq_client_match(tclient, &info)) + client = tclient; + + if (CHECK_FLAG(tclient->flags, OPQ_CLIENT_FLAG_RECV)) + scount++; } if (client == NULL) { @@ -592,19 +698,18 @@ static int handle_opq_unregistration(const struct zmsghdr *hdr, __func__, opq_client2str(buf, sizeof(buf), client), info.type); - if (client->prev) - client->prev->next = client->next; - if (client->next) - client->next->prev = client->prev; - if (reg->clients == client) - reg->clients = client->next; - + opq_dequeue_client(reg, client); opq_client_free(&client); + scount--; /* Is registration empty now? */ if (reg->clients == NULL) { + opq_regh_del(&opq_reg_hash, reg); opq_reg_free(®); + } else if (scount == 0) { + /* Send notifications if no more servers for the message. */ + opq_send_notifications(reg, NULL, false); } done: @@ -613,13 +718,182 @@ done: return ret; } +/* + * Handle requests about opaque notifications. + */ +static int handle_opq_notif_req(const struct zmsghdr *hdr, struct stream *msg) +{ + int ret; + struct zapi_opaque_notif_info info = {}; + struct opq_client_reg *client; + struct opq_msg_reg key, *reg; + char buf[50]; + + ret = zclient_opaque_notif_decode(msg, &info); + if (ret < 0) + goto done; + + /* Handle deregistration */ + if (!info.reg) { + ret = handle_opq_notif_unreg(&info); + goto done; + } + + memset(&key, 0, sizeof(key)); + + key.type = info.msg_type; + + reg = opq_regh_find(&opq_reg_hash, &key); + if (reg) { + /* Look for dup client */ + for (client = reg->clients; client != NULL; + client = client->next) { + if (opq_client_notif_match(client, &info)) + break; + } + + if (client) { + /* Oops - duplicate ? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: duplicate opq notif reg client %s", + __func__, opq_client2str(buf, + sizeof(buf), + client)); + goto done; + } + + client = opq_client_alloc(info.proto, info.instance, + info.session_id); + SET_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY); + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s registers for notif %u", + __func__, + opq_client2str(buf, sizeof(buf), client), + info.msg_type); + + /* Link client into registration */ + opq_enqueue_client(reg, client); + + /* Send notification if any registered servers */ + /* Look for a server */ + for (client = reg->clients; client != NULL; + client = client->next) { + if (CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV)) + break; + } + if (client) + opq_send_notifications(reg, client, true); + + } else if (info.reg) { + /* + * No existing registrations - create one, add the + * client, and add registration to hash. + */ + reg = opq_reg_alloc(info.msg_type); + client = opq_client_alloc(info.proto, info.instance, + info.session_id); + SET_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY); + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s registers for new notif %u", + __func__, + opq_client2str(buf, sizeof(buf), client), + info.msg_type); + + reg->clients = client; + + opq_regh_add(&opq_reg_hash, reg); + } + +done: + stream_free(msg); + return ret; +} + +/* + * Unregister notification + */ +static int handle_opq_notif_unreg(const struct zapi_opaque_notif_info *info) +{ + int ret = 0; + struct opq_client_reg *client; + struct opq_msg_reg key, *reg; + char buf[50]; + + memset(&key, 0, sizeof(key)); + + key.type = info->msg_type; + + reg = opq_regh_find(&opq_reg_hash, &key); + if (reg == NULL) { + /* Weird: unregister for unknown message? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: unknown client %s/%u/%u unregisters notif for unknown type %u", + __func__, zebra_route_string(info->proto), + info->instance, info->session_id, + info->msg_type); + goto done; + } + + /* Look for client */ + for (client = reg->clients; client != NULL; client = client->next) { + if (opq_client_notif_match(client, info)) + break; + } + + if (client == NULL) { + /* Oops - unregister for unknown client? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: unknown client %s/%u/%u unregisters notif for %u", + __func__, zebra_route_string(info->proto), + info->instance, info->session_id, + info->msg_type); + goto done; + } + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s unregisters notif for %u", __func__, + opq_client2str(buf, sizeof(buf), client), + info->msg_type); + + /* Dequeue client object */ + opq_dequeue_client(reg, client); + + opq_client_free(&client); + + /* Is registration empty now? */ + if (reg->clients == NULL) { + opq_regh_del(&opq_reg_hash, reg); + opq_reg_free(®); + } + +done: + + return ret; +} + /* Compare utility for registered clients */ static bool opq_client_match(const struct opq_client_reg *client, const struct zapi_opaque_reg_info *info) { - if (client->proto == info->proto && - client->instance == info->instance && - client->session_id == info->session_id) + /* look for matching client, skip notifications */ + if (client->proto == info->proto && client->instance == info->instance && + client->session_id == info->session_id && + CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV)) + return true; + else + return false; +} + +/* Compare helper for clients registered for notifications */ +static bool opq_client_notif_match(const struct opq_client_reg *client, + const struct zapi_opaque_notif_info *info) +{ + /* look for matching client, only for notifications */ + if (client->proto == info->proto && client->instance == info->instance && + client->session_id == info->session_id && + CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY)) return true; else return false; @@ -655,16 +929,16 @@ static void opq_reg_free(struct opq_msg_reg **reg) XFREE(MTYPE_OPQ, (*reg)); } -static struct opq_client_reg *opq_client_alloc( - const struct zapi_opaque_reg_info *info) +static struct opq_client_reg *opq_client_alloc(uint8_t proto, uint16_t instance, + uint32_t session_id) { struct opq_client_reg *client; client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg)); - client->proto = info->proto; - client->instance = info->instance; - client->session_id = info->session_id; + client->proto = proto; + client->instance = instance; + client->session_id = session_id; return client; } |