diff options
author | Oto Šťáva <oto.stava@nic.cz> | 2022-10-20 12:10:15 +0200 |
---|---|---|
committer | Oto Šťáva <oto.stava@nic.cz> | 2023-01-26 12:56:08 +0100 |
commit | 0a2e4472d786175badf19897a9d2e92bba8c5ce1 (patch) | |
tree | a9ab6ec35a45b2ea9e661b194627ff9260b66087 /daemon/session2.c | |
parent | daemon/session2: documentation (diff) | |
download | knot-resolver-0a2e4472d786175badf19897a9d2e92bba8c5ce1.tar.xz knot-resolver-0a2e4472d786175badf19897a9d2e92bba8c5ce1.zip |
daemon: XDP with protolayers
Diffstat (limited to 'daemon/session2.c')
-rw-r--r-- | daemon/session2.c | 159 |
1 files changed, 112 insertions, 47 deletions
diff --git a/daemon/session2.c b/daemon/session2.c index 903c992c..d9b5ba15 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -2,9 +2,15 @@ * SPDX-License-Identifier: GPL-3.0-or-later */ +#include "kresconfig.h" + #include <ucw/lib.h> #include <sys/socket.h> +#if ENABLE_XDP + #include <libknot/xdp/xdp.h> +#endif + #include "lib/log.h" #include "lib/utils.h" #include "daemon/io.h" @@ -101,11 +107,11 @@ const char *protolayer_payload_names[PROTOLAYER_PAYLOAD_COUNT] = { /* Forward decls. */ static int session2_transport_pushv(struct session2 *s, struct iovec *iov, int iovcnt, - const void *target, + const struct comm_info *comm, protolayer_finished_cb cb, void *baton); static inline int session2_transport_push(struct session2 *s, char *buf, size_t buf_len, - const void *target, + const struct comm_info *comm, protolayer_finished_cb cb, void *baton); static int session2_transport_event(struct session2 *s, enum protolayer_event_type event, @@ -306,7 +312,7 @@ static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret) ctx->layer_ix, layer_name_ctx(ctx), ctx->status); if (ctx->finished_cb) - ctx->finished_cb(ret, session, ctx->finished_cb_target, + ctx->finished_cb(ret, session, &ctx->comm, ctx->finished_cb_baton); free(ctx); @@ -314,7 +320,7 @@ static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret) return ret; } -static void protolayer_push_finished(int status, struct session2 *s, const void *target, void *baton) +static void protolayer_push_finished(int status, struct session2 *s, const struct comm_info *comm, void *baton) { struct protolayer_iter_ctx *ctx = baton; ctx->status = status; @@ -338,11 +344,11 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx) if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) { session2_transport_push(session, ctx->payload.buffer.buf, ctx->payload.buffer.len, - ctx->target, protolayer_push_finished, ctx); + &ctx->comm, protolayer_push_finished, ctx); } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) { session2_transport_pushv(session, ctx->payload.iovec.iov, ctx->payload.iovec.cnt, - ctx->target, protolayer_push_finished, ctx); + &ctx->comm, protolayer_push_finished, ctx); } else { kr_assert(false && "Invalid payload type"); return kr_error(EINVAL); @@ -429,7 +435,7 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx) static int protolayer_manager_submit( struct protolayer_manager *manager, enum protolayer_direction direction, size_t layer_ix, - struct protolayer_payload payload, const void *target, + struct protolayer_payload payload, const struct comm_info *comm, protolayer_finished_cb cb, void *baton) { struct protolayer_iter_ctx *ctx = malloc(manager->cb_ctx_size); @@ -444,13 +450,11 @@ static int protolayer_manager_submit( *ctx = (struct protolayer_iter_ctx) { .payload = payload, - .target = target, - .comm = &manager->session->comm, + .comm = (comm) ? *comm : manager->session->comm, .direction = direction, .layer_ix = layer_ix, .manager = manager, .finished_cb = cb, - .finished_cb_target = target, .finished_cb_baton = baton }; @@ -711,9 +715,8 @@ struct session2 *session2_new(enum session2_transport_type transport_type, return s; } -static void session2_timer_on_close(uv_handle_t *handle) +void session2_free(struct session2 *s) { - struct session2 *s = handle->data; protolayer_manager_free(s->layers); wire_buf_deinit(&s->wire_buf); mm_ctx_delete(&s->pool); @@ -722,11 +725,6 @@ static void session2_timer_on_close(uv_handle_t *handle) free(s); } -void session2_free(struct session2 *s) -{ - uv_close((uv_handle_t *)&s->timer, session2_timer_on_close); -} - int session2_start_read(struct session2 *session) { if (session->transport.type == SESSION2_TRANSPORT_IO) @@ -1010,40 +1008,44 @@ void session2_waitinglist_finalize(struct session2 *session, int status) } int session2_unwrap(struct session2 *s, struct protolayer_payload payload, - const void *target, protolayer_finished_cb cb, void *baton) + const struct comm_info *comm, protolayer_finished_cb cb, + void *baton) { return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, 0, - payload, target, cb, baton); + payload, comm, cb, baton); } int session2_unwrap_after(struct session2 *s, enum protolayer_protocol protocol, - struct protolayer_payload payload, const void *target, - protolayer_finished_cb cb, void *baton) + struct protolayer_payload payload, + const struct comm_info *comm, + protolayer_finished_cb cb, void *baton) { ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) + 1; if (layer_ix < 0) return layer_ix; return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, layer_ix, - payload, target, cb, baton); + payload, comm, cb, baton); } int session2_wrap(struct session2 *s, struct protolayer_payload payload, - const void *target, protolayer_finished_cb cb, void *baton) + const struct comm_info *comm, protolayer_finished_cb cb, + void *baton) { return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP, s->layers->num_layers - 1, - payload, target, cb, baton); + payload, comm, cb, baton); } int session2_wrap_after(struct session2 *s, enum protolayer_protocol protocol, - struct protolayer_payload payload, const void *target, - protolayer_finished_cb cb, void *baton) + struct protolayer_payload payload, + const struct comm_info *comm, + protolayer_finished_cb cb, void *baton) { ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) - 1; if (layer_ix < 0) return layer_ix; return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP, layer_ix, - payload, target, cb, baton); + payload, comm, cb, baton); } static void session2_event_wrap(struct session2 *s, enum protolayer_event_type event, void *baton) @@ -1125,7 +1127,7 @@ void session2_init_request(struct session2 *s, struct kr_request *req) struct session2_pushv_ctx { struct session2 *session; protolayer_finished_cb cb; - const void *target; + const struct comm_info *comm; void *baton; char *buf; @@ -1134,12 +1136,12 @@ struct session2_pushv_ctx { static void session2_transport_parent_pushv_finished(int status, struct session2 *session, - const void *target, + const struct comm_info *comm, void *baton) { struct session2_pushv_ctx *ctx = baton; if (ctx->cb) - ctx->cb(status, ctx->session, target, ctx->baton); + ctx->cb(status, ctx->session, comm, ctx->baton); free(ctx->buf); free(ctx); } @@ -1148,7 +1150,7 @@ static void session2_transport_udp_queue_pushv_finished(int status, void *baton) { struct session2_pushv_ctx *ctx = baton; if (ctx->cb) - ctx->cb(status, ctx->session, ctx->target, ctx->baton); + ctx->cb(status, ctx->session, ctx->comm, ctx->baton); free(ctx->buf); free(ctx); } @@ -1157,7 +1159,7 @@ static void session2_transport_udp_pushv_finished(uv_udp_send_t *req, int status { struct session2_pushv_ctx *ctx = req->data; if (ctx->cb) - ctx->cb(status, ctx->session, ctx->target, ctx->baton); + ctx->cb(status, ctx->session, ctx->comm, ctx->baton); free(ctx->buf); free(ctx); free(req); @@ -1167,15 +1169,39 @@ static void session2_transport_stream_pushv_finished(uv_write_t *req, int status { struct session2_pushv_ctx *ctx = req->data; if (ctx->cb) - ctx->cb(status, ctx->session, ctx->target, ctx->baton); + ctx->cb(status, ctx->session, ctx->comm, ctx->baton); free(ctx->buf); free(ctx); free(req); } +#if ENABLE_XDP +static void xdp_tx_waker(uv_idle_t *handle) +{ + xdp_handle_data_t *xhd = handle->data; + int ret = knot_xdp_send_finish(xhd->socket); + if (ret != KNOT_EAGAIN && ret != KNOT_EOK) + kr_log_error(XDP, "check: ret = %d, %s\n", ret, knot_strerror(ret)); + /* Apparently some drivers need many explicit wake-up calls + * even if we push no additional packets (in case they accumulated a lot) */ + if (ret != KNOT_EAGAIN) + uv_idle_stop(handle); + knot_xdp_send_prepare(xhd->socket); + /* LATER(opt.): it _might_ be better for performance to do these two steps + * at different points in time */ + while (queue_len(xhd->tx_waker_queue)) { + struct session2_pushv_ctx *ctx = queue_head(xhd->tx_waker_queue); + if (ctx->cb) + ctx->cb(kr_ok(), ctx->session, ctx->comm, ctx->baton); + free(ctx); + queue_pop(xhd->tx_waker_queue); + } +} +#endif + static int session2_transport_pushv(struct session2 *s, struct iovec *iov, int iovcnt, - const void *target, + const struct comm_info *comm, protolayer_finished_cb cb, void *baton) { if (kr_fails_assert(s)) @@ -1187,7 +1213,7 @@ static int session2_transport_pushv(struct session2 *s, .session = s, .cb = cb, .baton = baton, - .target = target + .comm = comm }; switch (s->transport.type) { @@ -1195,12 +1221,11 @@ static int session2_transport_pushv(struct session2 *s, uv_handle_t *handle = s->transport.io.handle; if (kr_fails_assert(handle)) { if (cb) - cb(kr_error(EINVAL), s, target, baton); + cb(kr_error(EINVAL), s, comm, baton); free(ctx); return kr_error(EINVAL); } - /* TODO: XDP */ if (handle->type == UV_UDP) { if (ENABLE_SENDMMSG && !s->outgoing) { int fd; @@ -1212,7 +1237,7 @@ static int session2_transport_pushv(struct session2 *s, if (kr_fails_assert(iovcnt == 1)) return kr_error(EINVAL); - udp_queue_push(fd, target, iov->iov_base, iov->iov_len, + udp_queue_push(fd, comm->comm_addr, iov->iov_base, iov->iov_len, session2_transport_udp_queue_pushv_finished, ctx); return kr_ok(); @@ -1220,11 +1245,11 @@ static int session2_transport_pushv(struct session2 *s, uv_udp_send_t *req = malloc(sizeof(*req)); req->data = ctx; int ret = uv_udp_send(req, (uv_udp_t *)handle, - (uv_buf_t *)iov, iovcnt, target, + (uv_buf_t *)iov, iovcnt, comm->comm_addr, session2_transport_udp_pushv_finished); if (ret) { if (cb) - cb(ret, s, target, baton); + cb(ret, s, comm, baton); free(req); free(ctx); } @@ -1237,15 +1262,50 @@ static int session2_transport_pushv(struct session2 *s, session2_transport_stream_pushv_finished); if (ret) { if (cb) - cb(ret, s, target, baton); + cb(ret, s, comm, baton); free(req); free(ctx); } return ret; +#if ENABLE_XDP + } else if (handle->type == UV_POLL) { + xdp_handle_data_t *xhd = handle->data; + if (kr_fails_assert(xhd && xhd->socket)) + return kr_error(EIO); + + /* TODO: support multiple iovecs properly? */ + if (kr_fails_assert(iovcnt == 1)) + return kr_error(EINVAL); + + knot_xdp_msg_t msg; +#if KNOT_VERSION_HEX >= 0x030100 + /* We don't have a nice way of preserving the _msg_t from frame allocation, + * so we manually redo all other parts of knot_xdp_send_alloc() */ + memset(&msg, 0, sizeof(msg)); + bool ipv6 = comm->comm_addr->sa_family == AF_INET6; + msg.flags = ipv6 ? KNOT_XDP_MSG_IPV6 : 0; + memcpy(msg.eth_from, comm->eth_from, sizeof(comm->eth_from)); + memcpy(msg.eth_to, comm->eth_to, sizeof(comm->eth_to)); +#endif + const struct sockaddr *ip_from = comm->dst_addr; + const struct sockaddr *ip_to = comm->comm_addr; + memcpy(&msg.ip_from, ip_from, kr_sockaddr_len(ip_from)); + memcpy(&msg.ip_to, ip_to, kr_sockaddr_len(ip_to)); + msg.payload = *iov; + + uint32_t sent; + int ret = knot_xdp_send(xhd->socket, &msg, 1, &sent); + + queue_push(xhd->tx_waker_queue, ctx); + uv_idle_start(&xhd->tx_waker, xdp_tx_waker); + kr_log_debug(XDP, "pushed a packet, ret = %d\n", ret); + + return kr_ok(); +#endif } else { kr_assert(false && "Unsupported handle"); if (cb) - cb(kr_error(EINVAL), s, target, baton); + cb(kr_error(EINVAL), s, comm, baton); free(ctx); return kr_error(EINVAL); } @@ -1257,7 +1317,7 @@ static int session2_transport_pushv(struct session2 *s, return kr_error(EINVAL); } int ret = session2_wrap(parent, protolayer_iovec(iov, iovcnt), - target, session2_transport_parent_pushv_finished, + comm, session2_transport_parent_pushv_finished, ctx); return (ret < 0) ? ret : kr_ok(); @@ -1276,18 +1336,18 @@ struct push_ctx { static void session2_transport_single_push_finished(int status, struct session2 *s, - const void *target, + const struct comm_info *comm, void *baton) { struct push_ctx *ctx = baton; if (ctx->cb) - ctx->cb(status, s, target, ctx->baton); + ctx->cb(status, s, comm, ctx->baton); free(ctx); } static inline int session2_transport_push(struct session2 *s, char *buf, size_t buf_len, - const void *target, + const struct comm_info *comm, protolayer_finished_cb cb, void *baton) { struct push_ctx *ctx = malloc(sizeof(*ctx)); @@ -1301,7 +1361,7 @@ static inline int session2_transport_push(struct session2 *s, .baton = baton }; - return session2_transport_pushv(s, &ctx->iov, 1, target, + return session2_transport_pushv(s, &ctx->iov, 1, comm, session2_transport_single_push_finished, ctx); } @@ -1315,7 +1375,12 @@ static void on_session2_handle_close(uv_handle_t *handle) static int session2_handle_close(struct session2 *s, uv_handle_t *handle) { + if (kr_fails_assert(s->transport.type == SESSION2_TRANSPORT_IO + && s->transport.io.handle == handle)) + return kr_error(EINVAL); + io_stop_read(handle); + uv_close((uv_handle_t *)&s->timer, NULL); uv_close(handle, on_session2_handle_close); return kr_ok(); } |