summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibor Peltan <libor.peltan@nic.cz>2019-12-18 09:44:16 +0100
committerDaniel Salzman <daniel.salzman@nic.cz>2020-04-21 18:43:10 +0200
commit2e8a5aa9463a4456a4cd6449e6980f34c79a393d (patch)
tree78e256dcebe81d36831fe7d49e1d3ba59fce1a5d
parentserver: ifaces are array not list (diff)
downloadknot-2e8a5aa9463a4456a4cd6449e6980f34c79a393d.tar.xz
knot-2e8a5aa9463a4456a4cd6449e6980f34c79a393d.zip
xdp: use more sockets: one per xdp_worker * iface
-rw-r--r--src/knot/server/server.c30
-rw-r--r--src/knot/server/server.h5
-rw-r--r--src/knot/server/udp-handler.c77
-rw-r--r--src/libknot/xdp/af_xdp.c159
-rw-r--r--src/libknot/xdp/af_xdp.h21
-rw-r--r--src/libknot/xdp/bpf-user.h18
-rw-r--r--tests-fuzz/knotd_wrap/udp-handler.c9
7 files changed, 170 insertions, 149 deletions
diff --git a/src/knot/server/server.c b/src/knot/server/server.c
index 9beb8d496..77e0b08aa 100644
--- a/src/knot/server/server.c
+++ b/src/knot/server/server.c
@@ -66,11 +66,15 @@ static void server_deinit_iface(iface_t *iface)
free(iface->fd_udp);
}
- if (iface->fd_xdp > -1) {
+ for (int i = 0; i < iface->fd_xdp_count; i++) {
#ifdef ENABLE_XDP
- knot_xsk_deinit();
+ knot_xsk_deinit(iface->sock_xdp[i]);
+#else
+ assert(0);
#endif
}
+ free(iface->fd_xdp);
+ free(iface->sock_xdp);
/* Free TCP handler. */
if (iface->fd_tcp != NULL) {
@@ -210,7 +214,7 @@ static int enable_fastopen(int sock, int backlog)
*/
static iface_t *server_init_iface(struct sockaddr_storage *addr,
int udp_thread_count, int tcp_thread_count,
- bool tcp_reuseport, bool use_xdp)
+ int xdp_thread_count, bool tcp_reuseport)
{
iface_t *new_if = calloc(1, sizeof(*new_if));
if (new_if == NULL) {
@@ -227,6 +231,7 @@ static iface_t *server_init_iface(struct sockaddr_storage *addr,
int udp_bind_flags = 0;
int tcp_socket_count = 1;
int tcp_bind_flags = 0;
+ int xdp_socket_count = 0;
#ifdef ENABLE_REUSEPORT
udp_socket_count = udp_thread_count;
@@ -238,9 +243,16 @@ static iface_t *server_init_iface(struct sockaddr_storage *addr,
}
#endif
+#ifdef ENABLE_XDP
+ xdp_socket_count = xdp_thread_count;
+#endif
+
new_if->fd_udp = malloc(udp_socket_count * sizeof(int));
new_if->fd_tcp = malloc(tcp_socket_count * sizeof(int));
- if (new_if->fd_udp == NULL || new_if->fd_tcp == NULL) {
+ new_if->fd_xdp = malloc(xdp_socket_count * sizeof(int));
+ new_if->sock_xdp = calloc(xdp_socket_count, sizeof(*new_if->sock_xdp));
+ if (new_if->fd_udp == NULL || new_if->fd_tcp == NULL ||
+ new_if->fd_xdp == NULL || new_if->sock_xdp == NULL) {
log_error("failed to initialize interface");
server_deinit_iface(new_if);
return NULL;
@@ -293,16 +305,15 @@ static iface_t *server_init_iface(struct sockaddr_storage *addr,
new_if->fd_udp_count += 1;
}
- new_if->fd_xdp = -1;
- if (use_xdp) {
+ for (int i = 0; i < xdp_socket_count; i++) {
#ifndef ENABLE_XDP
assert(0);
#else
- int ret = knot_xsk_init("enp1s0f1", "/bpf-kernel.o", NULL); // FIXME
+ int ret = knot_xsk_init(new_if->sock_xdp + i, "enp1s0f1", i, "/bpf-kernel.o"); // FIXME
if (ret != KNOT_EOK) {
log_warning("failed to init XDP (%s)", knot_strerror(ret));
} else {
- new_if->fd_xdp = knot_xsk_get_poll_fd();
+ new_if->fd_xdp[i] = knot_xsk_get_poll_fd(new_if->sock_xdp[i]);
}
#endif
}
@@ -402,8 +413,7 @@ static int configure_sockets(conf_t *conf, server_t *s)
unsigned size_udp = s->handlers[IO_UDP].handler.unit->size;
unsigned size_tcp = s->handlers[IO_TCP].handler.unit->size;
bool tcp_reuseport = conf->cache.srv_tcp_reuseport;
- iface_t *new_if = server_init_iface(&addr, size_udp, size_tcp, tcp_reuseport,
- conf->cache.srv_xdp_threads > 0);
+ iface_t *new_if = server_init_iface(&addr, size_udp, size_tcp, conf->cache.srv_xdp_threads, tcp_reuseport);
if (new_if != NULL) {
memcpy(&newlist[real_n++], new_if, sizeof(*newlist));
free(new_if);
diff --git a/src/knot/server/server.h b/src/knot/server/server.h
index d53fe9e77..c834261c5 100644
--- a/src/knot/server/server.h
+++ b/src/knot/server/server.h
@@ -29,6 +29,7 @@
/* Forwad declarations. */
struct server;
+struct knot_xsk_socket;
/*! \brief I/O handler structure.
*/
@@ -56,7 +57,9 @@ typedef struct iface {
int fd_udp_count;
int *fd_tcp;
int fd_tcp_count;
- int fd_xdp;
+ int *fd_xdp;
+ int fd_xdp_count;
+ struct knot_xsk_socket **sock_xdp;
struct sockaddr_storage addr;
} iface_t;
diff --git a/src/knot/server/udp-handler.c b/src/knot/server/udp-handler.c
index b8fed8f62..b8ac182d1 100644
--- a/src/knot/server/udp-handler.c
+++ b/src/knot/server/udp-handler.c
@@ -108,9 +108,9 @@ static void udp_handle(udp_context_t *udp, int fd, struct sockaddr_storage *ss,
typedef struct {
void* (*udp_init)(void);
void (*udp_deinit)(void *);
- int (*udp_recv)(int, void *);
- int (*udp_handle)(udp_context_t *, void *);
- int (*udp_send)(void *);
+ int (*udp_recv)(int, void *, void *);
+ int (*udp_handle)(udp_context_t *, void *, void *);
+ int (*udp_send)(void *, void *);
} udp_api_t;
/*! \brief Control message to fit IP_PKTINFO or IPv6_RECVPKTINFO. */
@@ -184,8 +184,9 @@ static void udp_recvfrom_deinit(void *d)
free(rq);
}
-static int udp_recvfrom_recv(int fd, void *d)
+static int udp_recvfrom_recv(int fd, void *d, void *unused)
{
+ UNUSED(unused);
/* Reset max lengths. */
struct udp_recvfrom *rq = (struct udp_recvfrom *)d;
rq->iov[RX].iov_len = KNOT_WIRE_MAX_PKTSIZE;
@@ -202,8 +203,9 @@ static int udp_recvfrom_recv(int fd, void *d)
return 0;
}
-static int udp_recvfrom_handle(udp_context_t *ctx, void *d)
+static int udp_recvfrom_handle(udp_context_t *ctx, void *d, void *unused)
{
+ UNUSED(unused);
struct udp_recvfrom *rq = (struct udp_recvfrom *)d;
/* Prepare TX address. */
@@ -218,8 +220,9 @@ static int udp_recvfrom_handle(udp_context_t *ctx, void *d)
return KNOT_EOK;
}
-static int udp_recvfrom_send(void *d)
+static int udp_recvfrom_send(void *d, void *unused)
{
+ UNUSED(unused);
struct udp_recvfrom *rq = (struct udp_recvfrom *)d;
int rc = 0;
if (rq->iov[TX].iov_len > 0) {
@@ -295,8 +298,9 @@ static void udp_recvmmsg_deinit(void *d)
}
}
-static int udp_recvmmsg_recv(int fd, void *d)
+static int udp_recvmmsg_recv(int fd, void *d, void *unused)
{
+ UNUSED(unused);
struct udp_recvmmsg *rq = (struct udp_recvmmsg *)d;
int n = recvmmsg(fd, rq->msgs[RX], RECVMMSG_BATCHLEN, MSG_DONTWAIT, NULL);
@@ -307,8 +311,9 @@ static int udp_recvmmsg_recv(int fd, void *d)
return n;
}
-static int udp_recvmmsg_handle(udp_context_t *ctx, void *d)
+static int udp_recvmmsg_handle(udp_context_t *ctx, void *d, void *unused)
{
+ UNUSED(unused);
struct udp_recvmmsg *rq = (struct udp_recvmmsg *)d;
/* Handle each received msg. */
@@ -331,8 +336,9 @@ static int udp_recvmmsg_handle(udp_context_t *ctx, void *d)
return KNOT_EOK;
}
-static int udp_recvmmsg_send(void *d)
+static int udp_recvmmsg_send(void *d, void *unused)
{
+ UNUSED(unused);
struct udp_recvmmsg *rq = (struct udp_recvmmsg *)d;
int rc = sendmmsg(rq->fd, rq->msgs[TX], rq->rcvd, 0);
for (unsigned i = 0; i < rq->rcvd; ++i) {
@@ -381,18 +387,18 @@ static void xdp_recvmmsg_deinit(void *d)
free(d);
}
-static int xdp_recvmmsg_recv(int fd, void *d)
+static int xdp_recvmmsg_recv(int fd, void *d, void *xdp_sock)
{
UNUSED(fd);
struct xdp_recvmmsg *rq = (struct xdp_recvmmsg *)d;
- int ret = knot_xsk_recvmmsg(rq->msgs_rx, XDP_BATCHLEN, &rq->rcvd);
+ int ret = knot_xsk_recvmmsg(xdp_sock, rq->msgs_rx, XDP_BATCHLEN, &rq->rcvd);
return ret == KNOT_EOK ? rq->rcvd : ret;
}
-static int xdp_recvmmsg_handle(udp_context_t *ctx, void *d)
+static int xdp_recvmmsg_handle(udp_context_t *ctx, void *d, void *xdp_sock)
{
struct xdp_recvmmsg *rq = (struct xdp_recvmmsg *)d;
@@ -400,7 +406,7 @@ static int xdp_recvmmsg_handle(udp_context_t *ctx, void *d)
struct iovec *rx = &rq->msgs_rx[i].payload;
struct iovec *tx = &rq->msgs_tx[i].payload;
- *tx = knot_xsk_alloc_frame();
+ *tx = knot_xsk_alloc_frame(xdp_sock);
if (tx->iov_base == NULL) {
return KNOT_ERROR;
}
@@ -412,7 +418,7 @@ static int xdp_recvmmsg_handle(udp_context_t *ctx, void *d)
memcpy(&rq->msgs_tx[i].ip_from, &rq->msgs_rx[i].ip_to, sizeof(rq->msgs_tx[i].ip_from));
memcpy(&rq->msgs_tx[i].ip_to, &rq->msgs_rx[i].ip_from, sizeof(rq->msgs_tx[i].ip_to));
- knot_xsk_free_recvd(&rq->msgs_rx[i]);
+ knot_xsk_free_recvd(xdp_sock, &rq->msgs_rx[i]);
// FIXME!! :
/*
@@ -426,16 +432,16 @@ static int xdp_recvmmsg_handle(udp_context_t *ctx, void *d)
return KNOT_EOK;
}
-static int xdp_recvmmsg_send(void *d)
+static int xdp_recvmmsg_send(void *d, void *xdp_sock)
{
struct xdp_recvmmsg *rq = (struct xdp_recvmmsg *)d;
uint32_t sent = rq->rcvd;
- int ret = knot_xsk_sendmmsg(rq->msgs_tx, sent);
+ int ret = knot_xsk_sendmmsg(xdp_sock, rq->msgs_tx, sent);
memset(rq, 0, sizeof(*rq));
- knot_xsk_check();
+ knot_xsk_check(xdp_sock);
return ret == KNOT_EOK ? sent : ret;
}
@@ -450,11 +456,24 @@ static udp_api_t xdp_recvmmsg_api = {
#endif /* ENABLE_XDP */
/*! \brief Get interface UDP descriptor for a given thread. */
-static int iface_udp_fd(const iface_t *iface, int thread_id, bool use_xdp)
+static int iface_udp_fd(const iface_t *iface, int thread_id, bool use_xdp, void **socket_ctx)
{
if (use_xdp) {
- assert(iface->fd_xdp > -1);
- return iface->fd_xdp;
+#ifdef ENABLE_XDP
+ size_t udp_wrk = conf()->cache.srv_udp_threads;
+ size_t tcp_wrk = conf()->cache.srv_tcp_threads;
+ size_t xdp_wrk = conf()->cache.srv_xdp_threads;
+ // XDP worker thread follow after UDP and TCP worker threads
+ assert(thread_id >= udp_wrk + tcp_wrk);
+ assert(thread_id < udp_wrk + tcp_wrk + xdp_wrk);
+
+ size_t xdp_wrk_id = thread_id - udp_wrk - tcp_wrk;
+
+ *socket_ctx = iface->sock_xdp[xdp_wrk_id];
+ return iface->fd_xdp[xdp_wrk_id];
+#else
+ assert(0);
+#endif
}
#ifdef ENABLE_REUSEPORT
@@ -476,8 +495,10 @@ static int iface_udp_fd(const iface_t *iface, int thread_id, bool use_xdp)
* \return Number of watched descriptors, zero on error.
*/
static unsigned udp_set_ifaces(const iface_t *ifaces, size_t n_ifaces, struct pollfd **fds_ptr,
- int thread_id, bool use_xdp)
+ int thread_id, bool use_xdp, void **socket_ctxs)
{
+ memset(socket_ctxs, 0, n_ifaces * sizeof(*socket_ctxs));
+
if (ifaces == NULL) {
return 0;
}
@@ -488,7 +509,7 @@ static unsigned udp_set_ifaces(const iface_t *ifaces, size_t n_ifaces, struct po
}
for (size_t i = 0; i < n_ifaces; i++) {
- fds[i].fd = iface_udp_fd(&ifaces[i], thread_id, use_xdp);
+ fds[i].fd = iface_udp_fd(&ifaces[i], thread_id, use_xdp, &socket_ctxs[i]);
fds[i].events = POLLIN;
fds[i].revents = 0;
}
@@ -544,11 +565,14 @@ int udp_master(dthread_t *thread)
struct pollfd *fds = NULL;
/* Allocate descriptors for the configured interfaces. */
+ size_t nifs = handler->server->n_ifaces;
+ void *socket_ctxs[nifs]; // only for XDP: pointers on knot_xsk_socket
unsigned nfds = udp_set_ifaces(handler->server->ifaces, handler->server->n_ifaces, &fds,
- udp.thread_id, handler->use_xdp);
+ udp.thread_id, handler->use_xdp, socket_ctxs);
if (nfds == 0) {
goto finish;
}
+ assert(nfds == nifs);
/* Loop until all data is read. */
for (;;) {
@@ -572,9 +596,10 @@ int udp_master(dthread_t *thread)
continue;
}
events -= 1;
- if (api->udp_recv(fds[i].fd, rq) > 0) {
- api->udp_handle(&udp, rq);
- api->udp_send(rq);
+ void *sock_ctx = socket_ctxs[i];
+ if (api->udp_recv(fds[i].fd, rq, sock_ctx) > 0) {
+ api->udp_handle(&udp, rq, sock_ctx);
+ api->udp_send(rq, sock_ctx);
}
}
}
diff --git a/src/libknot/xdp/af_xdp.c b/src/libknot/xdp/af_xdp.c
index 84ffaed24..46366fafe 100644
--- a/src/libknot/xdp/af_xdp.c
+++ b/src/libknot/xdp/af_xdp.c
@@ -34,7 +34,6 @@
#include <string.h>
#include <unistd.h>
-
#ifdef KR_XDP_ETH_CRC
#include <zlib.h>
#endif
@@ -75,9 +74,13 @@ struct umem_frame {
static const size_t FRAME_PAYLOAD_OFFSET = offsetof(struct udpv4, data) + offsetof(struct umem_frame, udpv4);
-// FIXME later: get rid of those singletons!
-struct xsk_socket_info *the_socket = NULL;
-struct kxsk_config *the_config = NULL;
+static const struct xsk_umem_config global_umem_config = {
+ .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
+ .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
+ .frame_size = FRAME_SIZE, // we need to know this value explicitly
+ .frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
+};
+#define UMEM_FRAME_COUNT 8192
/** Swap two bytes as a *constant* expression. ATM we assume we're LE, i.e. we do need to swap. */
#define BS16(n) (((n) >> 8) + (((n) & 0xff) << 8))
@@ -125,11 +128,11 @@ static struct umem_frame *xsk_alloc_umem_frame(struct xsk_umem_info *umem)
}
_public_
-struct iovec knot_xsk_alloc_frame()
+struct iovec knot_xsk_alloc_frame(struct knot_xsk_socket *socket)
{
struct iovec res = { 0 };
- struct umem_frame *uframe = xsk_alloc_umem_frame(the_socket->umem);
+ struct umem_frame *uframe = xsk_alloc_umem_frame(socket->umem);
if (uframe != NULL) {
res.iov_len = MIN(UINT16_MAX, FRAME_SIZE - FRAME_PAYLOAD_OFFSET - 4/*eth CRC*/);
res.iov_base = uframe->udpv4.data;
@@ -148,31 +151,33 @@ static void xsk_dealloc_umem_frame(struct xsk_umem_info *umem, uint8_t *uframe_p
}
_public_
-void knot_xsk_deinit()
+void knot_xsk_deinit(struct knot_xsk_socket *socket)
{
- if (!the_socket)
+ if (socket == NULL) {
return;
- kxsk_socket_stop(the_socket->iface, the_config->xsk_if_queue);
- xsk_socket__delete(the_socket->xsk);
- xsk_umem__delete(the_socket->umem->umem);
+ }
- kxsk_iface_free((struct kxsk_iface *)/*const-cast*/the_socket->iface, false);
- //TODO: more memory
+ kxsk_socket_stop(socket->iface, socket->if_queue);
+ xsk_socket__delete(socket->xsk);
+ xsk_umem__delete(socket->umem->umem);
+
+ kxsk_iface_free((struct kxsk_iface *)/*const-cast*/socket->iface, false);
+ free(socket);
}
/** Add some free frames into the RX fill queue (possibly zero, etc.) */
-static int kxsk_umem_refill(const struct kxsk_config *cfg, struct xsk_umem_info *umem)
+static int kxsk_umem_refill(struct xsk_umem_info *umem)
{
/* First find to_reserve: how many frames to move to the RX fill queue.
* Let's keep about as many frames ready for TX (free_count) as for RX (fq_ready),
* and don't fill the queue to more than a half. */
- const int fq_target = cfg->umem.fill_size / 2;
+ const int fq_target = global_umem_config.fill_size / 2;
uint32_t fq_free = xsk_prod_nb_free(&umem->fq, 65536*256);
/* TODO: not nice - ^^ the caching logic inside is the other way,
* so we disable it clumsily by passing a high value. */
if (fq_free <= fq_target)
return 0;
- const int fq_ready = cfg->umem.fill_size - fq_free;
+ const int fq_ready = global_umem_config.fill_size - fq_free;
const int balance = (fq_ready + umem->free_count) / 2;
const int fq_want = MIN(balance, fq_target); // don't overshoot the target
const int to_reserve = fq_want - fq_ready;
@@ -201,28 +206,35 @@ static int kxsk_umem_refill(const struct kxsk_config *cfg, struct xsk_umem_info
return 0;
}
-static struct xsk_socket_info *xsk_configure_socket(struct kxsk_config *cfg,
- struct xsk_umem_info *umem,
- const struct kxsk_iface *iface)
+static struct knot_xsk_socket *xsk_configure_socket(struct xsk_umem_info *umem,
+ const struct kxsk_iface *iface,
+ int if_queue)
{
/* Put a couple RX buffers into the fill queue.
* Even if we don't need them, it silences a dmesg line,
* and it avoids 100% CPU usage of ksoftirqd/i for each queue i!
*/
- errno = kxsk_umem_refill(cfg, umem);
+ errno = kxsk_umem_refill(umem);
if (errno)
return NULL;
- struct xsk_socket_info *xsk_info = calloc(1, sizeof(*xsk_info));
+ struct knot_xsk_socket *xsk_info = calloc(1, sizeof(*xsk_info));
if (!xsk_info)
return NULL;
xsk_info->iface = iface;
+ xsk_info->if_queue = if_queue;
xsk_info->umem = umem;
- assert(cfg->xsk.libbpf_flags & XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD);
+ const struct xsk_socket_config sock_conf = {
+ .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
+ .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
+ .libbpf_flags = XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD,
+ .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST,
+ };
+
errno = xsk_socket__create(&xsk_info->xsk, iface->ifname,
- cfg->xsk_if_queue, umem->umem,
- &xsk_info->rx, &xsk_info->tx, &cfg->xsk);
+ xsk_info->if_queue, umem->umem,
+ &xsk_info->rx, &xsk_info->tx, &sock_conf);
return xsk_info;
}
@@ -255,7 +267,7 @@ static __be16 pkt_ipv4_checksum_2(const struct iphdr *h)
return ~BS16(from32to16(sum32));
}
-static int pkt_send(struct xsk_socket_info *xsk, uint64_t addr, uint32_t len)
+static int pkt_send(struct knot_xsk_socket *xsk, uint64_t addr, uint32_t len)
{
uint32_t tx_idx;
int ret = xsk_ring_prod__reserve(&xsk->tx, 1, &tx_idx);
@@ -272,15 +284,15 @@ static int pkt_send(struct xsk_socket_info *xsk, uint64_t addr, uint32_t len)
return KNOT_EOK;
}
-static uint8_t *msg_uframe_p(const knot_xsk_msg_t *msg)
+static uint8_t *msg_uframe_p(struct knot_xsk_socket *socket, const knot_xsk_msg_t *msg)
{
// FIXME: for some reason the message alignment isn't what we expect
//uint8_t *uframe_p = msg->payload.iov_base - FRAME_PAYLOAD_OFFSET;
uint8_t *uNULL = NULL;
uint8_t *uframe_p = uNULL + ((msg->payload.iov_base - NULL) & ~(FRAME_SIZE - 1));
- const uint8_t *umem_mem_start = the_socket->umem->frames->bytes;
+ const uint8_t *umem_mem_start = socket->umem->frames->bytes;
if (//((uframe_p - uNULL) % FRAME_SIZE != 0) ||
- ((uframe_p - umem_mem_start) / FRAME_SIZE >= the_socket->umem->frame_count)) {
+ ((uframe_p - umem_mem_start) / FRAME_SIZE >= socket->umem->frame_count)) {
// not allocated msg->payload correctly
return NULL;
}
@@ -289,9 +301,9 @@ static uint8_t *msg_uframe_p(const knot_xsk_msg_t *msg)
}
_public_
-int knot_xsk_sendmsg(const knot_xsk_msg_t *msg)
+int knot_xsk_sendmsg(struct knot_xsk_socket *socket, const knot_xsk_msg_t *msg)
{
- uint8_t *uframe_p = msg_uframe_p(msg);
+ uint8_t *uframe_p = msg_uframe_p(socket, msg);
if (uframe_p == NULL) {
return KNOT_EINVAL;
}
@@ -333,16 +345,16 @@ int knot_xsk_sendmsg(const knot_xsk_msg_t *msg)
uint32_t eth_len = FRAME_PAYLOAD_OFFSET + msg->payload.iov_len + 4/*CRC*/;
- return pkt_send(the_socket, h->bytes - the_socket->umem->frames->bytes, eth_len);
+ return pkt_send(socket, h->bytes - socket->umem->frames->bytes, eth_len);
}
_public_
-int knot_xsk_sendmmsg(const knot_xsk_msg_t msgs[], uint32_t count)
+int knot_xsk_sendmmsg(struct knot_xsk_socket *socket, const knot_xsk_msg_t msgs[], uint32_t count)
{
int ret = KNOT_EOK;
for (int i = 0; i < count && ret == KNOT_EOK; i++) {
if (msgs[i].payload.iov_len > 0) {
- ret = knot_xsk_sendmsg(&msgs[i]);
+ ret = knot_xsk_sendmsg(socket, &msgs[i]);
}
}
return ret;
@@ -350,18 +362,18 @@ int knot_xsk_sendmmsg(const knot_xsk_msg_t msgs[], uint32_t count)
/** Periodical callback. Just using 'the_socket' global. */
_public_
-int knot_xsk_check()
+int knot_xsk_check(struct knot_xsk_socket *socket)
{
/* Trigger sending queued packets.
* LATER(opt.): the periodical epoll due to the uv_poll* stuff
* is probably enough to wake the kernel even for sending
* (though AFAIK it might be specific to driver and/or kernel version). */
- if (the_socket->kernel_needs_wakeup) {
- int sendret = sendto(xsk_socket__fd(the_socket->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
+ if (socket->kernel_needs_wakeup) {
+ int sendret = sendto(xsk_socket__fd(socket->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
bool is_ok = (sendret != -1);
const bool is_again = !is_ok && (errno == EWOULDBLOCK || errno == EAGAIN);
if (is_ok || is_again) {
- the_socket->kernel_needs_wakeup = false;
+ socket->kernel_needs_wakeup = false;
// EAGAIN is unclear; we'll retry the syscall later, to be sure
}
if (!is_ok && !is_again) {
@@ -370,24 +382,24 @@ int knot_xsk_check()
}
/* Collect completed packets. */
- struct xsk_ring_cons *cq = &the_socket->umem->cq;
+ struct xsk_ring_cons *cq = &socket->umem->cq;
uint32_t idx_cq;
const uint32_t completed = xsk_ring_cons__peek(cq, UINT32_MAX, &idx_cq);
if (!completed) return KNOT_EOK; // ?
/* Free shared memory. */
for (int i = 0; i < completed; ++i, ++idx_cq) {
- uint8_t *uframe_p = (uint8_t *)the_socket->umem->frames + *xsk_ring_cons__comp_addr(cq, idx_cq) - offsetof(struct umem_frame, udpv4);
- xsk_dealloc_umem_frame(the_socket->umem, uframe_p);
+ uint8_t *uframe_p = (uint8_t *)socket->umem->frames + *xsk_ring_cons__comp_addr(cq, idx_cq) - offsetof(struct umem_frame, udpv4);
+ xsk_dealloc_umem_frame(socket->umem, uframe_p);
}
xsk_ring_cons__release(cq, completed);
//TODO: one uncompleted packet/batch is left until the next I/O :-/
/* And feed frames into RX fill queue. */
- return kxsk_umem_refill(the_config, the_socket->umem);
+ return kxsk_umem_refill(socket->umem);
}
-static int rx_desc(struct xsk_socket_info *xsi, const struct xdp_desc *desc,
+static int rx_desc(struct knot_xsk_socket *xsi, const struct xdp_desc *desc,
knot_xsk_msg_t *msg)
{
uint8_t *uframe_p = xsi->umem->frames->bytes + desc->addr;
@@ -450,56 +462,40 @@ free_frame:
}
_public_
-int knot_xsk_recvmmsg(knot_xsk_msg_t msgs[], uint32_t max_count, uint32_t *count)
+int knot_xsk_recvmmsg(struct knot_xsk_socket *socket, knot_xsk_msg_t msgs[], uint32_t max_count, uint32_t *count)
{
uint32_t idx_rx = 0;
int ret = KNOT_EOK;
- *count = xsk_ring_cons__peek(&the_socket->rx, max_count, &idx_rx);
+ *count = xsk_ring_cons__peek(&socket->rx, max_count, &idx_rx);
assert(*count <= max_count);
for (size_t i = 0; i < *count && ret == KNOT_EOK; ++i, ++idx_rx) {
- ret = rx_desc(the_socket, xsk_ring_cons__rx_desc(&the_socket->rx, idx_rx), &msgs[i]);
+ ret = rx_desc(socket, xsk_ring_cons__rx_desc(&socket->rx, idx_rx), &msgs[i]);
}
if (ret != KNOT_EOK)
printf("rx_desc() == %d\n", ret);
//FIXME: overall design of errors here ("bad packets")
- xsk_ring_cons__release(&the_socket->rx, *count);
+ xsk_ring_cons__release(&socket->rx, *count);
return ret;
}
_public_
-void knot_xsk_free_recvd(const knot_xsk_msg_t *msg)
+void knot_xsk_free_recvd(struct knot_xsk_socket *socket, const knot_xsk_msg_t *msg)
{
- uint8_t *uframe_p = msg_uframe_p(msg);
+ uint8_t *uframe_p = msg_uframe_p(socket, msg);
assert(uframe_p);
if (uframe_p != NULL) {
- xsk_dealloc_umem_frame(the_socket->umem, uframe_p);
+ xsk_dealloc_umem_frame(socket->umem, uframe_p);
}
}
-static struct kxsk_config the_config_storage = { // static to get zeroed by default
- .xsk_if_queue = 0, // defaults overridable by command-line -x eth3:0
- .umem_frame_count = 8192,
- .umem = {
- .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
- .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
- .frame_size = FRAME_SIZE, // we need to know this value explicitly
- .frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
- },
- .xsk = {
- .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
- .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
- .libbpf_flags = XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD,
- .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST,
- },
-};
-
_public_
-int knot_xsk_init(const char *ifname, const char *prog_fname,
- ssize_t *out_busy_frames)
+int knot_xsk_init(struct knot_xsk_socket **socket, const char *ifname, int if_queue, const char *prog_fname)
{
- the_config = &the_config_storage;
+ if (socket == NULL || *socket != NULL) {
+ return KNOT_EINVAL;
+ }
struct kxsk_iface *iface = kxsk_iface_new(ifname, prog_fname);
if (!iface) {
@@ -508,39 +504,34 @@ int knot_xsk_init(const char *ifname, const char *prog_fname,
/* Initialize shared packet_buffer for umem usage */
struct xsk_umem_info *umem =
- configure_xsk_umem(&the_config->umem, the_config->umem_frame_count);
+ configure_xsk_umem(&global_umem_config, UMEM_FRAME_COUNT);
if (umem == NULL) {
kxsk_iface_free(iface, false);
return KNOT_ENOMEM;
}
- /* Open and configure the AF_XDP (xsk) socket */
- assert(!the_socket);
-
- the_socket = xsk_configure_socket(the_config, umem, iface);
- if (!the_socket) {
+ *socket = xsk_configure_socket(umem, iface, if_queue);
+ if (!*socket) {
xsk_umem__delete(umem->umem);
kxsk_iface_free(iface, false);
return KNOT_NET_ESOCKET;
}
- int ret = kxsk_socket_start(iface, the_config->xsk_if_queue, the_socket->xsk);
+ int ret = kxsk_socket_start(iface, (*socket)->if_queue, (*socket)->xsk);
if (ret != KNOT_EOK) {
- xsk_socket__delete(the_socket->xsk);
- xsk_umem__delete(the_socket->umem->umem);
+ xsk_socket__delete((*socket)->xsk);
+ xsk_umem__delete((*socket)->umem->umem);
kxsk_iface_free(iface, false);
+ free(*socket);
+ *socket = NULL;
return ret;
}
- if (out_busy_frames != NULL) {
- *out_busy_frames = the_socket->umem->frame_count - the_socket->umem->free_count;
- }
-
return ret;
}
_public_
-int knot_xsk_get_poll_fd()
+int knot_xsk_get_poll_fd(struct knot_xsk_socket *socket)
{
- return xsk_socket__fd(the_socket->xsk);
+ return xsk_socket__fd(socket->xsk);
}
diff --git a/src/libknot/xdp/af_xdp.h b/src/libknot/xdp/af_xdp.h
index 5bed4bf4a..6009acae5 100644
--- a/src/libknot/xdp/af_xdp.h
+++ b/src/libknot/xdp/af_xdp.h
@@ -28,21 +28,22 @@ typedef struct {
struct iovec payload;
} knot_xsk_msg_t;
-int knot_xsk_init(const char *ifname, const char *prog_fname,
- ssize_t *out_busy_frames);
+struct knot_xsk_socket;
-void knot_xsk_deinit(void);
+int knot_xsk_init(struct knot_xsk_socket **socket, const char *ifname, int if_queue, const char *prog_fname);
-struct iovec knot_xsk_alloc_frame(void);
+void knot_xsk_deinit(struct knot_xsk_socket *socket);
-int knot_xsk_sendmsg(const knot_xsk_msg_t *msg); // msg->payload MUST have been allocated by knot_xsk_alloc_frame()
+struct iovec knot_xsk_alloc_frame(struct knot_xsk_socket *socket);
-int knot_xsk_sendmmsg(const knot_xsk_msg_t msgs[], uint32_t count); // skip messages with payload length == 0
+int knot_xsk_sendmsg(struct knot_xsk_socket *socket, const knot_xsk_msg_t *msg); // msg->payload MUST have been allocated by knot_xsk_alloc_frame()
-int knot_xsk_recvmmsg(knot_xsk_msg_t msgs[], uint32_t max_count, uint32_t *count);
+int knot_xsk_sendmmsg(struct knot_xsk_socket *socket, const knot_xsk_msg_t msgs[], uint32_t count); // skip messages with payload length == 0
-void knot_xsk_free_recvd(const knot_xsk_msg_t *msg);
+int knot_xsk_recvmmsg(struct knot_xsk_socket *socket, knot_xsk_msg_t msgs[], uint32_t max_count, uint32_t *count);
-int knot_xsk_check(void);
+void knot_xsk_free_recvd(struct knot_xsk_socket *socket, const knot_xsk_msg_t *msg);
-int knot_xsk_get_poll_fd(void);
+int knot_xsk_check(struct knot_xsk_socket *socket);
+
+int knot_xsk_get_poll_fd(struct knot_xsk_socket *socket);
diff --git a/src/libknot/xdp/bpf-user.h b/src/libknot/xdp/bpf-user.h
index 8d1556c94..3f80fdb5a 100644
--- a/src/libknot/xdp/bpf-user.h
+++ b/src/libknot/xdp/bpf-user.h
@@ -22,7 +22,6 @@ struct udpv4 {
} __attribute__((packed)); };
};
-
/** Data around one network interface. */
struct kxsk_iface {
const char *ifname;
@@ -33,18 +32,6 @@ struct kxsk_iface {
int xsks_map_fd;
};
-
-struct kxsk_config {
- int xsk_if_queue;
-
- struct xsk_umem_config umem; /**< For xsk_umem__create() from libbpf. */
- uint32_t umem_frame_count;
-
- struct xsk_socket_config xsk; /**< For xsk_socket__create() from libbpf. */
-
- struct udpv4 pkt_template;
-};
-
struct xsk_umem_info {
/** Fill queue: passing memory frames to kernel - ready to receive. */
struct xsk_ring_prod fq;
@@ -58,7 +45,7 @@ struct xsk_umem_info {
uint32_t free_count; /**< The number of free frames. */
uint32_t *free_indices; /**< Stack of indices of the free frames. */
};
-struct xsk_socket_info {
+typedef struct knot_xsk_socket {
/** Receive queue: passing arrived packets from kernel. */
struct xsk_ring_cons rx;
/** Transmit queue: passing packets to kernel for sending. */
@@ -71,7 +58,8 @@ struct xsk_socket_info {
bool kernel_needs_wakeup;
const struct kxsk_iface *iface;
-};
+ int if_queue;
+} knot_xsk_socket_t;
/* eBPF stuff (user-space part), implemented in ./bpf-user.c */
diff --git a/tests-fuzz/knotd_wrap/udp-handler.c b/tests-fuzz/knotd_wrap/udp-handler.c
index 00d70eb57..19f5082ba 100644
--- a/tests-fuzz/knotd_wrap/udp-handler.c
+++ b/tests-fuzz/knotd_wrap/udp-handler.c
@@ -60,8 +60,9 @@ static void udp_stdin_deinit(void *d)
free(d);
}
-static int udp_stdin_recv(int fd, void *d)
+static int udp_stdin_recv(int fd, void *d, void *unused)
{
+ UNUSED(unused);
udp_stdin_t *rq = (udp_stdin_t *)d;
rq->iov[RX].iov_len = fread(rq->iov[RX].iov_base, 1,
KNOT_WIRE_MAX_PKTSIZE, stdin);
@@ -72,15 +73,17 @@ static int udp_stdin_recv(int fd, void *d)
return rq->iov[RX].iov_len;
}
-static int udp_stdin_handle(udp_context_t *ctx, void *d)
+static int udp_stdin_handle(udp_context_t *ctx, void *d, void *unused)
{
+ UNUSED(unused);
udp_stdin_t *rq = (udp_stdin_t *)d;
udp_handle(ctx, STDIN_FILENO, &rq->addr, &rq->iov[RX], &rq->iov[TX]);
return 0;
}
-static int udp_stdin_send(void *d)
+static int udp_stdin_send(void *d, void *unused)
{
+ UNUSED(unused);
udp_stdin_t *rq = (udp_stdin_t *)d;
next(rq);
return 0;