diff options
author | Oto Šťáva <oto.stava@nic.cz> | 2022-08-02 10:53:38 +0200 |
---|---|---|
committer | Oto Šťáva <oto.stava@nic.cz> | 2023-01-26 12:56:07 +0100 |
commit | 5501d84bb244f55ef8ef06be95456b3b4f516fa3 (patch) | |
tree | 6fd2b2d6e44a3ff308bbbae44a599dcbe4de8c10 /daemon/io.c | |
parent | session2: protocol layer API (diff) | |
download | knot-resolver-5501d84bb244f55ef8ef06be95456b3b4f516fa3.tar.xz knot-resolver-5501d84bb244f55ef8ef06be95456b3b4f516fa3.zip |
daemon: basic implementation of TCP and UDP with protolayers
Diffstat (limited to 'daemon/io.c')
-rw-r--r-- | daemon/io.c | 1139 |
1 files changed, 648 insertions, 491 deletions
diff --git a/daemon/io.c b/daemon/io.c index 66ad03da..c9fcc0eb 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -21,7 +21,7 @@ #include "daemon/worker.h" #include "daemon/tls.h" #include "daemon/http.h" -#include "daemon/session.h" +#include "daemon/session2.h" #include "contrib/cleanup.h" #include "lib/utils.h" @@ -40,9 +40,9 @@ static void check_bufsize(uv_handle_t* handle) * This is magic presuming we can pull in a whole recvmmsg width in one wave. * Linux will double this the bufsize wanted. */ - const int bufsize_want = 2 * sizeof(the_worker->wire_buf) ; - negotiate_bufsize(uv_recv_buffer_size, handle, bufsize_want); - negotiate_bufsize(uv_send_buffer_size, handle, bufsize_want); + const int BUF_SIZE = 2 * sizeof(RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE); + negotiate_bufsize(uv_recv_buffer_size, handle, BUF_SIZE); + negotiate_bufsize(uv_send_buffer_size, handle, BUF_SIZE); } #undef negotiate_bufsize @@ -57,26 +57,26 @@ static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* * guaranteed to be unchanged only for the duration of * udp_read() and tcp_read(). */ - struct session *s = handle->data; - if (!session_flags(s)->has_tls) { - buf->base = (char *) session_wirebuf_get_free_start(s); - buf->len = session_wirebuf_get_free_size(s); - } else { - struct tls_common_ctx *ctx = session_tls_get_common_ctx(s); - buf->base = (char *) ctx->recv_buf; - buf->len = sizeof(ctx->recv_buf); - } + struct session2 *s = handle->data; + buf->base = wire_buf_free_space(&s->wire_buf); + buf->len = wire_buf_free_space_length(&s->wire_buf); +} + +static void udp_on_unwrapped(int status, struct session2 *session, + const void *target, void *baton) +{ + wire_buf_reset(&session->wire_buf); } void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *comm_addr, unsigned flags) { - struct session *s = handle->data; - if (session_flags(s)->closing || nread <= 0 || comm_addr->sa_family == AF_UNSPEC) + struct session2 *s = handle->data; + if (s->closing || nread <= 0 || comm_addr->sa_family == AF_UNSPEC) return; - if (session_flags(s)->outgoing) { - const struct sockaddr *peer = session_get_peer(s); + if (s->outgoing) { + const struct sockaddr *peer = session2_get_peer(s); if (kr_fails_assert(peer->sa_family != AF_UNSPEC)) return; if (kr_sockaddr_cmp(peer, comm_addr) != 0) { @@ -86,64 +86,16 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, } } - const uint8_t *data = (const uint8_t *)buf->base; - ssize_t data_len = nread; - const struct sockaddr *src_addr = comm_addr; - const struct sockaddr *dst_addr = NULL; - struct proxy_result proxy; - bool has_proxy = false; - if (!session_flags(s)->outgoing && proxy_header_present(data, data_len)) { - if (!proxy_allowed(comm_addr)) { - kr_log_debug(IO, "<= ignoring PROXYv2 UDP from disallowed address '%s'\n", - kr_straddr(comm_addr)); - return; - } - - ssize_t trimmed = proxy_process_header(&proxy, s, data, data_len); - if (trimmed == KNOT_EMALF) { - if (kr_log_is_debug(IO, NULL)) { - kr_log_debug(IO, "<= ignoring malformed PROXYv2 UDP " - "from address '%s'\n", - kr_straddr(comm_addr)); - } - return; - } else if (trimmed < 0) { - if (kr_log_is_debug(IO, NULL)) { - kr_log_debug(IO, "<= error processing PROXYv2 UDP " - "from address '%s', ignoring\n", - kr_straddr(comm_addr)); - } - return; - } - - if (proxy.command == PROXY2_CMD_PROXY && proxy.family != AF_UNSPEC) { - has_proxy = true; - src_addr = &proxy.src_addr.ip; - dst_addr = &proxy.dst_addr.ip; - - if (kr_log_is_debug(IO, NULL)) { - kr_log_debug(IO, "<= UDP query from '%s'\n", - kr_straddr(src_addr)); - kr_log_debug(IO, "<= proxied through '%s'\n", - kr_straddr(comm_addr)); - } - } - data = session_wirebuf_get_free_start(s); - data_len = nread - trimmed; + int ret = wire_buf_consume(&s->wire_buf, nread); + if (ret) { + wire_buf_reset(&s->wire_buf); + return; } - ssize_t consumed = session_wirebuf_consume(s, data, data_len); - kr_assert(consumed == data_len); - - struct io_comm_data comm = { - .src_addr = src_addr, - .comm_addr = comm_addr, - .dst_addr = dst_addr, - .proxy = (has_proxy) ? &proxy : NULL - }; - session_wirebuf_process(s, &comm); - session_wirebuf_discard(s); - mp_flush(the_worker->pkt_pool.ctx); + ret = session2_unwrap(s, protolayer_wire_buf(&s->wire_buf), comm_addr, + udp_on_unwrapped, NULL); + if (ret) + wire_buf_reset(&s->wire_buf); } static int family_to_freebind_option(sa_family_t sa_family, int *level, int *name) @@ -180,6 +132,304 @@ static int family_to_freebind_option(sa_family_t sa_family, int *level, int *nam return kr_ok(); } + +struct pl_udp_iter_data { + struct proxy_result proxy; + bool has_proxy; +}; + +static int pl_udp_iter_init(struct protolayer_manager *manager, struct protolayer_data *layer) +{ + struct pl_udp_iter_data *udp = protolayer_iter_data(layer); + *udp = (struct pl_udp_iter_data){0}; + return 0; +} + +static enum protolayer_cb_result pl_udp_unwrap(struct protolayer_data *layer, struct protolayer_cb_ctx *ctx) +{ + if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) { + /* events should not happen in UDP (currently) */ + return protolayer_continue(ctx); + } + + ctx->payload = protolayer_as_buffer(&ctx->payload); + if (kr_fails_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER)) { + /* unsupported payload */ + return protolayer_break(ctx, kr_error(EINVAL)); + } + + struct session2 *s = ctx->manager->session; + struct pl_udp_iter_data *udp = protolayer_iter_data(layer); + + char *data = ctx->payload.buffer.buf; + ssize_t data_len = ctx->payload.buffer.len; + struct comm_info *comm = &ctx->comm; + comm->comm_addr = ctx->target; + comm->src_addr = ctx->target; + if (!s->outgoing && proxy_header_present(data, data_len)) { + if (!proxy_allowed(comm->comm_addr)) { + kr_log_debug(IO, "<= ignoring PROXYv2 UDP from disallowed address '%s'\n", + kr_straddr(comm->comm_addr)); + return protolayer_break(ctx, kr_error(EPERM)); + } + + ssize_t trimmed = proxy_process_header(&udp->proxy, data, data_len); + if (trimmed == KNOT_EMALF) { + if (kr_log_is_debug(IO, NULL)) { + kr_log_debug(IO, "<= ignoring malformed PROXYv2 UDP " + "from address '%s'\n", + kr_straddr(comm->comm_addr)); + } + return protolayer_break(ctx, kr_error(EINVAL)); + } else if (trimmed < 0) { + if (kr_log_is_debug(IO, NULL)) { + kr_log_debug(IO, "<= error processing PROXYv2 UDP " + "from address '%s', ignoring\n", + kr_straddr(comm->comm_addr)); + } + return protolayer_break(ctx, kr_error(EINVAL)); + } + + if (udp->proxy.command == PROXY2_CMD_PROXY && udp->proxy.family != AF_UNSPEC) { + udp->has_proxy = true; + + comm->src_addr = &udp->proxy.src_addr.ip; + comm->dst_addr = &udp->proxy.dst_addr.ip; + comm->proxy = &udp->proxy; + + if (kr_log_is_debug(IO, NULL)) { + kr_log_debug(IO, "<= UDP query from '%s'\n", + kr_straddr(comm->src_addr)); + kr_log_debug(IO, "<= proxied through '%s'\n", + kr_straddr(comm->comm_addr)); + } + } + + ctx->payload = protolayer_buffer(data + trimmed, data_len - trimmed); + } + + return protolayer_continue(ctx); +} + +static enum protolayer_cb_result pl_udp_wrap(struct protolayer_data *layer, struct protolayer_cb_ctx *ctx) +{ + return protolayer_push(ctx); +} + + +struct pl_tcp_sess_data { + struct proxy_result proxy; + struct wire_buf wire_buf; + bool had_data : 1; + bool has_proxy : 1; +}; + +static int pl_tcp_sess_init(struct protolayer_manager *manager, struct protolayer_data *layer) +{ + struct pl_tcp_sess_data *tcp = protolayer_sess_data(layer); + *tcp = (struct pl_tcp_sess_data){0}; + return 0; +} + +static int pl_tcp_sess_deinit(struct protolayer_manager *manager, struct protolayer_data *layer) +{ + struct pl_tcp_sess_data *tcp = protolayer_sess_data(layer); + wire_buf_deinit(&tcp->wire_buf); + return 0; +} + +static enum protolayer_cb_result pl_tcp_unwrap_timeout( + struct protolayer_data *layer, struct protolayer_cb_ctx *ctx) +{ + /* TODO - connecting timeout? */ + struct session2 *s = ctx->manager->session; + + if (kr_fails_assert(!s->closing)) + return protolayer_continue(ctx); + + if (!session2_tasklist_is_empty(s)) { + int finalized = session2_tasklist_finalize_expired(s); + the_worker->stats.timeout += finalized; + /* session2_tasklist_finalize_expired() may call worker_task_finalize(). + * If session is a source session and there were IO errors, + * worker_task_finalize() can finalize all tasks and close session. */ + if (s->closing) + return protolayer_continue(ctx); + } + + if (!session2_tasklist_is_empty(s)) { + session2_timer_stop(s); + session2_timer_start(s, + KR_RESOLVE_TIME_LIMIT / 2, + KR_RESOLVE_TIME_LIMIT / 2, + PROTOLAYER_UNWRAP); + } else { + /* Normally it should not happen, + * but better to check if there anything in this list. */ + while (!session2_waitinglist_is_empty(s)) { + struct qr_task *t = session2_waitinglist_pop(s, false); + worker_task_finalize(t, KR_STATE_FAIL); + worker_task_unref(t); + the_worker->stats.timeout += 1; + if (s->closing) + return protolayer_continue(ctx); + } + uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout; + uint64_t idle_time = kr_now() - s->last_activity; + if (idle_time < idle_in_timeout) { + idle_in_timeout -= idle_time; + session2_timer_stop(s); + session2_timer_start(s, + idle_in_timeout, idle_in_timeout, + PROTOLAYER_UNWRAP); + } else { + struct sockaddr *peer = session2_get_peer(s); + char *peer_str = kr_straddr(peer); + kr_log_debug(IO, "=> closing connection to '%s'\n", + peer_str ? peer_str : ""); + if (s->outgoing) { + worker_del_tcp_waiting(peer); + worker_del_tcp_connected(peer); + } + session2_unwrap(s, protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), NULL, NULL, NULL); + } + } + + return protolayer_continue(ctx); +} + +static enum protolayer_cb_result pl_tcp_unwrap(struct protolayer_data *layer, struct protolayer_cb_ctx *ctx) +{ + struct session2 *s = ctx->manager->session; + struct pl_tcp_sess_data *tcp = protolayer_sess_data(layer); + struct sockaddr *peer = session2_get_peer(s); + + if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) { + if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT) + return pl_tcp_unwrap_timeout(layer, ctx); + + /* pass thru */ + return protolayer_continue(ctx); + } + + if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) { + const char *buf = ctx->payload.buffer.buf; + const size_t len = ctx->payload.buffer.len; + + /* Copy a simple buffer into internal wirebuffer. */ + if (len > KNOT_WIRE_MAX_PKTSIZE) + return protolayer_break(ctx, kr_error(EMSGSIZE)); + + if (!tcp->wire_buf.buf) { + int ret = wire_buf_reserve(&tcp->wire_buf, + KNOT_WIRE_MAX_PKTSIZE); + if (ret) + return protolayer_break(ctx, ret); + } + + /* Try to make space */ + while (len > wire_buf_free_space_length(&tcp->wire_buf)) { + if (wire_buf_data_length(&tcp->wire_buf) > 0 || + tcp->wire_buf.start == 0) + return protolayer_break(ctx, kr_error(EMSGSIZE)); + + wire_buf_movestart(&tcp->wire_buf); + } + + memcpy(wire_buf_free_space(&tcp->wire_buf), buf, len); + wire_buf_consume(&tcp->wire_buf, ctx->payload.buffer.len); + ctx->payload = protolayer_wire_buf(&tcp->wire_buf); + } + + if (kr_fails_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF)) { + /* TODO: iovec support unimplemented */ + return protolayer_break(ctx, kr_error(EINVAL)); + } + + char *data = wire_buf_data(ctx->payload.wire_buf); /* layer's or session's wirebuf */ + ssize_t data_len = wire_buf_data_length(ctx->payload.wire_buf); + struct comm_info *comm = &ctx->comm; + comm->src_addr = peer; + comm->comm_addr = peer; + comm->dst_addr = NULL; + if (!s->outgoing && !tcp->had_data && proxy_header_present(data, data_len)) { + if (!proxy_allowed(comm->src_addr)) { + if (kr_log_is_debug(IO, NULL)) { + kr_log_debug(IO, "<= connection to '%s': PROXYv2 not allowed " + "for this peer, close\n", + kr_straddr(peer)); + } + worker_end_tcp(s); + ctx->payload = protolayer_event_nd(PROTOLAYER_EVENT_CLOSE); + return protolayer_push(ctx); + } + + ssize_t trimmed = proxy_process_header(&tcp->proxy, data, data_len); + if (trimmed < 0) { + if (kr_log_is_debug(IO, NULL)) { + if (trimmed == KNOT_EMALF) { + kr_log_debug(IO, "<= connection to '%s': " + "malformed PROXYv2 header, close\n", + kr_straddr(comm->src_addr)); + } else { + kr_log_debug(IO, "<= connection to '%s': " + "error processing PROXYv2 header, close\n", + kr_straddr(comm->src_addr)); + } + } + worker_end_tcp(s); + ctx->payload = protolayer_event_nd(PROTOLAYER_EVENT_CLOSE); + return protolayer_push(ctx); + } else if (trimmed == 0) { + ctx->payload = protolayer_event_nd(PROTOLAYER_EVENT_CLOSE); + return protolayer_push(ctx); + } + + if (tcp->proxy.command != PROXY2_CMD_LOCAL && tcp->proxy.family != AF_UNSPEC) { + comm->src_addr = &tcp->proxy.src_addr.ip; + comm->dst_addr = &tcp->proxy.dst_addr.ip; + + if (kr_log_is_debug(IO, NULL)) { + kr_log_debug(IO, "<= TCP stream from '%s'\n", + kr_straddr(comm->src_addr)); + kr_log_debug(IO, "<= proxied through '%s'\n", + kr_straddr(comm->comm_addr)); + } + } + + wire_buf_trim(ctx->payload.wire_buf, trimmed); + } + + tcp->had_data = true; + + return protolayer_continue(ctx); +} + +static enum protolayer_cb_result pl_tcp_wrap(struct protolayer_data *layer, struct protolayer_cb_ctx *ctx) +{ + return protolayer_push(ctx); +} + + +void io_protolayers_init() +{ + protolayer_globals[PROTOLAYER_UDP] = (struct protolayer_globals){ + .iter_size = sizeof(struct pl_udp_iter_data), + .iter_init = pl_udp_iter_init, + .unwrap = pl_udp_unwrap, + .wrap = pl_udp_wrap + }; + + protolayer_globals[PROTOLAYER_TCP] = (struct protolayer_globals){ + .sess_size = sizeof(struct pl_tcp_sess_data), + .sess_init = pl_tcp_sess_init, + .sess_deinit = pl_tcp_sess_deinit, + .unwrap = pl_tcp_unwrap, + .wrap = pl_tcp_wrap + }; +} + + int io_bind(const struct sockaddr *addr, int type, const endpoint_flags_t *flags) { const int fd = socket(addr->sa_family, type, 0); @@ -265,12 +515,11 @@ int io_listen_udp(uv_loop_t *loop, uv_udp_t *handle, int fd) uv_handle_t *h = (uv_handle_t *)handle; check_bufsize(h); /* Handle is already created, just create context. */ - struct session *s = session_new(h, false, false); + struct session2 *s = session2_new_io(h, PROTOLAYER_GRP_DOUDP, false); kr_require(s); - session_flags(s)->outgoing = false; int socklen = sizeof(union kr_sockaddr); - ret = uv_udp_getsockname(handle, session_get_sockname(s), &socklen); + ret = uv_udp_getsockname(handle, &s->transport.io.sockname.ip, &socklen); if (ret) { kr_log_error(IO, "ERROR: getsockname failed: %s\n", uv_strerror(ret)); abort(); /* It might be nontrivial not to leak something here. */ @@ -279,70 +528,13 @@ int io_listen_udp(uv_loop_t *loop, uv_udp_t *handle, int fd) return io_start_read(h); } -void tcp_timeout_trigger(uv_timer_t *timer) -{ - struct session *s = timer->data; - - if (kr_fails_assert(!session_flags(s)->closing)) - return; - - if (!session_tasklist_is_empty(s)) { - int finalized = session_tasklist_finalize_expired(s); - the_worker->stats.timeout += finalized; - /* session_tasklist_finalize_expired() may call worker_task_finalize(). - * If session is a source session and there were IO errors, - * worker_task_finalize() can finalize all tasks and close session. */ - if (session_flags(s)->closing) { - return; - } - - } - if (!session_tasklist_is_empty(s)) { - uv_timer_stop(timer); - session_timer_start(s, tcp_timeout_trigger, - KR_RESOLVE_TIME_LIMIT / 2, - KR_RESOLVE_TIME_LIMIT / 2); - } else { - /* Normally it should not happen, - * but better to check if there anything in this list. */ - while (!session_waitinglist_is_empty(s)) { - struct qr_task *t = session_waitinglist_pop(s, false); - worker_task_finalize(t, KR_STATE_FAIL); - worker_task_unref(t); - the_worker->stats.timeout += 1; - if (session_flags(s)->closing) { - return; - } - } - uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout; - uint64_t last_activity = session_last_activity(s); - uint64_t idle_time = kr_now() - last_activity; - if (idle_time < idle_in_timeout) { - idle_in_timeout -= idle_time; - uv_timer_stop(timer); - session_timer_start(s, tcp_timeout_trigger, - idle_in_timeout, idle_in_timeout); - } else { - struct sockaddr *peer = session_get_peer(s); - char *peer_str = kr_straddr(peer); - kr_log_debug(IO, "=> closing connection to '%s'\n", - peer_str ? peer_str : ""); - if (session_flags(s)->outgoing) { - worker_del_tcp_waiting(peer); - worker_del_tcp_connected(peer); - } - session_close(s); - } - } -} - static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { - struct session *s = handle->data; - if (kr_fails_assert(s && session_get_handle(s) == (uv_handle_t *)handle && handle->type == UV_TCP)) + struct session2 *s = handle->data; + if (kr_fails_assert(s && session2_get_handle(s) == (uv_handle_t *)handle && handle->type == UV_TCP)) return; - if (session_flags(s)->closing) { + if (s->closing) { return; } @@ -354,160 +546,117 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) if (nread < 0 || !buf->base) { if (kr_log_is_debug(IO, NULL)) { - struct sockaddr *peer = session_get_peer(s); + struct sockaddr *peer = session2_get_peer(s); char *peer_str = kr_straddr(peer); kr_log_debug(IO, "=> connection to '%s' closed by peer (%s)\n", peer_str ? peer_str : "", uv_strerror(nread)); } worker_end_tcp(s); + session2_unwrap(s, protolayer_event_nd(PROTOLAYER_EVENT_FORCE_CLOSE), + NULL, NULL, NULL); return; } - const uint8_t *data = (const uint8_t *)buf->base; - ssize_t data_len = nread; - const struct sockaddr *src_addr = session_get_peer(s); - const struct sockaddr *dst_addr = NULL; - if (!session_flags(s)->outgoing && !session_flags(s)->no_proxy && - proxy_header_present(data, data_len)) { - if (!proxy_allowed(src_addr)) { - if (kr_log_is_debug(IO, NULL)) { - kr_log_debug(IO, "<= connection to '%s': PROXYv2 not allowed " - "for this peer, close\n", - kr_straddr(src_addr)); - } - worker_end_tcp(s); - return; - } - - struct proxy_result *proxy = session_proxy_create(s); - ssize_t trimmed = proxy_process_header(proxy, s, data, data_len); - if (trimmed < 0) { - if (kr_log_is_debug(IO, NULL)) { - if (trimmed == KNOT_EMALF) { - kr_log_debug(IO, "<= connection to '%s': " - "malformed PROXYv2 header, close\n", - kr_straddr(src_addr)); - } else { - kr_log_debug(IO, "<= connection to '%s': " - "error processing PROXYv2 header, close\n", - kr_straddr(src_addr)); - } - } - worker_end_tcp(s); - return; - } else if (trimmed == 0) { - return; - } - - if (proxy->command != PROXY2_CMD_LOCAL && proxy->family != AF_UNSPEC) { - src_addr = &proxy->src_addr.ip; - dst_addr = &proxy->dst_addr.ip; - - if (kr_log_is_debug(IO, NULL)) { - kr_log_debug(IO, "<= TCP stream from '%s'\n", - kr_straddr(src_addr)); - kr_log_debug(IO, "<= proxied through '%s'\n", - kr_straddr(session_get_peer(s))); - } - } - - data = session_wirebuf_get_free_start(s); - data_len = nread - trimmed; - } - - session_flags(s)->no_proxy = true; - - ssize_t consumed = 0; - if (session_flags(s)->has_tls) { - /* buf->base points to start of the tls receive buffer. - Decode data free space in session wire buffer. */ - consumed = tls_process_input_data(s, data, data_len); - if (consumed < 0) { - if (kr_log_is_debug(IO, NULL)) { - char *peer_str = kr_straddr(src_addr); - kr_log_debug(IO, "=> connection to '%s': " - "error processing TLS data, close\n", - peer_str ? peer_str : ""); - } - worker_end_tcp(s); - return; - } else if (consumed == 0) { - return; - } - data = session_wirebuf_get_free_start(s); - data_len = consumed; - } -#if ENABLE_DOH2 - int streaming = 1; - if (session_flags(s)->has_http) { - streaming = http_process_input_data(s, data, data_len, - &consumed); - if (streaming < 0) { - if (kr_log_is_debug(IO, NULL)) { - char *peer_str = kr_straddr(src_addr); - kr_log_debug(IO, "=> connection to '%s': " - "error processing HTTP data, close\n", - peer_str ? peer_str : ""); - } - worker_end_tcp(s); - return; - } - if (consumed == 0) { - return; - } - data = session_wirebuf_get_free_start(s); - data_len = consumed; - } -#endif - - /* data points to start of the free space in session wire buffer. - Simple increase internal counter. */ - consumed = session_wirebuf_consume(s, data, data_len); - kr_assert(consumed == data_len); - - struct io_comm_data comm = { - .src_addr = src_addr, - .comm_addr = session_get_peer(s), - .dst_addr = dst_addr, - .proxy = session_proxy_get(s) - }; - int ret = session_wirebuf_process(s, &comm); - if (ret < 0) { - /* An error has occurred, close the session. */ - worker_end_tcp(s); - } - session_wirebuf_compress(s); - mp_flush(the_worker->pkt_pool.ctx); -#if ENABLE_DOH2 - if (session_flags(s)->has_http && streaming == 0 && ret == 0) { - ret = http_send_status(s, HTTP_STATUS_BAD_REQUEST); - if (ret) { - /* An error has occurred, close the session. */ - worker_end_tcp(s); - } + int ret = wire_buf_consume(&s->wire_buf, nread); + if (ret) { + wire_buf_reset(&s->wire_buf); + return; } -#endif -} -#if ENABLE_DOH2 -static ssize_t tls_send(const uint8_t *buf, const size_t len, struct session *session) -{ - struct tls_ctx *ctx = session_tls_get_server_ctx(session); - ssize_t sent = 0; - kr_require(ctx); - - sent = gnutls_record_send(ctx->c.tls_session, buf, len); - if (sent < 0) { - kr_log_debug(DOH, "gnutls_record_send failed: %s (%zd)\n", - gnutls_strerror_name(sent), sent); - return kr_error(EIO); - } - return sent; + session2_unwrap(s, protolayer_wire_buf(&s->wire_buf), NULL, NULL, NULL); + +// ssize_t consumed = 0; +// if (session_flags(s)->has_tls) { +// /* buf->base points to start of the tls receive buffer. +// Decode data free space in session wire buffer. */ +// consumed = tls_process_input_data(s, data, data_len); +// if (consumed < 0) { +// if (kr_log_is_debug(IO, NULL)) { +// char *peer_str = kr_straddr(src_addr); +// kr_log_debug(IO, "=> connection to '%s': " +// "error processing TLS data, close\n", +// peer_str ? peer_str : ""); +// } +// worker_end_tcp(s); +// return; +// } else if (consumed == 0) { +// return; +// } +// data = session_wirebuf_get_free_start(s); +// data_len = consumed; +// } +//#if ENABLE_DOH2 +// int streaming = 1; +// if (session_flags(s)->has_http) { +// streaming = http_process_input_data(s, data, data_len, +// &consumed); +// if (streaming < 0) { +// if (kr_log_is_debug(IO, NULL)) { +// char *peer_str = kr_straddr(src_addr); +// kr_log_debug(IO, "=> connection to '%s': " +// "error processing HTTP data, close\n", +// peer_str ? peer_str : ""); +// } +// worker_end_tcp(s); +// return; +// } +// if (consumed == 0) { +// return; +// } +// data = session_wirebuf_get_free_start(s); +// data_len = consumed; +// } +//#endif +// +// /* data points to start of the free space in session wire buffer. +// Simple increase internal counter. */ +// consumed = session_wirebuf_consume(s, data, data_len); +// kr_assert(consumed == data_len); +// +// struct io_comm_data comm = { +// .src_addr = src_addr, +// .comm_addr = session_get_peer(s), +// .dst_addr = dst_addr, +// .proxy = session_proxy_get(s) +// }; +// int ret = session_wirebuf_process(s, &comm); +// if (ret < 0) { +// /* An error has occurred, close the session. */ +// worker_end_tcp(s); +// } +// session_wirebuf_compress(s); +// mp_flush(the_worker->pkt_pool.ctx); +//#if ENABLE_DOH2 +// if (session_flags(s)->has_http && streaming == 0 && ret == 0) { +// ret = http_send_status(s, HTTP_STATUS_BAD_REQUEST); +// if (ret) { +// /* An error has occurred, close the session. */ +// worker_end_tcp(s); +// } +// } +//#endif } -#endif -static void _tcp_accept(uv_stream_t *master, int status, bool tls, bool http) +/* TODO: http */ +//#if ENABLE_DOH2 +//static ssize_t tls_send(const uint8_t *buf, const size_t len, struct session *session) +//{ +// struct tls_ctx *ctx = session_tls_get_server_ctx(session); +// ssize_t sent = 0; +// kr_require(ctx); +// +// sent = gnutls_record_send(ctx->c.tls_session, buf, len); +// if (sent < 0) { +// kr_log_debug(DOH, "gnutls_record_send failed: %s (%zd)\n", +// gnutls_strerror_name(sent), sent); +// return kr_error(EIO); +// } +// return sent; +//} +//#endif + +static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp) { if (status != 0) { return; @@ -518,7 +667,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls, bool http) return; } int res = io_create(master->loop, (uv_handle_t *)client, - SOCK_STREAM, AF_UNSPEC, tls, http); + SOCK_STREAM, AF_UNSPEC, grp, false); if (res) { if (res == UV_EMFILE) { the_worker->too_many_open = true; @@ -532,31 +681,37 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls, bool http) } /* struct session was allocated \ borrowed from memory pool. */ - struct session *s = client->data; - kr_require(session_flags(s)->outgoing == false); - kr_require(session_flags(s)->has_tls == tls); + struct session2 *s = client->data; + kr_require(s->outgoing == false); +// kr_require(s->secure == tls); /* TODO */ if (uv_accept(master, (uv_stream_t *)client) != 0) { /* close session, close underlying uv handles and * deallocate (or return to memory pool) memory. */ - session_close(s); + session2_unwrap(s, + protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), + NULL, NULL, NULL); return; } /* Get peer's and our address. We apparently get specific sockname here * even if we listened on a wildcard address. */ - struct sockaddr *sa = session_get_peer(s); + struct sockaddr *sa = session2_get_peer(s); int sa_len = sizeof(struct sockaddr_in6); int ret = uv_tcp_getpeername(client, sa, &sa_len); if (ret || sa->sa_family == AF_UNSPEC) { - session_close(s); + session2_unwrap(s, + protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), + NULL, NULL, NULL); return; } - sa = session_get_sockname(s); + sa = session2_get_sockname(s); sa_len = sizeof(struct sockaddr_in6); ret = uv_tcp_getsockname(client, sa, &sa_len); if (ret || sa->sa_family == AF_UNSPEC) { - session_close(s); + session2_unwrap(s, + protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), + NULL, NULL, NULL); return; } @@ -567,77 +722,78 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls, bool http) uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout; uint64_t timeout = KR_CONN_RTT_MAX / 2; - if (tls) { - timeout += TLS_MAX_HANDSHAKE_TIME; - struct tls_ctx *ctx = session_tls_get_server_ctx(s); - if (!ctx) { - ctx = tls_new(); - if (!ctx) { - session_close(s); - return; - } - ctx->c.session = s; - ctx->c.handshake_state = TLS_HS_IN_PROGRESS; - - /* Configure ALPN. */ - gnutls_datum_t proto; - if (!http) { - proto.data = (unsigned char *)"dot"; - proto.size = 3; - } else { - proto.data = (unsigned char *)"h2"; - proto.size = 2; - } - unsigned int flags = 0; -#if GNUTLS_VERSION_NUMBER >= 0x030500 - /* Mandatory ALPN means the protocol must match if and - * only if ALPN extension is used by the client. */ - flags |= GNUTLS_ALPN_MANDATORY; -#endif - ret = gnutls_alpn_set_protocols(ctx->c.tls_session, &proto, 1, flags); - if (ret != GNUTLS_E_SUCCESS) { - session_close(s); - return; - } - - session_tls_set_server_ctx(s, ctx); - } - } -#if ENABLE_DOH2 - if (http) { - struct http_ctx *ctx = session_http_get_server_ctx(s); - if (!ctx) { - if (!tls) { /* Plain HTTP is not supported. */ - session_close(s); - return; - } - ctx = http_new(s, tls_send); - if (!ctx) { - session_close(s); - return; - } - session_http_set_server_ctx(s, ctx); - } - } -#endif - session_timer_start(s, tcp_timeout_trigger, timeout, idle_in_timeout); + /* TODO: tls, http */ +// if (tls) { +// timeout += TLS_MAX_HANDSHAKE_TIME; +// struct tls_ctx *ctx = session_tls_get_server_ctx(s); +// if (!ctx) { +// ctx = tls_new(); +// if (!ctx) { +// session_close(s); +// return; +// } +// ctx->c.session = s; +// ctx->c.handshake_state = TLS_HS_IN_PROGRESS; +// +// /* Configure ALPN. */ +// gnutls_datum_t proto; +// if (!http) { +// proto.data = (unsigned char *)"dot"; +// proto.size = 3; +// } else { +// proto.data = (unsigned char *)"h2"; +// proto.size = 2; +// } +// unsigned int flags = 0; +//#if GNUTLS_VERSION_NUMBER >= 0x030500 +// /* Mandatory ALPN means the protocol must match if and +// * only if ALPN extension is used by the client. */ +// flags |= GNUTLS_ALPN_MANDATORY; +//#endif +// ret = gnutls_alpn_set_protocols(ctx->c.tls_session, &proto, 1, flags); +// if (ret != GNUTLS_E_SUCCESS) { +// session_close(s); +// return; +// } +// +// session_tls_set_server_ctx(s, ctx); +// } +// } +//#if ENABLE_DOH2 +// if (http) { +// struct http_ctx *ctx = session_http_get_server_ctx(s); +// if (!ctx) { +// if (!tls) { /* Plain HTTP is not supported. */ +// session_close(s); +// return; +// } +// ctx = http_new(s, tls_send); +// if (!ctx) { +// session_close(s); +// return; +// } +// session_http_set_server_ctx(s, ctx); +// } +// } +//#endif + session2_timer_start(s, timeout, idle_in_timeout, PROTOLAYER_UNWRAP); io_start_read((uv_handle_t *)client); } static void tcp_accept(uv_stream_t *master, int status) { - _tcp_accept(master, status, false, false); + _tcp_accept(master, status, PROTOLAYER_GRP_DOTCP); } static void tls_accept(uv_stream_t *master, int status) { - _tcp_accept(master, status, true, false); + _tcp_accept(master, status, PROTOLAYER_GRP_DOTLS); } #if ENABLE_DOH2 static void https_accept(uv_stream_t *master, int status) { - _tcp_accept(master, status, true, true); + _tcp_accept(master, status, PROTOLAYER_GRP_DOHTTPS); } #endif @@ -933,151 +1089,152 @@ int io_listen_pipe(uv_loop_t *loop, uv_pipe_t *handle, int fd) return 0; } -#if ENABLE_XDP -static void xdp_rx(uv_poll_t* handle, int status, int events) -{ - const int XDP_RX_BATCH_SIZE = 64; - if (status < 0) { - kr_log_error(XDP, "poll status %d: %s\n", status, uv_strerror(status)); - return; - } - if (events != UV_READABLE) { - kr_log_error(XDP, "poll unexpected events: %d\n", events); - return; - } - - xdp_handle_data_t *xhd = handle->data; - kr_require(xhd && xhd->session && xhd->socket); - uint32_t rcvd; - knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE]; - int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd - #if KNOT_VERSION_HEX >= 0x030100 - , NULL - #endif - ); - - if (kr_fails_assert(ret == KNOT_EOK)) { - /* ATM other error codes can only be returned when called incorrectly */ - kr_log_error(XDP, "knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret)); - return; - } - kr_log_debug(XDP, "poll triggered, processing a batch of %d packets\n", (int)rcvd); - kr_require(rcvd <= XDP_RX_BATCH_SIZE); - for (int i = 0; i < rcvd; ++i) { - const knot_xdp_msg_t *msg = &msgs[i]; - kr_require(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE); - knot_pkt_t *kpkt = knot_pkt_new(msg->payload.iov_base, msg->payload.iov_len, - &the_worker->pkt_pool); - if (kpkt == NULL) { - ret = kr_error(ENOMEM); - } else { - struct io_comm_data comm = { - .src_addr = (const struct sockaddr *)&msg->ip_from, - .comm_addr = (const struct sockaddr *)&msg->ip_from, - .dst_addr = (const struct sockaddr *)&msg->ip_to - }; - ret = worker_submit(xhd->session, &comm, - msg->eth_from, msg->eth_to, kpkt); - } - if (ret) - kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret)); - mp_flush(the_worker->pkt_pool.ctx); - } - knot_xdp_recv_finish(xhd->socket, msgs, rcvd); -} -/// Warn if the XDP program is running in emulated mode (XDP_SKB) -static void xdp_warn_mode(const char *ifname) -{ - if (kr_fails_assert(ifname)) - return; - - const unsigned if_index = if_nametoindex(ifname); - if (!if_index) { - kr_log_warning(XDP, "warning: interface %s, unexpected error when converting its name: %s\n", - ifname, strerror(errno)); - return; - } - - const knot_xdp_mode_t mode = knot_eth_xdp_mode(if_index); - switch (mode) { - case KNOT_XDP_MODE_FULL: - return; - case KNOT_XDP_MODE_EMUL: - kr_log_warning(XDP, "warning: interface %s running only with XDP emulation\n", - ifname); - return; - case KNOT_XDP_MODE_NONE: // enum warnings from compiler - break; - } - kr_log_warning(XDP, "warning: interface %s running in unexpected XDP mode %d\n", - ifname, (int)mode); -} -int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname) -{ - if (!ep || !ep->handle) { - return kr_error(EINVAL); - } - - // RLIMIT_MEMLOCK often needs raising when operating on BPF - static int ret_limit = 1; - if (ret_limit == 1) { - struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY }; - ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit) - ? kr_error(errno) : 0; - } - if (ret_limit) return ret_limit; - - xdp_handle_data_t *xhd = malloc(sizeof(*xhd)); - if (!xhd) return kr_error(ENOMEM); - - xhd->socket = NULL; // needed for some reason - - // This call is a libknot version hell, unfortunately. - int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue, - #if KNOT_VERSION_HEX < 0x030100 - ep->port ? ep->port : KNOT_XDP_LISTEN_PORT_ALL, - KNOT_XDP_LOAD_BPF_MAYBE - #elif KNOT_VERSION_HEX < 0x030200 - ep->port ? ep->port : (KNOT_XDP_LISTEN_PORT_PASS | 0), - KNOT_XDP_LOAD_BPF_MAYBE - #else - KNOT_XDP_FILTER_UDP | (ep->port ? 0 : KNOT_XDP_FILTER_PASS), - ep->port, 0/*quic_port*/, - KNOT_XDP_LOAD_BPF_MAYBE, - NULL/*xdp_config*/ - #endif - ); - - if (!ret) xdp_warn_mode(ifname); - - if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker); - if (ret || kr_fails_assert(xhd->socket)) { - free(xhd); - return ret == 0 ? kr_error(EINVAL) : kr_error(ret); - } - xhd->tx_waker.data = xhd->socket; - - ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful - ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd); - if (ret) { - knot_xdp_deinit(xhd->socket); - free(xhd); - return kr_error(ret); - } - - // beware: this sets poll_handle->data - xhd->session = session_new(ep->handle, false, false); - kr_require(!session_flags(xhd->session)->outgoing); - session_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there - - ep->handle->data = xhd; - ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx); - return ret; -} -#endif - - -int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, bool has_tls, bool has_http) +/* TODO: xdp */ +//#if ENABLE_XDP +//static void xdp_rx(uv_poll_t* handle, int status, int events) +//{ +// const int XDP_RX_BATCH_SIZE = 64; +// if (status < 0) { +// kr_log_error(XDP, "poll status %d: %s\n", status, uv_strerror(status)); +// return; +// } +// if (events != UV_READABLE) { +// kr_log_error(XDP, "poll unexpected events: %d\n", events); +// return; +// } +// +// xdp_handle_data_t *xhd = handle->data; +// kr_require(xhd && xhd->session && xhd->socket); +// uint32_t rcvd; +// knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE]; +// int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd +// #if KNOT_VERSION_HEX >= 0x030100 +// , NULL +// #endif +// ); +// +// if (kr_fails_assert(ret == KNOT_EOK)) { +// /* ATM other error codes can only be returned when called incorrectly */ +// kr_log_error(XDP, "knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret)); +// return; +// } +// kr_log_debug(XDP, "poll triggered, processing a batch of %d packets\n", (int)rcvd); +// kr_require(rcvd <= XDP_RX_BATCH_SIZE); +// for (int i = 0; i < rcvd; ++i) { +// const knot_xdp_msg_t *msg = &msgs[i]; +// kr_require(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE); +// knot_pkt_t *kpkt = knot_pkt_new(msg->payload.iov_base, msg->payload.iov_len, +// &the_worker->pkt_pool); +// if (kpkt == NULL) { +// ret = kr_error(ENOMEM); +// } else { +// struct io_comm_data comm = { +// .src_addr = (const struct sockaddr *)&msg->ip_from, +// .comm_addr = (const struct sockaddr *)&msg->ip_from, +// .dst_addr = (const struct sockaddr *)&msg->ip_to +// }; +// ret = worker_submit(xhd->session, &comm, +// msg->eth_from, msg->eth_to, kpkt); +// } +// if (ret) +// kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret)); +// mp_flush(the_worker->pkt_pool.ctx); +// } +// knot_xdp_recv_finish(xhd->socket, msgs, rcvd); +//} +///// Warn if the XDP program is running in emulated mode (XDP_SKB) +//static void xdp_warn_mode(const char *ifname) +//{ +// if (kr_fails_assert(ifname)) +// return; +// +// const unsigned if_index = if_nametoindex(ifname); +// if (!if_index) { +// kr_log_warning(XDP, "warning: interface %s, unexpected error when converting its name: %s\n", +// ifname, strerror(errno)); +// return; +// } +// +// const knot_xdp_mode_t mode = knot_eth_xdp_mode(if_index); +// switch (mode) { +// case KNOT_XDP_MODE_FULL: +// return; +// case KNOT_XDP_MODE_EMUL: +// kr_log_warning(XDP, "warning: interface %s running only with XDP emulation\n", +// ifname); +// return; +// case KNOT_XDP_MODE_NONE: // enum warnings from compiler +// break; +// } +// kr_log_warning(XDP, "warning: interface %s running in unexpected XDP mode %d\n", +// ifname, (int)mode); +//} +//int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname) +//{ +// if (!ep || !ep->handle) { +// return kr_error(EINVAL); +// } +// +// // RLIMIT_MEMLOCK often needs raising when operating on BPF +// static int ret_limit = 1; +// if (ret_limit == 1) { +// struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY }; +// ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit) +// ? kr_error(errno) : 0; +// } +// if (ret_limit) return ret_limit; +// +// xdp_handle_data_t *xhd = malloc(sizeof(*xhd)); +// if (!xhd) return kr_error(ENOMEM); +// +// xhd->socket = NULL; // needed for some reason +// +// // This call is a libknot version hell, unfortunately. +// int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue, +// #if KNOT_VERSION_HEX < 0x030100 +// ep->port ? ep->port : KNOT_XDP_LISTEN_PORT_ALL, +// KNOT_XDP_LOAD_BPF_MAYBE +// #elif KNOT_VERSION_HEX < 0x030200 +// ep->port ? ep->port : (KNOT_XDP_LISTEN_PORT_PASS | 0), +// KNOT_XDP_LOAD_BPF_MAYBE +// #else +// KNOT_XDP_FILTER_UDP | (ep->port ? 0 : KNOT_XDP_FILTER_PASS), +// ep->port, 0/*quic_port*/, +// KNOT_XDP_LOAD_BPF_MAYBE, +// NULL/*xdp_config*/ +// #endif +// ); +// +// if (!ret) xdp_warn_mode(ifname); +// +// if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker); +// if (ret || kr_fails_assert(xhd->socket)) { +// free(xhd); +// return ret == 0 ? kr_error(EINVAL) : kr_error(ret); +// } +// xhd->tx_waker.data = xhd->socket; +// +// ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful +// ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd); +// if (ret) { +// knot_xdp_deinit(xhd->socket); +// free(xhd); +// return kr_error(ret); +// } +// +// // beware: this sets poll_handle->data +// xhd->session = session_new(ep->handle, false, false); +// kr_require(!session_flags(xhd->session)->outgoing); +// session_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there +// +// ep->handle->data = xhd; +// ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx); +// return ret; +//} +//#endif + +int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, + enum protolayer_grp grp, bool outgoing) { int ret = -1; if (type == SOCK_DGRAM) { @@ -1089,7 +1246,7 @@ int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, b if (ret != 0) { return ret; } - struct session *s = session_new(handle, has_tls, has_http); + struct session2 *s = session2_new_io(handle, grp, outgoing); if (s == NULL) { ret = -1; } @@ -1102,13 +1259,13 @@ static void io_deinit(uv_handle_t *handle) return; } if (handle->type != UV_POLL) { - session_free(handle->data); + session2_free(handle->data); } else { #if ENABLE_XDP xdp_handle_data_t *xhd = handle->data; uv_idle_stop(&xhd->tx_waker); uv_close((uv_handle_t *)&xhd->tx_waker, NULL); - session_free(xhd->session); + session2_free(xhd->session); knot_xdp_deinit(xhd->socket); free(xhd); #else |