diff options
author | Lukáš Ondráček <lukas.ondracek@nic.cz> | 2024-11-11 19:13:27 +0100 |
---|---|---|
committer | Lukáš Ondráček <lukas.ondracek@nic.cz> | 2024-11-20 23:54:03 +0100 |
commit | f492f0b2c00345c0c0ca85ab2ee9f78ba55d5528 (patch) | |
tree | 191dbe6b3fe4c73d50a90c6b8e0c0c966ea8d0cd /daemon | |
parent | daemon/session2: add half-closed TCP connection handling (diff) | |
download | knot-resolver-f492f0b2c00345c0c0ca85ab2ee9f78ba55d5528.tar.xz knot-resolver-f492f0b2c00345c0c0ca85ab2ee9f78ba55d5528.zip |
daemon/defer: defer stream EOF if data are deferred
Diffstat (limited to 'daemon')
-rw-r--r-- | daemon/defer.c | 54 | ||||
-rw-r--r-- | daemon/session2.c | 2 |
2 files changed, 49 insertions, 7 deletions
diff --git a/daemon/defer.c b/daemon/defer.c index 20f8fb55..c561bc52 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -168,7 +168,7 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream) } VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n", - kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix); + kr_straddr(&addr->ip), nsec / 1000000.0, max_load, prefix); } /// Determine priority of the request in [-1, QUEUES_CNT - 1]. @@ -299,13 +299,14 @@ static inline void break_query(struct protolayer_iter_ctx *ctx, int err) static inline void process_single_deferred(void) { struct protolayer_iter_ctx *ctx = pop_query(); - if (ctx == NULL) return; + if (kr_fails_assert(ctx)) return; defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); phase_accounting = true; struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx); struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx); + struct session2 *session = ctx->session; uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp; VERBOSE_LOG(" %s POP from %d after %4.3f ms\n", @@ -332,17 +333,34 @@ static inline void process_single_deferred(void) return; } + bool eof = false; if (ctx->session->stream) { kr_assert(queue_head(sdata->queue) == ctx); queue_pop(sdata->queue); + while ((queue_len(sdata->queue) > 0) && (queue_head(sdata->queue) == NULL)) { // EOF event + eof = true; + queue_pop(sdata->queue); + } if (queue_len(sdata->queue) > 0) { VERBOSE_LOG(" PUSH follow-up to head of %d\n", priority); push_query(queue_head(sdata->queue), priority, true); } } + if (eof) { + // Keep session alive even if it is somehow force-closed during continuation. + // TODO Is it needed? + session->ref_count++; + } + VERBOSE_LOG(" CONTINUE\n"); protolayer_continue(ctx); + + if (eof) { + VERBOSE_LOG(" CONTINUE EOF event\n"); + session2_event_after(session, PROTOLAYER_TYPE_DEFER, PROTOLAYER_EVENT_EOF, NULL); + session2_unhandle(session); // decrease ref_count + } } /// Break expired requests at the beginning of queues, uses current stamp. @@ -371,10 +389,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx) { - if (!defer) - return protolayer_continue(ctx); - - if (ctx->session->outgoing) + if (!defer || ctx->session->outgoing) return protolayer_continue(ctx); defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); @@ -413,6 +428,32 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( return protolayer_async(); } +/// Unwrap event: EOF event may be deferred here, other events pass synchronously. +static enum protolayer_event_cb_result pl_defer_event_unwrap( + enum protolayer_event_type event, void **baton, + struct session2 *session, void *sess_data) +{ + if (!defer || session->outgoing) + return PROTOLAYER_EVENT_PROPAGATE; + + struct pl_defer_sess_data *sdata = sess_data; + if ((event == PROTOLAYER_EVENT_EOF) && (queue_len(sdata->queue) > 0)) { + // defer EOF event if unprocessed data remain, baton is dropped if any + queue_push(sdata->queue, NULL); + VERBOSE_LOG(" %s event %s deferred\n", + session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)", + protolayer_event_name(event)); + return PROTOLAYER_EVENT_CONSUME; + } + + VERBOSE_LOG(" %s event %s passes through synchronously%s%s\n", + session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)", + protolayer_event_name(event), + queue_len(sdata->queue) > 0 ? " ahead of deferred data" : "", + *baton ? " (with baton)" : ""); + return PROTOLAYER_EVENT_PROPAGATE; +} + /// Idle: continue processing deferred requests. static void defer_queues_idle(uv_idle_t *handle) { @@ -535,5 +576,6 @@ static void defer_protolayers_init(void) .sess_size = sizeof(struct pl_defer_sess_data), .sess_init = pl_defer_sess_init, .unwrap = pl_defer_unwrap, + .event_unwrap = pl_defer_event_unwrap, }; } diff --git a/daemon/session2.c b/daemon/session2.c index c372bfb5..6eb91bf4 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -603,7 +603,7 @@ static int session2_submit( { if (session->closing) return kr_error(ECANCELED); - if (session->ref_count >= INT_MAX) + if (session->ref_count >= INT_MAX - 1) return kr_error(ETOOMANYREFS); if (kr_fails_assert(session->proto < KR_PROTO_COUNT)) return kr_error(EFAULT); |