diff options
author | Daniel Salzman <daniel.salzman@nic.cz> | 2021-07-08 10:19:01 +0200 |
---|---|---|
committer | Daniel Salzman <daniel.salzman@nic.cz> | 2021-07-10 22:46:37 +0200 |
commit | 509d9890d157e61abb58471f57872c959dadc02a (patch) | |
tree | b789b3be24882a12485c219e49bd9ec805ba3c20 | |
parent | XDP-TCP: xdp handler improvements (diff) | |
download | knot-509d9890d157e61abb58471f57872c959dadc02a.tar.xz knot-509d9890d157e61abb58471f57872c959dadc02a.zip |
XDP-TCP: simplify UDP handler API
-rw-r--r-- | src/knot/server/udp-handler.c | 92 | ||||
-rw-r--r-- | src/knot/server/xdp-handler.c | 40 | ||||
-rw-r--r-- | src/knot/server/xdp-handler.h | 12 | ||||
-rw-r--r-- | tests-fuzz/knotd_wrap/udp-handler.c | 13 |
4 files changed, 68 insertions, 89 deletions
diff --git a/src/knot/server/udp-handler.c b/src/knot/server/udp-handler.c index 1808e346f..a0f223aa7 100644 --- a/src/knot/server/udp-handler.c +++ b/src/knot/server/udp-handler.c @@ -108,12 +108,12 @@ static void udp_handle(udp_context_t *udp, int fd, struct sockaddr_storage *ss, } typedef struct { - void* (*udp_init)(void); + void* (*udp_init)(void *); void (*udp_deinit)(void *); - int (*udp_recv)(int, void *, void *); - int (*udp_handle)(udp_context_t *, void *, void *); - int (*udp_send)(void *, void *); - int (*udp_tick)(void *, void *); // Optional + int (*udp_recv)(int, void *); + void (*udp_handle)(udp_context_t *, void *); + void (*udp_send)(void *); + void (*udp_sweep)(void *); // Optional } udp_api_t; /*! \brief Control message to fit IP_PKTINFO or IPv6_RECVPKTINFO. */ @@ -160,7 +160,7 @@ struct udp_recvfrom { cmsg_pktinfo_t pktinfo; }; -static void *udp_recvfrom_init(void) +static void *udp_recvfrom_init(_unused_ void *xdp_sock) { struct udp_recvfrom *rq = malloc(sizeof(struct udp_recvfrom)); if (rq == NULL) { @@ -187,9 +187,8 @@ static void udp_recvfrom_deinit(void *d) free(rq); } -static int udp_recvfrom_recv(int fd, void *d, void *unused) +static int udp_recvfrom_recv(int fd, void *d) { - UNUSED(unused); /* Reset max lengths. */ struct udp_recvfrom *rq = (struct udp_recvfrom *)d; rq->iov[RX].iov_len = KNOT_WIRE_MAX_PKTSIZE; @@ -206,9 +205,8 @@ static int udp_recvfrom_recv(int fd, void *d, void *unused) return 0; } -static int udp_recvfrom_handle(udp_context_t *ctx, void *d, void *unused) +static void udp_recvfrom_handle(udp_context_t *ctx, void *d) { - UNUSED(unused); struct udp_recvfrom *rq = d; /* Prepare TX address. */ @@ -219,25 +217,14 @@ static int udp_recvfrom_handle(udp_context_t *ctx, void *d, void *unused) /* Process received pkt. */ udp_handle(ctx, rq->fd, &rq->addr, &rq->iov[RX], &rq->iov[TX], NULL); - - return KNOT_EOK; } -static int udp_recvfrom_send(void *d, void *unused) +static void udp_recvfrom_send(void *d) { - UNUSED(unused); struct udp_recvfrom *rq = d; - int rc = 0; if (rq->iov[TX].iov_len > 0) { - rc = sendmsg(rq->fd, &rq->msg[TX], 0); - } - - /* Return number of packets sent. */ - if (rc > 1) { - return 1; + (void)sendmsg(rq->fd, &rq->msg[TX], 0); } - - return 0; } __attribute__ ((unused)) @@ -262,7 +249,7 @@ struct udp_recvmmsg { cmsg_pktinfo_t pktinfo[RECVMMSG_BATCHLEN]; }; -static void *udp_recvmmsg_init(void) +static void *udp_recvmmsg_init(_unused_ void *xdp_sock) { knot_mm_t mm; mm_ctx_mempool(&mm, sizeof(struct udp_recvmmsg)); @@ -300,9 +287,8 @@ static void udp_recvmmsg_deinit(void *d) } } -static int udp_recvmmsg_recv(int fd, void *d, void *unused) +static int udp_recvmmsg_recv(int fd, void *d) { - UNUSED(unused); struct udp_recvmmsg *rq = d; int n = recvmmsg(fd, rq->msgs[RX], RECVMMSG_BATCHLEN, MSG_DONTWAIT, NULL); @@ -313,9 +299,8 @@ static int udp_recvmmsg_recv(int fd, void *d, void *unused) return n; } -static int udp_recvmmsg_handle(udp_context_t *ctx, void *d, void *unused) +static void udp_recvmmsg_handle(udp_context_t *ctx, void *d) { - UNUSED(unused); struct udp_recvmmsg *rq = d; /* Handle each received msg. */ @@ -334,15 +319,12 @@ static int udp_recvmmsg_handle(udp_context_t *ctx, void *d, void *unused) rq->msgs[TX][i].msg_hdr.msg_namelen = rq->msgs[RX][i].msg_hdr.msg_namelen; } } - - return KNOT_EOK; } -static int udp_recvmmsg_send(void *d, void *unused) +static void udp_recvmmsg_send(void *d) { - UNUSED(unused); struct udp_recvmmsg *rq = d; - int rc = sendmmsg(rq->fd, rq->msgs[TX], rq->rcvd, 0); + (void)sendmmsg(rq->fd, rq->msgs[TX], rq->rcvd, 0); for (unsigned i = 0; i < rq->rcvd; ++i) { /* Reset buffer size and address len. */ struct iovec *rx = rq->msgs[RX][i].msg_hdr.msg_iov; @@ -355,7 +337,6 @@ static int udp_recvmmsg_send(void *d, void *unused) rq->msgs[TX][i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); rq->msgs[RX][i].msg_hdr.msg_controllen = sizeof(cmsg_pktinfo_t); } - return rc; } static udp_api_t udp_recvmmsg_api = { @@ -369,9 +350,9 @@ static udp_api_t udp_recvmmsg_api = { #ifdef ENABLE_XDP -static void *xdp_recvmmsg_init(void) +static void *xdp_recvmmsg_init(void *xdp_sock) { - return xdp_handle_init(); + return xdp_handle_init(xdp_sock); } static void xdp_recvmmsg_deinit(void *d) @@ -379,26 +360,25 @@ static void xdp_recvmmsg_deinit(void *d) xdp_handle_free(d); } -static int xdp_recvmmsg_recv(int fd, void *d, void *xdp_sock) +static int xdp_recvmmsg_recv(_unused_ int fd, void *d) { - UNUSED(fd); - return xdp_handle_recv(d, xdp_sock); + return xdp_handle_recv(d); } -static int xdp_recvmmsg_handle(udp_context_t *ctx, void *d, void *xdp_sock) +static void xdp_recvmmsg_handle(udp_context_t *ctx, void *d) { - return xdp_handle_msgs(d, xdp_sock, &ctx->layer, ctx->server, ctx->thread_id); + xdp_handle_msgs(d, &ctx->layer, ctx->server, ctx->thread_id); } -static int xdp_recvmmsg_send(void *d, void *xdp_sock) +static void xdp_recvmmsg_send(void *d) { - return xdp_handle_send(d, xdp_sock); + xdp_handle_send(d); } -static int xdp_recvmmsg_tick(void *d, void *xdp_sock) +static void xdp_recvmmsg_sweep(void *d) { xdp_handle_reconfigure(d); - return xdp_handle_sweep(d, xdp_sock); + xdp_handle_sweep(d); } static udp_api_t xdp_recvmmsg_api = { @@ -407,7 +387,7 @@ static udp_api_t xdp_recvmmsg_api = { xdp_recvmmsg_recv, xdp_recvmmsg_handle, xdp_recvmmsg_send, - xdp_recvmmsg_tick + xdp_recvmmsg_sweep, }; #endif /* ENABLE_XDP */ @@ -508,7 +488,7 @@ int udp_master(dthread_t *thread) api = &udp_recvfrom_api; #endif } - void *rq = api->udp_init(); + void *api_ctx = NULL; /* Create big enough memory cushion. */ knot_mm_t mm; @@ -534,6 +514,12 @@ int udp_master(dthread_t *thread) goto finish; } + /* Initialize the networking API. */ + api_ctx = api->udp_init(xdp_socket); + if (api_ctx == NULL) { + goto finish; + } + /* Loop until all data is read. */ for (;;) { /* Cancellation point. */ @@ -550,20 +536,20 @@ int udp_master(dthread_t *thread) if (!fdset_it_is_pollin(&it)) { continue; } - if (api->udp_recv(fdset_it_get_fd(&it), rq, xdp_socket) > 0) { - api->udp_handle(&udp, rq, xdp_socket); - api->udp_send(rq, xdp_socket); + if (api->udp_recv(fdset_it_get_fd(&it), api_ctx) > 0) { + api->udp_handle(&udp, api_ctx); + api->udp_send(api_ctx); } } /* Regular maintenance (XDP-TCP only). */ - if (api->udp_tick != NULL) { - api->udp_tick(rq, xdp_socket); + if (api->udp_sweep != NULL) { + api->udp_sweep(api_ctx); } } finish: - api->udp_deinit(rq); + api->udp_deinit(api_ctx); mp_delete(mm.ctx); fdset_clear(&fds); diff --git a/src/knot/server/xdp-handler.c b/src/knot/server/xdp-handler.c index f285c72d5..e2e632277 100644 --- a/src/knot/server/xdp-handler.c +++ b/src/knot/server/xdp-handler.c @@ -23,10 +23,12 @@ #include "knot/server/xdp-handler.h" #include "knot/common/log.h" #include "knot/server/server.h" +#include "contrib/ucw/mempool.h" #include "libknot/error.h" #include "libknot/xdp/tcp.h" typedef struct xdp_handle_ctx { + knot_xdp_socket_t *sock; knot_xdp_msg_t msg_recv[XDP_BATCHLEN]; knot_xdp_msg_t msg_send_udp[XDP_BATCHLEN]; knot_tcp_relay_dynarray_t tcp_relays; @@ -74,12 +76,13 @@ void xdp_handle_free(xdp_handle_ctx_t *ctx) free(ctx); } -xdp_handle_ctx_t *xdp_handle_init(void) +xdp_handle_ctx_t *xdp_handle_init(knot_xdp_socket_t *xdp_sock) { xdp_handle_ctx_t *ctx = calloc(1, sizeof(*ctx)); if (ctx == NULL) { return NULL; } + ctx->sock = xdp_sock; xdp_handle_reconfigure(ctx); @@ -95,9 +98,9 @@ xdp_handle_ctx_t *xdp_handle_init(void) return ctx; } -int xdp_handle_recv(xdp_handle_ctx_t *ctx, knot_xdp_socket_t *xdp_sock) +int xdp_handle_recv(xdp_handle_ctx_t *ctx) { - int ret = knot_xdp_recv(xdp_sock, ctx->msg_recv, XDP_BATCHLEN, + int ret = knot_xdp_recv(ctx->sock, ctx->msg_recv, XDP_BATCHLEN, &ctx->msg_recv_count, NULL); return ret == KNOT_EOK ? ctx->msg_recv_count : ret; } @@ -124,18 +127,18 @@ static void handle_finish(knot_layer_t *layer) mp_flush(layer->mm->ctx); } -int xdp_handle_msgs(xdp_handle_ctx_t *ctx, knot_xdp_socket_t *sock, - knot_layer_t *layer, server_t *server, unsigned thread_id) +void xdp_handle_msgs(xdp_handle_ctx_t *ctx, knot_layer_t *layer, + server_t *server, unsigned thread_id) { assert(ctx->msg_recv_count > 0); knotd_qdata_params_t params = { - .socket = knot_xdp_socket_fd(sock), + .socket = knot_xdp_socket_fd(ctx->sock), .server = server, .thread_id = thread_id, }; - knot_xdp_send_prepare(sock); + knot_xdp_send_prepare(ctx->sock); ctx->msg_udp_count = 0; @@ -149,7 +152,7 @@ int xdp_handle_msgs(xdp_handle_ctx_t *ctx, knot_xdp_socket_t *sock, continue; } - if (knot_xdp_reply_alloc(sock, msg_recv, msg_send) != KNOT_EOK) { + if (knot_xdp_reply_alloc(ctx->sock, msg_recv, msg_send) != KNOT_EOK) { continue; // no point in returning error, where handled? } ctx->msg_udp_count++; @@ -170,7 +173,7 @@ int xdp_handle_msgs(xdp_handle_ctx_t *ctx, knot_xdp_socket_t *sock, } // handle TCP messages - int ret = knot_tcp_relay(sock, ctx->msg_recv, ctx->msg_recv_count, ctx->tcp_table, NULL, &ctx->tcp_relays); + int ret = knot_tcp_relay(ctx->sock, ctx->msg_recv, ctx->msg_recv_count, ctx->tcp_table, NULL, &ctx->tcp_relays); if (ret == KNOT_EOK && ctx->tcp_relays.size > 0) { uint8_t ans_buf[KNOT_WIRE_MAX_PKTSIZE]; @@ -195,30 +198,26 @@ int xdp_handle_msgs(xdp_handle_ctx_t *ctx, knot_xdp_socket_t *sock, } } } - knot_xdp_recv_finish(sock, ctx->msg_recv, ctx->msg_recv_count); - - return KNOT_EOK; + knot_xdp_recv_finish(ctx->sock, ctx->msg_recv, ctx->msg_recv_count); } -int xdp_handle_send(xdp_handle_ctx_t *ctx, knot_xdp_socket_t *xdp_sock) +void xdp_handle_send(xdp_handle_ctx_t *ctx) { uint32_t unused = 0; - int ret = knot_xdp_send(xdp_sock, ctx->msg_send_udp, ctx->msg_udp_count, &unused); + int ret = knot_xdp_send(ctx->sock, ctx->msg_send_udp, ctx->msg_udp_count, &unused); if (ret == KNOT_EOK) { if (ctx->tcp) { - ret = knot_tcp_send(xdp_sock, knot_tcp_relay_dynarray_arr(&ctx->tcp_relays), + ret = knot_tcp_send(ctx->sock, knot_tcp_relay_dynarray_arr(&ctx->tcp_relays), ctx->tcp_relays.size); } else { - ret = knot_xdp_send_finish(xdp_sock); + ret = knot_xdp_send_finish(ctx->sock); } } if (ctx->tcp) { knot_tcp_relay_free(&ctx->tcp_relays); } - - return ret; } static size_t overweight(size_t weight, size_t max_weight) @@ -229,12 +228,12 @@ static size_t overweight(size_t weight, size_t max_weight) return w; } -int xdp_handle_sweep(xdp_handle_ctx_t *ctx, knot_xdp_socket_t *xdp_sock) +void xdp_handle_sweep(xdp_handle_ctx_t *ctx) { uint32_t last_reset = 0, last_close = 0; int ret = KNOT_EOK; do { - ret = knot_tcp_sweep(ctx->tcp_table, xdp_sock, 20, ctx->tcp_idle_close, ctx->tcp_idle_reset, + ret = knot_tcp_sweep(ctx->tcp_table, ctx->sock, 20, ctx->tcp_idle_close, ctx->tcp_idle_reset, overweight(ctx->tcp_table->usage, ctx->tcp_max_conns), overweight(ctx->tcp_table->inbufs_total, ctx->tcp_max_inbufs), &last_close, &last_reset); @@ -243,7 +242,6 @@ int xdp_handle_sweep(xdp_handle_ctx_t *ctx, knot_xdp_socket_t *xdp_sock) if (last_close > 0 || last_reset > 0) { log_debug("timeouted XDP-TCP connections: %u closed, %u reset", last_close, last_reset); } - return ret; } #endif // ENABLE_XDP diff --git a/src/knot/server/xdp-handler.h b/src/knot/server/xdp-handler.h index 3a21a0f56..eaf4f8a5b 100644 --- a/src/knot/server/xdp-handler.h +++ b/src/knot/server/xdp-handler.h @@ -29,7 +29,7 @@ struct server; /*! * \brief Initialize XDP packet handling context. */ -struct xdp_handle_ctx *xdp_handle_init(void); +struct xdp_handle_ctx *xdp_handle_init(knot_xdp_socket_t *sock); /*! * \brief Deinitialize XDP packet handling context. @@ -39,25 +39,25 @@ void xdp_handle_free(struct xdp_handle_ctx *ctx); /*! * \brief Receive packets thru XDP socket. */ -int xdp_handle_recv(struct xdp_handle_ctx *ctx, knot_xdp_socket_t *xdp_sock); +int xdp_handle_recv(struct xdp_handle_ctx *ctx); /*! * \brief Answer packets including DNS layers. * * \warning In case of TCP, this also sends some packets, e.g. ACK. */ -int xdp_handle_msgs(struct xdp_handle_ctx *ctx, knot_xdp_socket_t *sock, - knot_layer_t *layer, struct server *server, unsigned thread_id); +void xdp_handle_msgs(struct xdp_handle_ctx *ctx, knot_layer_t *layer, + struct server *server, unsigned thread_id); /*! * \brief Send packets thru XDP socket. */ -int xdp_handle_send(struct xdp_handle_ctx *ctx, knot_xdp_socket_t *xdp_sock); +void xdp_handle_send(struct xdp_handle_ctx *ctx); /*! * \brief Check for old TCP connections and close/reset them. */ -int xdp_handle_sweep(struct xdp_handle_ctx *ctx, knot_xdp_socket_t *xdp_sock); +void xdp_handle_sweep(struct xdp_handle_ctx *ctx); /*! * \brief Update configuration parameters of running ctx. diff --git a/tests-fuzz/knotd_wrap/udp-handler.c b/tests-fuzz/knotd_wrap/udp-handler.c index c6e28be6e..908c31fb5 100644 --- a/tests-fuzz/knotd_wrap/udp-handler.c +++ b/tests-fuzz/knotd_wrap/udp-handler.c @@ -35,7 +35,7 @@ static inline void next(udp_stdin_t *rq) } } -static void *udp_stdin_init(void) +static void *udp_stdin_init(_unused_ void *xdp_sock) { udp_stdin_t *rq = calloc(1, sizeof(udp_stdin_t)); if (rq == NULL) { @@ -62,9 +62,8 @@ static void udp_stdin_deinit(void *d) free(d); } -static int udp_stdin_recv(int fd, void *d, void *unused) +static int udp_stdin_recv(_unused_ int fd, void *d) { - UNUSED(unused); udp_stdin_t *rq = (udp_stdin_t *)d; rq->iov[RX].iov_len = fread(rq->iov[RX].iov_base, 1, KNOT_WIRE_MAX_PKTSIZE, stdin); @@ -75,20 +74,16 @@ static int udp_stdin_recv(int fd, void *d, void *unused) return rq->iov[RX].iov_len; } -static int udp_stdin_handle(udp_context_t *ctx, void *d, void *unused) +static void udp_stdin_handle(udp_context_t *ctx, void *d) { - UNUSED(unused); udp_stdin_t *rq = (udp_stdin_t *)d; udp_handle(ctx, STDIN_FILENO, &rq->addr, &rq->iov[RX], &rq->iov[TX], false); - return 0; } -static int udp_stdin_send(void *d, void *unused) +static void udp_stdin_send(void *d) { - UNUSED(unused); udp_stdin_t *rq = (udp_stdin_t *)d; next(rq); - return 0; } static udp_api_t stdin_api = { |