summaryrefslogtreecommitdiffstats
path: root/daemon/defer.c
diff options
context:
space:
mode:
authorLukáš Ondráček <lukas.ondracek@nic.cz>2024-10-07 18:28:49 +0200
committerLukáš Ondráček <lukas.ondracek@nic.cz>2024-10-07 19:06:50 +0200
commit945cd0c728f615c8966b96327ad37e34f065949d (patch)
tree0c46ca2cce62d9d6f711f951cd7bda05dd4d1053 /daemon/defer.c
parentdaemon/defer: use total accounted time as KRU time (diff)
downloadknot-resolver-945cd0c728f615c8966b96327ad37e34f065949d.tar.xz
knot-resolver-945cd0c728f615c8966b96327ad37e34f065949d.zip
daemon/defer: add alternate UDP and non-UDP phases
Diffstat (limited to 'daemon/defer.c')
-rw-r--r--daemon/defer.c108
1 files changed, 87 insertions, 21 deletions
diff --git a/daemon/defer.c b/daemon/defer.c
index d86cd837..38e97074 100644
--- a/daemon/defer.c
+++ b/daemon/defer.c
@@ -17,17 +17,20 @@
#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)
#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} // the last one should be UINT16_MAX
-#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS)) // -1 for synchronous, +1 for unverified
-#define UNVERIFIED_PRIORITY 1 // -1 synchronous, 1 async UDP, {0, 2, 3} other async
+#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) + 1) // +1 for unverified
+#define PRIORITY_SYNC (-1) // no queue
+#define PRIORITY_UDP (QUEUES_CNT - 1) // last queue
#define KRU_CAPACITY (1<<10)
#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s of accounted time
#define BASE_PRICE(nsec) ((uint64_t)MAX_DECAY * nsec / 1000000ll)
// TODO reconsider time flow speed in KRU (currently sum of all-processes accounted time)
-#define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
-#define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
-#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase
+#define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
+#define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
+#define PHASE_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); switch between udp, non-udp phases
+#define PHASE_NON_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); after timeout or emptying queue
+#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase
#define VERBOSE_LOG(...) kr_log_debug(DEFER, " | " __VA_ARGS__)
@@ -53,6 +56,30 @@ protolayer_iter_ctx_queue_t queues[QUEUES_CNT];
int waiting_requests = 0;
int queue_ix = QUEUES_CNT; // MIN( last popped queue, first non-empty queue )
+enum phase {
+ PHASE_UDP = 1,
+ PHASE_NON_UDP = 2,
+ PHASE_ANY = PHASE_UDP | PHASE_NON_UDP
+} phase = PHASE_ANY;
+uint64_t phase_elapsed = 0; // ns
+bool phase_accounting = false; // add accounted time to phase_elapsed on next call of defer_account
+
+static inline void phase_set(enum phase p) {
+ if (phase != p) {
+ phase_elapsed = 0;
+ phase = p;
+ }
+}
+static inline void phase_account(uint64_t nsec) {
+ kr_assert(phase != PHASE_ANY);
+ phase_elapsed += nsec;
+ if ((phase == PHASE_UDP) && (phase_elapsed > PHASE_UDP_TIMEOUT)) {
+ phase_set(PHASE_NON_UDP);
+ } else if ((phase == PHASE_NON_UDP) && (phase_elapsed > PHASE_NON_UDP_TIMEOUT)) {
+ phase_set(PHASE_UDP);
+ }
+}
+
struct pl_defer_iter_data {
struct protolayer_data h;
uint64_t req_stamp; // time when request was received, uses get_stamp()
@@ -68,7 +95,14 @@ static bool using_avx2(void)
}
/// Increment KRU counters by given time.
-void defer_account(uint64_t nsec, union kr_sockaddr *addr) {
+void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream) {
+ if (phase_accounting) {
+ phase_account(nsec);
+ phase_accounting = false;
+ }
+
+ if (!stream) return; // UDP is not accounted in KRU
+
_Alignas(16) uint8_t key[16] = {0, };
uint16_t max_load = 0;
uint8_t prefix = 0;
@@ -113,11 +147,17 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) {
/// Determine priority of the request in [-1, QUEUES_CNT - 1].
/// Lower value has higher priority, -1 should be synchronous.
+/// Both UDP and non-UDP may end up with synchronous priority
+/// if the phase is active and no requests can be scheduled before them.
static inline int classify(const union kr_sockaddr *addr, bool stream)
{
- if (!stream) {
+ if (!stream) { // UDP
VERBOSE_LOG(" unverified address\n");
- return UNVERIFIED_PRIORITY; // UDP
+ if ((phase & PHASE_UDP) && (queue_len(queues[PRIORITY_UDP]) == 0)) {
+ phase_set(PHASE_UDP);
+ return PRIORITY_SYNC;
+ }
+ return PRIORITY_UDP;
}
uint32_t time_now = atomic_load_explicit(&defer->time_now, memory_order_relaxed);
@@ -134,15 +174,15 @@ static inline int classify(const union kr_sockaddr *addr, bool stream)
0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix);
}
- int threshold_index = 0; // 0: synchronous
- for (; LOADS_THRESHOLDS[threshold_index] < max_load; threshold_index++);
+ int priority = 0;
+ for (; LOADS_THRESHOLDS[priority] < max_load; priority++);
VERBOSE_LOG(" load %d on /%d\n", max_load, prefix);
- int priority = threshold_index - 1;
- if (priority >= UNVERIFIED_PRIORITY)
- priority++;
-
+ if ((phase & PHASE_NON_UDP) && (priority == 0) && (queue_len(queues[0]) == 0)) {
+ phase_set(PHASE_NON_UDP);
+ return PRIORITY_SYNC;
+ }
return priority;
}
@@ -160,14 +200,36 @@ static inline void push_query(struct protolayer_iter_ctx *ctx, int priority)
}
}
-/// Pop and return the query with the highest priority, deactivate idle if not needed.
+/// Pop and return the query with the highest priority, UDP or non-UDP based on current phase,
+/// deactivate idle if not needed.
static inline struct protolayer_iter_ctx *pop_query(void)
{
- for (; queue_ix < QUEUES_CNT && queue_len(queues[queue_ix]) == 0; queue_ix++);
- if (queue_ix >= QUEUES_CNT) return NULL;
+ const int waiting_udp = queue_len(queues[PRIORITY_UDP]);
+ const int waiting_non_udp = waiting_requests - waiting_udp;
+
+ enum phase new_phase;
+ if ((phase & PHASE_NON_UDP) && (waiting_non_udp > 0)) {
+ new_phase = PHASE_NON_UDP; // maybe changing from PHASE_ANY
+ } else if ((phase & PHASE_UDP) && (waiting_udp > 0)) {
+ new_phase = PHASE_UDP; // maybe changing from PHASE_ANY
+ } else if (waiting_non_udp > 0) {
+ new_phase = PHASE_NON_UDP; // change from PHASE_UDP, no UDP queries
+ } else {
+ new_phase = PHASE_UDP; // change from PHASE_NON_UDP, no non-UDP queries
+ }
+ phase_set(new_phase);
- struct protolayer_iter_ctx *ctx = queue_head(queues[queue_ix]);
- queue_pop(queues[queue_ix]);
+ int i;
+ if (phase == PHASE_NON_UDP) {
+ for (; queue_ix < QUEUES_CNT && queue_len(queues[queue_ix]) == 0; queue_ix++);
+ if (queue_ix >= PRIORITY_UDP) kr_assert(false);
+ i = queue_ix;
+ } else {
+ i = PRIORITY_UDP;
+ }
+
+ struct protolayer_iter_ctx *ctx = queue_head(queues[i]);
+ queue_pop(queues[i]);
if (--waiting_requests <= 0) {
kr_assert(waiting_requests == 0);
uv_idle_stop(&idle_handle);
@@ -184,6 +246,7 @@ static inline void process_single_deferred(void) {
if (ctx == NULL) return;
defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
+ phase_accounting = true;
struct pl_defer_iter_data *iter_data = protolayer_iter_data_get_current(ctx);
uint64_t age_ns = defer_sample_state.stamp - iter_data->req_stamp;
@@ -234,6 +297,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
if (priority == -1) {
VERBOSE_LOG(" CONTINUE\n");
+ phase_accounting = true;
return protolayer_continue(ctx);
}
@@ -260,10 +324,12 @@ static void defer_queues_idle(uv_idle_t *handle) {
defer_sample_restart();
}
defer_sample_stop(); // TODO skip calling and use just restart elsewhere?
- udp_queue_send_all(); // TODO keep here or call after processing each priority level?
- // (or after UNVERIFIED_PRIORITY but beware future QUIC)
+ udp_queue_send_all();
+
if (waiting_requests > 0) {
VERBOSE_LOG(" %d waiting\n", waiting_requests);
+ } else {
+ phase_set(PHASE_ANY);
}
VERBOSE_LOG("POLL\n");
}