diff options
author | Lukáš Ondráček <lukas.ondracek@nic.cz> | 2024-11-21 00:15:29 +0100 |
---|---|---|
committer | Lukáš Ondráček <lukas.ondracek@nic.cz> | 2024-11-21 00:25:34 +0100 |
commit | 6c246b825bf6fdba44cb3eb5b35cb43366365b90 (patch) | |
tree | 82def96bfb063eb4d2fa701a991e57a935e1e12a /daemon | |
parent | daemon/defer: defer stream EOF if data are deferred (diff) | |
download | knot-resolver-6c246b825bf6fdba44cb3eb5b35cb43366365b90.tar.xz knot-resolver-6c246b825bf6fdba44cb3eb5b35cb43366365b90.zip |
daemon/defer: limit deferred queries by memory usage
Diffstat (limited to 'daemon')
-rw-r--r-- | daemon/defer.c | 30 | ||||
-rw-r--r-- | daemon/session2.c | 14 | ||||
-rw-r--r-- | daemon/session2.h | 10 |
3 files changed, 50 insertions, 4 deletions
diff --git a/daemon/defer.c b/daemon/defer.c index c561bc52..3835bceb 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -56,8 +56,11 @@ #define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase #define PHASE_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); switch between udp, non-udp phases #define PHASE_NON_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); after timeout or emptying queue -#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase - // TODO measure memory usage instead +#define MAX_WAITING_REQS_SIZE (64 * 1024 * 1024) // bytes; if exceeded, some deferred requests are processed in poll phase + // single TCP allocates more than 64KiB wire buffer + // TODO check whether all important allocations are counted; + // different things are not counted: tasks and subsessions (not deferred after created), uv handles, queues overhead, ...; + // payload is counted either as part of session wire buffer (for stream) or as part of iter ctx (for datagrams) #define VERBOSE_LOG(...) kr_log_debug(DEFER, " | " __VA_ARGS__) @@ -81,6 +84,7 @@ static void defer_queues_idle(uv_idle_t *handle); protolayer_iter_ctx_queue_t queues[QUEUES_CNT]; int waiting_requests = 0; +ptrdiff_t waiting_requests_size = 0; // signed for non-negativeness asserts int queue_ix = QUEUES_CNT; // MIN( last popped queue, first non-empty queue ) enum phase { @@ -113,11 +117,13 @@ struct pl_defer_sess_data { struct protolayer_data h; protolayer_iter_ctx_queue_t queue; // properly ordered sequence of deferred packets, for stream only // the first ctx in the queue is also in a defer queue + size_t size; }; struct pl_defer_iter_data { struct protolayer_data h; uint64_t req_stamp; // time when request was received, uses get_stamp() + size_t size; }; /// Return whether we're using optimized variant right now. @@ -281,17 +287,23 @@ static inline void break_query(struct protolayer_iter_ctx *ctx, int err) { if (ctx->session->stream) { struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx); + waiting_requests_size -= sdata->size; if (!ctx->session->closing) { session2_force_close(ctx->session); // session is not freed here as iter contexts exist } queue_pop(sdata->queue); while (queue_len(sdata->queue) > 0) { + struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx); + waiting_requests_size -= idata->size; protolayer_break(ctx, kr_error(err)); // session is not freed here as other contexts exist ctx = queue_head(sdata->queue); queue_pop(sdata->queue); } } + struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx); + waiting_requests_size -= idata->size; protolayer_break(ctx, kr_error(err)); + kr_assert(waiting_requests ? waiting_requests_size > 0 : waiting_requests_size == 0); } /// Process a single deferred query (or defer again) if there is any. @@ -344,9 +356,14 @@ static inline void process_single_deferred(void) if (queue_len(sdata->queue) > 0) { VERBOSE_LOG(" PUSH follow-up to head of %d\n", priority); push_query(queue_head(sdata->queue), priority, true); + } else { + waiting_requests_size -= sdata->size; } } + waiting_requests_size -= idata->size; + kr_assert(waiting_requests ? waiting_requests_size > 0 : waiting_requests_size == 0); + if (eof) { // Keep session alive even if it is somehow force-closed during continuation. // TODO Is it needed? @@ -402,6 +419,8 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( if (queue_len(sdata->queue) > 0) { // stream with preceding packet already deferred queue_push(sdata->queue, ctx); + waiting_requests_size += data->size = protolayer_iter_size_est(ctx, false); + // payload counted in session wire buffer VERBOSE_LOG(" PUSH as follow-up\n"); return protolayer_async(); } @@ -417,11 +436,14 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( VERBOSE_LOG(" PUSH to %d\n", priority); if (ctx->session->stream) { queue_push(sdata->queue, ctx); + waiting_requests_size += sdata->size = protolayer_sess_size_est(ctx->session); } push_query(ctx, priority, false); - while (waiting_requests > MAX_WAITING_REQS) { // TODO follow-up stream packets are not counted here + waiting_requests_size += data->size = protolayer_iter_size_est(ctx, !ctx->session->stream); + // for stream, payload is counted in session wire buffer + while (waiting_requests_size > MAX_WAITING_REQS_SIZE) { defer_sample_restart(); - process_single_deferred(); // possibly defers again without decreasing waiting_requests + process_single_deferred(); // possibly defers again without decreasing waiting_requests_size // defer_sample_stop should be called soon outside } diff --git a/daemon/session2.c b/daemon/session2.c index 6eb91bf4..eb3bedb3 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -378,6 +378,19 @@ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx) return protolayer_iter_data_get(ctx, ctx->layer_ix); } +size_t protolayer_sess_size_est(struct session2 *s) +{ + return s->session_size + s->wire_buf.size; +} + +size_t protolayer_iter_size_est(struct protolayer_iter_ctx *ctx, bool incl_payload) +{ + size_t size = ctx->session->iter_ctx_size; + if (incl_payload) + size += protolayer_payload_size(&ctx->payload); + return size; +} + static inline bool protolayer_iter_ctx_is_last(struct protolayer_iter_ctx *ctx) { unsigned int last_ix = (ctx->direction == PROTOLAYER_UNWRAP) @@ -852,6 +865,7 @@ struct session2 *session2_new(enum session2_transport_type transport_type, .proto = proto, .iter_ctx_size = iter_ctx_size, + .session_size = session_size, }; memcpy(&s->layer_data, offsets, sizeof(offsets)); diff --git a/daemon/session2.h b/daemon/session2.h index 73b88d32..5228ad86 100644 --- a/daemon/session2.h +++ b/daemon/session2.h @@ -549,6 +549,13 @@ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx); * To be used after returning from its callback for async continuation but before calling protolayer_continue. */ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx); +/** Gets rough memory footprint estimate of session/iteration for use in defer. + * Different, hopefully minor, allocations are not counted here; + * tasks and subsessions are also not counted; + * read the code before using elsewhere. */ +size_t protolayer_sess_size_est(struct session2 *s); +size_t protolayer_iter_size_est(struct protolayer_iter_ctx *ctx, bool incl_payload); + /** Layer-specific data - the generic struct. To be added as the first member of * each specific struct. */ struct protolayer_data { @@ -874,6 +881,9 @@ struct session2 { * (`struct protolayer_iter_ctx`), including layer-specific data. */ size_t iter_ctx_size; + /** The size of this session struct. */ + size_t session_size; + /** The following flexible array has basically this structure: * * struct { |