diff options
author | Lukáš Ondráček <lukas.ondracek@nic.cz> | 2025-01-10 18:04:41 +0100 |
---|---|---|
committer | Lukáš Ondráček <lukas.ondracek@nic.cz> | 2025-01-10 18:04:41 +0100 |
commit | ef655fc4f4c5a916a74e9cfd4968578cb9bb85a8 (patch) | |
tree | 3d4a9c2891f8924bbb9acf9a2625b201fcee58cc | |
parent | daemon/defer: update uv time after longer operations (diff) | |
download | knot-resolver-ef655fc4f4c5a916a74e9cfd4968578cb9bb85a8.tar.xz knot-resolver-ef655fc4f4c5a916a74e9cfd4968578cb9bb85a8.zip |
daemon/defer: redesign UDP and non-UDP phase transition
-rw-r--r-- | daemon/defer.c | 142 |
1 files changed, 63 insertions, 79 deletions
diff --git a/daemon/defer.c b/daemon/defer.c index b1abcacd..223a2d61 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -85,31 +85,43 @@ ptrdiff_t waiting_requests_size = 0; // signed for non-negativeness asserts int queue_ix = QUEUES_CNT; // MIN( last popped queue, first non-empty queue ) enum phase { - PHASE_UDP = 1, - PHASE_NON_UDP = 2, - PHASE_ANY = PHASE_UDP | PHASE_NON_UDP -} phase = PHASE_ANY; -uint64_t phase_elapsed = 0; // ns -bool phase_accounting = false; // add accounted time to phase_elapsed in defer_account - -static inline void phase_set(enum phase p) + PHASE_NONE, + PHASE_UDP, + PHASE_NON_UDP +} phase = PHASE_NONE; +uint64_t phase_elapsed[3] = { 0 }; // ns; [PHASE_NONE] value is being incremented but never used +const uint64_t phase_limits[3] = {0, PHASE_UDP_TIMEOUT, PHASE_NON_UDP_TIMEOUT}; +uint64_t phase_stamp = 0; + +static inline bool phase_over_limit(enum phase p) { - if (phase != p) { - phase_elapsed = 0; - phase = p; - } + return phase_elapsed[p] >= phase_limits[p]; +} + +/// Reset elapsed times of phases and set phase to UDP, NON_UDP, or NONE. +static inline void phase_reset(enum phase p) +{ + phase_elapsed[PHASE_UDP] = 0; + phase_elapsed[PHASE_NON_UDP] = 0; + phase_stamp = defer_sample_state.stamp; + phase = p; } -static inline void phase_charge(uint64_t nsec) + +/// Set phase to UDP or NON_UDP if it is not over limit or both are over limit (reset them). +static inline bool phase_try_set(enum phase p) { - kr_assert(phase != PHASE_ANY); - phase_elapsed += nsec; - if ((phase == PHASE_UDP) && (phase_elapsed > PHASE_UDP_TIMEOUT)) { - phase_set(PHASE_NON_UDP); - phase_accounting = false; - } else if ((phase == PHASE_NON_UDP) && (phase_elapsed > PHASE_NON_UDP_TIMEOUT)) { - phase_set(PHASE_UDP); - phase_accounting = false; + phase_elapsed[phase] += defer_sample_state.stamp - phase_stamp; + phase_stamp = defer_sample_state.stamp; + + if (!phase_over_limit(p)) { + phase = p; + return true; + } else if (phase_over_limit(PHASE_UDP) && phase_over_limit(PHASE_NON_UDP)) { + phase_reset(p); + return true; } + + return false; } struct pl_defer_sess_data { @@ -247,11 +259,7 @@ static inline int kru_charge_classify(const struct kru_conf *kru_conf, uint8_t * /// Increment KRU counters by given time. void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream) { - if (phase_accounting) { - phase_charge(nsec); - } - - if (!stream) return; // UDP is not accounted in KRU + if (!stream) return; // UDP is not accounted in KRU; TODO remove !stream invocations? _Alignas(16) uint8_t key[16] = {0, }; const struct kru_conf *kru_conf; @@ -312,7 +320,7 @@ static inline int classify(const union kr_sockaddr *addr, bool stream) } -/// Push query to a queue according to its priority and activate idle. +/// Push query to a queue according to its priority. static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, bool to_head_end) { if (to_head_end) { @@ -321,46 +329,30 @@ static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, boo queue_push(queues[priority], ctx); } queue_ix = MIN(queue_ix, priority); - if (waiting_requests++ <= 0) { - kr_assert(waiting_requests == 1); - uv_idle_start(&idle_handle, defer_queues_idle); - VERBOSE_LOG(" activating idle\n"); - } + waiting_requests++; } -/// Pop and return query from the specified queue, deactivate idle if not needed. +/// Pop and return query from the specified queue.. static inline struct protolayer_iter_ctx *pop_query_queue(int priority) { kr_assert(queue_len(queues[priority]) > 0); struct protolayer_iter_ctx *ctx = queue_head(queues[priority]); queue_pop(queues[priority]); - if (--waiting_requests <= 0) { - kr_assert(waiting_requests == 0); - uv_idle_stop(&idle_handle); - VERBOSE_LOG(" deactivating idle\n"); - } + waiting_requests--; + kr_assert(waiting_requests >= 0); return ctx; } -/// Pop and return the query with the highest priority, UDP or non-UDP based on current phase, -/// deactivate idle if not needed. +/// Pop and return the query with the highest priority, UDP or non-UDP based on the current phase. static inline struct protolayer_iter_ctx *pop_query(void) { const int waiting_udp = queue_len(queues[PRIORITY_UDP]); const int waiting_non_udp = waiting_requests - waiting_udp; - enum phase new_phase; - if ((phase & PHASE_NON_UDP) && (waiting_non_udp > 0)) { - new_phase = PHASE_NON_UDP; // maybe changing from PHASE_ANY - } else if ((phase & PHASE_UDP) && (waiting_udp > 0)) { - new_phase = PHASE_UDP; // maybe changing from PHASE_ANY - } else if (waiting_non_udp > 0) { - new_phase = PHASE_NON_UDP; // change from PHASE_UDP, no UDP queries - } else { - new_phase = PHASE_UDP; // change from PHASE_NON_UDP, no non-UDP queries - } - phase_set(new_phase); + if (!((waiting_non_udp > 0) && phase_try_set(PHASE_NON_UDP)) && + !((waiting_udp > 0) && phase_try_set(PHASE_UDP))) + phase_reset(waiting_non_udp > 0 ? PHASE_NON_UDP : PHASE_UDP); int i; if (phase == PHASE_NON_UDP) { @@ -501,13 +493,11 @@ static inline void process_deferred_over_size_limit(void) { defer_sample_state_t prev_sample_state; defer_sample_start(&prev_sample_state); do { - phase_accounting = true; process_single_deferred(); // possibly defers again without decreasing waiting_requests_size // If the unwrapped query is to be processed here, // it is the last iteration and the query is processed after returning. defer_sample_restart(); } while (waiting_requests_size > MAX_WAITING_REQS_SIZE); - phase_accounting = false; defer_sample_stop(&prev_sample_state, true); } } @@ -538,7 +528,6 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx) { - phase_accounting = false; if (!defer || ctx->session->outgoing) return protolayer_continue(ctx); @@ -550,6 +539,8 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( VERBOSE_LOG(" %s UNWRAP\n", kr_straddr(ctx->comm->src_addr)); + uv_idle_start(&idle_handle, defer_queues_idle); + if (queue_len(sdata->queue) > 0) { // stream with preceding packet already deferred queue_push(sdata->queue, ctx); waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, false); @@ -561,14 +552,10 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( int priority = classify((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream); - // Process synchronously if the phase is active and no requests can be scheduled before. - if (( - ((priority == PRIORITY_UDP) && (phase & PHASE_UDP)) || - ((priority == 0) && (phase & PHASE_NON_UDP)) - ) && (queue_len(queues[priority]) == 0)) { + // Process synchronously unless there may exist requests that has to be processed first + if (((priority == 0) || (priority == PRIORITY_UDP)) && (queue_len(queues[priority]) == 0) && + phase_try_set(priority == PRIORITY_UDP ? PHASE_UDP : PHASE_NON_UDP)) { VERBOSE_LOG(" CONTINUE\n"); - phase_set(priority == PRIORITY_UDP ? PHASE_UDP : PHASE_NON_UDP); - phase_accounting = true; return protolayer_continue(ctx); } @@ -590,10 +577,6 @@ static enum protolayer_event_cb_result pl_defer_event_unwrap( enum protolayer_event_type event, void **baton, struct session2 *session, void *sess_data) { - if ((event == PROTOLAYER_EVENT_EOF) || (event == PROTOLAYER_EVENT_GENERAL_TIMEOUT)) { - // disable accounting only for events that cannot occur during incoming data processing - phase_accounting = false; - } if (!defer || !session->stream || session->outgoing) return PROTOLAYER_EVENT_PROPAGATE; @@ -620,25 +603,26 @@ static enum protolayer_event_cb_result pl_defer_event_unwrap( /// Idle: continue processing deferred requests. static void defer_queues_idle(uv_idle_t *handle) { - kr_assert(waiting_requests > 0); VERBOSE_LOG("IDLE\n"); - VERBOSE_LOG(" %d waiting\n", waiting_requests); - defer_sample_start(NULL); - uint64_t idle_stamp = defer_sample_state.stamp; - do { - phase_accounting = true; - process_single_deferred(); - defer_sample_restart(); - } while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)); - phase_accounting = false; - defer_sample_stop(NULL, true); - cleanup_queues(); - udp_queue_send_all(); + if (waiting_requests > 0) { + VERBOSE_LOG(" %d waiting\n", waiting_requests); + defer_sample_start(NULL); + uint64_t idle_stamp = defer_sample_state.stamp; + do { + process_single_deferred(); + defer_sample_restart(); + } while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)); + defer_sample_stop(NULL, true); + cleanup_queues(); + udp_queue_send_all(); + } if (waiting_requests > 0) { VERBOSE_LOG(" %d waiting\n", waiting_requests); } else { - phase_set(PHASE_ANY); + phase_reset(PHASE_NONE); + VERBOSE_LOG(" deactivate idle\n"); + uv_idle_stop(&idle_handle); } VERBOSE_LOG("POLL\n"); } |