summaryrefslogtreecommitdiffstats
path: root/daemon/session2.c
diff options
context:
space:
mode:
authorOto Šťáva <oto.stava@nic.cz>2022-10-20 12:10:15 +0200
committerOto Šťáva <oto.stava@nic.cz>2023-01-26 12:56:08 +0100
commit0a2e4472d786175badf19897a9d2e92bba8c5ce1 (patch)
treea9ab6ec35a45b2ea9e661b194627ff9260b66087 /daemon/session2.c
parentdaemon/session2: documentation (diff)
downloadknot-resolver-0a2e4472d786175badf19897a9d2e92bba8c5ce1.tar.xz
knot-resolver-0a2e4472d786175badf19897a9d2e92bba8c5ce1.zip
daemon: XDP with protolayers
Diffstat (limited to 'daemon/session2.c')
-rw-r--r--daemon/session2.c159
1 files changed, 112 insertions, 47 deletions
diff --git a/daemon/session2.c b/daemon/session2.c
index 903c992c..d9b5ba15 100644
--- a/daemon/session2.c
+++ b/daemon/session2.c
@@ -2,9 +2,15 @@
* SPDX-License-Identifier: GPL-3.0-or-later
*/
+#include "kresconfig.h"
+
#include <ucw/lib.h>
#include <sys/socket.h>
+#if ENABLE_XDP
+ #include <libknot/xdp/xdp.h>
+#endif
+
#include "lib/log.h"
#include "lib/utils.h"
#include "daemon/io.h"
@@ -101,11 +107,11 @@ const char *protolayer_payload_names[PROTOLAYER_PAYLOAD_COUNT] = {
/* Forward decls. */
static int session2_transport_pushv(struct session2 *s,
struct iovec *iov, int iovcnt,
- const void *target,
+ const struct comm_info *comm,
protolayer_finished_cb cb, void *baton);
static inline int session2_transport_push(struct session2 *s,
char *buf, size_t buf_len,
- const void *target,
+ const struct comm_info *comm,
protolayer_finished_cb cb, void *baton);
static int session2_transport_event(struct session2 *s,
enum protolayer_event_type event,
@@ -306,7 +312,7 @@ static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret)
ctx->layer_ix, layer_name_ctx(ctx), ctx->status);
if (ctx->finished_cb)
- ctx->finished_cb(ret, session, ctx->finished_cb_target,
+ ctx->finished_cb(ret, session, &ctx->comm,
ctx->finished_cb_baton);
free(ctx);
@@ -314,7 +320,7 @@ static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret)
return ret;
}
-static void protolayer_push_finished(int status, struct session2 *s, const void *target, void *baton)
+static void protolayer_push_finished(int status, struct session2 *s, const struct comm_info *comm, void *baton)
{
struct protolayer_iter_ctx *ctx = baton;
ctx->status = status;
@@ -338,11 +344,11 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx)
if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
session2_transport_push(session,
ctx->payload.buffer.buf, ctx->payload.buffer.len,
- ctx->target, protolayer_push_finished, ctx);
+ &ctx->comm, protolayer_push_finished, ctx);
} else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
session2_transport_pushv(session,
ctx->payload.iovec.iov, ctx->payload.iovec.cnt,
- ctx->target, protolayer_push_finished, ctx);
+ &ctx->comm, protolayer_push_finished, ctx);
} else {
kr_assert(false && "Invalid payload type");
return kr_error(EINVAL);
@@ -429,7 +435,7 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx)
static int protolayer_manager_submit(
struct protolayer_manager *manager,
enum protolayer_direction direction, size_t layer_ix,
- struct protolayer_payload payload, const void *target,
+ struct protolayer_payload payload, const struct comm_info *comm,
protolayer_finished_cb cb, void *baton)
{
struct protolayer_iter_ctx *ctx = malloc(manager->cb_ctx_size);
@@ -444,13 +450,11 @@ static int protolayer_manager_submit(
*ctx = (struct protolayer_iter_ctx) {
.payload = payload,
- .target = target,
- .comm = &manager->session->comm,
+ .comm = (comm) ? *comm : manager->session->comm,
.direction = direction,
.layer_ix = layer_ix,
.manager = manager,
.finished_cb = cb,
- .finished_cb_target = target,
.finished_cb_baton = baton
};
@@ -711,9 +715,8 @@ struct session2 *session2_new(enum session2_transport_type transport_type,
return s;
}
-static void session2_timer_on_close(uv_handle_t *handle)
+void session2_free(struct session2 *s)
{
- struct session2 *s = handle->data;
protolayer_manager_free(s->layers);
wire_buf_deinit(&s->wire_buf);
mm_ctx_delete(&s->pool);
@@ -722,11 +725,6 @@ static void session2_timer_on_close(uv_handle_t *handle)
free(s);
}
-void session2_free(struct session2 *s)
-{
- uv_close((uv_handle_t *)&s->timer, session2_timer_on_close);
-}
-
int session2_start_read(struct session2 *session)
{
if (session->transport.type == SESSION2_TRANSPORT_IO)
@@ -1010,40 +1008,44 @@ void session2_waitinglist_finalize(struct session2 *session, int status)
}
int session2_unwrap(struct session2 *s, struct protolayer_payload payload,
- const void *target, protolayer_finished_cb cb, void *baton)
+ const struct comm_info *comm, protolayer_finished_cb cb,
+ void *baton)
{
return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, 0,
- payload, target, cb, baton);
+ payload, comm, cb, baton);
}
int session2_unwrap_after(struct session2 *s, enum protolayer_protocol protocol,
- struct protolayer_payload payload, const void *target,
- protolayer_finished_cb cb, void *baton)
+ struct protolayer_payload payload,
+ const struct comm_info *comm,
+ protolayer_finished_cb cb, void *baton)
{
ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) + 1;
if (layer_ix < 0)
return layer_ix;
return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, layer_ix,
- payload, target, cb, baton);
+ payload, comm, cb, baton);
}
int session2_wrap(struct session2 *s, struct protolayer_payload payload,
- const void *target, protolayer_finished_cb cb, void *baton)
+ const struct comm_info *comm, protolayer_finished_cb cb,
+ void *baton)
{
return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP,
s->layers->num_layers - 1,
- payload, target, cb, baton);
+ payload, comm, cb, baton);
}
int session2_wrap_after(struct session2 *s, enum protolayer_protocol protocol,
- struct protolayer_payload payload, const void *target,
- protolayer_finished_cb cb, void *baton)
+ struct protolayer_payload payload,
+ const struct comm_info *comm,
+ protolayer_finished_cb cb, void *baton)
{
ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) - 1;
if (layer_ix < 0)
return layer_ix;
return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP, layer_ix,
- payload, target, cb, baton);
+ payload, comm, cb, baton);
}
static void session2_event_wrap(struct session2 *s, enum protolayer_event_type event, void *baton)
@@ -1125,7 +1127,7 @@ void session2_init_request(struct session2 *s, struct kr_request *req)
struct session2_pushv_ctx {
struct session2 *session;
protolayer_finished_cb cb;
- const void *target;
+ const struct comm_info *comm;
void *baton;
char *buf;
@@ -1134,12 +1136,12 @@ struct session2_pushv_ctx {
static void session2_transport_parent_pushv_finished(int status,
struct session2 *session,
- const void *target,
+ const struct comm_info *comm,
void *baton)
{
struct session2_pushv_ctx *ctx = baton;
if (ctx->cb)
- ctx->cb(status, ctx->session, target, ctx->baton);
+ ctx->cb(status, ctx->session, comm, ctx->baton);
free(ctx->buf);
free(ctx);
}
@@ -1148,7 +1150,7 @@ static void session2_transport_udp_queue_pushv_finished(int status, void *baton)
{
struct session2_pushv_ctx *ctx = baton;
if (ctx->cb)
- ctx->cb(status, ctx->session, ctx->target, ctx->baton);
+ ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
free(ctx->buf);
free(ctx);
}
@@ -1157,7 +1159,7 @@ static void session2_transport_udp_pushv_finished(uv_udp_send_t *req, int status
{
struct session2_pushv_ctx *ctx = req->data;
if (ctx->cb)
- ctx->cb(status, ctx->session, ctx->target, ctx->baton);
+ ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
free(ctx->buf);
free(ctx);
free(req);
@@ -1167,15 +1169,39 @@ static void session2_transport_stream_pushv_finished(uv_write_t *req, int status
{
struct session2_pushv_ctx *ctx = req->data;
if (ctx->cb)
- ctx->cb(status, ctx->session, ctx->target, ctx->baton);
+ ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
free(ctx->buf);
free(ctx);
free(req);
}
+#if ENABLE_XDP
+static void xdp_tx_waker(uv_idle_t *handle)
+{
+ xdp_handle_data_t *xhd = handle->data;
+ int ret = knot_xdp_send_finish(xhd->socket);
+ if (ret != KNOT_EAGAIN && ret != KNOT_EOK)
+ kr_log_error(XDP, "check: ret = %d, %s\n", ret, knot_strerror(ret));
+ /* Apparently some drivers need many explicit wake-up calls
+ * even if we push no additional packets (in case they accumulated a lot) */
+ if (ret != KNOT_EAGAIN)
+ uv_idle_stop(handle);
+ knot_xdp_send_prepare(xhd->socket);
+ /* LATER(opt.): it _might_ be better for performance to do these two steps
+ * at different points in time */
+ while (queue_len(xhd->tx_waker_queue)) {
+ struct session2_pushv_ctx *ctx = queue_head(xhd->tx_waker_queue);
+ if (ctx->cb)
+ ctx->cb(kr_ok(), ctx->session, ctx->comm, ctx->baton);
+ free(ctx);
+ queue_pop(xhd->tx_waker_queue);
+ }
+}
+#endif
+
static int session2_transport_pushv(struct session2 *s,
struct iovec *iov, int iovcnt,
- const void *target,
+ const struct comm_info *comm,
protolayer_finished_cb cb, void *baton)
{
if (kr_fails_assert(s))
@@ -1187,7 +1213,7 @@ static int session2_transport_pushv(struct session2 *s,
.session = s,
.cb = cb,
.baton = baton,
- .target = target
+ .comm = comm
};
switch (s->transport.type) {
@@ -1195,12 +1221,11 @@ static int session2_transport_pushv(struct session2 *s,
uv_handle_t *handle = s->transport.io.handle;
if (kr_fails_assert(handle)) {
if (cb)
- cb(kr_error(EINVAL), s, target, baton);
+ cb(kr_error(EINVAL), s, comm, baton);
free(ctx);
return kr_error(EINVAL);
}
- /* TODO: XDP */
if (handle->type == UV_UDP) {
if (ENABLE_SENDMMSG && !s->outgoing) {
int fd;
@@ -1212,7 +1237,7 @@ static int session2_transport_pushv(struct session2 *s,
if (kr_fails_assert(iovcnt == 1))
return kr_error(EINVAL);
- udp_queue_push(fd, target, iov->iov_base, iov->iov_len,
+ udp_queue_push(fd, comm->comm_addr, iov->iov_base, iov->iov_len,
session2_transport_udp_queue_pushv_finished,
ctx);
return kr_ok();
@@ -1220,11 +1245,11 @@ static int session2_transport_pushv(struct session2 *s,
uv_udp_send_t *req = malloc(sizeof(*req));
req->data = ctx;
int ret = uv_udp_send(req, (uv_udp_t *)handle,
- (uv_buf_t *)iov, iovcnt, target,
+ (uv_buf_t *)iov, iovcnt, comm->comm_addr,
session2_transport_udp_pushv_finished);
if (ret) {
if (cb)
- cb(ret, s, target, baton);
+ cb(ret, s, comm, baton);
free(req);
free(ctx);
}
@@ -1237,15 +1262,50 @@ static int session2_transport_pushv(struct session2 *s,
session2_transport_stream_pushv_finished);
if (ret) {
if (cb)
- cb(ret, s, target, baton);
+ cb(ret, s, comm, baton);
free(req);
free(ctx);
}
return ret;
+#if ENABLE_XDP
+ } else if (handle->type == UV_POLL) {
+ xdp_handle_data_t *xhd = handle->data;
+ if (kr_fails_assert(xhd && xhd->socket))
+ return kr_error(EIO);
+
+ /* TODO: support multiple iovecs properly? */
+ if (kr_fails_assert(iovcnt == 1))
+ return kr_error(EINVAL);
+
+ knot_xdp_msg_t msg;
+#if KNOT_VERSION_HEX >= 0x030100
+ /* We don't have a nice way of preserving the _msg_t from frame allocation,
+ * so we manually redo all other parts of knot_xdp_send_alloc() */
+ memset(&msg, 0, sizeof(msg));
+ bool ipv6 = comm->comm_addr->sa_family == AF_INET6;
+ msg.flags = ipv6 ? KNOT_XDP_MSG_IPV6 : 0;
+ memcpy(msg.eth_from, comm->eth_from, sizeof(comm->eth_from));
+ memcpy(msg.eth_to, comm->eth_to, sizeof(comm->eth_to));
+#endif
+ const struct sockaddr *ip_from = comm->dst_addr;
+ const struct sockaddr *ip_to = comm->comm_addr;
+ memcpy(&msg.ip_from, ip_from, kr_sockaddr_len(ip_from));
+ memcpy(&msg.ip_to, ip_to, kr_sockaddr_len(ip_to));
+ msg.payload = *iov;
+
+ uint32_t sent;
+ int ret = knot_xdp_send(xhd->socket, &msg, 1, &sent);
+
+ queue_push(xhd->tx_waker_queue, ctx);
+ uv_idle_start(&xhd->tx_waker, xdp_tx_waker);
+ kr_log_debug(XDP, "pushed a packet, ret = %d\n", ret);
+
+ return kr_ok();
+#endif
} else {
kr_assert(false && "Unsupported handle");
if (cb)
- cb(kr_error(EINVAL), s, target, baton);
+ cb(kr_error(EINVAL), s, comm, baton);
free(ctx);
return kr_error(EINVAL);
}
@@ -1257,7 +1317,7 @@ static int session2_transport_pushv(struct session2 *s,
return kr_error(EINVAL);
}
int ret = session2_wrap(parent, protolayer_iovec(iov, iovcnt),
- target, session2_transport_parent_pushv_finished,
+ comm, session2_transport_parent_pushv_finished,
ctx);
return (ret < 0) ? ret : kr_ok();
@@ -1276,18 +1336,18 @@ struct push_ctx {
static void session2_transport_single_push_finished(int status,
struct session2 *s,
- const void *target,
+ const struct comm_info *comm,
void *baton)
{
struct push_ctx *ctx = baton;
if (ctx->cb)
- ctx->cb(status, s, target, ctx->baton);
+ ctx->cb(status, s, comm, ctx->baton);
free(ctx);
}
static inline int session2_transport_push(struct session2 *s,
char *buf, size_t buf_len,
- const void *target,
+ const struct comm_info *comm,
protolayer_finished_cb cb, void *baton)
{
struct push_ctx *ctx = malloc(sizeof(*ctx));
@@ -1301,7 +1361,7 @@ static inline int session2_transport_push(struct session2 *s,
.baton = baton
};
- return session2_transport_pushv(s, &ctx->iov, 1, target,
+ return session2_transport_pushv(s, &ctx->iov, 1, comm,
session2_transport_single_push_finished, ctx);
}
@@ -1315,7 +1375,12 @@ static void on_session2_handle_close(uv_handle_t *handle)
static int session2_handle_close(struct session2 *s, uv_handle_t *handle)
{
+ if (kr_fails_assert(s->transport.type == SESSION2_TRANSPORT_IO
+ && s->transport.io.handle == handle))
+ return kr_error(EINVAL);
+
io_stop_read(handle);
+ uv_close((uv_handle_t *)&s->timer, NULL);
uv_close(handle, on_session2_handle_close);
return kr_ok();
}