diff options
author | Lukáš Ondráček <lukas.ondracek@nic.cz> | 2024-10-07 18:28:49 +0200 |
---|---|---|
committer | Lukáš Ondráček <lukas.ondracek@nic.cz> | 2024-10-07 19:06:50 +0200 |
commit | 945cd0c728f615c8966b96327ad37e34f065949d (patch) | |
tree | 0c46ca2cce62d9d6f711f951cd7bda05dd4d1053 /daemon/defer.c | |
parent | daemon/defer: use total accounted time as KRU time (diff) | |
download | knot-resolver-945cd0c728f615c8966b96327ad37e34f065949d.tar.xz knot-resolver-945cd0c728f615c8966b96327ad37e34f065949d.zip |
daemon/defer: add alternate UDP and non-UDP phases
Diffstat (limited to 'daemon/defer.c')
-rw-r--r-- | daemon/defer.c | 108 |
1 files changed, 87 insertions, 21 deletions
diff --git a/daemon/defer.c b/daemon/defer.c index d86cd837..38e97074 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -17,17 +17,20 @@ #define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT) #define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} // the last one should be UINT16_MAX -#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS)) // -1 for synchronous, +1 for unverified -#define UNVERIFIED_PRIORITY 1 // -1 synchronous, 1 async UDP, {0, 2, 3} other async +#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) + 1) // +1 for unverified +#define PRIORITY_SYNC (-1) // no queue +#define PRIORITY_UDP (QUEUES_CNT - 1) // last queue #define KRU_CAPACITY (1<<10) #define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s of accounted time #define BASE_PRICE(nsec) ((uint64_t)MAX_DECAY * nsec / 1000000ll) // TODO reconsider time flow speed in KRU (currently sum of all-processes accounted time) -#define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped -#define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase -#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase +#define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped +#define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase +#define PHASE_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); switch between udp, non-udp phases +#define PHASE_NON_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); after timeout or emptying queue +#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase #define VERBOSE_LOG(...) kr_log_debug(DEFER, " | " __VA_ARGS__) @@ -53,6 +56,30 @@ protolayer_iter_ctx_queue_t queues[QUEUES_CNT]; int waiting_requests = 0; int queue_ix = QUEUES_CNT; // MIN( last popped queue, first non-empty queue ) +enum phase { + PHASE_UDP = 1, + PHASE_NON_UDP = 2, + PHASE_ANY = PHASE_UDP | PHASE_NON_UDP +} phase = PHASE_ANY; +uint64_t phase_elapsed = 0; // ns +bool phase_accounting = false; // add accounted time to phase_elapsed on next call of defer_account + +static inline void phase_set(enum phase p) { + if (phase != p) { + phase_elapsed = 0; + phase = p; + } +} +static inline void phase_account(uint64_t nsec) { + kr_assert(phase != PHASE_ANY); + phase_elapsed += nsec; + if ((phase == PHASE_UDP) && (phase_elapsed > PHASE_UDP_TIMEOUT)) { + phase_set(PHASE_NON_UDP); + } else if ((phase == PHASE_NON_UDP) && (phase_elapsed > PHASE_NON_UDP_TIMEOUT)) { + phase_set(PHASE_UDP); + } +} + struct pl_defer_iter_data { struct protolayer_data h; uint64_t req_stamp; // time when request was received, uses get_stamp() @@ -68,7 +95,14 @@ static bool using_avx2(void) } /// Increment KRU counters by given time. -void defer_account(uint64_t nsec, union kr_sockaddr *addr) { +void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream) { + if (phase_accounting) { + phase_account(nsec); + phase_accounting = false; + } + + if (!stream) return; // UDP is not accounted in KRU + _Alignas(16) uint8_t key[16] = {0, }; uint16_t max_load = 0; uint8_t prefix = 0; @@ -113,11 +147,17 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) { /// Determine priority of the request in [-1, QUEUES_CNT - 1]. /// Lower value has higher priority, -1 should be synchronous. +/// Both UDP and non-UDP may end up with synchronous priority +/// if the phase is active and no requests can be scheduled before them. static inline int classify(const union kr_sockaddr *addr, bool stream) { - if (!stream) { + if (!stream) { // UDP VERBOSE_LOG(" unverified address\n"); - return UNVERIFIED_PRIORITY; // UDP + if ((phase & PHASE_UDP) && (queue_len(queues[PRIORITY_UDP]) == 0)) { + phase_set(PHASE_UDP); + return PRIORITY_SYNC; + } + return PRIORITY_UDP; } uint32_t time_now = atomic_load_explicit(&defer->time_now, memory_order_relaxed); @@ -134,15 +174,15 @@ static inline int classify(const union kr_sockaddr *addr, bool stream) 0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix); } - int threshold_index = 0; // 0: synchronous - for (; LOADS_THRESHOLDS[threshold_index] < max_load; threshold_index++); + int priority = 0; + for (; LOADS_THRESHOLDS[priority] < max_load; priority++); VERBOSE_LOG(" load %d on /%d\n", max_load, prefix); - int priority = threshold_index - 1; - if (priority >= UNVERIFIED_PRIORITY) - priority++; - + if ((phase & PHASE_NON_UDP) && (priority == 0) && (queue_len(queues[0]) == 0)) { + phase_set(PHASE_NON_UDP); + return PRIORITY_SYNC; + } return priority; } @@ -160,14 +200,36 @@ static inline void push_query(struct protolayer_iter_ctx *ctx, int priority) } } -/// Pop and return the query with the highest priority, deactivate idle if not needed. +/// Pop and return the query with the highest priority, UDP or non-UDP based on current phase, +/// deactivate idle if not needed. static inline struct protolayer_iter_ctx *pop_query(void) { - for (; queue_ix < QUEUES_CNT && queue_len(queues[queue_ix]) == 0; queue_ix++); - if (queue_ix >= QUEUES_CNT) return NULL; + const int waiting_udp = queue_len(queues[PRIORITY_UDP]); + const int waiting_non_udp = waiting_requests - waiting_udp; + + enum phase new_phase; + if ((phase & PHASE_NON_UDP) && (waiting_non_udp > 0)) { + new_phase = PHASE_NON_UDP; // maybe changing from PHASE_ANY + } else if ((phase & PHASE_UDP) && (waiting_udp > 0)) { + new_phase = PHASE_UDP; // maybe changing from PHASE_ANY + } else if (waiting_non_udp > 0) { + new_phase = PHASE_NON_UDP; // change from PHASE_UDP, no UDP queries + } else { + new_phase = PHASE_UDP; // change from PHASE_NON_UDP, no non-UDP queries + } + phase_set(new_phase); - struct protolayer_iter_ctx *ctx = queue_head(queues[queue_ix]); - queue_pop(queues[queue_ix]); + int i; + if (phase == PHASE_NON_UDP) { + for (; queue_ix < QUEUES_CNT && queue_len(queues[queue_ix]) == 0; queue_ix++); + if (queue_ix >= PRIORITY_UDP) kr_assert(false); + i = queue_ix; + } else { + i = PRIORITY_UDP; + } + + struct protolayer_iter_ctx *ctx = queue_head(queues[i]); + queue_pop(queues[i]); if (--waiting_requests <= 0) { kr_assert(waiting_requests == 0); uv_idle_stop(&idle_handle); @@ -184,6 +246,7 @@ static inline void process_single_deferred(void) { if (ctx == NULL) return; defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); + phase_accounting = true; struct pl_defer_iter_data *iter_data = protolayer_iter_data_get_current(ctx); uint64_t age_ns = defer_sample_state.stamp - iter_data->req_stamp; @@ -234,6 +297,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( if (priority == -1) { VERBOSE_LOG(" CONTINUE\n"); + phase_accounting = true; return protolayer_continue(ctx); } @@ -260,10 +324,12 @@ static void defer_queues_idle(uv_idle_t *handle) { defer_sample_restart(); } defer_sample_stop(); // TODO skip calling and use just restart elsewhere? - udp_queue_send_all(); // TODO keep here or call after processing each priority level? - // (or after UNVERIFIED_PRIORITY but beware future QUIC) + udp_queue_send_all(); + if (waiting_requests > 0) { VERBOSE_LOG(" %d waiting\n", waiting_requests); + } else { + phase_set(PHASE_ANY); } VERBOSE_LOG("POLL\n"); } |