summaryrefslogtreecommitdiffstats
path: root/daemon
diff options
context:
space:
mode:
authorLukáš Ondráček <lukas.ondracek@nic.cz>2024-11-11 19:13:27 +0100
committerLukáš Ondráček <lukas.ondracek@nic.cz>2024-11-20 23:54:03 +0100
commitf492f0b2c00345c0c0ca85ab2ee9f78ba55d5528 (patch)
tree191dbe6b3fe4c73d50a90c6b8e0c0c966ea8d0cd /daemon
parentdaemon/session2: add half-closed TCP connection handling (diff)
downloadknot-resolver-f492f0b2c00345c0c0ca85ab2ee9f78ba55d5528.tar.xz
knot-resolver-f492f0b2c00345c0c0ca85ab2ee9f78ba55d5528.zip
daemon/defer: defer stream EOF if data are deferred
Diffstat (limited to 'daemon')
-rw-r--r--daemon/defer.c54
-rw-r--r--daemon/session2.c2
2 files changed, 49 insertions, 7 deletions
diff --git a/daemon/defer.c b/daemon/defer.c
index 20f8fb55..c561bc52 100644
--- a/daemon/defer.c
+++ b/daemon/defer.c
@@ -168,7 +168,7 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream)
}
VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n",
- kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix);
+ kr_straddr(&addr->ip), nsec / 1000000.0, max_load, prefix);
}
/// Determine priority of the request in [-1, QUEUES_CNT - 1].
@@ -299,13 +299,14 @@ static inline void break_query(struct protolayer_iter_ctx *ctx, int err)
static inline void process_single_deferred(void)
{
struct protolayer_iter_ctx *ctx = pop_query();
- if (ctx == NULL) return;
+ if (kr_fails_assert(ctx)) return;
defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
phase_accounting = true;
struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
+ struct session2 *session = ctx->session;
uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp;
VERBOSE_LOG(" %s POP from %d after %4.3f ms\n",
@@ -332,17 +333,34 @@ static inline void process_single_deferred(void)
return;
}
+ bool eof = false;
if (ctx->session->stream) {
kr_assert(queue_head(sdata->queue) == ctx);
queue_pop(sdata->queue);
+ while ((queue_len(sdata->queue) > 0) && (queue_head(sdata->queue) == NULL)) { // EOF event
+ eof = true;
+ queue_pop(sdata->queue);
+ }
if (queue_len(sdata->queue) > 0) {
VERBOSE_LOG(" PUSH follow-up to head of %d\n", priority);
push_query(queue_head(sdata->queue), priority, true);
}
}
+ if (eof) {
+ // Keep session alive even if it is somehow force-closed during continuation.
+ // TODO Is it needed?
+ session->ref_count++;
+ }
+
VERBOSE_LOG(" CONTINUE\n");
protolayer_continue(ctx);
+
+ if (eof) {
+ VERBOSE_LOG(" CONTINUE EOF event\n");
+ session2_event_after(session, PROTOLAYER_TYPE_DEFER, PROTOLAYER_EVENT_EOF, NULL);
+ session2_unhandle(session); // decrease ref_count
+ }
}
/// Break expired requests at the beginning of queues, uses current stamp.
@@ -371,10 +389,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
void *sess_data, void *iter_data,
struct protolayer_iter_ctx *ctx)
{
- if (!defer)
- return protolayer_continue(ctx);
-
- if (ctx->session->outgoing)
+ if (!defer || ctx->session->outgoing)
return protolayer_continue(ctx);
defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
@@ -413,6 +428,32 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
return protolayer_async();
}
+/// Unwrap event: EOF event may be deferred here, other events pass synchronously.
+static enum protolayer_event_cb_result pl_defer_event_unwrap(
+ enum protolayer_event_type event, void **baton,
+ struct session2 *session, void *sess_data)
+{
+ if (!defer || session->outgoing)
+ return PROTOLAYER_EVENT_PROPAGATE;
+
+ struct pl_defer_sess_data *sdata = sess_data;
+ if ((event == PROTOLAYER_EVENT_EOF) && (queue_len(sdata->queue) > 0)) {
+ // defer EOF event if unprocessed data remain, baton is dropped if any
+ queue_push(sdata->queue, NULL);
+ VERBOSE_LOG(" %s event %s deferred\n",
+ session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)",
+ protolayer_event_name(event));
+ return PROTOLAYER_EVENT_CONSUME;
+ }
+
+ VERBOSE_LOG(" %s event %s passes through synchronously%s%s\n",
+ session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)",
+ protolayer_event_name(event),
+ queue_len(sdata->queue) > 0 ? " ahead of deferred data" : "",
+ *baton ? " (with baton)" : "");
+ return PROTOLAYER_EVENT_PROPAGATE;
+}
+
/// Idle: continue processing deferred requests.
static void defer_queues_idle(uv_idle_t *handle)
{
@@ -535,5 +576,6 @@ static void defer_protolayers_init(void)
.sess_size = sizeof(struct pl_defer_sess_data),
.sess_init = pl_defer_sess_init,
.unwrap = pl_defer_unwrap,
+ .event_unwrap = pl_defer_event_unwrap,
};
}
diff --git a/daemon/session2.c b/daemon/session2.c
index c372bfb5..6eb91bf4 100644
--- a/daemon/session2.c
+++ b/daemon/session2.c
@@ -603,7 +603,7 @@ static int session2_submit(
{
if (session->closing)
return kr_error(ECANCELED);
- if (session->ref_count >= INT_MAX)
+ if (session->ref_count >= INT_MAX - 1)
return kr_error(ETOOMANYREFS);
if (kr_fails_assert(session->proto < KR_PROTO_COUNT))
return kr_error(EFAULT);