diff options
author | Vladimír Čunát <vladimir.cunat@nic.cz> | 2025-01-14 09:44:28 +0100 |
---|---|---|
committer | Vladimír Čunát <vladimir.cunat@nic.cz> | 2025-01-14 09:44:28 +0100 |
commit | 04f8f717fc93a1235f6d61774887b30eac65a1dd (patch) | |
tree | f23e67cb779ae4f94e2ab7bc41fac2fffe504066 | |
parent | Merge branch 'kresctl-tab-completion' into 'master' (diff) | |
parent | doc/user: defer nits (diff) | |
download | knot-resolver-04f8f717fc93a1235f6d61774887b30eac65a1dd.tar.xz knot-resolver-04f8f717fc93a1235f6d61774887b30eac65a1dd.zip |
Merge !1641: Request prioritization (defer)
-rw-r--r-- | NEWS | 1 | ||||
-rw-r--r-- | daemon/defer.c | 565 | ||||
-rw-r--r-- | daemon/defer.h | 118 | ||||
-rw-r--r-- | daemon/io.c | 28 | ||||
-rw-r--r-- | daemon/lua/kres-gen-33.lua | 2 | ||||
-rw-r--r-- | daemon/main.c | 9 | ||||
-rw-r--r-- | daemon/meson.build | 1 | ||||
-rw-r--r-- | daemon/session2.c | 162 | ||||
-rw-r--r-- | daemon/session2.h | 16 | ||||
-rw-r--r-- | daemon/tls.c | 6 | ||||
-rw-r--r-- | daemon/worker.c | 131 | ||||
-rw-r--r-- | doc/_static/config.schema.json | 21 | ||||
-rw-r--r-- | doc/user/config-defer.rst | 65 | ||||
-rw-r--r-- | doc/user/config-performance.rst | 1 | ||||
-rw-r--r-- | lib/kru.h | 8 | ||||
-rw-r--r-- | lib/kru.inc.c | 28 | ||||
-rw-r--r-- | python/knot_resolver/datamodel/config_schema.py | 4 | ||||
-rw-r--r-- | python/knot_resolver/datamodel/defer_schema.py | 15 | ||||
-rw-r--r-- | python/knot_resolver/datamodel/templates/defer.lua.j2 | 10 | ||||
-rw-r--r-- | python/knot_resolver/datamodel/templates/policy-config.lua.j2 | 6 | ||||
-rw-r--r-- | python/knot_resolver/datamodel/templates/worker-config.lua.j2 | 3 | ||||
-rw-r--r-- | python/knot_resolver/manager/manager.py | 9 |
22 files changed, 906 insertions, 303 deletions
@@ -7,6 +7,7 @@ Improvements - manager: fix startup on Linux without libsystemd (!1608) - auto-reload TLS certificate files (!1626) - kresctl: bash command-line TAB completion (!1622) +- add request prioritization (defer) to mitigate DoS attacks (!1641) Knot Resolver 6.0.9 (2024-11-11) ================================ diff --git a/daemon/defer.c b/daemon/defer.c index 32b0358c..223a2d61 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -2,6 +2,8 @@ * SPDX-License-Identifier: GPL-3.0-or-later */ +#include <math.h> +#include <stdatomic.h> #include "daemon/defer.h" #include "daemon/session2.h" #include "daemon/udp_queue.h" @@ -11,64 +13,63 @@ #define V4_PREFIXES (uint8_t[]) { 18, 20, 24, 32 } #define V4_RATE_MULT (kru_price_t[]) { 768, 256, 32, 1 } +#define V4_SUBPRIO (uint8_t[]) { 0, 1, 3, 7 } #define V6_PREFIXES (uint8_t[]) { 32, 48, 56, 64, 128 } #define V6_RATE_MULT (kru_price_t[]) { 64, 4, 3, 2, 1 } +#define V6_SUBPRIO (uint8_t[]) { 2, 4, 5, 6, 7 } +#define SUBPRIO_CNT 8 #define V4_PREFIXES_CNT (sizeof(V4_PREFIXES) / sizeof(*V4_PREFIXES)) #define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES)) #define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT) -#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} // the last one should be UINT16_MAX -#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) + 1) // +1 for unverified -#define PRIORITY_SYNC (-1) // no queue -#define PRIORITY_UDP (QUEUES_CNT - 1) // last queue - -#define KRU_CAPACITY (1<<19) - // same as ratelimiting default -#define MAX_DECAY (KRU_LIMIT * 0.0006929) - // halving counters in 1s - // 5s from max to 2^11 (priority 3) // TODO change 2^11 to 2^12 to make the times equal? - // 3s from 2^11 to 2^8 (priority 2) - // 4s from 2^8 to 2^4 (priority 1) - // 4s from 2^4 to zero (priority 0) -#define BASE_PRICE(nsec, cpus) ((uint64_t)MAX_DECAY * 10 * nsec / 1000000ll / cpus) - // max value when the single host uses 1/10 of all cpus' time; - // needed cpu utilization (rate limit) for other thresholds and prefixes: - // single v6/48 v4/24 v6/32 v4/20 v4/18 - // max: 10.000 % 40.00 % - - - - - // 2^11: 0.312 % 1.25 % 10.00 % 20.00 % 80.00 % - (priority 3) - // 2^8: 0.039 % 0.16 % 1.25 % 2.50 % 10.00 % 30.00 % (priority 2) - // 2^4: 0.002 % 0.01 % 0.08 % 0.16 % 0.63 % 1.87 % (priority 1) - // instant limit for single host and 1 cpu: (greater for larger networks and for more cpus) - // 35 us for 2^4, 0.56 ms for 2^8, 4.5 ms for 2^11, 144 ms max value - // TODO adjust somehow - // simple DoT query may cost 1 ms, DoH 2.5 ms; it gets priority 2 during handshake (on laptop); - // the instant limits can be doubled by: - // doubling half-life (approx.), - // doubling percents in the previous table, or - // doubling number of cpus - // possible solution: - // half-life 5s, BASE_PRICE /= 2.5 -> for 4 cpus 1.75 ms fits below 2^4; - // still not enough for home routers -> TODO make something configurable, maybe the BASE_PRICE multiplier - -#define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped +struct kru_conf { + uint8_t namespace; + size_t prefixes_cnt; + uint8_t *prefixes; + const kru_price_t *rate_mult; + const uint8_t *subprio; +} const +V4_CONF = {0, V4_PREFIXES_CNT, V4_PREFIXES, V4_RATE_MULT, V4_SUBPRIO}, +V6_CONF = {1, V6_PREFIXES_CNT, V6_PREFIXES, V6_RATE_MULT, V6_SUBPRIO}; + +#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<12, -1} // the last one should be UINT16_MAX +#define QUEUES_CNT ((sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1) * SUBPRIO_CNT + 2) + // priority 0 has no subpriorities, +1 for unverified +#define PRIORITY_UDP (QUEUES_CNT - 1) // last queue + +#define Q0_INSTANT_LIMIT 1000000 // ns +#define KRU_CAPACITY (1<<19) // same as ratelimiting default +#define BASE_PRICE(nsec) ((uint64_t)KRU_LIMIT * LOADS_THRESHOLDS[0] / (1<<16) * (nsec) / Q0_INSTANT_LIMIT) +#define MAX_DECAY (BASE_PRICE(1000000) / 2) // max value at 50% utilization of single cpu + // see log written by defer_str_conf for details + +#define REQ_TIMEOUT 1000000000 // ns (THREAD_CPUTIME), older deferred queries are dropped #define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase #define PHASE_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); switch between udp, non-udp phases #define PHASE_NON_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); after timeout or emptying queue -#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase - // TODO measure memory usage instead +#define MAX_WAITING_REQS_SIZE (64l * 1024 * 1024) // bytes; if exceeded, some deferred requests are processed in poll phase + // single TCP allocates more than 64KiB wire buffer + // TODO check whether all important allocations are counted; + // different things are not counted: tasks and subsessions (not deferred after creation), uv handles, queues overhead, ...; + // payload is counted either as part of session wire buffer (for stream) or as part of iter ctx (for datagrams) + #define VERBOSE_LOG(...) kr_log_debug(DEFER, " | " __VA_ARGS__) struct defer { size_t capacity; kru_price_t max_decay; + uint32_t log_period; int cpus; bool using_avx2; + _Atomic uint32_t log_time; _Alignas(64) uint8_t kru[]; }; struct defer *defer = NULL; +bool defer_initialized = false; +uint64_t defer_uvtime_stamp = 0; struct mmapped defer_mmapped = {0}; defer_sample_state_t defer_sample_state = { @@ -80,43 +81,60 @@ static void defer_queues_idle(uv_idle_t *handle); protolayer_iter_ctx_queue_t queues[QUEUES_CNT]; int waiting_requests = 0; +ptrdiff_t waiting_requests_size = 0; // signed for non-negativeness asserts int queue_ix = QUEUES_CNT; // MIN( last popped queue, first non-empty queue ) enum phase { - PHASE_UDP = 1, - PHASE_NON_UDP = 2, - PHASE_ANY = PHASE_UDP | PHASE_NON_UDP -} phase = PHASE_ANY; -uint64_t phase_elapsed = 0; // ns -bool phase_accounting = false; // add accounted time to phase_elapsed on next call of defer_account - -static inline void phase_set(enum phase p) + PHASE_NONE, + PHASE_UDP, + PHASE_NON_UDP +} phase = PHASE_NONE; +uint64_t phase_elapsed[3] = { 0 }; // ns; [PHASE_NONE] value is being incremented but never used +const uint64_t phase_limits[3] = {0, PHASE_UDP_TIMEOUT, PHASE_NON_UDP_TIMEOUT}; +uint64_t phase_stamp = 0; + +static inline bool phase_over_limit(enum phase p) { - if (phase != p) { - phase_elapsed = 0; - phase = p; - } + return phase_elapsed[p] >= phase_limits[p]; +} + +/// Reset elapsed times of phases and set phase to UDP, NON_UDP, or NONE. +static inline void phase_reset(enum phase p) +{ + phase_elapsed[PHASE_UDP] = 0; + phase_elapsed[PHASE_NON_UDP] = 0; + phase_stamp = defer_sample_state.stamp; + phase = p; } -static inline void phase_account(uint64_t nsec) + +/// Set phase to UDP or NON_UDP if it is not over limit or both are over limit (reset them). +static inline bool phase_try_set(enum phase p) { - kr_assert(phase != PHASE_ANY); - phase_elapsed += nsec; - if ((phase == PHASE_UDP) && (phase_elapsed > PHASE_UDP_TIMEOUT)) { - phase_set(PHASE_NON_UDP); - } else if ((phase == PHASE_NON_UDP) && (phase_elapsed > PHASE_NON_UDP_TIMEOUT)) { - phase_set(PHASE_UDP); + phase_elapsed[phase] += defer_sample_state.stamp - phase_stamp; + phase_stamp = defer_sample_state.stamp; + + if (!phase_over_limit(p)) { + phase = p; + return true; + } else if (phase_over_limit(PHASE_UDP) && phase_over_limit(PHASE_NON_UDP)) { + phase_reset(p); + return true; } + + return false; } struct pl_defer_sess_data { struct protolayer_data h; protolayer_iter_ctx_queue_t queue; // properly ordered sequence of deferred packets, for stream only // the first ctx in the queue is also in a defer queue + size_t size; }; struct pl_defer_iter_data { struct protolayer_data h; uint64_t req_stamp; // time when request was received, uses get_stamp() + size_t size; }; /// Return whether we're using optimized variant right now. @@ -127,92 +145,182 @@ static bool using_avx2(void) return result; } -/// Increment KRU counters by given time. -void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream) +/// Print configuration into desc array. +void defer_str_conf(char *desc, int desc_len) { - if (phase_accounting) { - phase_account(nsec); - phase_accounting = false; + int len = 0; +#define append(...) len += snprintf(desc + len, desc_len > len ? desc_len - len : 0, __VA_ARGS__) +#define append_time(prefix, ms, suffix) { \ + if ((ms) < 1) append(prefix "%7.1f us" suffix, (ms) * 1000); \ + else if ((ms) < 1000) append(prefix "%7.1f ms" suffix, (ms)); \ + else append(prefix "%7.1f s " suffix, (ms) / 1000); } + append( " Expected cpus/procs: %5d\n", defer->cpus); + + const size_t thresholds = sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS); + append( " Max waiting requests:%7.1f MiB\n", MAX_WAITING_REQS_SIZE / 1024.0 / 1024.0); + append_time(" Request timeout: ", REQ_TIMEOUT / 1000000.0, "\n"); + append_time(" Idle: ", IDLE_TIMEOUT / 1000000.0, "\n"); + append_time(" UDP phase: ", PHASE_UDP_TIMEOUT / 1000000.0, "\n"); + append_time(" Non-UDP phase: ", PHASE_NON_UDP_TIMEOUT / 1000000.0, "\n"); + append( " Priority levels: %5ld (%ld main levels, %d sublevels) + UDP\n", QUEUES_CNT - 1, thresholds, SUBPRIO_CNT); + + size_t capacity_log = 0; + for (size_t c = defer->capacity - 1; c > 0; c >>= 1) capacity_log++; + size_t size = offsetof(struct defer, kru) + KRU.get_size(capacity_log); + + append( " KRU capacity: %7.1f k (%0.1f MiB)\n", (1 << capacity_log) / 1000.0, size / 1000000.0); + + bool uniform_thresholds = true; + for (int i = 1; i < thresholds - 1; i++) + uniform_thresholds &= (LOADS_THRESHOLDS[i] == LOADS_THRESHOLDS[i - 1] * LOADS_THRESHOLDS[0]); + uniform_thresholds &= ((1<<16) == (int)LOADS_THRESHOLDS[thresholds - 2] * LOADS_THRESHOLDS[0]); + + append( " Decay: %7.3f %% per ms (32-bit max: %d)\n", + 100.0 * defer->max_decay / KRU_LIMIT, defer->max_decay); + float half_life = -1.0 / log2f(1.0 - (float)defer->max_decay / KRU_LIMIT); + append_time(" Half-life: ", half_life, "\n"); + if (uniform_thresholds) + append_time(" Priority rise in: ", half_life * 16 / thresholds, "\n"); + append_time(" Counter reset in: ", half_life * 16, "\n"); + + append(" Rate limits for crossing priority levels as single CPU utilization:\n"); + + const struct kru_conf *kru_confs[] = {&V4_CONF, &V6_CONF}; + const int version[] = {4, 6}; + const kru_price_t base_price_ms = BASE_PRICE(1000000); + + append("%15s", ""); + for (int j = 0; j < 3; j++) + append("%14d", j+1); + append("%14s\n", "max"); + + for (int v = 0; v < 2; v++) { + for (int i = kru_confs[v]->prefixes_cnt - 1; i >= 0; i--) { + append("%9sv%d/%-3d: ", "", version[v], kru_confs[v]->prefixes[i]); + for (int j = 0; j < thresholds; j++) { + float needed_util = (float)defer->max_decay / (1<<16) * LOADS_THRESHOLDS[j] / base_price_ms * kru_confs[v]->rate_mult[i]; + append("%12.3f %%", needed_util * 100); + } + append("\n"); + } } - if (!stream) return; // UDP is not accounted in KRU + append(" Instant limits for crossing priority levels as CPU time:\n"); + + append("%15s", ""); + for (int j = 0; j < 3; j++) + append("%14d", j+1); + append("%14s\n", "max"); + + for (int v = 0; v < 2; v++) { + for (int i = kru_confs[v]->prefixes_cnt - 1; i >= 0; i--) { + append("%9sv%d/%-3d: ", "", version[v], kru_confs[v]->prefixes[i]); + for (int j = 0; j < thresholds; j++) { + float needed_time = (float)KRU_LIMIT / (1<<16) * LOADS_THRESHOLDS[j] / base_price_ms * kru_confs[v]->rate_mult[i]; + if (needed_time < 1) { + append("%11.1f us", needed_time * 1000); + } else if (needed_time < 1000) { + append("%11.1f ms", needed_time); + } else { + append("%11.1f s ", needed_time / 1000); + } + } + append("\n"); + } + } + append(" (values above max are indistinguishable)\n"); - _Alignas(16) uint8_t key[16] = {0, }; - uint16_t max_load = 0; - uint8_t prefix = 0; - kru_price_t base_price = BASE_PRICE(nsec, defer->cpus); +#undef append_time +#undef append +} - if (addr->ip.sa_family == AF_INET6) { - memcpy(key, &addr->ip6.sin6_addr, 16); - kru_price_t prices[V6_PREFIXES_CNT]; - for (size_t i = 0; i < V6_PREFIXES_CNT; i++) { - prices[i] = base_price / V6_RATE_MULT[i]; +/// Call KRU, return priority and as params load and prefix. +static inline int kru_charge_classify(const struct kru_conf *kru_conf, uint8_t *key, kru_price_t *prices, + uint16_t *out_load, uint8_t *out_prefix) +{ + uint16_t loads[kru_conf->prefixes_cnt]; + KRU.load_multi_prefix((struct kru *)defer->kru, kr_now(), + kru_conf->namespace, key, kru_conf->prefixes, prices, kru_conf->prefixes_cnt, loads); + + int priority = 0; + int prefix_index = kru_conf->prefixes_cnt - 1; + for (int i = kru_conf->prefixes_cnt - 1, j = 0; i >= 0; i--) { + for (; LOADS_THRESHOLDS[j] < loads[i]; j++) { + prefix_index = i; + priority = 1 + j * SUBPRIO_CNT + kru_conf->subprio[i]; } + } + *out_load = loads[prefix_index]; + *out_prefix = kru_conf->prefixes[prefix_index]; + return priority; +} + +/// Increment KRU counters by given time. +void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream) +{ + if (!stream) return; // UDP is not accounted in KRU; TODO remove !stream invocations? - max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), - 1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT, &prefix); + _Alignas(16) uint8_t key[16] = {0, }; + const struct kru_conf *kru_conf; + if (addr->ip.sa_family == AF_INET6) { + memcpy(key, &addr->ip6.sin6_addr, 16); + kru_conf = &V6_CONF; } else if (addr->ip.sa_family == AF_INET) { memcpy(key, &addr->ip4.sin_addr, 4); - - kru_price_t prices[V4_PREFIXES_CNT]; - for (size_t i = 0; i < V4_PREFIXES_CNT; i++) { - prices[i] = base_price / V4_RATE_MULT[i]; - } - - max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), - 0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix); + kru_conf = &V4_CONF; } else { return; } + uint64_t base_price = BASE_PRICE(nsec); + kru_price_t prices[kru_conf->prefixes_cnt]; + for (size_t i = 0; i < kru_conf->prefixes_cnt; i++) { + uint64_t price = base_price / kru_conf->rate_mult[i]; + prices[i] = price > (kru_price_t)-1 ? -1 : price; + } + + uint16_t load; + uint8_t prefix; + kru_charge_classify(kru_conf, key, prices, &load, &prefix); + VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n", - kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix); + kr_straddr(&addr->ip), nsec / 1000000.0, load, prefix); } -/// Determine priority of the request in [-1, QUEUES_CNT - 1]. -/// Lower value has higher priority, -1 should be synchronous. -/// Both UDP and non-UDP may end up with synchronous priority -/// if the phase is active and no requests can be scheduled before them. +/// Determine priority of the request in [0, QUEUES_CNT - 1]; +/// lower value has higher priority; plain UDP always gets PRIORITY_UDP. static inline int classify(const union kr_sockaddr *addr, bool stream) { if (!stream) { // UDP VERBOSE_LOG(" unverified address\n"); - if ((phase & PHASE_UDP) && (queue_len(queues[PRIORITY_UDP]) == 0)) { - phase_set(PHASE_UDP); - return PRIORITY_SYNC; - } return PRIORITY_UDP; } _Alignas(16) uint8_t key[16] = {0, }; - uint16_t max_load = 0; - uint8_t prefix = 0; + const struct kru_conf *kru_conf = NULL; if (addr->ip.sa_family == AF_INET6) { memcpy(key, &addr->ip6.sin6_addr, 16); - max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), - 1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT, &prefix); + kru_conf = &V6_CONF; } else if (addr->ip.sa_family == AF_INET) { memcpy(key, &addr->ip4.sin_addr, 4); - max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), - 0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix); + kru_conf = &V4_CONF; + } else { + kr_assert(false); + return 0; // shouldn't happen anyway } - int priority = 0; - for (; LOADS_THRESHOLDS[priority] < max_load; priority++); + uint16_t load; + uint8_t prefix; + int priority = kru_charge_classify(kru_conf, key, NULL, &load, &prefix); - VERBOSE_LOG(" load %d on /%d\n", max_load, prefix); + VERBOSE_LOG(" load %d on /%d\n", load, prefix); - if ((phase & PHASE_NON_UDP) && (priority == 0) && (queue_len(queues[0]) == 0)) { - phase_set(PHASE_NON_UDP); - return PRIORITY_SYNC; - } return priority; } - -/// Push query to a queue according to its priority and activate idle. +/// Push query to a queue according to its priority. static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, bool to_head_end) { if (to_head_end) { @@ -221,51 +329,36 @@ static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, boo queue_push(queues[priority], ctx); } queue_ix = MIN(queue_ix, priority); - if (waiting_requests++ <= 0) { - kr_assert(waiting_requests == 1); - uv_idle_start(&idle_handle, defer_queues_idle); - VERBOSE_LOG(" activating idle\n"); - } + waiting_requests++; } -/// Pop and return query from the specified queue, deactivate idle if not needed. +/// Pop and return query from the specified queue.. static inline struct protolayer_iter_ctx *pop_query_queue(int priority) { kr_assert(queue_len(queues[priority]) > 0); struct protolayer_iter_ctx *ctx = queue_head(queues[priority]); queue_pop(queues[priority]); - if (--waiting_requests <= 0) { - kr_assert(waiting_requests == 0); - uv_idle_stop(&idle_handle); - VERBOSE_LOG(" deactivating idle\n"); - } + waiting_requests--; + kr_assert(waiting_requests >= 0); return ctx; } -/// Pop and return the query with the highest priority, UDP or non-UDP based on current phase, -/// deactivate idle if not needed. +/// Pop and return the query with the highest priority, UDP or non-UDP based on the current phase. static inline struct protolayer_iter_ctx *pop_query(void) { const int waiting_udp = queue_len(queues[PRIORITY_UDP]); const int waiting_non_udp = waiting_requests - waiting_udp; - enum phase new_phase; - if ((phase & PHASE_NON_UDP) && (waiting_non_udp > 0)) { - new_phase = PHASE_NON_UDP; // maybe changing from PHASE_ANY - } else if ((phase & PHASE_UDP) && (waiting_udp > 0)) { - new_phase = PHASE_UDP; // maybe changing from PHASE_ANY - } else if (waiting_non_udp > 0) { - new_phase = PHASE_NON_UDP; // change from PHASE_UDP, no UDP queries - } else { - new_phase = PHASE_UDP; // change from PHASE_NON_UDP, no non-UDP queries - } - phase_set(new_phase); + if (!((waiting_non_udp > 0) && phase_try_set(PHASE_NON_UDP)) && + !((waiting_udp > 0) && phase_try_set(PHASE_UDP))) + phase_reset(waiting_non_udp > 0 ? PHASE_NON_UDP : PHASE_UDP); int i; if (phase == PHASE_NON_UDP) { for (; queue_ix < QUEUES_CNT && queue_len(queues[queue_ix]) == 0; queue_ix++); - if (queue_ix >= PRIORITY_UDP) kr_assert(false); + if (kr_fails_assert(queue_ix < PRIORITY_UDP)) + return NULL; i = queue_ix; } else { i = PRIORITY_UDP; @@ -279,18 +372,31 @@ static inline struct protolayer_iter_ctx *pop_query(void) static inline void break_query(struct protolayer_iter_ctx *ctx, int err) { if (ctx->session->stream) { + struct session2 *s = ctx->session; struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx); + s->ref_count++; // keep session and sdata alive for a while + waiting_requests_size -= sdata->size; if (!ctx->session->closing) { - session2_force_close(ctx->session); // session is not freed here as iter contexts exist + session2_force_close(ctx->session); } - queue_pop(sdata->queue); - while (queue_len(sdata->queue) > 0) { - protolayer_break(ctx, kr_error(err)); // session is not freed here as other contexts exist - ctx = queue_head(sdata->queue); + kr_assert(ctx == queue_head(sdata->queue)); + while (true) { queue_pop(sdata->queue); + if (ctx) { + struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx); + waiting_requests_size -= idata->size; + protolayer_break(ctx, kr_error(err)); + } + if (queue_len(sdata->queue) == 0) break; + ctx = queue_head(sdata->queue); } + session2_unhandle(s); // decrease ref_count + } else { + struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx); + waiting_requests_size -= idata->size; + protolayer_break(ctx, kr_error(err)); } - protolayer_break(ctx, kr_error(err)); + kr_assert(waiting_requests ? waiting_requests_size > 0 : waiting_requests_size == 0); } /// Process a single deferred query (or defer again) if there is any. @@ -298,17 +404,17 @@ static inline void break_query(struct protolayer_iter_ctx *ctx, int err) static inline void process_single_deferred(void) { struct protolayer_iter_ctx *ctx = pop_query(); - if (ctx == NULL) return; + if (kr_fails_assert(ctx)) return; - defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); - phase_accounting = true; + defer_sample_addr((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream); struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx); struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx); + struct session2 *session = ctx->session; uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp; VERBOSE_LOG(" %s POP from %d after %4.3f ms\n", - kr_straddr(ctx->comm->comm_addr), + kr_straddr(ctx->comm->src_addr), queue_ix, age_ns / 1000000.0); @@ -320,28 +426,80 @@ static inline void process_single_deferred(void) if (age_ns >= REQ_TIMEOUT) { VERBOSE_LOG(" BREAK (timeout)\n"); - break_query(ctx, ETIME); - return; - } - int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); - if (priority > queue_ix) { // priority dropped (got higher value) - VERBOSE_LOG(" PUSH to %d\n", priority); - push_query(ctx, priority, false); + // notice logging according to log-period + const uint32_t time_now = kr_now(); + uint32_t log_time_orig = atomic_load_explicit(&defer->log_time, memory_order_relaxed); + if (defer->log_period) { + while (time_now - log_time_orig + 1024 >= defer->log_period + 1024) { + if (atomic_compare_exchange_weak_explicit(&defer->log_time, &log_time_orig, time_now, + memory_order_relaxed, memory_order_relaxed)) { + kr_log_notice(DEFER, "Data from %s too long in queue, dropping. (%0.3f MiB in queues)\n", + kr_straddr(ctx->comm->src_addr), waiting_requests_size / 1024.0 / 1024.0); + break; + } + } + } + + break_query(ctx, ETIME); return; } + bool eof = false; if (ctx->session->stream) { + int priority = classify((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream); + if (priority > queue_ix) { // priority dropped (got higher value) + VERBOSE_LOG(" PUSH to %d\n", priority); + push_query(ctx, priority, false); + return; + } + kr_assert(queue_head(sdata->queue) == ctx); queue_pop(sdata->queue); + while ((queue_len(sdata->queue) > 0) && (queue_head(sdata->queue) == NULL)) { // EOF event + eof = true; + queue_pop(sdata->queue); + } if (queue_len(sdata->queue) > 0) { VERBOSE_LOG(" PUSH follow-up to head of %d\n", priority); push_query(queue_head(sdata->queue), priority, true); + } else { + waiting_requests_size -= sdata->size; } } + waiting_requests_size -= idata->size; + kr_assert(waiting_requests ? waiting_requests_size > 0 : waiting_requests_size == 0); + + if (eof) { + // Keep session alive even if it is somehow force-closed during continuation. + // TODO Is it possible? + session->ref_count++; + } + VERBOSE_LOG(" CONTINUE\n"); protolayer_continue(ctx); + + if (eof) { + VERBOSE_LOG(" CONTINUE EOF event\n"); + session2_event_after(session, PROTOLAYER_TYPE_DEFER, PROTOLAYER_EVENT_EOF, NULL); + session2_unhandle(session); // decrease ref_count + } +} + +/// Process as many deferred requests as needed to get memory consumption under limit. +static inline void process_deferred_over_size_limit(void) { + if (waiting_requests_size > MAX_WAITING_REQS_SIZE) { + defer_sample_state_t prev_sample_state; + defer_sample_start(&prev_sample_state); + do { + process_single_deferred(); // possibly defers again without decreasing waiting_requests_size + // If the unwrapped query is to be processed here, + // it is the last iteration and the query is processed after returning. + defer_sample_restart(); + } while (waiting_requests_size > MAX_WAITING_REQS_SIZE); + defer_sample_stop(&prev_sample_state, true); + } } /// Break expired requests at the beginning of queues, uses current stamp. @@ -370,76 +528,115 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx) { - if (!defer) + if (!defer || ctx->session->outgoing) return protolayer_continue(ctx); - if (ctx->session->outgoing) - return protolayer_continue(ctx); - - defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); - struct pl_defer_iter_data *data = iter_data; + defer_sample_addr((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream); + struct pl_defer_iter_data *idata = iter_data; struct pl_defer_sess_data *sdata = sess_data; - data->req_stamp = defer_sample_state.stamp; + idata->req_stamp = defer_sample_state.stamp; VERBOSE_LOG(" %s UNWRAP\n", - kr_straddr(ctx->comm->comm_addr)); + kr_straddr(ctx->comm->src_addr)); + + uv_idle_start(&idle_handle, defer_queues_idle); if (queue_len(sdata->queue) > 0) { // stream with preceding packet already deferred queue_push(sdata->queue, ctx); + waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, false); + // payload counted in session wire buffer VERBOSE_LOG(" PUSH as follow-up\n"); + process_deferred_over_size_limit(); return protolayer_async(); } - int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); + int priority = classify((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream); - if (priority == -1) { + // Process synchronously unless there may exist requests that has to be processed first + if (((priority == 0) || (priority == PRIORITY_UDP)) && (queue_len(queues[priority]) == 0) && + phase_try_set(priority == PRIORITY_UDP ? PHASE_UDP : PHASE_NON_UDP)) { VERBOSE_LOG(" CONTINUE\n"); - phase_accounting = true; return protolayer_continue(ctx); } VERBOSE_LOG(" PUSH to %d\n", priority); if (ctx->session->stream) { queue_push(sdata->queue, ctx); + waiting_requests_size += sdata->size = protolayer_sess_size_est(ctx->session); } push_query(ctx, priority, false); - while (waiting_requests > MAX_WAITING_REQS) { // TODO follow-up stream packets are not counted here - defer_sample_restart(); - process_single_deferred(); // possibly defers again without decreasing waiting_requests - // defer_sample_stop should be called soon outside - } + waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, !ctx->session->stream); + // for stream, payload is counted in session wire buffer + process_deferred_over_size_limit(); return protolayer_async(); } +/// Unwrap event: EOF event may be deferred here, other events pass synchronously. +static enum protolayer_event_cb_result pl_defer_event_unwrap( + enum protolayer_event_type event, void **baton, + struct session2 *session, void *sess_data) +{ + if (!defer || !session->stream || session->outgoing) + return PROTOLAYER_EVENT_PROPAGATE; + + defer_sample_addr((const union kr_sockaddr *)session->comm_storage.src_addr, session->stream); + + struct pl_defer_sess_data *sdata = sess_data; + if ((event == PROTOLAYER_EVENT_EOF) && (queue_len(sdata->queue) > 0)) { + // defer EOF event if unprocessed data remain, baton is dropped if any + queue_push(sdata->queue, NULL); + VERBOSE_LOG(" %s event %s deferred\n", + session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)", + protolayer_event_name(event)); + return PROTOLAYER_EVENT_CONSUME; + } + + VERBOSE_LOG(" %s event %s passes through synchronously%s%s\n", + session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)", + protolayer_event_name(event), + queue_len(sdata->queue) > 0 ? " ahead of deferred data" : "", + *baton ? " (with baton)" : ""); + return PROTOLAYER_EVENT_PROPAGATE; +} + /// Idle: continue processing deferred requests. static void defer_queues_idle(uv_idle_t *handle) { - kr_assert(waiting_requests > 0); VERBOSE_LOG("IDLE\n"); - VERBOSE_LOG(" %d waiting\n", waiting_requests); - defer_sample_start(); - uint64_t idle_stamp = defer_sample_state.stamp; - while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)) { - process_single_deferred(); - defer_sample_restart(); + if (waiting_requests > 0) { + VERBOSE_LOG(" %d waiting\n", waiting_requests); + defer_sample_start(NULL); + uint64_t idle_stamp = defer_sample_state.stamp; + do { + process_single_deferred(); + defer_sample_restart(); + } while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)); + defer_sample_stop(NULL, true); + cleanup_queues(); + udp_queue_send_all(); } - cleanup_queues(); - defer_sample_stop(); // TODO skip calling and use just restart elsewhere? - udp_queue_send_all(); if (waiting_requests > 0) { VERBOSE_LOG(" %d waiting\n", waiting_requests); } else { - phase_set(PHASE_ANY); + phase_reset(PHASE_NONE); + VERBOSE_LOG(" deactivate idle\n"); + uv_idle_stop(&idle_handle); } VERBOSE_LOG("POLL\n"); } /// Initialize shared memory, queues. To be called from Lua. -int defer_init(const char *mmap_file, int cpus) +int defer_init(const char *mmap_file, uint32_t log_period, int cpus) // TODO possibly remove cpus; not needed { + defer_initialized = true; + if (mmap_file == NULL) { + // defer explicitly disabled + return 0; + } + int ret = 0; if (cpus < 1) { ret = EINVAL; @@ -449,6 +646,7 @@ int defer_init(const char *mmap_file, int cpus) struct defer header = { .capacity = KRU_CAPACITY, .max_decay = MAX_DECAY, + .log_period = log_period, .cpus = cpus, .using_avx2 = using_avx2(), }; @@ -462,12 +660,13 @@ int defer_init(const char *mmap_file, int cpus) offsetof(struct defer, using_avx2) == sizeof(header.capacity) + sizeof(header.max_decay) + + sizeof(header.log_period) + sizeof(header.cpus), "detected padding with undefined data inside mmapped header"); ret = mmapped_init(&defer_mmapped, mmap_file, size, &header, header_size); if (ret == MMAPPED_WAS_FIRST) { - kr_log_info(SYSTEM, "Initializing prioritization...\n"); + kr_log_info(DEFER, "Initializing defer...\n"); defer = defer_mmapped.mem; @@ -478,13 +677,22 @@ int defer_init(const char *mmap_file, int cpus) goto fail; } + defer->log_time = kr_now() - log_period; + ret = mmapped_init_continue(&defer_mmapped); if (ret != 0) goto fail; - kr_log_info(SYSTEM, "Prioritization initialized (%s).\n", (defer->using_avx2 ? "AVX2" : "generic")); + kr_log_info(DEFER, "Defer initialized (%s).\n", (defer->using_avx2 ? "AVX2" : "generic")); + + // log current configuration + if (KR_LOG_LEVEL_IS(LOG_INFO) || kr_log_group_is_set(LOG_GRP_DEFER)) { + char desc[8000]; + defer_str_conf(desc, sizeof(desc)); + kr_log_info(DEFER, "Defer configuration:\n%s", desc); + } } else if (ret == 0) { defer = defer_mmapped.mem; - kr_log_info(SYSTEM, "Using existing prioritization data (%s).\n", (defer->using_avx2 ? "AVX2" : "generic")); + kr_log_info(DEFER, "Using existing defer data (%s).\n", (defer->using_avx2 ? "AVX2" : "generic")); } else goto fail; for (size_t i = 0; i < QUEUES_CNT; i++) @@ -494,7 +702,7 @@ int defer_init(const char *mmap_file, int cpus) fail: - kr_log_crit(SYSTEM, "Initialization of shared prioritization data failed.\n"); + kr_log_crit(DEFER, "Initialization of shared defer data failed.\n"); return ret; } @@ -528,5 +736,6 @@ static void defer_protolayers_init(void) .sess_size = sizeof(struct pl_defer_sess_data), .sess_init = pl_defer_sess_init, .unwrap = pl_defer_unwrap, + .event_unwrap = pl_defer_event_unwrap, }; } diff --git a/daemon/defer.h b/daemon/defer.h index db1b083b..18adf91b 100644 --- a/daemon/defer.h +++ b/daemon/defer.h @@ -8,9 +8,8 @@ #include "lib/kru.h" /// Initialize defer, incl. shared memory with KRU, excl. idle. -/// To be called from Lua; defer is disabled by default otherwise. KR_EXPORT -int defer_init(const char *mmap_file, int cpus); +int defer_init(const char *mmap_file, uint32_t log_period, int cpus); /// Initialize idle. int defer_init_idle(uv_loop_t *loop); @@ -19,37 +18,33 @@ int defer_init_idle(uv_loop_t *loop); void defer_deinit(void); /// Increment KRU counters by the given time. -void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream); +void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream); typedef struct { - int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1 - union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet + bool is_accounting; /// whether currently accounting the time to someone bool stream; + union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet uint64_t stamp; /// monotonic nanoseconds, probably won't wrap } defer_sample_state_t; extern defer_sample_state_t defer_sample_state; extern struct defer *defer; /// skip sampling/deferring if NULL - +extern bool defer_initialized; /// defer_init was called, possibly keeping defer disabled +extern uint64_t defer_uvtime_stamp; /// stamp of the last uv time update // TODO: reconsider `static inline` cases below #include <time.h> -static inline uint64_t get_stamp(void) +static inline uint64_t defer_get_stamp(void) { struct timespec now_ts = {0}; clock_gettime(CLOCK_THREAD_CPUTIME_ID, &now_ts); - return now_ts.tv_nsec + 1000*1000*1000 * (uint64_t)now_ts.tv_sec; -} - -/// Start accounting work, if not doing it already. -static inline void defer_sample_start(void) -{ - if (!defer) return; - kr_assert(!defer_sample_state.is_accounting); - ++defer_sample_state.is_accounting; - defer_sample_state.stamp = get_stamp(); - defer_sample_state.addr.ip.sa_family = AF_UNSPEC; + uint64_t stamp = now_ts.tv_nsec + 1000*1000*1000 * (uint64_t)now_ts.tv_sec; + if (defer_uvtime_stamp + 1000*1000 < stamp) { + defer_uvtime_stamp = stamp; + uv_update_time(uv_default_loop()); + } + return stamp; } /// Annotate the work currently being accounted by an IP address. @@ -60,8 +55,14 @@ static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream) if (defer_sample_state.addr.ip.sa_family != AF_UNSPEC) { // TODO: this costs performance, so only in some debug mode? - kr_assert(kr_sockaddr_cmp(&addr->ip, &defer_sample_state.addr.ip) == kr_ok()); - return; + if (kr_sockaddr_cmp(&addr->ip, &defer_sample_state.addr.ip) != kr_ok()) { + char defer_addr[KR_STRADDR_MAXLEN + 1] = { 0 }; + strncpy(defer_addr, kr_straddr(&defer_sample_state.addr.ip), sizeof(defer_addr) - 1); + kr_log_warning(DEFER, "Sampling address mismatch: %s != %s\n", + kr_straddr(&addr->ip), + defer_addr); + return; + } } switch (addr->ip.sa_family) { @@ -78,38 +79,79 @@ static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream) defer_sample_state.stream = stream; } -/// Stop accounting work - and change the source if applicable. -static inline void defer_sample_stop(void) +/// Internal; start accounting work at specified timestamp. +static inline void defer_sample_start_stamp(uint64_t stamp) { if (!defer) return; + kr_assert(!defer_sample_state.is_accounting); + defer_sample_state.is_accounting = true; + defer_sample_state.stamp = stamp; + defer_sample_state.addr.ip.sa_family = AF_UNSPEC; +} - if (kr_fails_assert(defer_sample_state.is_accounting > 0)) return; // weird - if (--defer_sample_state.is_accounting) return; - if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return; +/// Internal; stop accounting work at specified timestamp and charge the source if applicable. +static inline void defer_sample_stop_stamp(uint64_t stamp) +{ + if (!defer) return; + kr_assert(defer_sample_state.is_accounting); + defer_sample_state.is_accounting = false; - const uint64_t elapsed = get_stamp() - defer_sample_state.stamp; + if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return; - // we accounted something + const uint64_t elapsed = stamp - defer_sample_state.stamp; + if (elapsed == 0) return; // TODO: some queries of internal origin have suspicioiusly high numbers. // We won't be really accounting those, but it might suggest some other issue. - defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream); + defer_charge(elapsed, &defer_sample_state.addr, defer_sample_state.stream); } -/// Stop accounting if active, then start again. Uses just one stamp. -static inline void defer_sample_restart(void) +static inline bool defer_sample_is_accounting(void) { - if (!defer) return; + return defer_sample_state.is_accounting; +} - uint64_t stamp = get_stamp(); +/// Start accounting work; optionally save state of current accounting. +/// Current state can be saved only after having an address assigned. +static inline void defer_sample_start(defer_sample_state_t *prev_state_out) { + if (!defer) return; + uint64_t stamp = defer_get_stamp(); - if (defer_sample_state.is_accounting > 0) { - const uint64_t elapsed = stamp - defer_sample_state.stamp; - defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream); + // suspend + if (prev_state_out) { + *prev_state_out = defer_sample_state; // TODO stamp is not needed + if (defer_sample_state.is_accounting) + defer_sample_stop_stamp(stamp); } - defer_sample_state.stamp = stamp; - defer_sample_state.addr.ip.sa_family = AF_UNSPEC; - defer_sample_state.is_accounting = 1; + // start + defer_sample_start_stamp(stamp); +} + +/// Stop accounting and start it again. +static inline void defer_sample_restart(void) { + if (!defer) return; + uint64_t stamp = defer_get_stamp(); + + // stop + defer_sample_stop_stamp(stamp); + + // start + defer_sample_start_stamp(stamp); +} + +/// Stop accounting and charge the source if applicable; optionally resume previous accounting. +static inline void defer_sample_stop(defer_sample_state_t *prev_state, bool reuse_last_stamp) { + if (!defer) return; + uint64_t stamp = reuse_last_stamp ? defer_sample_state.stamp : defer_get_stamp(); + + // stop + defer_sample_stop_stamp(stamp); + + // resume + if (prev_state) { + defer_sample_state = *prev_state; + defer_sample_state.stamp = stamp; + } } diff --git a/daemon/io.c b/daemon/io.c index 36648907..8093a4b0 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -324,6 +324,32 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) return; } + if (nread == UV_ENOBUFS) { + /* No space available in session buffer. + * The connection may be just waiting in defer. + * Ignore the error and keep the data in system queue for later reading or timeout. */ + if (kr_log_is_debug(IO, NULL)) { + struct sockaddr *peer = session2_get_peer(s); + char *peer_str = kr_straddr(peer); + kr_log_debug(IO, "=> incoming data from '%s' waiting (%s)\n", + peer_str ? peer_str : "", + uv_strerror(nread)); + } + return; + } + + // allow deferring EOF for incoming connections to send answer even if half-closed + if (!s->outgoing && (nread == UV_EOF)) { + if (kr_log_is_debug(IO, NULL)) { + struct sockaddr *peer = session2_get_peer(s); + char *peer_str = kr_straddr(peer); + kr_log_debug(IO, "=> connection to '%s' half-closed by peer (EOF)\n", + peer_str ? peer_str : ""); + } + session2_event(s, PROTOLAYER_EVENT_EOF, NULL); + return; + } + if (nread < 0 || !buf->base) { if (kr_log_is_debug(IO, NULL)) { struct sockaddr *peer = session2_get_peer(s); @@ -353,7 +379,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) static void tcp_accept_internal(uv_stream_t *master, int status, enum kr_proto grp) { - if (status != 0) { + if (status != 0) { return; } diff --git a/daemon/lua/kres-gen-33.lua b/daemon/lua/kres-gen-33.lua index 4312d218..3fc16df3 100644 --- a/daemon/lua/kres-gen-33.lua +++ b/daemon/lua/kres-gen-33.lua @@ -617,7 +617,7 @@ struct qr_task *worker_resolve_start(knot_pkt_t *, struct kr_qflags); int zi_zone_import(const zi_config_t); _Bool ratelimiting_request_begin(struct kr_request *); int ratelimiting_init(const char *, size_t, uint32_t, uint32_t, uint16_t, uint32_t, _Bool); -int defer_init(const char *, int); +int defer_init(const char *, uint32_t, int); struct engine { char _stub[]; }; diff --git a/daemon/main.c b/daemon/main.c index a808ba20..a7b9c92b 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -616,6 +616,15 @@ int main(int argc, char **argv) lua_settop(the_engine->L, 0); } + if (!defer_initialized) { + kr_log_warning(SYSTEM, "Prioritization not initialized from Lua, using hardcoded default.\n"); + ret = defer_init("defer", 1, 1); + if (ret) { + ret = EXIT_FAILURE; + goto cleanup; + } + } + if (defer_init_idle(loop) != 0) { ret = EXIT_FAILURE; goto cleanup; diff --git a/daemon/meson.build b/daemon/meson.build index 62ac983b..9fde08f8 100644 --- a/daemon/meson.build +++ b/daemon/meson.build @@ -62,6 +62,7 @@ kresd_deps = [ capng, nghttp2, malloc, + libm ] diff --git a/daemon/session2.c b/daemon/session2.c index 40328e13..faca57bf 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -307,6 +307,18 @@ bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue) return false; } +static inline ssize_t session2_get_protocol( + struct session2 *s, enum protolayer_type protocol) +{ + const struct protolayer_grp *grp = &protolayer_grps[s->proto]; + for (ssize_t i = 0; i < grp->num_layers; i++) { + enum protolayer_type found = grp->layers[i]; + if (protocol == found) + return i; + } + + return -1; +} /** Gets layer-specific session data for the layer with the specified index * from the manager. */ @@ -333,6 +345,14 @@ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx) return protolayer_sess_data_get(ctx->session, ctx->layer_ix); } +void *protolayer_sess_data_get_proto(struct session2 *s, enum protolayer_type protocol) { + ssize_t layer_ix = session2_get_protocol(s, protocol); + if (layer_ix < 0) + return NULL; + + return protolayer_sess_data_get(s, layer_ix); +} + /** Gets layer-specific iteration data for the layer with the specified index * from the context. */ static inline struct protolayer_data *protolayer_iter_data_get( @@ -358,17 +378,17 @@ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx) return protolayer_iter_data_get(ctx, ctx->layer_ix); } -static inline ssize_t session2_get_protocol( - struct session2 *s, enum protolayer_type protocol) +size_t protolayer_sess_size_est(struct session2 *s) { - const struct protolayer_grp *grp = &protolayer_grps[s->proto]; - for (ssize_t i = 0; i < grp->num_layers; i++) { - enum protolayer_type found = grp->layers[i]; - if (protocol == found) - return i; - } + return s->session_size + s->wire_buf.size; +} - return -1; +size_t protolayer_iter_size_est(struct protolayer_iter_ctx *ctx, bool incl_payload) +{ + size_t size = ctx->session->iter_ctx_size; + if (incl_payload) + size += protolayer_payload_size(&ctx->payload); + return size; } static inline bool protolayer_iter_ctx_is_last(struct protolayer_iter_ctx *ctx) @@ -596,7 +616,7 @@ static int session2_submit( { if (session->closing) return kr_error(ECANCELED); - if (session->ref_count >= INT_MAX) + if (session->ref_count >= INT_MAX - 1) return kr_error(ETOOMANYREFS); if (kr_fails_assert(session->proto < KR_PROTO_COUNT)) return kr_error(EFAULT); @@ -610,7 +630,7 @@ static int session2_submit( // Note two cases: incoming session (new request) // vs. outgoing session (resuming work on some request) if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0)) - defer_sample_start(); + defer_sample_start(NULL); struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size); kr_require(ctx); @@ -672,7 +692,7 @@ static int session2_submit( int ret = protolayer_step(ctx); if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0)) - defer_sample_stop(); + defer_sample_stop(NULL, false); return ret; } @@ -845,6 +865,7 @@ struct session2 *session2_new(enum session2_transport_type transport_type, .proto = proto, .iter_ctx_size = iter_ctx_size, + .session_size = session_size, }; memcpy(&s->layer_data, offsets, sizeof(offsets)); @@ -959,10 +980,8 @@ uv_handle_t *session2_get_handle(struct session2 *s) static void session2_on_timeout(uv_timer_t *timer) { - defer_sample_start(); struct session2 *s = timer->data; session2_event(s, s->timer_event, NULL); - defer_sample_stop(); } int session2_timer_start(struct session2 *s, enum protolayer_event_type event, uint64_t timeout, uint64_t repeat) @@ -1084,11 +1103,17 @@ struct qr_task *session2_tasklist_del_msgid(const struct session2 *session, uint void session2_tasklist_finalize(struct session2 *session, int status) { - while (session2_tasklist_get_len(session) > 0) { - struct qr_task *t = session2_tasklist_del_first(session, false); - kr_require(worker_task_numrefs(t) > 0); - worker_task_finalize(t, status); - worker_task_unref(t); + if (session2_tasklist_get_len(session) > 0) { + defer_sample_state_t defer_prev_sample_state; + defer_sample_start(&defer_prev_sample_state); + do { + struct qr_task *t = session2_tasklist_del_first(session, false); + kr_require(worker_task_numrefs(t) > 0); + worker_task_finalize(t, status); + worker_task_unref(t); + defer_sample_restart(); + } while (session2_tasklist_get_len(session) > 0); + defer_sample_stop(&defer_prev_sample_state, true); } } @@ -1121,27 +1146,34 @@ int session2_tasklist_finalize_expired(struct session2 *session) key = (char *)&msg_id; keylen = sizeof(msg_id); } - while (queue_len(q) > 0) { - task = queue_head(q); - if (session->outgoing) { - knot_pkt_t *pktbuf = worker_task_get_pktbuf(task); - msg_id = knot_wire_get_id(pktbuf->wire); - } - int res = trie_del(t, key, keylen, NULL); - if (!worker_task_finished(task)) { - /* task->pending_count must be zero, - * but there are can be followers, - * so run worker_task_subreq_finalize() to ensure retrying - * for all the followers. */ - worker_task_subreq_finalize(task); - worker_task_finalize(task, KR_STATE_FAIL); - } - if (res == KNOT_EOK) { + + if (queue_len(q) > 0) { + defer_sample_state_t defer_prev_sample_state; + defer_sample_start(&defer_prev_sample_state); + do { + task = queue_head(q); + if (session->outgoing) { + knot_pkt_t *pktbuf = worker_task_get_pktbuf(task); + msg_id = knot_wire_get_id(pktbuf->wire); + } + int res = trie_del(t, key, keylen, NULL); + if (!worker_task_finished(task)) { + /* task->pending_count must be zero, + * but there are can be followers, + * so run worker_task_subreq_finalize() to ensure retrying + * for all the followers. */ + worker_task_subreq_finalize(task); + worker_task_finalize(task, KR_STATE_FAIL); + } + if (res == KNOT_EOK) { + worker_task_unref(task); + } + queue_pop(q); worker_task_unref(task); - } - queue_pop(q); - worker_task_unref(task); - ++ret; + ++ret; + defer_sample_restart(); + } while (queue_len(q) > 0); + defer_sample_stop(&defer_prev_sample_state, true); } queue_deinit(q); @@ -1172,22 +1204,34 @@ struct qr_task *session2_waitinglist_pop(struct session2 *session, bool deref) void session2_waitinglist_retry(struct session2 *session, bool increase_timeout_cnt) { - while (!session2_waitinglist_is_empty(session)) { - struct qr_task *task = session2_waitinglist_pop(session, false); - if (increase_timeout_cnt) { - worker_task_timeout_inc(task); - } - worker_task_step(task, session2_get_peer(session), NULL); - worker_task_unref(task); + if (!session2_waitinglist_is_empty(session)) { + defer_sample_state_t defer_prev_sample_state; + defer_sample_start(&defer_prev_sample_state); + do { + struct qr_task *task = session2_waitinglist_pop(session, false); + if (increase_timeout_cnt) { + worker_task_timeout_inc(task); + } + worker_task_step(task, session2_get_peer(session), NULL); + worker_task_unref(task); + defer_sample_restart(); + } while (!session2_waitinglist_is_empty(session)); + defer_sample_stop(&defer_prev_sample_state, true); } } void session2_waitinglist_finalize(struct session2 *session, int status) { - while (!session2_waitinglist_is_empty(session)) { - struct qr_task *t = session2_waitinglist_pop(session, false); - worker_task_finalize(t, status); - worker_task_unref(t); + if (!session2_waitinglist_is_empty(session)) { + defer_sample_state_t defer_prev_sample_state; + defer_sample_start(&defer_prev_sample_state); + do { + struct qr_task *t = session2_waitinglist_pop(session, false); + worker_task_finalize(t, status); + worker_task_unref(t); + defer_sample_restart(); + } while (!session2_waitinglist_is_empty(session)); + defer_sample_stop(&defer_prev_sample_state, true); } } @@ -1301,7 +1345,19 @@ static void session2_event_unwrap(struct session2 *s, ssize_t start_ix, enum pro void session2_event(struct session2 *s, enum protolayer_event_type event, void *baton) { + /* Events may be sent from inside or outside of already measured code. + * From inside: close by us, statistics, ... + * From outside: timeout, EOF, close by external reasons, ... */ + bool defer_accounting_here = false; + if (!defer_sample_is_accounting() && s->stream && !s->outgoing) { + defer_sample_start(NULL); + defer_accounting_here = true; + } + session2_event_unwrap(s, 0, event, baton); + + if (defer_accounting_here) + defer_sample_stop(NULL, false); } void session2_event_after(struct session2 *s, enum protolayer_type protocol, @@ -1684,6 +1740,12 @@ static int session2_transport_event(struct session2 *s, if (s->closing) return kr_ok(); + if (event == PROTOLAYER_EVENT_EOF) { + // no layer wanted to retain TCP half-closed state + session2_force_close(s); + return kr_ok(); + } + bool is_close_event = (event == PROTOLAYER_EVENT_CLOSE || event == PROTOLAYER_EVENT_FORCE_CLOSE); if (is_close_event) { diff --git a/daemon/session2.h b/daemon/session2.h index 957df6d9..5228ad86 100644 --- a/daemon/session2.h +++ b/daemon/session2.h @@ -334,6 +334,8 @@ typedef void (*protolayer_finished_cb)(int status, struct session2 *session, XX(MALFORMED) \ /** Signal that a connection has ended. */\ XX(DISCONNECT) \ + /** Signal EOF from peer (e.g. half-closed TCP connection). */\ + XX(EOF) \ /** Failed task send - update stats. */\ XX(STATS_SEND_ERR) \ /** Outgoing query submission - update stats. */\ @@ -535,6 +537,10 @@ size_t protolayer_queue_count_payload(const protolayer_iter_ctx_queue_t *queue); * queue iterators, as it does not need to iterate through the whole queue. */ bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue); +/** Gets layer-specific session data for the specified protocol layer. + * Returns NULL if the layer is not present in the session. */ +void *protolayer_sess_data_get_proto(struct session2 *s, enum protolayer_type protocol); + /** Gets layer-specific session data for the last processed layer. * To be used after returning from its callback for async continuation but before calling protolayer_continue. */ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx); @@ -543,6 +549,13 @@ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx); * To be used after returning from its callback for async continuation but before calling protolayer_continue. */ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx); +/** Gets rough memory footprint estimate of session/iteration for use in defer. + * Different, hopefully minor, allocations are not counted here; + * tasks and subsessions are also not counted; + * read the code before using elsewhere. */ +size_t protolayer_sess_size_est(struct session2 *s); +size_t protolayer_iter_size_est(struct protolayer_iter_ctx *ctx, bool incl_payload); + /** Layer-specific data - the generic struct. To be added as the first member of * each specific struct. */ struct protolayer_data { @@ -868,6 +881,9 @@ struct session2 { * (`struct protolayer_iter_ctx`), including layer-specific data. */ size_t iter_ctx_size; + /** The size of this session struct. */ + size_t session_size; + /** The following flexible array has basically this structure: * * struct { diff --git a/daemon/tls.c b/daemon/tls.c index 231bff2d..6b3436f7 100644 --- a/daemon/tls.c +++ b/daemon/tls.c @@ -1325,6 +1325,12 @@ static enum protolayer_event_cb_result pl_tls_event_unwrap( return PROTOLAYER_EVENT_PROPAGATE; } + if (event == PROTOLAYER_EVENT_EOF) { + // TCP half-closed state not allowed + session2_force_close(s); + return PROTOLAYER_EVENT_CONSUME; + } + if (tls->client_side) { if (event == PROTOLAYER_EVENT_CONNECT) return pl_tls_client_connect_start(tls, s); diff --git a/daemon/worker.c b/daemon/worker.c index d517dd6c..ece3fd95 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -103,6 +103,14 @@ struct qr_task qr_task_free((task)); \ } while (0) +struct pl_dns_stream_sess_data { + struct protolayer_data h; + bool single : 1; /**< True: Stream only allows a single packet */ + bool produced : 1; /**< True: At least one packet has been produced */ + bool connected : 1; /**< True: The stream is connected */ + bool half_closed : 1; /**< True: EOF was received, the stream is half-closed */ +}; + /* Forward decls */ static void qr_task_free(struct qr_task *task); static int qr_task_step(struct qr_task *task, @@ -122,7 +130,6 @@ static struct session2* worker_find_tcp_waiting(const struct sockaddr* addr); static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt); - struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */ struct worker_ctx *the_worker = NULL; @@ -696,10 +703,18 @@ static struct kr_query *task_get_last_pending_query(struct qr_task *task) static int send_waiting(struct session2 *session) { + if (session2_waitinglist_is_empty(session)) + return 0; + + defer_sample_state_t defer_prev_sample_state; + defer_sample_start(&defer_prev_sample_state); int ret = 0; - while (!session2_waitinglist_is_empty(session)) { + do { struct qr_task *t = session2_waitinglist_get(session); + if (t->ctx->source.session) + defer_sample_addr(&t->ctx->source.addr, t->ctx->source.session->stream); ret = qr_task_send(t, session, NULL, NULL); + defer_sample_restart(); if (ret != 0) { struct sockaddr *peer = session2_get_peer(session); session2_waitinglist_finalize(session, KR_STATE_FAIL); @@ -709,7 +724,9 @@ static int send_waiting(struct session2 *session) break; } session2_waitinglist_pop(session, true); - } + } while (!session2_waitinglist_is_empty(session)); + defer_sample_stop(&defer_prev_sample_state, true); + return ret; } @@ -874,26 +891,32 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_ kr_assert(ret == KNOT_EOK && val_deleted == task); } /* Notify waiting tasks. */ - struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending); - for (size_t i = task->waiting.len; i > 0; i--) { - struct qr_task *follower = task->waiting.at[i - 1]; - /* Reuse MSGID and 0x20 secret */ - if (follower->ctx->req.rplan.pending.len > 0) { - struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending); - qry->id = leader_qry->id; - qry->secret = leader_qry->secret; - - // Note that this transport may not be present in `leader_qry`'s server selection - follower->transport = task->transport; - if(follower->transport) { - follower->transport->deduplicated = true; + if (task->waiting.len > 0) { + struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending); + defer_sample_state_t defer_prev_sample_state; + defer_sample_start(&defer_prev_sample_state); + for (size_t i = task->waiting.len; i > 0; i--) { + struct qr_task *follower = task->waiting.at[i - 1]; + /* Reuse MSGID and 0x20 secret */ + if (follower->ctx->req.rplan.pending.len > 0) { + struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending); + qry->id = leader_qry->id; + qry->secret = leader_qry->secret; + + // Note that this transport may not be present in `leader_qry`'s server selection + follower->transport = task->transport; + if(follower->transport) { + follower->transport->deduplicated = true; + } + leader_qry->secret = 0; /* Next will be already decoded */ } - leader_qry->secret = 0; /* Next will be already decoded */ + qr_task_step(follower, packet_source, pkt); + qr_task_unref(follower); + defer_sample_restart(); } - qr_task_step(follower, packet_source, pkt); - qr_task_unref(follower); + defer_sample_stop(&defer_prev_sample_state, true); + task->waiting.len = 0; } - task->waiting.len = 0; task->leading = false; } @@ -942,6 +965,10 @@ static int qr_task_finalize(struct qr_task *task, int state) if (task->finished) { return kr_ok(); } + + if (task->ctx->source.session) + defer_sample_addr(&task->ctx->source.addr, task->ctx->source.session->stream); + struct request_ctx *ctx = task->ctx; struct session2 *source_session = ctx->source.session; kr_resolve_finish(&ctx->req, state); @@ -997,6 +1024,18 @@ static int qr_task_finalize(struct qr_task *task, int state) session2_close(source_session); } + if (source_session->stream && !source_session->closing) { + struct pl_dns_stream_sess_data *stream = + protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_MULTI_STREAM); + if (!stream) + stream = protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_UNSIZED_STREAM); + if (!stream) + stream = protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_SINGLE_STREAM); + if (stream && stream->half_closed) { + session2_force_close(source_session); + } + } + qr_task_unref(task); if (ret != kr_ok() || state != KR_STATE_DONE) @@ -1806,13 +1845,6 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap( } } -struct pl_dns_stream_sess_data { - struct protolayer_data h; - bool single : 1; /**< True: Stream only allows a single packet */ - bool produced : 1; /**< True: At least one packet has been produced */ - bool connected : 1; /**< True: The stream is connected */ -}; - static int pl_dns_stream_sess_init(struct session2 *session, void *sess_data, void *param) { @@ -1856,13 +1888,19 @@ static enum protolayer_event_cb_result pl_dns_stream_resolution_timeout( } else { /* Normally it should not happen, * but better to check if there anything in this list. */ - while (!session2_waitinglist_is_empty(s)) { - struct qr_task *t = session2_waitinglist_pop(s, false); - worker_task_finalize(t, KR_STATE_FAIL); - worker_task_unref(t); - the_worker->stats.timeout += 1; - if (s->closing) - return PROTOLAYER_EVENT_PROPAGATE; + if (!session2_waitinglist_is_empty(s)) { + defer_sample_state_t defer_prev_sample_state; + defer_sample_start(&defer_prev_sample_state); + do { + struct qr_task *t = session2_waitinglist_pop(s, false); + worker_task_finalize(t, KR_STATE_FAIL); + worker_task_unref(t); + the_worker->stats.timeout += 1; + if (s->closing) + return PROTOLAYER_EVENT_PROPAGATE; + defer_sample_restart(); + } while (!session2_waitinglist_is_empty(s)); + defer_sample_stop(&defer_prev_sample_state, true); } uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout; uint64_t idle_time = kr_now() - s->last_activity; @@ -1975,6 +2013,13 @@ static enum protolayer_event_cb_result pl_dns_stream_disconnected( stream->connected = false; + if (session2_is_empty(session)) + return PROTOLAYER_EVENT_PROPAGATE; + + defer_sample_state_t defer_prev_sample_state; + if (session->outgoing) + defer_sample_start(&defer_prev_sample_state); + while (!session2_waitinglist_is_empty(session)) { struct qr_task *task = session2_waitinglist_pop(session, false); kr_assert(task->refs > 1); @@ -1991,6 +2036,7 @@ static enum protolayer_event_cb_result pl_dns_stream_disconnected( qry->flags.TCP = false; } qr_task_step(task, NULL, NULL); + defer_sample_restart(); } else { kr_assert(task->ctx->source.session == session); task->ctx->source.session = NULL; @@ -2007,6 +2053,7 @@ static enum protolayer_event_cb_result pl_dns_stream_disconnected( qry->flags.TCP = false; } qr_task_step(task, NULL, NULL); + defer_sample_restart(); } else { kr_assert(task->ctx->source.session == session); task->ctx->source.session = NULL; @@ -2014,6 +2061,19 @@ static enum protolayer_event_cb_result pl_dns_stream_disconnected( worker_task_unref(task); } + if (session->outgoing) + defer_sample_stop(&defer_prev_sample_state, true); + + return PROTOLAYER_EVENT_PROPAGATE; +} + +static enum protolayer_event_cb_result pl_dns_stream_eof( + struct session2 *session, struct pl_dns_stream_sess_data *stream) +{ + if (!session2_is_empty(session)) { + stream->half_closed = true; + return PROTOLAYER_EVENT_CONSUME; + } return PROTOLAYER_EVENT_PROPAGATE; } @@ -2048,6 +2108,9 @@ static enum protolayer_event_cb_result pl_dns_stream_event_unwrap( case PROTOLAYER_EVENT_FORCE_CLOSE: return pl_dns_stream_disconnected(session, stream); + case PROTOLAYER_EVENT_EOF: + return pl_dns_stream_eof(session, stream); + default: return PROTOLAYER_EVENT_PROPAGATE; } diff --git a/doc/_static/config.schema.json b/doc/_static/config.schema.json index 86bc4277..1aa80cf9 100644 --- a/doc/_static/config.schema.json +++ b/doc/_static/config.schema.json @@ -1713,6 +1713,27 @@ }, "default": null }, + "defer": { + "description": "Configuration of request prioritization (defer).", + "type": "object", + "properties": { + "enabled": { + "type": "boolean", + "description": "Use request prioritization.", + "default": false + }, + "log-period": { + "type": "string", + "pattern": "^(\\d+)(us|ms|s|m|h|d)$", + "description": "Minimal time between two log messages, or '0s' to disable.", + "default": "0s" + } + }, + "default": { + "enabled": false, + "log_period": "0s" + } + }, "lua": { "description": "Custom Lua configuration.", "type": "object", diff --git a/doc/user/config-defer.rst b/doc/user/config-defer.rst new file mode 100644 index 00000000..0b948a16 --- /dev/null +++ b/doc/user/config-defer.rst @@ -0,0 +1,65 @@ +.. SPDX-License-Identifier: GPL-3.0-or-later + +.. _config-defer: + +Request prioritization (defer) +============================== + +Defer tries to mitigate DoS attacks by measuring cpu time consumption of different hosts and networks +and deferring future requests from the same origin. +If there is not enough time to process all the requests, the lowest priority ones are dropped. + +The time measurements are taken into account only for TCP-based queries (including DoT and DoH), +as the source address of plain UDP can be forged. +We aim to spend half of the time for UDP without prioritization +and half of the time for non-UDP with prioritization, +if there are enough requests of both types. + +Detailed configuration is printed by ``defer`` group on ``info`` level on startup (unless disabled). + +.. note:: + + The data of all deferred queries may occupy 64 MiB of memory per :ref:`worker <config-multiple-workers>`. + +.. option:: defer/enabled: true|false + + :default: false + + Enable request prioritization. + + If disabled, requests are processed in order of their arrival + and their possible dropping in case of overloading + is caused only by the overflow of kernel queues. + + +.. option:: defer/log-period: <time ms|s|m|h|d> + + :default: 0s + + Minimal time between two log messages, or ``0s`` to disable logging. + + If a response is dropped after being deferred for too long, the address is logged + and logging is disabled for the :option:`log-period <defer/log-period: <time ms|s|m|h|d>`. + As long as dropping is needed, one source is logged each period + and sources with more dropped queries have greater probability to be chosen. + + +Implementation details +---------------------- + +Internally, defer uses similar approach as :ref:`rate limiting <config-rate-limiting>`, +except that cpu time is measured instead of counting requests. + +There are four main priority levels with assigned rate and instant limits for individual hosts +and their multiples for networks -- the same prefix lengths and multipliers are used as for rate limiting. +Within a priority level, requests are ordered by the longest prefix length, +on which it falls into that level, +so that we first process requests that are on that level only as part of a larger network +and then requests that fall there also due to a smaller subnetwork, +which possibly caused deprioritization of the larger network. +Further ordering is according to the time of arrival. + +If a request is deferred for too long, it gets dropped. +This can happen also for UDP requests, +which are stored in a single queue ordered by the time of their arrival. + diff --git a/doc/user/config-performance.rst b/doc/user/config-performance.rst index b09b4a92..43b15c53 100644 --- a/doc/user/config-performance.rst +++ b/doc/user/config-performance.rst @@ -33,3 +33,4 @@ impact than cache settings and number of instances. config-priming config-edns-keepalive config-rate-limiting + config-defer @@ -90,6 +90,14 @@ struct kru_api { /// The key of i-th query consists of prefixes[i] bits of key, prefixes[i], and namespace; as above. uint16_t (*load_multi_prefix_max)(struct kru *kru, uint32_t time_now, uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint8_t *prefix_out); + + + /// Multiple queries based on different prefixes of a single key. + /// Stores the final values of the involved counters normalized to the limit 2^16 to *loads_out (unless NULL). + /// Set prices to NULL to skip updating; otherwise, KRU is always updated, using maximal allowed value on overflow. + /// The key of i-th query consists of prefixes[i] bits of key, prefixes[i], and namespace; as above. + void (*load_multi_prefix)(struct kru *kru, uint32_t time_now, + uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint16_t *loads_out); }; // The functions are stored this way to make it easier to switch diff --git a/lib/kru.inc.c b/lib/kru.inc.c index 2272adab..166e1004 100644 --- a/lib/kru.inc.c +++ b/lib/kru.inc.c @@ -565,6 +565,33 @@ static uint8_t kru_limited_multi_prefix_or(struct kru *kru, uint32_t time_now, u return 0; } +static void kru_load_multi_prefix(struct kru *kru, uint32_t time_now, uint8_t namespace, + uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint16_t *loads_out) +{ + struct query_ctx ctx[queries_cnt]; + + for (size_t i = 0; i < queries_cnt; i++) { + kru_limited_prefetch_prefix(kru, time_now, namespace, key, prefixes[i], (prices ? prices[i] : 0), ctx + i); + } + + for (size_t i = 0; i < queries_cnt; i++) { + kru_limited_fetch(kru, ctx + i); + } + + if (prices) { + for (int i = queries_cnt - 1; i >= 0; i--) { + kru_limited_update(kru, ctx + i, true); + } + } + + if (loads_out) { + for (size_t i = 0; i < queries_cnt; i++) { + loads_out[i] = ctx[i].final_load_value; + } + } +} + + static uint16_t kru_load_multi_prefix_max(struct kru *kru, uint32_t time_now, uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint8_t *prefix_out) { @@ -612,5 +639,6 @@ static bool kru_limited(struct kru *kru, uint32_t time_now, uint8_t key[static 1 .limited_multi_or = kru_limited_multi_or, \ .limited_multi_or_nobreak = kru_limited_multi_or_nobreak, \ .limited_multi_prefix_or = kru_limited_multi_prefix_or, \ + .load_multi_prefix = kru_load_multi_prefix, \ .load_multi_prefix_max = kru_load_multi_prefix_max, \ } diff --git a/python/knot_resolver/datamodel/config_schema.py b/python/knot_resolver/datamodel/config_schema.py index 641032dc..6329add3 100644 --- a/python/knot_resolver/datamodel/config_schema.py +++ b/python/knot_resolver/datamodel/config_schema.py @@ -17,6 +17,7 @@ from knot_resolver.datamodel.monitoring_schema import MonitoringSchema from knot_resolver.datamodel.network_schema import NetworkSchema from knot_resolver.datamodel.options_schema import OptionsSchema from knot_resolver.datamodel.rate_limiting_schema import RateLimitingSchema +from knot_resolver.datamodel.defer_schema import DeferSchema from knot_resolver.datamodel.templates import POLICY_CONFIG_TEMPLATE, WORKER_CONFIG_TEMPLATE from knot_resolver.datamodel.types import EscapedStr, IntPositive, WritableDir from knot_resolver.datamodel.view_schema import ViewSchema @@ -108,6 +109,7 @@ class KresConfig(ConfigSchema): monitoring: Metrics exposisition configuration (Prometheus, Graphite) lua: Custom Lua configuration. rate_limiting: Configuration of rate limiting. + defer: Configuration of request prioritization (defer). """ version: int = 1 @@ -129,6 +131,7 @@ class KresConfig(ConfigSchema): logging: LoggingSchema = LoggingSchema() monitoring: MonitoringSchema = MonitoringSchema() rate_limiting: Optional[RateLimitingSchema] = None + defer: DeferSchema = DeferSchema() lua: LuaSchema = LuaSchema() _LAYER = Raw @@ -151,6 +154,7 @@ class KresConfig(ConfigSchema): logging: LoggingSchema monitoring: MonitoringSchema rate_limiting: Optional[RateLimitingSchema] + defer: DeferSchema lua: LuaSchema def _hostname(self, obj: Raw) -> Any: diff --git a/python/knot_resolver/datamodel/defer_schema.py b/python/knot_resolver/datamodel/defer_schema.py new file mode 100644 index 00000000..38301e55 --- /dev/null +++ b/python/knot_resolver/datamodel/defer_schema.py @@ -0,0 +1,15 @@ +from knot_resolver.utils.modeling import ConfigSchema +from knot_resolver.datamodel.types import TimeUnit + + +class DeferSchema(ConfigSchema): + """ + Configuration of request prioritization (defer). + + --- + enabled: Use request prioritization. + log_period: Minimal time between two log messages, or '0s' to disable. + """ + + enabled: bool = False + log_period: TimeUnit = TimeUnit("0s") diff --git a/python/knot_resolver/datamodel/templates/defer.lua.j2 b/python/knot_resolver/datamodel/templates/defer.lua.j2 new file mode 100644 index 00000000..131b71c4 --- /dev/null +++ b/python/knot_resolver/datamodel/templates/defer.lua.j2 @@ -0,0 +1,10 @@ +{% from 'macros/common_macros.lua.j2' import boolean %} + +{% if cfg.defer.enabled and not disable_defer -%} +assert(C.defer_init( + '{{ cfg.rundir }}/defer', + {{ cfg.defer.log_period.millis() }}, + {{ cfg.workers }}) == 0) +{% else %} +assert(C.defer_init(nil, 0, 0) == 0) +{%- endif %} diff --git a/python/knot_resolver/datamodel/templates/policy-config.lua.j2 b/python/knot_resolver/datamodel/templates/policy-config.lua.j2 index 4c5c9048..9d88537a 100644 --- a/python/knot_resolver/datamodel/templates/policy-config.lua.j2 +++ b/python/knot_resolver/datamodel/templates/policy-config.lua.j2 @@ -35,6 +35,12 @@ cache.open({{ cfg.cache.size_max.bytes() }}, 'lmdb://{{ cfg.cache.storage }}') -- FORWARD section ---------------------------------- {% include "forward.lua.j2" %} +-- DEFER section ------------------------------------ +-- Force-disable defer to avoid the default defer config. +{% set disable_defer = true %} +{% include "defer.lua.j2" %} + + {% endif %} quit() diff --git a/python/knot_resolver/datamodel/templates/worker-config.lua.j2 b/python/knot_resolver/datamodel/templates/worker-config.lua.j2 index c97f0820..be7fe150 100644 --- a/python/knot_resolver/datamodel/templates/worker-config.lua.j2 +++ b/python/knot_resolver/datamodel/templates/worker-config.lua.j2 @@ -46,6 +46,9 @@ nsid.name('{{ cfg.nsid }}' .. worker.id) -- RATE-LIMITING section ------------------------------------ {% include "rate_limiting.lua.j2" %} +-- DEFER section ------------------------------------ +{% include "defer.lua.j2" %} + {% endif %} -- LUA section -------------------------------------- diff --git a/python/knot_resolver/manager/manager.py b/python/knot_resolver/manager/manager.py index 952c8b7d..68edb915 100644 --- a/python/knot_resolver/manager/manager.py +++ b/python/knot_resolver/manager/manager.py @@ -135,6 +135,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes config.monitoring, config.lua, config.rate_limiting, + config.defer, ] # register and immediately call a verifier that validates config with 'canary' kresd process @@ -224,11 +225,17 @@ class KresManager: # pylint: disable=too-many-instance-attributes async def validate_config(self, _old: KresConfig, new: KresConfig) -> Result[NoneType, str]: async with self._manager_lock: if _old.rate_limiting != new.rate_limiting: - logger.debug("Unlinking shared RRL memory") + logger.debug("Unlinking shared ratelimiting memory") try: os.unlink(str(_old.rundir) + "/ratelimiting") except FileNotFoundError: pass + if _old.workers != new.workers or _old.defer != new.defer: + logger.debug("Unlinking shared defer memory") + try: + os.unlink(str(_old.rundir) + "/defer") + except FileNotFoundError: + pass logger.debug("Testing the new config with a canary process") try: # technically, this has side effects of leaving a new process runnning |