summaryrefslogtreecommitdiffstats
path: root/lib/mgmt_msg.c
diff options
context:
space:
mode:
authorChristian Hopps <chopps@labn.net>2023-04-29 12:22:37 +0200
committerChristian Hopps <chopps@labn.net>2023-05-28 11:13:22 +0200
commit070c5e7a91c413e08c1b8f78cc885f082a636b01 (patch)
tree5c55d456cf6459f89f723c12fc8fd5d645bec0f3 /lib/mgmt_msg.c
parentlib: mgmt msg: add version to messages (diff)
downloadfrr-070c5e7a91c413e08c1b8f78cc885f082a636b01.tar.xz
frr-070c5e7a91c413e08c1b8f78cc885f082a636b01.zip
lib: msg: refactor common connection code from mgmtd
Signed-off-by: Christian Hopps <chopps@labn.net>
Diffstat (limited to 'lib/mgmt_msg.c')
-rw-r--r--lib/mgmt_msg.c257
1 files changed, 252 insertions, 5 deletions
diff --git a/lib/mgmt_msg.c b/lib/mgmt_msg.c
index e682face9..967606d20 100644
--- a/lib/mgmt_msg.c
+++ b/lib/mgmt_msg.c
@@ -22,7 +22,7 @@
} while (0)
#define MGMT_MSG_ERR(ms, fmt, ...) \
- zlog_err("%s: %s: " fmt, ms->idtag, __func__, ##__VA_ARGS__)
+ zlog_err("%s: %s: " fmt, (ms)->idtag, __func__, ##__VA_ARGS__)
/**
* Read data from a socket into streams containing 1 or more full msgs headed by
@@ -127,8 +127,8 @@ enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,
* true if more to process (so reschedule) else false
*/
bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
- void (*handle_msg)(uint8_t version, void *user,
- uint8_t *msg, size_t msglen),
+ void (*handle_msg)(uint8_t version, uint8_t *msg,
+ size_t msglen, void *user),
void *user, bool debug)
{
const char *dbgtag = debug ? ms->idtag : NULL;
@@ -156,9 +156,10 @@ bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
assert(MGMT_MSG_IS_MARKER(mhdr->marker));
assert(left >= mhdr->len);
- handle_msg(MGMT_MSG_MARKER_VERSION(mhdr->marker), user,
+ handle_msg(MGMT_MSG_MARKER_VERSION(mhdr->marker),
(uint8_t *)(mhdr + 1),
- mhdr->len - sizeof(struct mgmt_msg_hdr));
+ mhdr->len - sizeof(struct mgmt_msg_hdr),
+ user);
ms->nrxm++;
nproc++;
}
@@ -402,6 +403,7 @@ size_t mgmt_msg_reset_writes(struct mgmt_msg_state *ms)
return nproc;
}
+
void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf,
size_t max_write_buf, size_t max_msg_sz, const char *idtag)
{
@@ -422,3 +424,248 @@ void mgmt_msg_destroy(struct mgmt_msg_state *ms)
stream_free(ms->ins);
free(ms->idtag);
}
+
+/*
+ * Connections
+ */
+
+#define MSG_CONN_DEFAULT_CONN_RETRY_MSEC 250
+#define MSG_CONN_SEND_BUF_SIZE (1u << 16)
+#define MSG_CONN_RECV_BUF_SIZE (1u << 16)
+
+static void msg_client_sched_connect(struct msg_client *client,
+ unsigned long msec);
+
+static void msg_conn_sched_proc_msgs(struct msg_conn *conn);
+static void msg_conn_sched_read(struct msg_conn *conn);
+static void msg_conn_sched_write(struct msg_conn *conn);
+
+static void msg_conn_write(struct event *thread)
+{
+ struct msg_conn *conn = EVENT_ARG(thread);
+ enum mgmt_msg_wsched rv;
+
+ rv = mgmt_msg_write(&conn->mstate, conn->fd, conn->debug);
+ if (rv == MSW_SCHED_STREAM)
+ msg_conn_sched_write(conn);
+ else if (rv == MSW_DISCONNECT)
+ msg_conn_disconnect(conn, conn->is_client);
+ else
+ assert(rv == MSW_SCHED_NONE);
+}
+
+static void msg_conn_read(struct event *thread)
+{
+ struct msg_conn *conn = EVENT_ARG(thread);
+ enum mgmt_msg_rsched rv;
+
+ rv = mgmt_msg_read(&conn->mstate, conn->fd, conn->debug);
+ if (rv == MSR_DISCONNECT) {
+ msg_conn_disconnect(conn, conn->is_client);
+ return;
+ }
+ if (rv == MSR_SCHED_BOTH)
+ msg_conn_sched_proc_msgs(conn);
+ msg_conn_sched_read(conn);
+}
+
+/* collapse this into mgmt_msg_procbufs */
+static void msg_conn_proc_msgs(struct event *thread)
+{
+ struct msg_conn *conn = EVENT_ARG(thread);
+
+ if (mgmt_msg_procbufs(&conn->mstate,
+ (void (*)(uint8_t, uint8_t *, size_t,
+ void *))conn->handle_msg,
+ conn, conn->debug))
+ /* there's more, schedule handling more */
+ msg_conn_sched_proc_msgs(conn);
+}
+
+static void msg_conn_sched_read(struct msg_conn *conn)
+{
+ event_add_read(conn->loop, msg_conn_read, conn, conn->fd,
+ &conn->read_ev);
+}
+
+static void msg_conn_sched_write(struct msg_conn *conn)
+{
+ event_add_write(conn->loop, msg_conn_write, conn, conn->fd,
+ &conn->write_ev);
+}
+
+static void msg_conn_sched_proc_msgs(struct msg_conn *conn)
+{
+ event_add_event(conn->loop, msg_conn_proc_msgs, conn, 0,
+ &conn->proc_msg_ev);
+}
+
+
+void msg_conn_disconnect(struct msg_conn *conn, bool reconnect)
+{
+
+ if (conn->fd != -1) {
+ close(conn->fd);
+ conn->fd = -1;
+
+ /* Notify client through registered callback (if any) */
+ if (conn->notify_disconnect)
+ (void)(*conn->notify_disconnect)(conn);
+ }
+
+ if (reconnect) {
+ assert(conn->is_client);
+ msg_client_sched_connect(
+ container_of(conn, struct msg_client, conn),
+ MSG_CONN_DEFAULT_CONN_RETRY_MSEC);
+ }
+}
+
+int msg_conn_send_msg(struct msg_conn *conn, uint8_t version, void *msg,
+ size_t mlen, size_t (*packf)(void *, void *))
+{
+ if (conn->fd == -1) {
+ MGMT_MSG_ERR(&conn->mstate,
+ "can't send message on closed connection");
+ return -1;
+ }
+
+ int rv = mgmt_msg_send_msg(&conn->mstate, version, msg, mlen, packf,
+ conn->debug);
+
+ msg_conn_sched_write(conn);
+
+ return rv;
+}
+
+void msg_conn_cleanup(struct msg_conn *conn)
+{
+ struct mgmt_msg_state *ms = &conn->mstate;
+
+ if (conn->fd != -1) {
+ close(conn->fd);
+ conn->fd = -1;
+ }
+
+ EVENT_OFF(conn->read_ev);
+ EVENT_OFF(conn->write_ev);
+ EVENT_OFF(conn->proc_msg_ev);
+
+ mgmt_msg_destroy(ms);
+}
+
+/*
+ * Client Connections
+ */
+
+static void msg_client_connect(struct msg_client *conn);
+
+static void msg_client_connect_timer(struct event *thread)
+{
+ msg_client_connect(EVENT_ARG(thread));
+}
+
+static void msg_client_sched_connect(struct msg_client *client,
+ unsigned long msec)
+{
+ struct msg_conn *conn = &client->conn;
+ const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
+
+ MGMT_MSG_DBG(dbgtag, "connection retry in %lu msec", msec);
+ if (msec)
+ event_add_timer_msec(conn->loop, msg_client_connect_timer,
+ client, msec, &client->conn_retry_tmr);
+ else
+ event_add_event(conn->loop, msg_client_connect_timer, client, 0,
+ &client->conn_retry_tmr);
+}
+
+
+/* Connect and start reading from the socket */
+static void msg_client_connect(struct msg_client *client)
+{
+ struct msg_conn *conn = &client->conn;
+ const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
+
+ conn->fd = mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE,
+ MSG_CONN_RECV_BUF_SIZE, dbgtag);
+
+ if (conn->fd == -1)
+ /* retry the connection */
+ msg_client_sched_connect(client,
+ MSG_CONN_DEFAULT_CONN_RETRY_MSEC);
+ else if (client->notify_connect && client->notify_connect(client))
+ /* client connect notify failed */
+ msg_conn_disconnect(conn, true);
+ else
+ /* start reading */
+ msg_conn_sched_read(conn);
+}
+
+void msg_client_init(struct msg_client *client, struct event_loop *tm,
+ const char *sopath,
+ int (*notify_connect)(struct msg_client *client),
+ int (*notify_disconnect)(struct msg_conn *client),
+ void (*handle_msg)(uint8_t version, uint8_t *data,
+ size_t len, struct msg_conn *client),
+ size_t max_read_buf, size_t max_write_buf,
+ size_t max_msg_sz, const char *idtag, bool debug)
+{
+ struct msg_conn *conn = &client->conn;
+ memset(client, 0, sizeof(*client));
+
+ conn->loop = tm;
+ conn->fd = -1;
+ conn->handle_msg = handle_msg;
+ conn->notify_disconnect = notify_disconnect;
+ conn->is_client = true;
+ conn->debug = debug;
+ client->sopath = strdup(sopath);
+ client->notify_connect = notify_connect;
+
+ mgmt_msg_init(&conn->mstate, max_read_buf, max_write_buf, max_msg_sz,
+ idtag);
+
+ /* XXX maybe just have client kick this off */
+ /* Start trying to connect to server */
+ msg_client_sched_connect(client, 0);
+}
+
+void msg_client_cleanup(struct msg_client *client)
+{
+ assert(client->conn.is_client);
+
+ EVENT_OFF(client->conn_retry_tmr);
+ free(client->sopath);
+
+ msg_conn_cleanup(&client->conn);
+}
+
+/*
+ * Initialize and start reading from the accepted socket
+ *
+ * notify_connect - only called for disconnect i.e., connected == false
+ */
+void mgmt_msg_server_accept_init(
+ struct msg_conn *conn, struct event_loop *tm, int fd,
+ int (*notify_disconnect)(struct msg_conn *conn),
+ void (*handle_msg)(uint8_t version, uint8_t *data, size_t len,
+ struct msg_conn *conn),
+ size_t max_read, size_t max_write, size_t max_size, const char *idtag)
+{
+ conn->loop = tm;
+ conn->fd = fd;
+ conn->notify_disconnect = notify_disconnect;
+ conn->handle_msg = handle_msg;
+ conn->is_client = false;
+
+ mgmt_msg_init(&conn->mstate, max_read, max_write, max_size, idtag);
+
+ /* start reading */
+ msg_conn_sched_read(conn);
+
+ /* Make socket non-blocking. */
+ set_nonblocking(conn->fd);
+ setsockopt_so_sendbuf(conn->fd, MSG_CONN_SEND_BUF_SIZE);
+ setsockopt_so_recvbuf(conn->fd, MSG_CONN_RECV_BUF_SIZE);
+}