summaryrefslogtreecommitdiffstats
path: root/daemon/io.c
diff options
context:
space:
mode:
authorOto Šťáva <oto.stava@nic.cz>2022-08-02 10:53:38 +0200
committerOto Šťáva <oto.stava@nic.cz>2023-01-26 12:56:07 +0100
commit5501d84bb244f55ef8ef06be95456b3b4f516fa3 (patch)
tree6fd2b2d6e44a3ff308bbbae44a599dcbe4de8c10 /daemon/io.c
parentsession2: protocol layer API (diff)
downloadknot-resolver-5501d84bb244f55ef8ef06be95456b3b4f516fa3.tar.xz
knot-resolver-5501d84bb244f55ef8ef06be95456b3b4f516fa3.zip
daemon: basic implementation of TCP and UDP with protolayers
Diffstat (limited to 'daemon/io.c')
-rw-r--r--daemon/io.c1139
1 files changed, 648 insertions, 491 deletions
diff --git a/daemon/io.c b/daemon/io.c
index 66ad03da..c9fcc0eb 100644
--- a/daemon/io.c
+++ b/daemon/io.c
@@ -21,7 +21,7 @@
#include "daemon/worker.h"
#include "daemon/tls.h"
#include "daemon/http.h"
-#include "daemon/session.h"
+#include "daemon/session2.h"
#include "contrib/cleanup.h"
#include "lib/utils.h"
@@ -40,9 +40,9 @@ static void check_bufsize(uv_handle_t* handle)
* This is magic presuming we can pull in a whole recvmmsg width in one wave.
* Linux will double this the bufsize wanted.
*/
- const int bufsize_want = 2 * sizeof(the_worker->wire_buf) ;
- negotiate_bufsize(uv_recv_buffer_size, handle, bufsize_want);
- negotiate_bufsize(uv_send_buffer_size, handle, bufsize_want);
+ const int BUF_SIZE = 2 * sizeof(RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE);
+ negotiate_bufsize(uv_recv_buffer_size, handle, BUF_SIZE);
+ negotiate_bufsize(uv_send_buffer_size, handle, BUF_SIZE);
}
#undef negotiate_bufsize
@@ -57,26 +57,26 @@ static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t*
* guaranteed to be unchanged only for the duration of
* udp_read() and tcp_read().
*/
- struct session *s = handle->data;
- if (!session_flags(s)->has_tls) {
- buf->base = (char *) session_wirebuf_get_free_start(s);
- buf->len = session_wirebuf_get_free_size(s);
- } else {
- struct tls_common_ctx *ctx = session_tls_get_common_ctx(s);
- buf->base = (char *) ctx->recv_buf;
- buf->len = sizeof(ctx->recv_buf);
- }
+ struct session2 *s = handle->data;
+ buf->base = wire_buf_free_space(&s->wire_buf);
+ buf->len = wire_buf_free_space_length(&s->wire_buf);
+}
+
+static void udp_on_unwrapped(int status, struct session2 *session,
+ const void *target, void *baton)
+{
+ wire_buf_reset(&session->wire_buf);
}
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
const struct sockaddr *comm_addr, unsigned flags)
{
- struct session *s = handle->data;
- if (session_flags(s)->closing || nread <= 0 || comm_addr->sa_family == AF_UNSPEC)
+ struct session2 *s = handle->data;
+ if (s->closing || nread <= 0 || comm_addr->sa_family == AF_UNSPEC)
return;
- if (session_flags(s)->outgoing) {
- const struct sockaddr *peer = session_get_peer(s);
+ if (s->outgoing) {
+ const struct sockaddr *peer = session2_get_peer(s);
if (kr_fails_assert(peer->sa_family != AF_UNSPEC))
return;
if (kr_sockaddr_cmp(peer, comm_addr) != 0) {
@@ -86,64 +86,16 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
}
}
- const uint8_t *data = (const uint8_t *)buf->base;
- ssize_t data_len = nread;
- const struct sockaddr *src_addr = comm_addr;
- const struct sockaddr *dst_addr = NULL;
- struct proxy_result proxy;
- bool has_proxy = false;
- if (!session_flags(s)->outgoing && proxy_header_present(data, data_len)) {
- if (!proxy_allowed(comm_addr)) {
- kr_log_debug(IO, "<= ignoring PROXYv2 UDP from disallowed address '%s'\n",
- kr_straddr(comm_addr));
- return;
- }
-
- ssize_t trimmed = proxy_process_header(&proxy, s, data, data_len);
- if (trimmed == KNOT_EMALF) {
- if (kr_log_is_debug(IO, NULL)) {
- kr_log_debug(IO, "<= ignoring malformed PROXYv2 UDP "
- "from address '%s'\n",
- kr_straddr(comm_addr));
- }
- return;
- } else if (trimmed < 0) {
- if (kr_log_is_debug(IO, NULL)) {
- kr_log_debug(IO, "<= error processing PROXYv2 UDP "
- "from address '%s', ignoring\n",
- kr_straddr(comm_addr));
- }
- return;
- }
-
- if (proxy.command == PROXY2_CMD_PROXY && proxy.family != AF_UNSPEC) {
- has_proxy = true;
- src_addr = &proxy.src_addr.ip;
- dst_addr = &proxy.dst_addr.ip;
-
- if (kr_log_is_debug(IO, NULL)) {
- kr_log_debug(IO, "<= UDP query from '%s'\n",
- kr_straddr(src_addr));
- kr_log_debug(IO, "<= proxied through '%s'\n",
- kr_straddr(comm_addr));
- }
- }
- data = session_wirebuf_get_free_start(s);
- data_len = nread - trimmed;
+ int ret = wire_buf_consume(&s->wire_buf, nread);
+ if (ret) {
+ wire_buf_reset(&s->wire_buf);
+ return;
}
- ssize_t consumed = session_wirebuf_consume(s, data, data_len);
- kr_assert(consumed == data_len);
-
- struct io_comm_data comm = {
- .src_addr = src_addr,
- .comm_addr = comm_addr,
- .dst_addr = dst_addr,
- .proxy = (has_proxy) ? &proxy : NULL
- };
- session_wirebuf_process(s, &comm);
- session_wirebuf_discard(s);
- mp_flush(the_worker->pkt_pool.ctx);
+ ret = session2_unwrap(s, protolayer_wire_buf(&s->wire_buf), comm_addr,
+ udp_on_unwrapped, NULL);
+ if (ret)
+ wire_buf_reset(&s->wire_buf);
}
static int family_to_freebind_option(sa_family_t sa_family, int *level, int *name)
@@ -180,6 +132,304 @@ static int family_to_freebind_option(sa_family_t sa_family, int *level, int *nam
return kr_ok();
}
+
+struct pl_udp_iter_data {
+ struct proxy_result proxy;
+ bool has_proxy;
+};
+
+static int pl_udp_iter_init(struct protolayer_manager *manager, struct protolayer_data *layer)
+{
+ struct pl_udp_iter_data *udp = protolayer_iter_data(layer);
+ *udp = (struct pl_udp_iter_data){0};
+ return 0;
+}
+
+static enum protolayer_cb_result pl_udp_unwrap(struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+{
+ if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
+ /* events should not happen in UDP (currently) */
+ return protolayer_continue(ctx);
+ }
+
+ ctx->payload = protolayer_as_buffer(&ctx->payload);
+ if (kr_fails_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER)) {
+ /* unsupported payload */
+ return protolayer_break(ctx, kr_error(EINVAL));
+ }
+
+ struct session2 *s = ctx->manager->session;
+ struct pl_udp_iter_data *udp = protolayer_iter_data(layer);
+
+ char *data = ctx->payload.buffer.buf;
+ ssize_t data_len = ctx->payload.buffer.len;
+ struct comm_info *comm = &ctx->comm;
+ comm->comm_addr = ctx->target;
+ comm->src_addr = ctx->target;
+ if (!s->outgoing && proxy_header_present(data, data_len)) {
+ if (!proxy_allowed(comm->comm_addr)) {
+ kr_log_debug(IO, "<= ignoring PROXYv2 UDP from disallowed address '%s'\n",
+ kr_straddr(comm->comm_addr));
+ return protolayer_break(ctx, kr_error(EPERM));
+ }
+
+ ssize_t trimmed = proxy_process_header(&udp->proxy, data, data_len);
+ if (trimmed == KNOT_EMALF) {
+ if (kr_log_is_debug(IO, NULL)) {
+ kr_log_debug(IO, "<= ignoring malformed PROXYv2 UDP "
+ "from address '%s'\n",
+ kr_straddr(comm->comm_addr));
+ }
+ return protolayer_break(ctx, kr_error(EINVAL));
+ } else if (trimmed < 0) {
+ if (kr_log_is_debug(IO, NULL)) {
+ kr_log_debug(IO, "<= error processing PROXYv2 UDP "
+ "from address '%s', ignoring\n",
+ kr_straddr(comm->comm_addr));
+ }
+ return protolayer_break(ctx, kr_error(EINVAL));
+ }
+
+ if (udp->proxy.command == PROXY2_CMD_PROXY && udp->proxy.family != AF_UNSPEC) {
+ udp->has_proxy = true;
+
+ comm->src_addr = &udp->proxy.src_addr.ip;
+ comm->dst_addr = &udp->proxy.dst_addr.ip;
+ comm->proxy = &udp->proxy;
+
+ if (kr_log_is_debug(IO, NULL)) {
+ kr_log_debug(IO, "<= UDP query from '%s'\n",
+ kr_straddr(comm->src_addr));
+ kr_log_debug(IO, "<= proxied through '%s'\n",
+ kr_straddr(comm->comm_addr));
+ }
+ }
+
+ ctx->payload = protolayer_buffer(data + trimmed, data_len - trimmed);
+ }
+
+ return protolayer_continue(ctx);
+}
+
+static enum protolayer_cb_result pl_udp_wrap(struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+{
+ return protolayer_push(ctx);
+}
+
+
+struct pl_tcp_sess_data {
+ struct proxy_result proxy;
+ struct wire_buf wire_buf;
+ bool had_data : 1;
+ bool has_proxy : 1;
+};
+
+static int pl_tcp_sess_init(struct protolayer_manager *manager, struct protolayer_data *layer)
+{
+ struct pl_tcp_sess_data *tcp = protolayer_sess_data(layer);
+ *tcp = (struct pl_tcp_sess_data){0};
+ return 0;
+}
+
+static int pl_tcp_sess_deinit(struct protolayer_manager *manager, struct protolayer_data *layer)
+{
+ struct pl_tcp_sess_data *tcp = protolayer_sess_data(layer);
+ wire_buf_deinit(&tcp->wire_buf);
+ return 0;
+}
+
+static enum protolayer_cb_result pl_tcp_unwrap_timeout(
+ struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+{
+ /* TODO - connecting timeout? */
+ struct session2 *s = ctx->manager->session;
+
+ if (kr_fails_assert(!s->closing))
+ return protolayer_continue(ctx);
+
+ if (!session2_tasklist_is_empty(s)) {
+ int finalized = session2_tasklist_finalize_expired(s);
+ the_worker->stats.timeout += finalized;
+ /* session2_tasklist_finalize_expired() may call worker_task_finalize().
+ * If session is a source session and there were IO errors,
+ * worker_task_finalize() can finalize all tasks and close session. */
+ if (s->closing)
+ return protolayer_continue(ctx);
+ }
+
+ if (!session2_tasklist_is_empty(s)) {
+ session2_timer_stop(s);
+ session2_timer_start(s,
+ KR_RESOLVE_TIME_LIMIT / 2,
+ KR_RESOLVE_TIME_LIMIT / 2,
+ PROTOLAYER_UNWRAP);
+ } else {
+ /* Normally it should not happen,
+ * but better to check if there anything in this list. */
+ while (!session2_waitinglist_is_empty(s)) {
+ struct qr_task *t = session2_waitinglist_pop(s, false);
+ worker_task_finalize(t, KR_STATE_FAIL);
+ worker_task_unref(t);
+ the_worker->stats.timeout += 1;
+ if (s->closing)
+ return protolayer_continue(ctx);
+ }
+ uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout;
+ uint64_t idle_time = kr_now() - s->last_activity;
+ if (idle_time < idle_in_timeout) {
+ idle_in_timeout -= idle_time;
+ session2_timer_stop(s);
+ session2_timer_start(s,
+ idle_in_timeout, idle_in_timeout,
+ PROTOLAYER_UNWRAP);
+ } else {
+ struct sockaddr *peer = session2_get_peer(s);
+ char *peer_str = kr_straddr(peer);
+ kr_log_debug(IO, "=> closing connection to '%s'\n",
+ peer_str ? peer_str : "");
+ if (s->outgoing) {
+ worker_del_tcp_waiting(peer);
+ worker_del_tcp_connected(peer);
+ }
+ session2_unwrap(s, protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), NULL, NULL, NULL);
+ }
+ }
+
+ return protolayer_continue(ctx);
+}
+
+static enum protolayer_cb_result pl_tcp_unwrap(struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+{
+ struct session2 *s = ctx->manager->session;
+ struct pl_tcp_sess_data *tcp = protolayer_sess_data(layer);
+ struct sockaddr *peer = session2_get_peer(s);
+
+ if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
+ if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT)
+ return pl_tcp_unwrap_timeout(layer, ctx);
+
+ /* pass thru */
+ return protolayer_continue(ctx);
+ }
+
+ if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
+ const char *buf = ctx->payload.buffer.buf;
+ const size_t len = ctx->payload.buffer.len;
+
+ /* Copy a simple buffer into internal wirebuffer. */
+ if (len > KNOT_WIRE_MAX_PKTSIZE)
+ return protolayer_break(ctx, kr_error(EMSGSIZE));
+
+ if (!tcp->wire_buf.buf) {
+ int ret = wire_buf_reserve(&tcp->wire_buf,
+ KNOT_WIRE_MAX_PKTSIZE);
+ if (ret)
+ return protolayer_break(ctx, ret);
+ }
+
+ /* Try to make space */
+ while (len > wire_buf_free_space_length(&tcp->wire_buf)) {
+ if (wire_buf_data_length(&tcp->wire_buf) > 0 ||
+ tcp->wire_buf.start == 0)
+ return protolayer_break(ctx, kr_error(EMSGSIZE));
+
+ wire_buf_movestart(&tcp->wire_buf);
+ }
+
+ memcpy(wire_buf_free_space(&tcp->wire_buf), buf, len);
+ wire_buf_consume(&tcp->wire_buf, ctx->payload.buffer.len);
+ ctx->payload = protolayer_wire_buf(&tcp->wire_buf);
+ }
+
+ if (kr_fails_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF)) {
+ /* TODO: iovec support unimplemented */
+ return protolayer_break(ctx, kr_error(EINVAL));
+ }
+
+ char *data = wire_buf_data(ctx->payload.wire_buf); /* layer's or session's wirebuf */
+ ssize_t data_len = wire_buf_data_length(ctx->payload.wire_buf);
+ struct comm_info *comm = &ctx->comm;
+ comm->src_addr = peer;
+ comm->comm_addr = peer;
+ comm->dst_addr = NULL;
+ if (!s->outgoing && !tcp->had_data && proxy_header_present(data, data_len)) {
+ if (!proxy_allowed(comm->src_addr)) {
+ if (kr_log_is_debug(IO, NULL)) {
+ kr_log_debug(IO, "<= connection to '%s': PROXYv2 not allowed "
+ "for this peer, close\n",
+ kr_straddr(peer));
+ }
+ worker_end_tcp(s);
+ ctx->payload = protolayer_event_nd(PROTOLAYER_EVENT_CLOSE);
+ return protolayer_push(ctx);
+ }
+
+ ssize_t trimmed = proxy_process_header(&tcp->proxy, data, data_len);
+ if (trimmed < 0) {
+ if (kr_log_is_debug(IO, NULL)) {
+ if (trimmed == KNOT_EMALF) {
+ kr_log_debug(IO, "<= connection to '%s': "
+ "malformed PROXYv2 header, close\n",
+ kr_straddr(comm->src_addr));
+ } else {
+ kr_log_debug(IO, "<= connection to '%s': "
+ "error processing PROXYv2 header, close\n",
+ kr_straddr(comm->src_addr));
+ }
+ }
+ worker_end_tcp(s);
+ ctx->payload = protolayer_event_nd(PROTOLAYER_EVENT_CLOSE);
+ return protolayer_push(ctx);
+ } else if (trimmed == 0) {
+ ctx->payload = protolayer_event_nd(PROTOLAYER_EVENT_CLOSE);
+ return protolayer_push(ctx);
+ }
+
+ if (tcp->proxy.command != PROXY2_CMD_LOCAL && tcp->proxy.family != AF_UNSPEC) {
+ comm->src_addr = &tcp->proxy.src_addr.ip;
+ comm->dst_addr = &tcp->proxy.dst_addr.ip;
+
+ if (kr_log_is_debug(IO, NULL)) {
+ kr_log_debug(IO, "<= TCP stream from '%s'\n",
+ kr_straddr(comm->src_addr));
+ kr_log_debug(IO, "<= proxied through '%s'\n",
+ kr_straddr(comm->comm_addr));
+ }
+ }
+
+ wire_buf_trim(ctx->payload.wire_buf, trimmed);
+ }
+
+ tcp->had_data = true;
+
+ return protolayer_continue(ctx);
+}
+
+static enum protolayer_cb_result pl_tcp_wrap(struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+{
+ return protolayer_push(ctx);
+}
+
+
+void io_protolayers_init()
+{
+ protolayer_globals[PROTOLAYER_UDP] = (struct protolayer_globals){
+ .iter_size = sizeof(struct pl_udp_iter_data),
+ .iter_init = pl_udp_iter_init,
+ .unwrap = pl_udp_unwrap,
+ .wrap = pl_udp_wrap
+ };
+
+ protolayer_globals[PROTOLAYER_TCP] = (struct protolayer_globals){
+ .sess_size = sizeof(struct pl_tcp_sess_data),
+ .sess_init = pl_tcp_sess_init,
+ .sess_deinit = pl_tcp_sess_deinit,
+ .unwrap = pl_tcp_unwrap,
+ .wrap = pl_tcp_wrap
+ };
+}
+
+
int io_bind(const struct sockaddr *addr, int type, const endpoint_flags_t *flags)
{
const int fd = socket(addr->sa_family, type, 0);
@@ -265,12 +515,11 @@ int io_listen_udp(uv_loop_t *loop, uv_udp_t *handle, int fd)
uv_handle_t *h = (uv_handle_t *)handle;
check_bufsize(h);
/* Handle is already created, just create context. */
- struct session *s = session_new(h, false, false);
+ struct session2 *s = session2_new_io(h, PROTOLAYER_GRP_DOUDP, false);
kr_require(s);
- session_flags(s)->outgoing = false;
int socklen = sizeof(union kr_sockaddr);
- ret = uv_udp_getsockname(handle, session_get_sockname(s), &socklen);
+ ret = uv_udp_getsockname(handle, &s->transport.io.sockname.ip, &socklen);
if (ret) {
kr_log_error(IO, "ERROR: getsockname failed: %s\n", uv_strerror(ret));
abort(); /* It might be nontrivial not to leak something here. */
@@ -279,70 +528,13 @@ int io_listen_udp(uv_loop_t *loop, uv_udp_t *handle, int fd)
return io_start_read(h);
}
-void tcp_timeout_trigger(uv_timer_t *timer)
-{
- struct session *s = timer->data;
-
- if (kr_fails_assert(!session_flags(s)->closing))
- return;
-
- if (!session_tasklist_is_empty(s)) {
- int finalized = session_tasklist_finalize_expired(s);
- the_worker->stats.timeout += finalized;
- /* session_tasklist_finalize_expired() may call worker_task_finalize().
- * If session is a source session and there were IO errors,
- * worker_task_finalize() can finalize all tasks and close session. */
- if (session_flags(s)->closing) {
- return;
- }
-
- }
- if (!session_tasklist_is_empty(s)) {
- uv_timer_stop(timer);
- session_timer_start(s, tcp_timeout_trigger,
- KR_RESOLVE_TIME_LIMIT / 2,
- KR_RESOLVE_TIME_LIMIT / 2);
- } else {
- /* Normally it should not happen,
- * but better to check if there anything in this list. */
- while (!session_waitinglist_is_empty(s)) {
- struct qr_task *t = session_waitinglist_pop(s, false);
- worker_task_finalize(t, KR_STATE_FAIL);
- worker_task_unref(t);
- the_worker->stats.timeout += 1;
- if (session_flags(s)->closing) {
- return;
- }
- }
- uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout;
- uint64_t last_activity = session_last_activity(s);
- uint64_t idle_time = kr_now() - last_activity;
- if (idle_time < idle_in_timeout) {
- idle_in_timeout -= idle_time;
- uv_timer_stop(timer);
- session_timer_start(s, tcp_timeout_trigger,
- idle_in_timeout, idle_in_timeout);
- } else {
- struct sockaddr *peer = session_get_peer(s);
- char *peer_str = kr_straddr(peer);
- kr_log_debug(IO, "=> closing connection to '%s'\n",
- peer_str ? peer_str : "");
- if (session_flags(s)->outgoing) {
- worker_del_tcp_waiting(peer);
- worker_del_tcp_connected(peer);
- }
- session_close(s);
- }
- }
-}
-
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
- struct session *s = handle->data;
- if (kr_fails_assert(s && session_get_handle(s) == (uv_handle_t *)handle && handle->type == UV_TCP))
+ struct session2 *s = handle->data;
+ if (kr_fails_assert(s && session2_get_handle(s) == (uv_handle_t *)handle && handle->type == UV_TCP))
return;
- if (session_flags(s)->closing) {
+ if (s->closing) {
return;
}
@@ -354,160 +546,117 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
if (nread < 0 || !buf->base) {
if (kr_log_is_debug(IO, NULL)) {
- struct sockaddr *peer = session_get_peer(s);
+ struct sockaddr *peer = session2_get_peer(s);
char *peer_str = kr_straddr(peer);
kr_log_debug(IO, "=> connection to '%s' closed by peer (%s)\n",
peer_str ? peer_str : "",
uv_strerror(nread));
}
worker_end_tcp(s);
+ session2_unwrap(s, protolayer_event_nd(PROTOLAYER_EVENT_FORCE_CLOSE),
+ NULL, NULL, NULL);
return;
}
- const uint8_t *data = (const uint8_t *)buf->base;
- ssize_t data_len = nread;
- const struct sockaddr *src_addr = session_get_peer(s);
- const struct sockaddr *dst_addr = NULL;
- if (!session_flags(s)->outgoing && !session_flags(s)->no_proxy &&
- proxy_header_present(data, data_len)) {
- if (!proxy_allowed(src_addr)) {
- if (kr_log_is_debug(IO, NULL)) {
- kr_log_debug(IO, "<= connection to '%s': PROXYv2 not allowed "
- "for this peer, close\n",
- kr_straddr(src_addr));
- }
- worker_end_tcp(s);
- return;
- }
-
- struct proxy_result *proxy = session_proxy_create(s);
- ssize_t trimmed = proxy_process_header(proxy, s, data, data_len);
- if (trimmed < 0) {
- if (kr_log_is_debug(IO, NULL)) {
- if (trimmed == KNOT_EMALF) {
- kr_log_debug(IO, "<= connection to '%s': "
- "malformed PROXYv2 header, close\n",
- kr_straddr(src_addr));
- } else {
- kr_log_debug(IO, "<= connection to '%s': "
- "error processing PROXYv2 header, close\n",
- kr_straddr(src_addr));
- }
- }
- worker_end_tcp(s);
- return;
- } else if (trimmed == 0) {
- return;
- }
-
- if (proxy->command != PROXY2_CMD_LOCAL && proxy->family != AF_UNSPEC) {
- src_addr = &proxy->src_addr.ip;
- dst_addr = &proxy->dst_addr.ip;
-
- if (kr_log_is_debug(IO, NULL)) {
- kr_log_debug(IO, "<= TCP stream from '%s'\n",
- kr_straddr(src_addr));
- kr_log_debug(IO, "<= proxied through '%s'\n",
- kr_straddr(session_get_peer(s)));
- }
- }
-
- data = session_wirebuf_get_free_start(s);
- data_len = nread - trimmed;
- }
-
- session_flags(s)->no_proxy = true;
-
- ssize_t consumed = 0;
- if (session_flags(s)->has_tls) {
- /* buf->base points to start of the tls receive buffer.
- Decode data free space in session wire buffer. */
- consumed = tls_process_input_data(s, data, data_len);
- if (consumed < 0) {
- if (kr_log_is_debug(IO, NULL)) {
- char *peer_str = kr_straddr(src_addr);
- kr_log_debug(IO, "=> connection to '%s': "
- "error processing TLS data, close\n",
- peer_str ? peer_str : "");
- }
- worker_end_tcp(s);
- return;
- } else if (consumed == 0) {
- return;
- }
- data = session_wirebuf_get_free_start(s);
- data_len = consumed;
- }
-#if ENABLE_DOH2
- int streaming = 1;
- if (session_flags(s)->has_http) {
- streaming = http_process_input_data(s, data, data_len,
- &consumed);
- if (streaming < 0) {
- if (kr_log_is_debug(IO, NULL)) {
- char *peer_str = kr_straddr(src_addr);
- kr_log_debug(IO, "=> connection to '%s': "
- "error processing HTTP data, close\n",
- peer_str ? peer_str : "");
- }
- worker_end_tcp(s);
- return;
- }
- if (consumed == 0) {
- return;
- }
- data = session_wirebuf_get_free_start(s);
- data_len = consumed;
- }
-#endif
-
- /* data points to start of the free space in session wire buffer.
- Simple increase internal counter. */
- consumed = session_wirebuf_consume(s, data, data_len);
- kr_assert(consumed == data_len);
-
- struct io_comm_data comm = {
- .src_addr = src_addr,
- .comm_addr = session_get_peer(s),
- .dst_addr = dst_addr,
- .proxy = session_proxy_get(s)
- };
- int ret = session_wirebuf_process(s, &comm);
- if (ret < 0) {
- /* An error has occurred, close the session. */
- worker_end_tcp(s);
- }
- session_wirebuf_compress(s);
- mp_flush(the_worker->pkt_pool.ctx);
-#if ENABLE_DOH2
- if (session_flags(s)->has_http && streaming == 0 && ret == 0) {
- ret = http_send_status(s, HTTP_STATUS_BAD_REQUEST);
- if (ret) {
- /* An error has occurred, close the session. */
- worker_end_tcp(s);
- }
+ int ret = wire_buf_consume(&s->wire_buf, nread);
+ if (ret) {
+ wire_buf_reset(&s->wire_buf);
+ return;
}
-#endif
-}
-#if ENABLE_DOH2
-static ssize_t tls_send(const uint8_t *buf, const size_t len, struct session *session)
-{
- struct tls_ctx *ctx = session_tls_get_server_ctx(session);
- ssize_t sent = 0;
- kr_require(ctx);
-
- sent = gnutls_record_send(ctx->c.tls_session, buf, len);
- if (sent < 0) {
- kr_log_debug(DOH, "gnutls_record_send failed: %s (%zd)\n",
- gnutls_strerror_name(sent), sent);
- return kr_error(EIO);
- }
- return sent;
+ session2_unwrap(s, protolayer_wire_buf(&s->wire_buf), NULL, NULL, NULL);
+
+// ssize_t consumed = 0;
+// if (session_flags(s)->has_tls) {
+// /* buf->base points to start of the tls receive buffer.
+// Decode data free space in session wire buffer. */
+// consumed = tls_process_input_data(s, data, data_len);
+// if (consumed < 0) {
+// if (kr_log_is_debug(IO, NULL)) {
+// char *peer_str = kr_straddr(src_addr);
+// kr_log_debug(IO, "=> connection to '%s': "
+// "error processing TLS data, close\n",
+// peer_str ? peer_str : "");
+// }
+// worker_end_tcp(s);
+// return;
+// } else if (consumed == 0) {
+// return;
+// }
+// data = session_wirebuf_get_free_start(s);
+// data_len = consumed;
+// }
+//#if ENABLE_DOH2
+// int streaming = 1;
+// if (session_flags(s)->has_http) {
+// streaming = http_process_input_data(s, data, data_len,
+// &consumed);
+// if (streaming < 0) {
+// if (kr_log_is_debug(IO, NULL)) {
+// char *peer_str = kr_straddr(src_addr);
+// kr_log_debug(IO, "=> connection to '%s': "
+// "error processing HTTP data, close\n",
+// peer_str ? peer_str : "");
+// }
+// worker_end_tcp(s);
+// return;
+// }
+// if (consumed == 0) {
+// return;
+// }
+// data = session_wirebuf_get_free_start(s);
+// data_len = consumed;
+// }
+//#endif
+//
+// /* data points to start of the free space in session wire buffer.
+// Simple increase internal counter. */
+// consumed = session_wirebuf_consume(s, data, data_len);
+// kr_assert(consumed == data_len);
+//
+// struct io_comm_data comm = {
+// .src_addr = src_addr,
+// .comm_addr = session_get_peer(s),
+// .dst_addr = dst_addr,
+// .proxy = session_proxy_get(s)
+// };
+// int ret = session_wirebuf_process(s, &comm);
+// if (ret < 0) {
+// /* An error has occurred, close the session. */
+// worker_end_tcp(s);
+// }
+// session_wirebuf_compress(s);
+// mp_flush(the_worker->pkt_pool.ctx);
+//#if ENABLE_DOH2
+// if (session_flags(s)->has_http && streaming == 0 && ret == 0) {
+// ret = http_send_status(s, HTTP_STATUS_BAD_REQUEST);
+// if (ret) {
+// /* An error has occurred, close the session. */
+// worker_end_tcp(s);
+// }
+// }
+//#endif
}
-#endif
-static void _tcp_accept(uv_stream_t *master, int status, bool tls, bool http)
+/* TODO: http */
+//#if ENABLE_DOH2
+//static ssize_t tls_send(const uint8_t *buf, const size_t len, struct session *session)
+//{
+// struct tls_ctx *ctx = session_tls_get_server_ctx(session);
+// ssize_t sent = 0;
+// kr_require(ctx);
+//
+// sent = gnutls_record_send(ctx->c.tls_session, buf, len);
+// if (sent < 0) {
+// kr_log_debug(DOH, "gnutls_record_send failed: %s (%zd)\n",
+// gnutls_strerror_name(sent), sent);
+// return kr_error(EIO);
+// }
+// return sent;
+//}
+//#endif
+
+static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp)
{
if (status != 0) {
return;
@@ -518,7 +667,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls, bool http)
return;
}
int res = io_create(master->loop, (uv_handle_t *)client,
- SOCK_STREAM, AF_UNSPEC, tls, http);
+ SOCK_STREAM, AF_UNSPEC, grp, false);
if (res) {
if (res == UV_EMFILE) {
the_worker->too_many_open = true;
@@ -532,31 +681,37 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls, bool http)
}
/* struct session was allocated \ borrowed from memory pool. */
- struct session *s = client->data;
- kr_require(session_flags(s)->outgoing == false);
- kr_require(session_flags(s)->has_tls == tls);
+ struct session2 *s = client->data;
+ kr_require(s->outgoing == false);
+// kr_require(s->secure == tls); /* TODO */
if (uv_accept(master, (uv_stream_t *)client) != 0) {
/* close session, close underlying uv handles and
* deallocate (or return to memory pool) memory. */
- session_close(s);
+ session2_unwrap(s,
+ protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
+ NULL, NULL, NULL);
return;
}
/* Get peer's and our address. We apparently get specific sockname here
* even if we listened on a wildcard address. */
- struct sockaddr *sa = session_get_peer(s);
+ struct sockaddr *sa = session2_get_peer(s);
int sa_len = sizeof(struct sockaddr_in6);
int ret = uv_tcp_getpeername(client, sa, &sa_len);
if (ret || sa->sa_family == AF_UNSPEC) {
- session_close(s);
+ session2_unwrap(s,
+ protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
+ NULL, NULL, NULL);
return;
}
- sa = session_get_sockname(s);
+ sa = session2_get_sockname(s);
sa_len = sizeof(struct sockaddr_in6);
ret = uv_tcp_getsockname(client, sa, &sa_len);
if (ret || sa->sa_family == AF_UNSPEC) {
- session_close(s);
+ session2_unwrap(s,
+ protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
+ NULL, NULL, NULL);
return;
}
@@ -567,77 +722,78 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls, bool http)
uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout;
uint64_t timeout = KR_CONN_RTT_MAX / 2;
- if (tls) {
- timeout += TLS_MAX_HANDSHAKE_TIME;
- struct tls_ctx *ctx = session_tls_get_server_ctx(s);
- if (!ctx) {
- ctx = tls_new();
- if (!ctx) {
- session_close(s);
- return;
- }
- ctx->c.session = s;
- ctx->c.handshake_state = TLS_HS_IN_PROGRESS;
-
- /* Configure ALPN. */
- gnutls_datum_t proto;
- if (!http) {
- proto.data = (unsigned char *)"dot";
- proto.size = 3;
- } else {
- proto.data = (unsigned char *)"h2";
- proto.size = 2;
- }
- unsigned int flags = 0;
-#if GNUTLS_VERSION_NUMBER >= 0x030500
- /* Mandatory ALPN means the protocol must match if and
- * only if ALPN extension is used by the client. */
- flags |= GNUTLS_ALPN_MANDATORY;
-#endif
- ret = gnutls_alpn_set_protocols(ctx->c.tls_session, &proto, 1, flags);
- if (ret != GNUTLS_E_SUCCESS) {
- session_close(s);
- return;
- }
-
- session_tls_set_server_ctx(s, ctx);
- }
- }
-#if ENABLE_DOH2
- if (http) {
- struct http_ctx *ctx = session_http_get_server_ctx(s);
- if (!ctx) {
- if (!tls) { /* Plain HTTP is not supported. */
- session_close(s);
- return;
- }
- ctx = http_new(s, tls_send);
- if (!ctx) {
- session_close(s);
- return;
- }
- session_http_set_server_ctx(s, ctx);
- }
- }
-#endif
- session_timer_start(s, tcp_timeout_trigger, timeout, idle_in_timeout);
+ /* TODO: tls, http */
+// if (tls) {
+// timeout += TLS_MAX_HANDSHAKE_TIME;
+// struct tls_ctx *ctx = session_tls_get_server_ctx(s);
+// if (!ctx) {
+// ctx = tls_new();
+// if (!ctx) {
+// session_close(s);
+// return;
+// }
+// ctx->c.session = s;
+// ctx->c.handshake_state = TLS_HS_IN_PROGRESS;
+//
+// /* Configure ALPN. */
+// gnutls_datum_t proto;
+// if (!http) {
+// proto.data = (unsigned char *)"dot";
+// proto.size = 3;
+// } else {
+// proto.data = (unsigned char *)"h2";
+// proto.size = 2;
+// }
+// unsigned int flags = 0;
+//#if GNUTLS_VERSION_NUMBER >= 0x030500
+// /* Mandatory ALPN means the protocol must match if and
+// * only if ALPN extension is used by the client. */
+// flags |= GNUTLS_ALPN_MANDATORY;
+//#endif
+// ret = gnutls_alpn_set_protocols(ctx->c.tls_session, &proto, 1, flags);
+// if (ret != GNUTLS_E_SUCCESS) {
+// session_close(s);
+// return;
+// }
+//
+// session_tls_set_server_ctx(s, ctx);
+// }
+// }
+//#if ENABLE_DOH2
+// if (http) {
+// struct http_ctx *ctx = session_http_get_server_ctx(s);
+// if (!ctx) {
+// if (!tls) { /* Plain HTTP is not supported. */
+// session_close(s);
+// return;
+// }
+// ctx = http_new(s, tls_send);
+// if (!ctx) {
+// session_close(s);
+// return;
+// }
+// session_http_set_server_ctx(s, ctx);
+// }
+// }
+//#endif
+ session2_timer_start(s, timeout, idle_in_timeout, PROTOLAYER_UNWRAP);
io_start_read((uv_handle_t *)client);
}
static void tcp_accept(uv_stream_t *master, int status)
{
- _tcp_accept(master, status, false, false);
+ _tcp_accept(master, status, PROTOLAYER_GRP_DOTCP);
}
static void tls_accept(uv_stream_t *master, int status)
{
- _tcp_accept(master, status, true, false);
+ _tcp_accept(master, status, PROTOLAYER_GRP_DOTLS);
}
#if ENABLE_DOH2
static void https_accept(uv_stream_t *master, int status)
{
- _tcp_accept(master, status, true, true);
+ _tcp_accept(master, status, PROTOLAYER_GRP_DOHTTPS);
}
#endif
@@ -933,151 +1089,152 @@ int io_listen_pipe(uv_loop_t *loop, uv_pipe_t *handle, int fd)
return 0;
}
-#if ENABLE_XDP
-static void xdp_rx(uv_poll_t* handle, int status, int events)
-{
- const int XDP_RX_BATCH_SIZE = 64;
- if (status < 0) {
- kr_log_error(XDP, "poll status %d: %s\n", status, uv_strerror(status));
- return;
- }
- if (events != UV_READABLE) {
- kr_log_error(XDP, "poll unexpected events: %d\n", events);
- return;
- }
-
- xdp_handle_data_t *xhd = handle->data;
- kr_require(xhd && xhd->session && xhd->socket);
- uint32_t rcvd;
- knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE];
- int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd
- #if KNOT_VERSION_HEX >= 0x030100
- , NULL
- #endif
- );
-
- if (kr_fails_assert(ret == KNOT_EOK)) {
- /* ATM other error codes can only be returned when called incorrectly */
- kr_log_error(XDP, "knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret));
- return;
- }
- kr_log_debug(XDP, "poll triggered, processing a batch of %d packets\n", (int)rcvd);
- kr_require(rcvd <= XDP_RX_BATCH_SIZE);
- for (int i = 0; i < rcvd; ++i) {
- const knot_xdp_msg_t *msg = &msgs[i];
- kr_require(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE);
- knot_pkt_t *kpkt = knot_pkt_new(msg->payload.iov_base, msg->payload.iov_len,
- &the_worker->pkt_pool);
- if (kpkt == NULL) {
- ret = kr_error(ENOMEM);
- } else {
- struct io_comm_data comm = {
- .src_addr = (const struct sockaddr *)&msg->ip_from,
- .comm_addr = (const struct sockaddr *)&msg->ip_from,
- .dst_addr = (const struct sockaddr *)&msg->ip_to
- };
- ret = worker_submit(xhd->session, &comm,
- msg->eth_from, msg->eth_to, kpkt);
- }
- if (ret)
- kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret));
- mp_flush(the_worker->pkt_pool.ctx);
- }
- knot_xdp_recv_finish(xhd->socket, msgs, rcvd);
-}
-/// Warn if the XDP program is running in emulated mode (XDP_SKB)
-static void xdp_warn_mode(const char *ifname)
-{
- if (kr_fails_assert(ifname))
- return;
-
- const unsigned if_index = if_nametoindex(ifname);
- if (!if_index) {
- kr_log_warning(XDP, "warning: interface %s, unexpected error when converting its name: %s\n",
- ifname, strerror(errno));
- return;
- }
-
- const knot_xdp_mode_t mode = knot_eth_xdp_mode(if_index);
- switch (mode) {
- case KNOT_XDP_MODE_FULL:
- return;
- case KNOT_XDP_MODE_EMUL:
- kr_log_warning(XDP, "warning: interface %s running only with XDP emulation\n",
- ifname);
- return;
- case KNOT_XDP_MODE_NONE: // enum warnings from compiler
- break;
- }
- kr_log_warning(XDP, "warning: interface %s running in unexpected XDP mode %d\n",
- ifname, (int)mode);
-}
-int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname)
-{
- if (!ep || !ep->handle) {
- return kr_error(EINVAL);
- }
-
- // RLIMIT_MEMLOCK often needs raising when operating on BPF
- static int ret_limit = 1;
- if (ret_limit == 1) {
- struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY };
- ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit)
- ? kr_error(errno) : 0;
- }
- if (ret_limit) return ret_limit;
-
- xdp_handle_data_t *xhd = malloc(sizeof(*xhd));
- if (!xhd) return kr_error(ENOMEM);
-
- xhd->socket = NULL; // needed for some reason
-
- // This call is a libknot version hell, unfortunately.
- int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue,
- #if KNOT_VERSION_HEX < 0x030100
- ep->port ? ep->port : KNOT_XDP_LISTEN_PORT_ALL,
- KNOT_XDP_LOAD_BPF_MAYBE
- #elif KNOT_VERSION_HEX < 0x030200
- ep->port ? ep->port : (KNOT_XDP_LISTEN_PORT_PASS | 0),
- KNOT_XDP_LOAD_BPF_MAYBE
- #else
- KNOT_XDP_FILTER_UDP | (ep->port ? 0 : KNOT_XDP_FILTER_PASS),
- ep->port, 0/*quic_port*/,
- KNOT_XDP_LOAD_BPF_MAYBE,
- NULL/*xdp_config*/
- #endif
- );
-
- if (!ret) xdp_warn_mode(ifname);
-
- if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker);
- if (ret || kr_fails_assert(xhd->socket)) {
- free(xhd);
- return ret == 0 ? kr_error(EINVAL) : kr_error(ret);
- }
- xhd->tx_waker.data = xhd->socket;
-
- ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful
- ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd);
- if (ret) {
- knot_xdp_deinit(xhd->socket);
- free(xhd);
- return kr_error(ret);
- }
-
- // beware: this sets poll_handle->data
- xhd->session = session_new(ep->handle, false, false);
- kr_require(!session_flags(xhd->session)->outgoing);
- session_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there
-
- ep->handle->data = xhd;
- ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx);
- return ret;
-}
-#endif
-
-
-int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, bool has_tls, bool has_http)
+/* TODO: xdp */
+//#if ENABLE_XDP
+//static void xdp_rx(uv_poll_t* handle, int status, int events)
+//{
+// const int XDP_RX_BATCH_SIZE = 64;
+// if (status < 0) {
+// kr_log_error(XDP, "poll status %d: %s\n", status, uv_strerror(status));
+// return;
+// }
+// if (events != UV_READABLE) {
+// kr_log_error(XDP, "poll unexpected events: %d\n", events);
+// return;
+// }
+//
+// xdp_handle_data_t *xhd = handle->data;
+// kr_require(xhd && xhd->session && xhd->socket);
+// uint32_t rcvd;
+// knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE];
+// int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd
+// #if KNOT_VERSION_HEX >= 0x030100
+// , NULL
+// #endif
+// );
+//
+// if (kr_fails_assert(ret == KNOT_EOK)) {
+// /* ATM other error codes can only be returned when called incorrectly */
+// kr_log_error(XDP, "knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret));
+// return;
+// }
+// kr_log_debug(XDP, "poll triggered, processing a batch of %d packets\n", (int)rcvd);
+// kr_require(rcvd <= XDP_RX_BATCH_SIZE);
+// for (int i = 0; i < rcvd; ++i) {
+// const knot_xdp_msg_t *msg = &msgs[i];
+// kr_require(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE);
+// knot_pkt_t *kpkt = knot_pkt_new(msg->payload.iov_base, msg->payload.iov_len,
+// &the_worker->pkt_pool);
+// if (kpkt == NULL) {
+// ret = kr_error(ENOMEM);
+// } else {
+// struct io_comm_data comm = {
+// .src_addr = (const struct sockaddr *)&msg->ip_from,
+// .comm_addr = (const struct sockaddr *)&msg->ip_from,
+// .dst_addr = (const struct sockaddr *)&msg->ip_to
+// };
+// ret = worker_submit(xhd->session, &comm,
+// msg->eth_from, msg->eth_to, kpkt);
+// }
+// if (ret)
+// kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret));
+// mp_flush(the_worker->pkt_pool.ctx);
+// }
+// knot_xdp_recv_finish(xhd->socket, msgs, rcvd);
+//}
+///// Warn if the XDP program is running in emulated mode (XDP_SKB)
+//static void xdp_warn_mode(const char *ifname)
+//{
+// if (kr_fails_assert(ifname))
+// return;
+//
+// const unsigned if_index = if_nametoindex(ifname);
+// if (!if_index) {
+// kr_log_warning(XDP, "warning: interface %s, unexpected error when converting its name: %s\n",
+// ifname, strerror(errno));
+// return;
+// }
+//
+// const knot_xdp_mode_t mode = knot_eth_xdp_mode(if_index);
+// switch (mode) {
+// case KNOT_XDP_MODE_FULL:
+// return;
+// case KNOT_XDP_MODE_EMUL:
+// kr_log_warning(XDP, "warning: interface %s running only with XDP emulation\n",
+// ifname);
+// return;
+// case KNOT_XDP_MODE_NONE: // enum warnings from compiler
+// break;
+// }
+// kr_log_warning(XDP, "warning: interface %s running in unexpected XDP mode %d\n",
+// ifname, (int)mode);
+//}
+//int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname)
+//{
+// if (!ep || !ep->handle) {
+// return kr_error(EINVAL);
+// }
+//
+// // RLIMIT_MEMLOCK often needs raising when operating on BPF
+// static int ret_limit = 1;
+// if (ret_limit == 1) {
+// struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY };
+// ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit)
+// ? kr_error(errno) : 0;
+// }
+// if (ret_limit) return ret_limit;
+//
+// xdp_handle_data_t *xhd = malloc(sizeof(*xhd));
+// if (!xhd) return kr_error(ENOMEM);
+//
+// xhd->socket = NULL; // needed for some reason
+//
+// // This call is a libknot version hell, unfortunately.
+// int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue,
+// #if KNOT_VERSION_HEX < 0x030100
+// ep->port ? ep->port : KNOT_XDP_LISTEN_PORT_ALL,
+// KNOT_XDP_LOAD_BPF_MAYBE
+// #elif KNOT_VERSION_HEX < 0x030200
+// ep->port ? ep->port : (KNOT_XDP_LISTEN_PORT_PASS | 0),
+// KNOT_XDP_LOAD_BPF_MAYBE
+// #else
+// KNOT_XDP_FILTER_UDP | (ep->port ? 0 : KNOT_XDP_FILTER_PASS),
+// ep->port, 0/*quic_port*/,
+// KNOT_XDP_LOAD_BPF_MAYBE,
+// NULL/*xdp_config*/
+// #endif
+// );
+//
+// if (!ret) xdp_warn_mode(ifname);
+//
+// if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker);
+// if (ret || kr_fails_assert(xhd->socket)) {
+// free(xhd);
+// return ret == 0 ? kr_error(EINVAL) : kr_error(ret);
+// }
+// xhd->tx_waker.data = xhd->socket;
+//
+// ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful
+// ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd);
+// if (ret) {
+// knot_xdp_deinit(xhd->socket);
+// free(xhd);
+// return kr_error(ret);
+// }
+//
+// // beware: this sets poll_handle->data
+// xhd->session = session_new(ep->handle, false, false);
+// kr_require(!session_flags(xhd->session)->outgoing);
+// session_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there
+//
+// ep->handle->data = xhd;
+// ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx);
+// return ret;
+//}
+//#endif
+
+int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family,
+ enum protolayer_grp grp, bool outgoing)
{
int ret = -1;
if (type == SOCK_DGRAM) {
@@ -1089,7 +1246,7 @@ int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, b
if (ret != 0) {
return ret;
}
- struct session *s = session_new(handle, has_tls, has_http);
+ struct session2 *s = session2_new_io(handle, grp, outgoing);
if (s == NULL) {
ret = -1;
}
@@ -1102,13 +1259,13 @@ static void io_deinit(uv_handle_t *handle)
return;
}
if (handle->type != UV_POLL) {
- session_free(handle->data);
+ session2_free(handle->data);
} else {
#if ENABLE_XDP
xdp_handle_data_t *xhd = handle->data;
uv_idle_stop(&xhd->tx_waker);
uv_close((uv_handle_t *)&xhd->tx_waker, NULL);
- session_free(xhd->session);
+ session2_free(xhd->session);
knot_xdp_deinit(xhd->socket);
free(xhd);
#else