summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladimír Čunát <vladimir.cunat@nic.cz>2025-01-14 09:44:28 +0100
committerVladimír Čunát <vladimir.cunat@nic.cz>2025-01-14 09:44:28 +0100
commit04f8f717fc93a1235f6d61774887b30eac65a1dd (patch)
treef23e67cb779ae4f94e2ab7bc41fac2fffe504066
parentMerge branch 'kresctl-tab-completion' into 'master' (diff)
parentdoc/user: defer nits (diff)
downloadknot-resolver-04f8f717fc93a1235f6d61774887b30eac65a1dd.tar.xz
knot-resolver-04f8f717fc93a1235f6d61774887b30eac65a1dd.zip
Merge !1641: Request prioritization (defer)
-rw-r--r--NEWS1
-rw-r--r--daemon/defer.c565
-rw-r--r--daemon/defer.h118
-rw-r--r--daemon/io.c28
-rw-r--r--daemon/lua/kres-gen-33.lua2
-rw-r--r--daemon/main.c9
-rw-r--r--daemon/meson.build1
-rw-r--r--daemon/session2.c162
-rw-r--r--daemon/session2.h16
-rw-r--r--daemon/tls.c6
-rw-r--r--daemon/worker.c131
-rw-r--r--doc/_static/config.schema.json21
-rw-r--r--doc/user/config-defer.rst65
-rw-r--r--doc/user/config-performance.rst1
-rw-r--r--lib/kru.h8
-rw-r--r--lib/kru.inc.c28
-rw-r--r--python/knot_resolver/datamodel/config_schema.py4
-rw-r--r--python/knot_resolver/datamodel/defer_schema.py15
-rw-r--r--python/knot_resolver/datamodel/templates/defer.lua.j210
-rw-r--r--python/knot_resolver/datamodel/templates/policy-config.lua.j26
-rw-r--r--python/knot_resolver/datamodel/templates/worker-config.lua.j23
-rw-r--r--python/knot_resolver/manager/manager.py9
22 files changed, 906 insertions, 303 deletions
diff --git a/NEWS b/NEWS
index e1562b6f..a9857e8b 100644
--- a/NEWS
+++ b/NEWS
@@ -7,6 +7,7 @@ Improvements
- manager: fix startup on Linux without libsystemd (!1608)
- auto-reload TLS certificate files (!1626)
- kresctl: bash command-line TAB completion (!1622)
+- add request prioritization (defer) to mitigate DoS attacks (!1641)
Knot Resolver 6.0.9 (2024-11-11)
================================
diff --git a/daemon/defer.c b/daemon/defer.c
index 32b0358c..223a2d61 100644
--- a/daemon/defer.c
+++ b/daemon/defer.c
@@ -2,6 +2,8 @@
* SPDX-License-Identifier: GPL-3.0-or-later
*/
+#include <math.h>
+#include <stdatomic.h>
#include "daemon/defer.h"
#include "daemon/session2.h"
#include "daemon/udp_queue.h"
@@ -11,64 +13,63 @@
#define V4_PREFIXES (uint8_t[]) { 18, 20, 24, 32 }
#define V4_RATE_MULT (kru_price_t[]) { 768, 256, 32, 1 }
+#define V4_SUBPRIO (uint8_t[]) { 0, 1, 3, 7 }
#define V6_PREFIXES (uint8_t[]) { 32, 48, 56, 64, 128 }
#define V6_RATE_MULT (kru_price_t[]) { 64, 4, 3, 2, 1 }
+#define V6_SUBPRIO (uint8_t[]) { 2, 4, 5, 6, 7 }
+#define SUBPRIO_CNT 8
#define V4_PREFIXES_CNT (sizeof(V4_PREFIXES) / sizeof(*V4_PREFIXES))
#define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES))
#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)
-#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} // the last one should be UINT16_MAX
-#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) + 1) // +1 for unverified
-#define PRIORITY_SYNC (-1) // no queue
-#define PRIORITY_UDP (QUEUES_CNT - 1) // last queue
-
-#define KRU_CAPACITY (1<<19)
- // same as ratelimiting default
-#define MAX_DECAY (KRU_LIMIT * 0.0006929)
- // halving counters in 1s
- // 5s from max to 2^11 (priority 3) // TODO change 2^11 to 2^12 to make the times equal?
- // 3s from 2^11 to 2^8 (priority 2)
- // 4s from 2^8 to 2^4 (priority 1)
- // 4s from 2^4 to zero (priority 0)
-#define BASE_PRICE(nsec, cpus) ((uint64_t)MAX_DECAY * 10 * nsec / 1000000ll / cpus)
- // max value when the single host uses 1/10 of all cpus' time;
- // needed cpu utilization (rate limit) for other thresholds and prefixes:
- // single v6/48 v4/24 v6/32 v4/20 v4/18
- // max: 10.000 % 40.00 % - - - -
- // 2^11: 0.312 % 1.25 % 10.00 % 20.00 % 80.00 % - (priority 3)
- // 2^8: 0.039 % 0.16 % 1.25 % 2.50 % 10.00 % 30.00 % (priority 2)
- // 2^4: 0.002 % 0.01 % 0.08 % 0.16 % 0.63 % 1.87 % (priority 1)
- // instant limit for single host and 1 cpu: (greater for larger networks and for more cpus)
- // 35 us for 2^4, 0.56 ms for 2^8, 4.5 ms for 2^11, 144 ms max value
- // TODO adjust somehow
- // simple DoT query may cost 1 ms, DoH 2.5 ms; it gets priority 2 during handshake (on laptop);
- // the instant limits can be doubled by:
- // doubling half-life (approx.),
- // doubling percents in the previous table, or
- // doubling number of cpus
- // possible solution:
- // half-life 5s, BASE_PRICE /= 2.5 -> for 4 cpus 1.75 ms fits below 2^4;
- // still not enough for home routers -> TODO make something configurable, maybe the BASE_PRICE multiplier
-
-#define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
+struct kru_conf {
+ uint8_t namespace;
+ size_t prefixes_cnt;
+ uint8_t *prefixes;
+ const kru_price_t *rate_mult;
+ const uint8_t *subprio;
+} const
+V4_CONF = {0, V4_PREFIXES_CNT, V4_PREFIXES, V4_RATE_MULT, V4_SUBPRIO},
+V6_CONF = {1, V6_PREFIXES_CNT, V6_PREFIXES, V6_RATE_MULT, V6_SUBPRIO};
+
+#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<12, -1} // the last one should be UINT16_MAX
+#define QUEUES_CNT ((sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1) * SUBPRIO_CNT + 2)
+ // priority 0 has no subpriorities, +1 for unverified
+#define PRIORITY_UDP (QUEUES_CNT - 1) // last queue
+
+#define Q0_INSTANT_LIMIT 1000000 // ns
+#define KRU_CAPACITY (1<<19) // same as ratelimiting default
+#define BASE_PRICE(nsec) ((uint64_t)KRU_LIMIT * LOADS_THRESHOLDS[0] / (1<<16) * (nsec) / Q0_INSTANT_LIMIT)
+#define MAX_DECAY (BASE_PRICE(1000000) / 2) // max value at 50% utilization of single cpu
+ // see log written by defer_str_conf for details
+
+#define REQ_TIMEOUT 1000000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
#define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
#define PHASE_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); switch between udp, non-udp phases
#define PHASE_NON_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); after timeout or emptying queue
-#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase
- // TODO measure memory usage instead
+#define MAX_WAITING_REQS_SIZE (64l * 1024 * 1024) // bytes; if exceeded, some deferred requests are processed in poll phase
+ // single TCP allocates more than 64KiB wire buffer
+ // TODO check whether all important allocations are counted;
+ // different things are not counted: tasks and subsessions (not deferred after creation), uv handles, queues overhead, ...;
+ // payload is counted either as part of session wire buffer (for stream) or as part of iter ctx (for datagrams)
+
#define VERBOSE_LOG(...) kr_log_debug(DEFER, " | " __VA_ARGS__)
struct defer {
size_t capacity;
kru_price_t max_decay;
+ uint32_t log_period;
int cpus;
bool using_avx2;
+ _Atomic uint32_t log_time;
_Alignas(64) uint8_t kru[];
};
struct defer *defer = NULL;
+bool defer_initialized = false;
+uint64_t defer_uvtime_stamp = 0;
struct mmapped defer_mmapped = {0};
defer_sample_state_t defer_sample_state = {
@@ -80,43 +81,60 @@ static void defer_queues_idle(uv_idle_t *handle);
protolayer_iter_ctx_queue_t queues[QUEUES_CNT];
int waiting_requests = 0;
+ptrdiff_t waiting_requests_size = 0; // signed for non-negativeness asserts
int queue_ix = QUEUES_CNT; // MIN( last popped queue, first non-empty queue )
enum phase {
- PHASE_UDP = 1,
- PHASE_NON_UDP = 2,
- PHASE_ANY = PHASE_UDP | PHASE_NON_UDP
-} phase = PHASE_ANY;
-uint64_t phase_elapsed = 0; // ns
-bool phase_accounting = false; // add accounted time to phase_elapsed on next call of defer_account
-
-static inline void phase_set(enum phase p)
+ PHASE_NONE,
+ PHASE_UDP,
+ PHASE_NON_UDP
+} phase = PHASE_NONE;
+uint64_t phase_elapsed[3] = { 0 }; // ns; [PHASE_NONE] value is being incremented but never used
+const uint64_t phase_limits[3] = {0, PHASE_UDP_TIMEOUT, PHASE_NON_UDP_TIMEOUT};
+uint64_t phase_stamp = 0;
+
+static inline bool phase_over_limit(enum phase p)
{
- if (phase != p) {
- phase_elapsed = 0;
- phase = p;
- }
+ return phase_elapsed[p] >= phase_limits[p];
+}
+
+/// Reset elapsed times of phases and set phase to UDP, NON_UDP, or NONE.
+static inline void phase_reset(enum phase p)
+{
+ phase_elapsed[PHASE_UDP] = 0;
+ phase_elapsed[PHASE_NON_UDP] = 0;
+ phase_stamp = defer_sample_state.stamp;
+ phase = p;
}
-static inline void phase_account(uint64_t nsec)
+
+/// Set phase to UDP or NON_UDP if it is not over limit or both are over limit (reset them).
+static inline bool phase_try_set(enum phase p)
{
- kr_assert(phase != PHASE_ANY);
- phase_elapsed += nsec;
- if ((phase == PHASE_UDP) && (phase_elapsed > PHASE_UDP_TIMEOUT)) {
- phase_set(PHASE_NON_UDP);
- } else if ((phase == PHASE_NON_UDP) && (phase_elapsed > PHASE_NON_UDP_TIMEOUT)) {
- phase_set(PHASE_UDP);
+ phase_elapsed[phase] += defer_sample_state.stamp - phase_stamp;
+ phase_stamp = defer_sample_state.stamp;
+
+ if (!phase_over_limit(p)) {
+ phase = p;
+ return true;
+ } else if (phase_over_limit(PHASE_UDP) && phase_over_limit(PHASE_NON_UDP)) {
+ phase_reset(p);
+ return true;
}
+
+ return false;
}
struct pl_defer_sess_data {
struct protolayer_data h;
protolayer_iter_ctx_queue_t queue; // properly ordered sequence of deferred packets, for stream only
// the first ctx in the queue is also in a defer queue
+ size_t size;
};
struct pl_defer_iter_data {
struct protolayer_data h;
uint64_t req_stamp; // time when request was received, uses get_stamp()
+ size_t size;
};
/// Return whether we're using optimized variant right now.
@@ -127,92 +145,182 @@ static bool using_avx2(void)
return result;
}
-/// Increment KRU counters by given time.
-void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream)
+/// Print configuration into desc array.
+void defer_str_conf(char *desc, int desc_len)
{
- if (phase_accounting) {
- phase_account(nsec);
- phase_accounting = false;
+ int len = 0;
+#define append(...) len += snprintf(desc + len, desc_len > len ? desc_len - len : 0, __VA_ARGS__)
+#define append_time(prefix, ms, suffix) { \
+ if ((ms) < 1) append(prefix "%7.1f us" suffix, (ms) * 1000); \
+ else if ((ms) < 1000) append(prefix "%7.1f ms" suffix, (ms)); \
+ else append(prefix "%7.1f s " suffix, (ms) / 1000); }
+ append( " Expected cpus/procs: %5d\n", defer->cpus);
+
+ const size_t thresholds = sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS);
+ append( " Max waiting requests:%7.1f MiB\n", MAX_WAITING_REQS_SIZE / 1024.0 / 1024.0);
+ append_time(" Request timeout: ", REQ_TIMEOUT / 1000000.0, "\n");
+ append_time(" Idle: ", IDLE_TIMEOUT / 1000000.0, "\n");
+ append_time(" UDP phase: ", PHASE_UDP_TIMEOUT / 1000000.0, "\n");
+ append_time(" Non-UDP phase: ", PHASE_NON_UDP_TIMEOUT / 1000000.0, "\n");
+ append( " Priority levels: %5ld (%ld main levels, %d sublevels) + UDP\n", QUEUES_CNT - 1, thresholds, SUBPRIO_CNT);
+
+ size_t capacity_log = 0;
+ for (size_t c = defer->capacity - 1; c > 0; c >>= 1) capacity_log++;
+ size_t size = offsetof(struct defer, kru) + KRU.get_size(capacity_log);
+
+ append( " KRU capacity: %7.1f k (%0.1f MiB)\n", (1 << capacity_log) / 1000.0, size / 1000000.0);
+
+ bool uniform_thresholds = true;
+ for (int i = 1; i < thresholds - 1; i++)
+ uniform_thresholds &= (LOADS_THRESHOLDS[i] == LOADS_THRESHOLDS[i - 1] * LOADS_THRESHOLDS[0]);
+ uniform_thresholds &= ((1<<16) == (int)LOADS_THRESHOLDS[thresholds - 2] * LOADS_THRESHOLDS[0]);
+
+ append( " Decay: %7.3f %% per ms (32-bit max: %d)\n",
+ 100.0 * defer->max_decay / KRU_LIMIT, defer->max_decay);
+ float half_life = -1.0 / log2f(1.0 - (float)defer->max_decay / KRU_LIMIT);
+ append_time(" Half-life: ", half_life, "\n");
+ if (uniform_thresholds)
+ append_time(" Priority rise in: ", half_life * 16 / thresholds, "\n");
+ append_time(" Counter reset in: ", half_life * 16, "\n");
+
+ append(" Rate limits for crossing priority levels as single CPU utilization:\n");
+
+ const struct kru_conf *kru_confs[] = {&V4_CONF, &V6_CONF};
+ const int version[] = {4, 6};
+ const kru_price_t base_price_ms = BASE_PRICE(1000000);
+
+ append("%15s", "");
+ for (int j = 0; j < 3; j++)
+ append("%14d", j+1);
+ append("%14s\n", "max");
+
+ for (int v = 0; v < 2; v++) {
+ for (int i = kru_confs[v]->prefixes_cnt - 1; i >= 0; i--) {
+ append("%9sv%d/%-3d: ", "", version[v], kru_confs[v]->prefixes[i]);
+ for (int j = 0; j < thresholds; j++) {
+ float needed_util = (float)defer->max_decay / (1<<16) * LOADS_THRESHOLDS[j] / base_price_ms * kru_confs[v]->rate_mult[i];
+ append("%12.3f %%", needed_util * 100);
+ }
+ append("\n");
+ }
}
- if (!stream) return; // UDP is not accounted in KRU
+ append(" Instant limits for crossing priority levels as CPU time:\n");
+
+ append("%15s", "");
+ for (int j = 0; j < 3; j++)
+ append("%14d", j+1);
+ append("%14s\n", "max");
+
+ for (int v = 0; v < 2; v++) {
+ for (int i = kru_confs[v]->prefixes_cnt - 1; i >= 0; i--) {
+ append("%9sv%d/%-3d: ", "", version[v], kru_confs[v]->prefixes[i]);
+ for (int j = 0; j < thresholds; j++) {
+ float needed_time = (float)KRU_LIMIT / (1<<16) * LOADS_THRESHOLDS[j] / base_price_ms * kru_confs[v]->rate_mult[i];
+ if (needed_time < 1) {
+ append("%11.1f us", needed_time * 1000);
+ } else if (needed_time < 1000) {
+ append("%11.1f ms", needed_time);
+ } else {
+ append("%11.1f s ", needed_time / 1000);
+ }
+ }
+ append("\n");
+ }
+ }
+ append(" (values above max are indistinguishable)\n");
- _Alignas(16) uint8_t key[16] = {0, };
- uint16_t max_load = 0;
- uint8_t prefix = 0;
- kru_price_t base_price = BASE_PRICE(nsec, defer->cpus);
+#undef append_time
+#undef append
+}
- if (addr->ip.sa_family == AF_INET6) {
- memcpy(key, &addr->ip6.sin6_addr, 16);
- kru_price_t prices[V6_PREFIXES_CNT];
- for (size_t i = 0; i < V6_PREFIXES_CNT; i++) {
- prices[i] = base_price / V6_RATE_MULT[i];
+/// Call KRU, return priority and as params load and prefix.
+static inline int kru_charge_classify(const struct kru_conf *kru_conf, uint8_t *key, kru_price_t *prices,
+ uint16_t *out_load, uint8_t *out_prefix)
+{
+ uint16_t loads[kru_conf->prefixes_cnt];
+ KRU.load_multi_prefix((struct kru *)defer->kru, kr_now(),
+ kru_conf->namespace, key, kru_conf->prefixes, prices, kru_conf->prefixes_cnt, loads);
+
+ int priority = 0;
+ int prefix_index = kru_conf->prefixes_cnt - 1;
+ for (int i = kru_conf->prefixes_cnt - 1, j = 0; i >= 0; i--) {
+ for (; LOADS_THRESHOLDS[j] < loads[i]; j++) {
+ prefix_index = i;
+ priority = 1 + j * SUBPRIO_CNT + kru_conf->subprio[i];
}
+ }
+ *out_load = loads[prefix_index];
+ *out_prefix = kru_conf->prefixes[prefix_index];
+ return priority;
+}
+
+/// Increment KRU counters by given time.
+void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream)
+{
+ if (!stream) return; // UDP is not accounted in KRU; TODO remove !stream invocations?
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
- 1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT, &prefix);
+ _Alignas(16) uint8_t key[16] = {0, };
+ const struct kru_conf *kru_conf;
+ if (addr->ip.sa_family == AF_INET6) {
+ memcpy(key, &addr->ip6.sin6_addr, 16);
+ kru_conf = &V6_CONF;
} else if (addr->ip.sa_family == AF_INET) {
memcpy(key, &addr->ip4.sin_addr, 4);
-
- kru_price_t prices[V4_PREFIXES_CNT];
- for (size_t i = 0; i < V4_PREFIXES_CNT; i++) {
- prices[i] = base_price / V4_RATE_MULT[i];
- }
-
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
- 0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix);
+ kru_conf = &V4_CONF;
} else {
return;
}
+ uint64_t base_price = BASE_PRICE(nsec);
+ kru_price_t prices[kru_conf->prefixes_cnt];
+ for (size_t i = 0; i < kru_conf->prefixes_cnt; i++) {
+ uint64_t price = base_price / kru_conf->rate_mult[i];
+ prices[i] = price > (kru_price_t)-1 ? -1 : price;
+ }
+
+ uint16_t load;
+ uint8_t prefix;
+ kru_charge_classify(kru_conf, key, prices, &load, &prefix);
+
VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n",
- kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix);
+ kr_straddr(&addr->ip), nsec / 1000000.0, load, prefix);
}
-/// Determine priority of the request in [-1, QUEUES_CNT - 1].
-/// Lower value has higher priority, -1 should be synchronous.
-/// Both UDP and non-UDP may end up with synchronous priority
-/// if the phase is active and no requests can be scheduled before them.
+/// Determine priority of the request in [0, QUEUES_CNT - 1];
+/// lower value has higher priority; plain UDP always gets PRIORITY_UDP.
static inline int classify(const union kr_sockaddr *addr, bool stream)
{
if (!stream) { // UDP
VERBOSE_LOG(" unverified address\n");
- if ((phase & PHASE_UDP) && (queue_len(queues[PRIORITY_UDP]) == 0)) {
- phase_set(PHASE_UDP);
- return PRIORITY_SYNC;
- }
return PRIORITY_UDP;
}
_Alignas(16) uint8_t key[16] = {0, };
- uint16_t max_load = 0;
- uint8_t prefix = 0;
+ const struct kru_conf *kru_conf = NULL;
if (addr->ip.sa_family == AF_INET6) {
memcpy(key, &addr->ip6.sin6_addr, 16);
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
- 1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT, &prefix);
+ kru_conf = &V6_CONF;
} else if (addr->ip.sa_family == AF_INET) {
memcpy(key, &addr->ip4.sin_addr, 4);
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
- 0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix);
+ kru_conf = &V4_CONF;
+ } else {
+ kr_assert(false);
+ return 0; // shouldn't happen anyway
}
- int priority = 0;
- for (; LOADS_THRESHOLDS[priority] < max_load; priority++);
+ uint16_t load;
+ uint8_t prefix;
+ int priority = kru_charge_classify(kru_conf, key, NULL, &load, &prefix);
- VERBOSE_LOG(" load %d on /%d\n", max_load, prefix);
+ VERBOSE_LOG(" load %d on /%d\n", load, prefix);
- if ((phase & PHASE_NON_UDP) && (priority == 0) && (queue_len(queues[0]) == 0)) {
- phase_set(PHASE_NON_UDP);
- return PRIORITY_SYNC;
- }
return priority;
}
-
-/// Push query to a queue according to its priority and activate idle.
+/// Push query to a queue according to its priority.
static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, bool to_head_end)
{
if (to_head_end) {
@@ -221,51 +329,36 @@ static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, boo
queue_push(queues[priority], ctx);
}
queue_ix = MIN(queue_ix, priority);
- if (waiting_requests++ <= 0) {
- kr_assert(waiting_requests == 1);
- uv_idle_start(&idle_handle, defer_queues_idle);
- VERBOSE_LOG(" activating idle\n");
- }
+ waiting_requests++;
}
-/// Pop and return query from the specified queue, deactivate idle if not needed.
+/// Pop and return query from the specified queue..
static inline struct protolayer_iter_ctx *pop_query_queue(int priority)
{
kr_assert(queue_len(queues[priority]) > 0);
struct protolayer_iter_ctx *ctx = queue_head(queues[priority]);
queue_pop(queues[priority]);
- if (--waiting_requests <= 0) {
- kr_assert(waiting_requests == 0);
- uv_idle_stop(&idle_handle);
- VERBOSE_LOG(" deactivating idle\n");
- }
+ waiting_requests--;
+ kr_assert(waiting_requests >= 0);
return ctx;
}
-/// Pop and return the query with the highest priority, UDP or non-UDP based on current phase,
-/// deactivate idle if not needed.
+/// Pop and return the query with the highest priority, UDP or non-UDP based on the current phase.
static inline struct protolayer_iter_ctx *pop_query(void)
{
const int waiting_udp = queue_len(queues[PRIORITY_UDP]);
const int waiting_non_udp = waiting_requests - waiting_udp;
- enum phase new_phase;
- if ((phase & PHASE_NON_UDP) && (waiting_non_udp > 0)) {
- new_phase = PHASE_NON_UDP; // maybe changing from PHASE_ANY
- } else if ((phase & PHASE_UDP) && (waiting_udp > 0)) {
- new_phase = PHASE_UDP; // maybe changing from PHASE_ANY
- } else if (waiting_non_udp > 0) {
- new_phase = PHASE_NON_UDP; // change from PHASE_UDP, no UDP queries
- } else {
- new_phase = PHASE_UDP; // change from PHASE_NON_UDP, no non-UDP queries
- }
- phase_set(new_phase);
+ if (!((waiting_non_udp > 0) && phase_try_set(PHASE_NON_UDP)) &&
+ !((waiting_udp > 0) && phase_try_set(PHASE_UDP)))
+ phase_reset(waiting_non_udp > 0 ? PHASE_NON_UDP : PHASE_UDP);
int i;
if (phase == PHASE_NON_UDP) {
for (; queue_ix < QUEUES_CNT && queue_len(queues[queue_ix]) == 0; queue_ix++);
- if (queue_ix >= PRIORITY_UDP) kr_assert(false);
+ if (kr_fails_assert(queue_ix < PRIORITY_UDP))
+ return NULL;
i = queue_ix;
} else {
i = PRIORITY_UDP;
@@ -279,18 +372,31 @@ static inline struct protolayer_iter_ctx *pop_query(void)
static inline void break_query(struct protolayer_iter_ctx *ctx, int err)
{
if (ctx->session->stream) {
+ struct session2 *s = ctx->session;
struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
+ s->ref_count++; // keep session and sdata alive for a while
+ waiting_requests_size -= sdata->size;
if (!ctx->session->closing) {
- session2_force_close(ctx->session); // session is not freed here as iter contexts exist
+ session2_force_close(ctx->session);
}
- queue_pop(sdata->queue);
- while (queue_len(sdata->queue) > 0) {
- protolayer_break(ctx, kr_error(err)); // session is not freed here as other contexts exist
- ctx = queue_head(sdata->queue);
+ kr_assert(ctx == queue_head(sdata->queue));
+ while (true) {
queue_pop(sdata->queue);
+ if (ctx) {
+ struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
+ waiting_requests_size -= idata->size;
+ protolayer_break(ctx, kr_error(err));
+ }
+ if (queue_len(sdata->queue) == 0) break;
+ ctx = queue_head(sdata->queue);
}
+ session2_unhandle(s); // decrease ref_count
+ } else {
+ struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
+ waiting_requests_size -= idata->size;
+ protolayer_break(ctx, kr_error(err));
}
- protolayer_break(ctx, kr_error(err));
+ kr_assert(waiting_requests ? waiting_requests_size > 0 : waiting_requests_size == 0);
}
/// Process a single deferred query (or defer again) if there is any.
@@ -298,17 +404,17 @@ static inline void break_query(struct protolayer_iter_ctx *ctx, int err)
static inline void process_single_deferred(void)
{
struct protolayer_iter_ctx *ctx = pop_query();
- if (ctx == NULL) return;
+ if (kr_fails_assert(ctx)) return;
- defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
- phase_accounting = true;
+ defer_sample_addr((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream);
struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
+ struct session2 *session = ctx->session;
uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp;
VERBOSE_LOG(" %s POP from %d after %4.3f ms\n",
- kr_straddr(ctx->comm->comm_addr),
+ kr_straddr(ctx->comm->src_addr),
queue_ix,
age_ns / 1000000.0);
@@ -320,28 +426,80 @@ static inline void process_single_deferred(void)
if (age_ns >= REQ_TIMEOUT) {
VERBOSE_LOG(" BREAK (timeout)\n");
- break_query(ctx, ETIME);
- return;
- }
- int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
- if (priority > queue_ix) { // priority dropped (got higher value)
- VERBOSE_LOG(" PUSH to %d\n", priority);
- push_query(ctx, priority, false);
+ // notice logging according to log-period
+ const uint32_t time_now = kr_now();
+ uint32_t log_time_orig = atomic_load_explicit(&defer->log_time, memory_order_relaxed);
+ if (defer->log_period) {
+ while (time_now - log_time_orig + 1024 >= defer->log_period + 1024) {
+ if (atomic_compare_exchange_weak_explicit(&defer->log_time, &log_time_orig, time_now,
+ memory_order_relaxed, memory_order_relaxed)) {
+ kr_log_notice(DEFER, "Data from %s too long in queue, dropping. (%0.3f MiB in queues)\n",
+ kr_straddr(ctx->comm->src_addr), waiting_requests_size / 1024.0 / 1024.0);
+ break;
+ }
+ }
+ }
+
+ break_query(ctx, ETIME);
return;
}
+ bool eof = false;
if (ctx->session->stream) {
+ int priority = classify((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream);
+ if (priority > queue_ix) { // priority dropped (got higher value)
+ VERBOSE_LOG(" PUSH to %d\n", priority);
+ push_query(ctx, priority, false);
+ return;
+ }
+
kr_assert(queue_head(sdata->queue) == ctx);
queue_pop(sdata->queue);
+ while ((queue_len(sdata->queue) > 0) && (queue_head(sdata->queue) == NULL)) { // EOF event
+ eof = true;
+ queue_pop(sdata->queue);
+ }
if (queue_len(sdata->queue) > 0) {
VERBOSE_LOG(" PUSH follow-up to head of %d\n", priority);
push_query(queue_head(sdata->queue), priority, true);
+ } else {
+ waiting_requests_size -= sdata->size;
}
}
+ waiting_requests_size -= idata->size;
+ kr_assert(waiting_requests ? waiting_requests_size > 0 : waiting_requests_size == 0);
+
+ if (eof) {
+ // Keep session alive even if it is somehow force-closed during continuation.
+ // TODO Is it possible?
+ session->ref_count++;
+ }
+
VERBOSE_LOG(" CONTINUE\n");
protolayer_continue(ctx);
+
+ if (eof) {
+ VERBOSE_LOG(" CONTINUE EOF event\n");
+ session2_event_after(session, PROTOLAYER_TYPE_DEFER, PROTOLAYER_EVENT_EOF, NULL);
+ session2_unhandle(session); // decrease ref_count
+ }
+}
+
+/// Process as many deferred requests as needed to get memory consumption under limit.
+static inline void process_deferred_over_size_limit(void) {
+ if (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
+ defer_sample_state_t prev_sample_state;
+ defer_sample_start(&prev_sample_state);
+ do {
+ process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
+ // If the unwrapped query is to be processed here,
+ // it is the last iteration and the query is processed after returning.
+ defer_sample_restart();
+ } while (waiting_requests_size > MAX_WAITING_REQS_SIZE);
+ defer_sample_stop(&prev_sample_state, true);
+ }
}
/// Break expired requests at the beginning of queues, uses current stamp.
@@ -370,76 +528,115 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
void *sess_data, void *iter_data,
struct protolayer_iter_ctx *ctx)
{
- if (!defer)
+ if (!defer || ctx->session->outgoing)
return protolayer_continue(ctx);
- if (ctx->session->outgoing)
- return protolayer_continue(ctx);
-
- defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
- struct pl_defer_iter_data *data = iter_data;
+ defer_sample_addr((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream);
+ struct pl_defer_iter_data *idata = iter_data;
struct pl_defer_sess_data *sdata = sess_data;
- data->req_stamp = defer_sample_state.stamp;
+ idata->req_stamp = defer_sample_state.stamp;
VERBOSE_LOG(" %s UNWRAP\n",
- kr_straddr(ctx->comm->comm_addr));
+ kr_straddr(ctx->comm->src_addr));
+
+ uv_idle_start(&idle_handle, defer_queues_idle);
if (queue_len(sdata->queue) > 0) { // stream with preceding packet already deferred
queue_push(sdata->queue, ctx);
+ waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, false);
+ // payload counted in session wire buffer
VERBOSE_LOG(" PUSH as follow-up\n");
+ process_deferred_over_size_limit();
return protolayer_async();
}
- int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
+ int priority = classify((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream);
- if (priority == -1) {
+ // Process synchronously unless there may exist requests that has to be processed first
+ if (((priority == 0) || (priority == PRIORITY_UDP)) && (queue_len(queues[priority]) == 0) &&
+ phase_try_set(priority == PRIORITY_UDP ? PHASE_UDP : PHASE_NON_UDP)) {
VERBOSE_LOG(" CONTINUE\n");
- phase_accounting = true;
return protolayer_continue(ctx);
}
VERBOSE_LOG(" PUSH to %d\n", priority);
if (ctx->session->stream) {
queue_push(sdata->queue, ctx);
+ waiting_requests_size += sdata->size = protolayer_sess_size_est(ctx->session);
}
push_query(ctx, priority, false);
- while (waiting_requests > MAX_WAITING_REQS) { // TODO follow-up stream packets are not counted here
- defer_sample_restart();
- process_single_deferred(); // possibly defers again without decreasing waiting_requests
- // defer_sample_stop should be called soon outside
- }
+ waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, !ctx->session->stream);
+ // for stream, payload is counted in session wire buffer
+ process_deferred_over_size_limit();
return protolayer_async();
}
+/// Unwrap event: EOF event may be deferred here, other events pass synchronously.
+static enum protolayer_event_cb_result pl_defer_event_unwrap(
+ enum protolayer_event_type event, void **baton,
+ struct session2 *session, void *sess_data)
+{
+ if (!defer || !session->stream || session->outgoing)
+ return PROTOLAYER_EVENT_PROPAGATE;
+
+ defer_sample_addr((const union kr_sockaddr *)session->comm_storage.src_addr, session->stream);
+
+ struct pl_defer_sess_data *sdata = sess_data;
+ if ((event == PROTOLAYER_EVENT_EOF) && (queue_len(sdata->queue) > 0)) {
+ // defer EOF event if unprocessed data remain, baton is dropped if any
+ queue_push(sdata->queue, NULL);
+ VERBOSE_LOG(" %s event %s deferred\n",
+ session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)",
+ protolayer_event_name(event));
+ return PROTOLAYER_EVENT_CONSUME;
+ }
+
+ VERBOSE_LOG(" %s event %s passes through synchronously%s%s\n",
+ session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)",
+ protolayer_event_name(event),
+ queue_len(sdata->queue) > 0 ? " ahead of deferred data" : "",
+ *baton ? " (with baton)" : "");
+ return PROTOLAYER_EVENT_PROPAGATE;
+}
+
/// Idle: continue processing deferred requests.
static void defer_queues_idle(uv_idle_t *handle)
{
- kr_assert(waiting_requests > 0);
VERBOSE_LOG("IDLE\n");
- VERBOSE_LOG(" %d waiting\n", waiting_requests);
- defer_sample_start();
- uint64_t idle_stamp = defer_sample_state.stamp;
- while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)) {
- process_single_deferred();
- defer_sample_restart();
+ if (waiting_requests > 0) {
+ VERBOSE_LOG(" %d waiting\n", waiting_requests);
+ defer_sample_start(NULL);
+ uint64_t idle_stamp = defer_sample_state.stamp;
+ do {
+ process_single_deferred();
+ defer_sample_restart();
+ } while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT));
+ defer_sample_stop(NULL, true);
+ cleanup_queues();
+ udp_queue_send_all();
}
- cleanup_queues();
- defer_sample_stop(); // TODO skip calling and use just restart elsewhere?
- udp_queue_send_all();
if (waiting_requests > 0) {
VERBOSE_LOG(" %d waiting\n", waiting_requests);
} else {
- phase_set(PHASE_ANY);
+ phase_reset(PHASE_NONE);
+ VERBOSE_LOG(" deactivate idle\n");
+ uv_idle_stop(&idle_handle);
}
VERBOSE_LOG("POLL\n");
}
/// Initialize shared memory, queues. To be called from Lua.
-int defer_init(const char *mmap_file, int cpus)
+int defer_init(const char *mmap_file, uint32_t log_period, int cpus) // TODO possibly remove cpus; not needed
{
+ defer_initialized = true;
+ if (mmap_file == NULL) {
+ // defer explicitly disabled
+ return 0;
+ }
+
int ret = 0;
if (cpus < 1) {
ret = EINVAL;
@@ -449,6 +646,7 @@ int defer_init(const char *mmap_file, int cpus)
struct defer header = {
.capacity = KRU_CAPACITY,
.max_decay = MAX_DECAY,
+ .log_period = log_period,
.cpus = cpus,
.using_avx2 = using_avx2(),
};
@@ -462,12 +660,13 @@ int defer_init(const char *mmap_file, int cpus)
offsetof(struct defer, using_avx2) ==
sizeof(header.capacity) +
sizeof(header.max_decay) +
+ sizeof(header.log_period) +
sizeof(header.cpus),
"detected padding with undefined data inside mmapped header");
ret = mmapped_init(&defer_mmapped, mmap_file, size, &header, header_size);
if (ret == MMAPPED_WAS_FIRST) {
- kr_log_info(SYSTEM, "Initializing prioritization...\n");
+ kr_log_info(DEFER, "Initializing defer...\n");
defer = defer_mmapped.mem;
@@ -478,13 +677,22 @@ int defer_init(const char *mmap_file, int cpus)
goto fail;
}
+ defer->log_time = kr_now() - log_period;
+
ret = mmapped_init_continue(&defer_mmapped);
if (ret != 0) goto fail;
- kr_log_info(SYSTEM, "Prioritization initialized (%s).\n", (defer->using_avx2 ? "AVX2" : "generic"));
+ kr_log_info(DEFER, "Defer initialized (%s).\n", (defer->using_avx2 ? "AVX2" : "generic"));
+
+ // log current configuration
+ if (KR_LOG_LEVEL_IS(LOG_INFO) || kr_log_group_is_set(LOG_GRP_DEFER)) {
+ char desc[8000];
+ defer_str_conf(desc, sizeof(desc));
+ kr_log_info(DEFER, "Defer configuration:\n%s", desc);
+ }
} else if (ret == 0) {
defer = defer_mmapped.mem;
- kr_log_info(SYSTEM, "Using existing prioritization data (%s).\n", (defer->using_avx2 ? "AVX2" : "generic"));
+ kr_log_info(DEFER, "Using existing defer data (%s).\n", (defer->using_avx2 ? "AVX2" : "generic"));
} else goto fail;
for (size_t i = 0; i < QUEUES_CNT; i++)
@@ -494,7 +702,7 @@ int defer_init(const char *mmap_file, int cpus)
fail:
- kr_log_crit(SYSTEM, "Initialization of shared prioritization data failed.\n");
+ kr_log_crit(DEFER, "Initialization of shared defer data failed.\n");
return ret;
}
@@ -528,5 +736,6 @@ static void defer_protolayers_init(void)
.sess_size = sizeof(struct pl_defer_sess_data),
.sess_init = pl_defer_sess_init,
.unwrap = pl_defer_unwrap,
+ .event_unwrap = pl_defer_event_unwrap,
};
}
diff --git a/daemon/defer.h b/daemon/defer.h
index db1b083b..18adf91b 100644
--- a/daemon/defer.h
+++ b/daemon/defer.h
@@ -8,9 +8,8 @@
#include "lib/kru.h"
/// Initialize defer, incl. shared memory with KRU, excl. idle.
-/// To be called from Lua; defer is disabled by default otherwise.
KR_EXPORT
-int defer_init(const char *mmap_file, int cpus);
+int defer_init(const char *mmap_file, uint32_t log_period, int cpus);
/// Initialize idle.
int defer_init_idle(uv_loop_t *loop);
@@ -19,37 +18,33 @@ int defer_init_idle(uv_loop_t *loop);
void defer_deinit(void);
/// Increment KRU counters by the given time.
-void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream);
+void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream);
typedef struct {
- int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1
- union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet
+ bool is_accounting; /// whether currently accounting the time to someone
bool stream;
+ union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet
uint64_t stamp; /// monotonic nanoseconds, probably won't wrap
} defer_sample_state_t;
extern defer_sample_state_t defer_sample_state;
extern struct defer *defer; /// skip sampling/deferring if NULL
-
+extern bool defer_initialized; /// defer_init was called, possibly keeping defer disabled
+extern uint64_t defer_uvtime_stamp; /// stamp of the last uv time update
// TODO: reconsider `static inline` cases below
#include <time.h>
-static inline uint64_t get_stamp(void)
+static inline uint64_t defer_get_stamp(void)
{
struct timespec now_ts = {0};
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &now_ts);
- return now_ts.tv_nsec + 1000*1000*1000 * (uint64_t)now_ts.tv_sec;
-}
-
-/// Start accounting work, if not doing it already.
-static inline void defer_sample_start(void)
-{
- if (!defer) return;
- kr_assert(!defer_sample_state.is_accounting);
- ++defer_sample_state.is_accounting;
- defer_sample_state.stamp = get_stamp();
- defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
+ uint64_t stamp = now_ts.tv_nsec + 1000*1000*1000 * (uint64_t)now_ts.tv_sec;
+ if (defer_uvtime_stamp + 1000*1000 < stamp) {
+ defer_uvtime_stamp = stamp;
+ uv_update_time(uv_default_loop());
+ }
+ return stamp;
}
/// Annotate the work currently being accounted by an IP address.
@@ -60,8 +55,14 @@ static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream)
if (defer_sample_state.addr.ip.sa_family != AF_UNSPEC) {
// TODO: this costs performance, so only in some debug mode?
- kr_assert(kr_sockaddr_cmp(&addr->ip, &defer_sample_state.addr.ip) == kr_ok());
- return;
+ if (kr_sockaddr_cmp(&addr->ip, &defer_sample_state.addr.ip) != kr_ok()) {
+ char defer_addr[KR_STRADDR_MAXLEN + 1] = { 0 };
+ strncpy(defer_addr, kr_straddr(&defer_sample_state.addr.ip), sizeof(defer_addr) - 1);
+ kr_log_warning(DEFER, "Sampling address mismatch: %s != %s\n",
+ kr_straddr(&addr->ip),
+ defer_addr);
+ return;
+ }
}
switch (addr->ip.sa_family) {
@@ -78,38 +79,79 @@ static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream)
defer_sample_state.stream = stream;
}
-/// Stop accounting work - and change the source if applicable.
-static inline void defer_sample_stop(void)
+/// Internal; start accounting work at specified timestamp.
+static inline void defer_sample_start_stamp(uint64_t stamp)
{
if (!defer) return;
+ kr_assert(!defer_sample_state.is_accounting);
+ defer_sample_state.is_accounting = true;
+ defer_sample_state.stamp = stamp;
+ defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
+}
- if (kr_fails_assert(defer_sample_state.is_accounting > 0)) return; // weird
- if (--defer_sample_state.is_accounting) return;
- if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return;
+/// Internal; stop accounting work at specified timestamp and charge the source if applicable.
+static inline void defer_sample_stop_stamp(uint64_t stamp)
+{
+ if (!defer) return;
+ kr_assert(defer_sample_state.is_accounting);
+ defer_sample_state.is_accounting = false;
- const uint64_t elapsed = get_stamp() - defer_sample_state.stamp;
+ if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return;
- // we accounted something
+ const uint64_t elapsed = stamp - defer_sample_state.stamp;
+ if (elapsed == 0) return;
// TODO: some queries of internal origin have suspicioiusly high numbers.
// We won't be really accounting those, but it might suggest some other issue.
- defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
+ defer_charge(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
}
-/// Stop accounting if active, then start again. Uses just one stamp.
-static inline void defer_sample_restart(void)
+static inline bool defer_sample_is_accounting(void)
{
- if (!defer) return;
+ return defer_sample_state.is_accounting;
+}
- uint64_t stamp = get_stamp();
+/// Start accounting work; optionally save state of current accounting.
+/// Current state can be saved only after having an address assigned.
+static inline void defer_sample_start(defer_sample_state_t *prev_state_out) {
+ if (!defer) return;
+ uint64_t stamp = defer_get_stamp();
- if (defer_sample_state.is_accounting > 0) {
- const uint64_t elapsed = stamp - defer_sample_state.stamp;
- defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
+ // suspend
+ if (prev_state_out) {
+ *prev_state_out = defer_sample_state; // TODO stamp is not needed
+ if (defer_sample_state.is_accounting)
+ defer_sample_stop_stamp(stamp);
}
- defer_sample_state.stamp = stamp;
- defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
- defer_sample_state.is_accounting = 1;
+ // start
+ defer_sample_start_stamp(stamp);
+}
+
+/// Stop accounting and start it again.
+static inline void defer_sample_restart(void) {
+ if (!defer) return;
+ uint64_t stamp = defer_get_stamp();
+
+ // stop
+ defer_sample_stop_stamp(stamp);
+
+ // start
+ defer_sample_start_stamp(stamp);
+}
+
+/// Stop accounting and charge the source if applicable; optionally resume previous accounting.
+static inline void defer_sample_stop(defer_sample_state_t *prev_state, bool reuse_last_stamp) {
+ if (!defer) return;
+ uint64_t stamp = reuse_last_stamp ? defer_sample_state.stamp : defer_get_stamp();
+
+ // stop
+ defer_sample_stop_stamp(stamp);
+
+ // resume
+ if (prev_state) {
+ defer_sample_state = *prev_state;
+ defer_sample_state.stamp = stamp;
+ }
}
diff --git a/daemon/io.c b/daemon/io.c
index 36648907..8093a4b0 100644
--- a/daemon/io.c
+++ b/daemon/io.c
@@ -324,6 +324,32 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
return;
}
+ if (nread == UV_ENOBUFS) {
+ /* No space available in session buffer.
+ * The connection may be just waiting in defer.
+ * Ignore the error and keep the data in system queue for later reading or timeout. */
+ if (kr_log_is_debug(IO, NULL)) {
+ struct sockaddr *peer = session2_get_peer(s);
+ char *peer_str = kr_straddr(peer);
+ kr_log_debug(IO, "=> incoming data from '%s' waiting (%s)\n",
+ peer_str ? peer_str : "",
+ uv_strerror(nread));
+ }
+ return;
+ }
+
+ // allow deferring EOF for incoming connections to send answer even if half-closed
+ if (!s->outgoing && (nread == UV_EOF)) {
+ if (kr_log_is_debug(IO, NULL)) {
+ struct sockaddr *peer = session2_get_peer(s);
+ char *peer_str = kr_straddr(peer);
+ kr_log_debug(IO, "=> connection to '%s' half-closed by peer (EOF)\n",
+ peer_str ? peer_str : "");
+ }
+ session2_event(s, PROTOLAYER_EVENT_EOF, NULL);
+ return;
+ }
+
if (nread < 0 || !buf->base) {
if (kr_log_is_debug(IO, NULL)) {
struct sockaddr *peer = session2_get_peer(s);
@@ -353,7 +379,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
static void tcp_accept_internal(uv_stream_t *master, int status, enum kr_proto grp)
{
- if (status != 0) {
+ if (status != 0) {
return;
}
diff --git a/daemon/lua/kres-gen-33.lua b/daemon/lua/kres-gen-33.lua
index 4312d218..3fc16df3 100644
--- a/daemon/lua/kres-gen-33.lua
+++ b/daemon/lua/kres-gen-33.lua
@@ -617,7 +617,7 @@ struct qr_task *worker_resolve_start(knot_pkt_t *, struct kr_qflags);
int zi_zone_import(const zi_config_t);
_Bool ratelimiting_request_begin(struct kr_request *);
int ratelimiting_init(const char *, size_t, uint32_t, uint32_t, uint16_t, uint32_t, _Bool);
-int defer_init(const char *, int);
+int defer_init(const char *, uint32_t, int);
struct engine {
char _stub[];
};
diff --git a/daemon/main.c b/daemon/main.c
index a808ba20..a7b9c92b 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -616,6 +616,15 @@ int main(int argc, char **argv)
lua_settop(the_engine->L, 0);
}
+ if (!defer_initialized) {
+ kr_log_warning(SYSTEM, "Prioritization not initialized from Lua, using hardcoded default.\n");
+ ret = defer_init("defer", 1, 1);
+ if (ret) {
+ ret = EXIT_FAILURE;
+ goto cleanup;
+ }
+ }
+
if (defer_init_idle(loop) != 0) {
ret = EXIT_FAILURE;
goto cleanup;
diff --git a/daemon/meson.build b/daemon/meson.build
index 62ac983b..9fde08f8 100644
--- a/daemon/meson.build
+++ b/daemon/meson.build
@@ -62,6 +62,7 @@ kresd_deps = [
capng,
nghttp2,
malloc,
+ libm
]
diff --git a/daemon/session2.c b/daemon/session2.c
index 40328e13..faca57bf 100644
--- a/daemon/session2.c
+++ b/daemon/session2.c
@@ -307,6 +307,18 @@ bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue)
return false;
}
+static inline ssize_t session2_get_protocol(
+ struct session2 *s, enum protolayer_type protocol)
+{
+ const struct protolayer_grp *grp = &protolayer_grps[s->proto];
+ for (ssize_t i = 0; i < grp->num_layers; i++) {
+ enum protolayer_type found = grp->layers[i];
+ if (protocol == found)
+ return i;
+ }
+
+ return -1;
+}
/** Gets layer-specific session data for the layer with the specified index
* from the manager. */
@@ -333,6 +345,14 @@ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx)
return protolayer_sess_data_get(ctx->session, ctx->layer_ix);
}
+void *protolayer_sess_data_get_proto(struct session2 *s, enum protolayer_type protocol) {
+ ssize_t layer_ix = session2_get_protocol(s, protocol);
+ if (layer_ix < 0)
+ return NULL;
+
+ return protolayer_sess_data_get(s, layer_ix);
+}
+
/** Gets layer-specific iteration data for the layer with the specified index
* from the context. */
static inline struct protolayer_data *protolayer_iter_data_get(
@@ -358,17 +378,17 @@ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx)
return protolayer_iter_data_get(ctx, ctx->layer_ix);
}
-static inline ssize_t session2_get_protocol(
- struct session2 *s, enum protolayer_type protocol)
+size_t protolayer_sess_size_est(struct session2 *s)
{
- const struct protolayer_grp *grp = &protolayer_grps[s->proto];
- for (ssize_t i = 0; i < grp->num_layers; i++) {
- enum protolayer_type found = grp->layers[i];
- if (protocol == found)
- return i;
- }
+ return s->session_size + s->wire_buf.size;
+}
- return -1;
+size_t protolayer_iter_size_est(struct protolayer_iter_ctx *ctx, bool incl_payload)
+{
+ size_t size = ctx->session->iter_ctx_size;
+ if (incl_payload)
+ size += protolayer_payload_size(&ctx->payload);
+ return size;
}
static inline bool protolayer_iter_ctx_is_last(struct protolayer_iter_ctx *ctx)
@@ -596,7 +616,7 @@ static int session2_submit(
{
if (session->closing)
return kr_error(ECANCELED);
- if (session->ref_count >= INT_MAX)
+ if (session->ref_count >= INT_MAX - 1)
return kr_error(ETOOMANYREFS);
if (kr_fails_assert(session->proto < KR_PROTO_COUNT))
return kr_error(EFAULT);
@@ -610,7 +630,7 @@ static int session2_submit(
// Note two cases: incoming session (new request)
// vs. outgoing session (resuming work on some request)
if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0))
- defer_sample_start();
+ defer_sample_start(NULL);
struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size);
kr_require(ctx);
@@ -672,7 +692,7 @@ static int session2_submit(
int ret = protolayer_step(ctx);
if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0))
- defer_sample_stop();
+ defer_sample_stop(NULL, false);
return ret;
}
@@ -845,6 +865,7 @@ struct session2 *session2_new(enum session2_transport_type transport_type,
.proto = proto,
.iter_ctx_size = iter_ctx_size,
+ .session_size = session_size,
};
memcpy(&s->layer_data, offsets, sizeof(offsets));
@@ -959,10 +980,8 @@ uv_handle_t *session2_get_handle(struct session2 *s)
static void session2_on_timeout(uv_timer_t *timer)
{
- defer_sample_start();
struct session2 *s = timer->data;
session2_event(s, s->timer_event, NULL);
- defer_sample_stop();
}
int session2_timer_start(struct session2 *s, enum protolayer_event_type event, uint64_t timeout, uint64_t repeat)
@@ -1084,11 +1103,17 @@ struct qr_task *session2_tasklist_del_msgid(const struct session2 *session, uint
void session2_tasklist_finalize(struct session2 *session, int status)
{
- while (session2_tasklist_get_len(session) > 0) {
- struct qr_task *t = session2_tasklist_del_first(session, false);
- kr_require(worker_task_numrefs(t) > 0);
- worker_task_finalize(t, status);
- worker_task_unref(t);
+ if (session2_tasklist_get_len(session) > 0) {
+ defer_sample_state_t defer_prev_sample_state;
+ defer_sample_start(&defer_prev_sample_state);
+ do {
+ struct qr_task *t = session2_tasklist_del_first(session, false);
+ kr_require(worker_task_numrefs(t) > 0);
+ worker_task_finalize(t, status);
+ worker_task_unref(t);
+ defer_sample_restart();
+ } while (session2_tasklist_get_len(session) > 0);
+ defer_sample_stop(&defer_prev_sample_state, true);
}
}
@@ -1121,27 +1146,34 @@ int session2_tasklist_finalize_expired(struct session2 *session)
key = (char *)&msg_id;
keylen = sizeof(msg_id);
}
- while (queue_len(q) > 0) {
- task = queue_head(q);
- if (session->outgoing) {
- knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
- msg_id = knot_wire_get_id(pktbuf->wire);
- }
- int res = trie_del(t, key, keylen, NULL);
- if (!worker_task_finished(task)) {
- /* task->pending_count must be zero,
- * but there are can be followers,
- * so run worker_task_subreq_finalize() to ensure retrying
- * for all the followers. */
- worker_task_subreq_finalize(task);
- worker_task_finalize(task, KR_STATE_FAIL);
- }
- if (res == KNOT_EOK) {
+
+ if (queue_len(q) > 0) {
+ defer_sample_state_t defer_prev_sample_state;
+ defer_sample_start(&defer_prev_sample_state);
+ do {
+ task = queue_head(q);
+ if (session->outgoing) {
+ knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+ msg_id = knot_wire_get_id(pktbuf->wire);
+ }
+ int res = trie_del(t, key, keylen, NULL);
+ if (!worker_task_finished(task)) {
+ /* task->pending_count must be zero,
+ * but there are can be followers,
+ * so run worker_task_subreq_finalize() to ensure retrying
+ * for all the followers. */
+ worker_task_subreq_finalize(task);
+ worker_task_finalize(task, KR_STATE_FAIL);
+ }
+ if (res == KNOT_EOK) {
+ worker_task_unref(task);
+ }
+ queue_pop(q);
worker_task_unref(task);
- }
- queue_pop(q);
- worker_task_unref(task);
- ++ret;
+ ++ret;
+ defer_sample_restart();
+ } while (queue_len(q) > 0);
+ defer_sample_stop(&defer_prev_sample_state, true);
}
queue_deinit(q);
@@ -1172,22 +1204,34 @@ struct qr_task *session2_waitinglist_pop(struct session2 *session, bool deref)
void session2_waitinglist_retry(struct session2 *session, bool increase_timeout_cnt)
{
- while (!session2_waitinglist_is_empty(session)) {
- struct qr_task *task = session2_waitinglist_pop(session, false);
- if (increase_timeout_cnt) {
- worker_task_timeout_inc(task);
- }
- worker_task_step(task, session2_get_peer(session), NULL);
- worker_task_unref(task);
+ if (!session2_waitinglist_is_empty(session)) {
+ defer_sample_state_t defer_prev_sample_state;
+ defer_sample_start(&defer_prev_sample_state);
+ do {
+ struct qr_task *task = session2_waitinglist_pop(session, false);
+ if (increase_timeout_cnt) {
+ worker_task_timeout_inc(task);
+ }
+ worker_task_step(task, session2_get_peer(session), NULL);
+ worker_task_unref(task);
+ defer_sample_restart();
+ } while (!session2_waitinglist_is_empty(session));
+ defer_sample_stop(&defer_prev_sample_state, true);
}
}
void session2_waitinglist_finalize(struct session2 *session, int status)
{
- while (!session2_waitinglist_is_empty(session)) {
- struct qr_task *t = session2_waitinglist_pop(session, false);
- worker_task_finalize(t, status);
- worker_task_unref(t);
+ if (!session2_waitinglist_is_empty(session)) {
+ defer_sample_state_t defer_prev_sample_state;
+ defer_sample_start(&defer_prev_sample_state);
+ do {
+ struct qr_task *t = session2_waitinglist_pop(session, false);
+ worker_task_finalize(t, status);
+ worker_task_unref(t);
+ defer_sample_restart();
+ } while (!session2_waitinglist_is_empty(session));
+ defer_sample_stop(&defer_prev_sample_state, true);
}
}
@@ -1301,7 +1345,19 @@ static void session2_event_unwrap(struct session2 *s, ssize_t start_ix, enum pro
void session2_event(struct session2 *s, enum protolayer_event_type event, void *baton)
{
+ /* Events may be sent from inside or outside of already measured code.
+ * From inside: close by us, statistics, ...
+ * From outside: timeout, EOF, close by external reasons, ... */
+ bool defer_accounting_here = false;
+ if (!defer_sample_is_accounting() && s->stream && !s->outgoing) {
+ defer_sample_start(NULL);
+ defer_accounting_here = true;
+ }
+
session2_event_unwrap(s, 0, event, baton);
+
+ if (defer_accounting_here)
+ defer_sample_stop(NULL, false);
}
void session2_event_after(struct session2 *s, enum protolayer_type protocol,
@@ -1684,6 +1740,12 @@ static int session2_transport_event(struct session2 *s,
if (s->closing)
return kr_ok();
+ if (event == PROTOLAYER_EVENT_EOF) {
+ // no layer wanted to retain TCP half-closed state
+ session2_force_close(s);
+ return kr_ok();
+ }
+
bool is_close_event = (event == PROTOLAYER_EVENT_CLOSE ||
event == PROTOLAYER_EVENT_FORCE_CLOSE);
if (is_close_event) {
diff --git a/daemon/session2.h b/daemon/session2.h
index 957df6d9..5228ad86 100644
--- a/daemon/session2.h
+++ b/daemon/session2.h
@@ -334,6 +334,8 @@ typedef void (*protolayer_finished_cb)(int status, struct session2 *session,
XX(MALFORMED) \
/** Signal that a connection has ended. */\
XX(DISCONNECT) \
+ /** Signal EOF from peer (e.g. half-closed TCP connection). */\
+ XX(EOF) \
/** Failed task send - update stats. */\
XX(STATS_SEND_ERR) \
/** Outgoing query submission - update stats. */\
@@ -535,6 +537,10 @@ size_t protolayer_queue_count_payload(const protolayer_iter_ctx_queue_t *queue);
* queue iterators, as it does not need to iterate through the whole queue. */
bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue);
+/** Gets layer-specific session data for the specified protocol layer.
+ * Returns NULL if the layer is not present in the session. */
+void *protolayer_sess_data_get_proto(struct session2 *s, enum protolayer_type protocol);
+
/** Gets layer-specific session data for the last processed layer.
* To be used after returning from its callback for async continuation but before calling protolayer_continue. */
void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx);
@@ -543,6 +549,13 @@ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx);
* To be used after returning from its callback for async continuation but before calling protolayer_continue. */
void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx);
+/** Gets rough memory footprint estimate of session/iteration for use in defer.
+ * Different, hopefully minor, allocations are not counted here;
+ * tasks and subsessions are also not counted;
+ * read the code before using elsewhere. */
+size_t protolayer_sess_size_est(struct session2 *s);
+size_t protolayer_iter_size_est(struct protolayer_iter_ctx *ctx, bool incl_payload);
+
/** Layer-specific data - the generic struct. To be added as the first member of
* each specific struct. */
struct protolayer_data {
@@ -868,6 +881,9 @@ struct session2 {
* (`struct protolayer_iter_ctx`), including layer-specific data. */
size_t iter_ctx_size;
+ /** The size of this session struct. */
+ size_t session_size;
+
/** The following flexible array has basically this structure:
*
* struct {
diff --git a/daemon/tls.c b/daemon/tls.c
index 231bff2d..6b3436f7 100644
--- a/daemon/tls.c
+++ b/daemon/tls.c
@@ -1325,6 +1325,12 @@ static enum protolayer_event_cb_result pl_tls_event_unwrap(
return PROTOLAYER_EVENT_PROPAGATE;
}
+ if (event == PROTOLAYER_EVENT_EOF) {
+ // TCP half-closed state not allowed
+ session2_force_close(s);
+ return PROTOLAYER_EVENT_CONSUME;
+ }
+
if (tls->client_side) {
if (event == PROTOLAYER_EVENT_CONNECT)
return pl_tls_client_connect_start(tls, s);
diff --git a/daemon/worker.c b/daemon/worker.c
index d517dd6c..ece3fd95 100644
--- a/daemon/worker.c
+++ b/daemon/worker.c
@@ -103,6 +103,14 @@ struct qr_task
qr_task_free((task)); \
} while (0)
+struct pl_dns_stream_sess_data {
+ struct protolayer_data h;
+ bool single : 1; /**< True: Stream only allows a single packet */
+ bool produced : 1; /**< True: At least one packet has been produced */
+ bool connected : 1; /**< True: The stream is connected */
+ bool half_closed : 1; /**< True: EOF was received, the stream is half-closed */
+};
+
/* Forward decls */
static void qr_task_free(struct qr_task *task);
static int qr_task_step(struct qr_task *task,
@@ -122,7 +130,6 @@ static struct session2* worker_find_tcp_waiting(const struct sockaddr* addr);
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt);
-
struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */
struct worker_ctx *the_worker = NULL;
@@ -696,10 +703,18 @@ static struct kr_query *task_get_last_pending_query(struct qr_task *task)
static int send_waiting(struct session2 *session)
{
+ if (session2_waitinglist_is_empty(session))
+ return 0;
+
+ defer_sample_state_t defer_prev_sample_state;
+ defer_sample_start(&defer_prev_sample_state);
int ret = 0;
- while (!session2_waitinglist_is_empty(session)) {
+ do {
struct qr_task *t = session2_waitinglist_get(session);
+ if (t->ctx->source.session)
+ defer_sample_addr(&t->ctx->source.addr, t->ctx->source.session->stream);
ret = qr_task_send(t, session, NULL, NULL);
+ defer_sample_restart();
if (ret != 0) {
struct sockaddr *peer = session2_get_peer(session);
session2_waitinglist_finalize(session, KR_STATE_FAIL);
@@ -709,7 +724,9 @@ static int send_waiting(struct session2 *session)
break;
}
session2_waitinglist_pop(session, true);
- }
+ } while (!session2_waitinglist_is_empty(session));
+ defer_sample_stop(&defer_prev_sample_state, true);
+
return ret;
}
@@ -874,26 +891,32 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_
kr_assert(ret == KNOT_EOK && val_deleted == task);
}
/* Notify waiting tasks. */
- struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
- for (size_t i = task->waiting.len; i > 0; i--) {
- struct qr_task *follower = task->waiting.at[i - 1];
- /* Reuse MSGID and 0x20 secret */
- if (follower->ctx->req.rplan.pending.len > 0) {
- struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
- qry->id = leader_qry->id;
- qry->secret = leader_qry->secret;
-
- // Note that this transport may not be present in `leader_qry`'s server selection
- follower->transport = task->transport;
- if(follower->transport) {
- follower->transport->deduplicated = true;
+ if (task->waiting.len > 0) {
+ struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
+ defer_sample_state_t defer_prev_sample_state;
+ defer_sample_start(&defer_prev_sample_state);
+ for (size_t i = task->waiting.len; i > 0; i--) {
+ struct qr_task *follower = task->waiting.at[i - 1];
+ /* Reuse MSGID and 0x20 secret */
+ if (follower->ctx->req.rplan.pending.len > 0) {
+ struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
+ qry->id = leader_qry->id;
+ qry->secret = leader_qry->secret;
+
+ // Note that this transport may not be present in `leader_qry`'s server selection
+ follower->transport = task->transport;
+ if(follower->transport) {
+ follower->transport->deduplicated = true;
+ }
+ leader_qry->secret = 0; /* Next will be already decoded */
}
- leader_qry->secret = 0; /* Next will be already decoded */
+ qr_task_step(follower, packet_source, pkt);
+ qr_task_unref(follower);
+ defer_sample_restart();
}
- qr_task_step(follower, packet_source, pkt);
- qr_task_unref(follower);
+ defer_sample_stop(&defer_prev_sample_state, true);
+ task->waiting.len = 0;
}
- task->waiting.len = 0;
task->leading = false;
}
@@ -942,6 +965,10 @@ static int qr_task_finalize(struct qr_task *task, int state)
if (task->finished) {
return kr_ok();
}
+
+ if (task->ctx->source.session)
+ defer_sample_addr(&task->ctx->source.addr, task->ctx->source.session->stream);
+
struct request_ctx *ctx = task->ctx;
struct session2 *source_session = ctx->source.session;
kr_resolve_finish(&ctx->req, state);
@@ -997,6 +1024,18 @@ static int qr_task_finalize(struct qr_task *task, int state)
session2_close(source_session);
}
+ if (source_session->stream && !source_session->closing) {
+ struct pl_dns_stream_sess_data *stream =
+ protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_MULTI_STREAM);
+ if (!stream)
+ stream = protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_UNSIZED_STREAM);
+ if (!stream)
+ stream = protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_SINGLE_STREAM);
+ if (stream && stream->half_closed) {
+ session2_force_close(source_session);
+ }
+ }
+
qr_task_unref(task);
if (ret != kr_ok() || state != KR_STATE_DONE)
@@ -1806,13 +1845,6 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap(
}
}
-struct pl_dns_stream_sess_data {
- struct protolayer_data h;
- bool single : 1; /**< True: Stream only allows a single packet */
- bool produced : 1; /**< True: At least one packet has been produced */
- bool connected : 1; /**< True: The stream is connected */
-};
-
static int pl_dns_stream_sess_init(struct session2 *session,
void *sess_data, void *param)
{
@@ -1856,13 +1888,19 @@ static enum protolayer_event_cb_result pl_dns_stream_resolution_timeout(
} else {
/* Normally it should not happen,
* but better to check if there anything in this list. */
- while (!session2_waitinglist_is_empty(s)) {
- struct qr_task *t = session2_waitinglist_pop(s, false);
- worker_task_finalize(t, KR_STATE_FAIL);
- worker_task_unref(t);
- the_worker->stats.timeout += 1;
- if (s->closing)
- return PROTOLAYER_EVENT_PROPAGATE;
+ if (!session2_waitinglist_is_empty(s)) {
+ defer_sample_state_t defer_prev_sample_state;
+ defer_sample_start(&defer_prev_sample_state);
+ do {
+ struct qr_task *t = session2_waitinglist_pop(s, false);
+ worker_task_finalize(t, KR_STATE_FAIL);
+ worker_task_unref(t);
+ the_worker->stats.timeout += 1;
+ if (s->closing)
+ return PROTOLAYER_EVENT_PROPAGATE;
+ defer_sample_restart();
+ } while (!session2_waitinglist_is_empty(s));
+ defer_sample_stop(&defer_prev_sample_state, true);
}
uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout;
uint64_t idle_time = kr_now() - s->last_activity;
@@ -1975,6 +2013,13 @@ static enum protolayer_event_cb_result pl_dns_stream_disconnected(
stream->connected = false;
+ if (session2_is_empty(session))
+ return PROTOLAYER_EVENT_PROPAGATE;
+
+ defer_sample_state_t defer_prev_sample_state;
+ if (session->outgoing)
+ defer_sample_start(&defer_prev_sample_state);
+
while (!session2_waitinglist_is_empty(session)) {
struct qr_task *task = session2_waitinglist_pop(session, false);
kr_assert(task->refs > 1);
@@ -1991,6 +2036,7 @@ static enum protolayer_event_cb_result pl_dns_stream_disconnected(
qry->flags.TCP = false;
}
qr_task_step(task, NULL, NULL);
+ defer_sample_restart();
} else {
kr_assert(task->ctx->source.session == session);
task->ctx->source.session = NULL;
@@ -2007,6 +2053,7 @@ static enum protolayer_event_cb_result pl_dns_stream_disconnected(
qry->flags.TCP = false;
}
qr_task_step(task, NULL, NULL);
+ defer_sample_restart();
} else {
kr_assert(task->ctx->source.session == session);
task->ctx->source.session = NULL;
@@ -2014,6 +2061,19 @@ static enum protolayer_event_cb_result pl_dns_stream_disconnected(
worker_task_unref(task);
}
+ if (session->outgoing)
+ defer_sample_stop(&defer_prev_sample_state, true);
+
+ return PROTOLAYER_EVENT_PROPAGATE;
+}
+
+static enum protolayer_event_cb_result pl_dns_stream_eof(
+ struct session2 *session, struct pl_dns_stream_sess_data *stream)
+{
+ if (!session2_is_empty(session)) {
+ stream->half_closed = true;
+ return PROTOLAYER_EVENT_CONSUME;
+ }
return PROTOLAYER_EVENT_PROPAGATE;
}
@@ -2048,6 +2108,9 @@ static enum protolayer_event_cb_result pl_dns_stream_event_unwrap(
case PROTOLAYER_EVENT_FORCE_CLOSE:
return pl_dns_stream_disconnected(session, stream);
+ case PROTOLAYER_EVENT_EOF:
+ return pl_dns_stream_eof(session, stream);
+
default:
return PROTOLAYER_EVENT_PROPAGATE;
}
diff --git a/doc/_static/config.schema.json b/doc/_static/config.schema.json
index 86bc4277..1aa80cf9 100644
--- a/doc/_static/config.schema.json
+++ b/doc/_static/config.schema.json
@@ -1713,6 +1713,27 @@
},
"default": null
},
+ "defer": {
+ "description": "Configuration of request prioritization (defer).",
+ "type": "object",
+ "properties": {
+ "enabled": {
+ "type": "boolean",
+ "description": "Use request prioritization.",
+ "default": false
+ },
+ "log-period": {
+ "type": "string",
+ "pattern": "^(\\d+)(us|ms|s|m|h|d)$",
+ "description": "Minimal time between two log messages, or '0s' to disable.",
+ "default": "0s"
+ }
+ },
+ "default": {
+ "enabled": false,
+ "log_period": "0s"
+ }
+ },
"lua": {
"description": "Custom Lua configuration.",
"type": "object",
diff --git a/doc/user/config-defer.rst b/doc/user/config-defer.rst
new file mode 100644
index 00000000..0b948a16
--- /dev/null
+++ b/doc/user/config-defer.rst
@@ -0,0 +1,65 @@
+.. SPDX-License-Identifier: GPL-3.0-or-later
+
+.. _config-defer:
+
+Request prioritization (defer)
+==============================
+
+Defer tries to mitigate DoS attacks by measuring cpu time consumption of different hosts and networks
+and deferring future requests from the same origin.
+If there is not enough time to process all the requests, the lowest priority ones are dropped.
+
+The time measurements are taken into account only for TCP-based queries (including DoT and DoH),
+as the source address of plain UDP can be forged.
+We aim to spend half of the time for UDP without prioritization
+and half of the time for non-UDP with prioritization,
+if there are enough requests of both types.
+
+Detailed configuration is printed by ``defer`` group on ``info`` level on startup (unless disabled).
+
+.. note::
+
+ The data of all deferred queries may occupy 64 MiB of memory per :ref:`worker <config-multiple-workers>`.
+
+.. option:: defer/enabled: true|false
+
+ :default: false
+
+ Enable request prioritization.
+
+ If disabled, requests are processed in order of their arrival
+ and their possible dropping in case of overloading
+ is caused only by the overflow of kernel queues.
+
+
+.. option:: defer/log-period: <time ms|s|m|h|d>
+
+ :default: 0s
+
+ Minimal time between two log messages, or ``0s`` to disable logging.
+
+ If a response is dropped after being deferred for too long, the address is logged
+ and logging is disabled for the :option:`log-period <defer/log-period: <time ms|s|m|h|d>`.
+ As long as dropping is needed, one source is logged each period
+ and sources with more dropped queries have greater probability to be chosen.
+
+
+Implementation details
+----------------------
+
+Internally, defer uses similar approach as :ref:`rate limiting <config-rate-limiting>`,
+except that cpu time is measured instead of counting requests.
+
+There are four main priority levels with assigned rate and instant limits for individual hosts
+and their multiples for networks -- the same prefix lengths and multipliers are used as for rate limiting.
+Within a priority level, requests are ordered by the longest prefix length,
+on which it falls into that level,
+so that we first process requests that are on that level only as part of a larger network
+and then requests that fall there also due to a smaller subnetwork,
+which possibly caused deprioritization of the larger network.
+Further ordering is according to the time of arrival.
+
+If a request is deferred for too long, it gets dropped.
+This can happen also for UDP requests,
+which are stored in a single queue ordered by the time of their arrival.
+
diff --git a/doc/user/config-performance.rst b/doc/user/config-performance.rst
index b09b4a92..43b15c53 100644
--- a/doc/user/config-performance.rst
+++ b/doc/user/config-performance.rst
@@ -33,3 +33,4 @@ impact than cache settings and number of instances.
config-priming
config-edns-keepalive
config-rate-limiting
+ config-defer
diff --git a/lib/kru.h b/lib/kru.h
index ef177ef8..b3690703 100644
--- a/lib/kru.h
+++ b/lib/kru.h
@@ -90,6 +90,14 @@ struct kru_api {
/// The key of i-th query consists of prefixes[i] bits of key, prefixes[i], and namespace; as above.
uint16_t (*load_multi_prefix_max)(struct kru *kru, uint32_t time_now,
uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint8_t *prefix_out);
+
+
+ /// Multiple queries based on different prefixes of a single key.
+ /// Stores the final values of the involved counters normalized to the limit 2^16 to *loads_out (unless NULL).
+ /// Set prices to NULL to skip updating; otherwise, KRU is always updated, using maximal allowed value on overflow.
+ /// The key of i-th query consists of prefixes[i] bits of key, prefixes[i], and namespace; as above.
+ void (*load_multi_prefix)(struct kru *kru, uint32_t time_now,
+ uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint16_t *loads_out);
};
// The functions are stored this way to make it easier to switch
diff --git a/lib/kru.inc.c b/lib/kru.inc.c
index 2272adab..166e1004 100644
--- a/lib/kru.inc.c
+++ b/lib/kru.inc.c
@@ -565,6 +565,33 @@ static uint8_t kru_limited_multi_prefix_or(struct kru *kru, uint32_t time_now, u
return 0;
}
+static void kru_load_multi_prefix(struct kru *kru, uint32_t time_now, uint8_t namespace,
+ uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint16_t *loads_out)
+{
+ struct query_ctx ctx[queries_cnt];
+
+ for (size_t i = 0; i < queries_cnt; i++) {
+ kru_limited_prefetch_prefix(kru, time_now, namespace, key, prefixes[i], (prices ? prices[i] : 0), ctx + i);
+ }
+
+ for (size_t i = 0; i < queries_cnt; i++) {
+ kru_limited_fetch(kru, ctx + i);
+ }
+
+ if (prices) {
+ for (int i = queries_cnt - 1; i >= 0; i--) {
+ kru_limited_update(kru, ctx + i, true);
+ }
+ }
+
+ if (loads_out) {
+ for (size_t i = 0; i < queries_cnt; i++) {
+ loads_out[i] = ctx[i].final_load_value;
+ }
+ }
+}
+
+
static uint16_t kru_load_multi_prefix_max(struct kru *kru, uint32_t time_now, uint8_t namespace,
uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint8_t *prefix_out)
{
@@ -612,5 +639,6 @@ static bool kru_limited(struct kru *kru, uint32_t time_now, uint8_t key[static 1
.limited_multi_or = kru_limited_multi_or, \
.limited_multi_or_nobreak = kru_limited_multi_or_nobreak, \
.limited_multi_prefix_or = kru_limited_multi_prefix_or, \
+ .load_multi_prefix = kru_load_multi_prefix, \
.load_multi_prefix_max = kru_load_multi_prefix_max, \
}
diff --git a/python/knot_resolver/datamodel/config_schema.py b/python/knot_resolver/datamodel/config_schema.py
index 641032dc..6329add3 100644
--- a/python/knot_resolver/datamodel/config_schema.py
+++ b/python/knot_resolver/datamodel/config_schema.py
@@ -17,6 +17,7 @@ from knot_resolver.datamodel.monitoring_schema import MonitoringSchema
from knot_resolver.datamodel.network_schema import NetworkSchema
from knot_resolver.datamodel.options_schema import OptionsSchema
from knot_resolver.datamodel.rate_limiting_schema import RateLimitingSchema
+from knot_resolver.datamodel.defer_schema import DeferSchema
from knot_resolver.datamodel.templates import POLICY_CONFIG_TEMPLATE, WORKER_CONFIG_TEMPLATE
from knot_resolver.datamodel.types import EscapedStr, IntPositive, WritableDir
from knot_resolver.datamodel.view_schema import ViewSchema
@@ -108,6 +109,7 @@ class KresConfig(ConfigSchema):
monitoring: Metrics exposisition configuration (Prometheus, Graphite)
lua: Custom Lua configuration.
rate_limiting: Configuration of rate limiting.
+ defer: Configuration of request prioritization (defer).
"""
version: int = 1
@@ -129,6 +131,7 @@ class KresConfig(ConfigSchema):
logging: LoggingSchema = LoggingSchema()
monitoring: MonitoringSchema = MonitoringSchema()
rate_limiting: Optional[RateLimitingSchema] = None
+ defer: DeferSchema = DeferSchema()
lua: LuaSchema = LuaSchema()
_LAYER = Raw
@@ -151,6 +154,7 @@ class KresConfig(ConfigSchema):
logging: LoggingSchema
monitoring: MonitoringSchema
rate_limiting: Optional[RateLimitingSchema]
+ defer: DeferSchema
lua: LuaSchema
def _hostname(self, obj: Raw) -> Any:
diff --git a/python/knot_resolver/datamodel/defer_schema.py b/python/knot_resolver/datamodel/defer_schema.py
new file mode 100644
index 00000000..38301e55
--- /dev/null
+++ b/python/knot_resolver/datamodel/defer_schema.py
@@ -0,0 +1,15 @@
+from knot_resolver.utils.modeling import ConfigSchema
+from knot_resolver.datamodel.types import TimeUnit
+
+
+class DeferSchema(ConfigSchema):
+ """
+ Configuration of request prioritization (defer).
+
+ ---
+ enabled: Use request prioritization.
+ log_period: Minimal time between two log messages, or '0s' to disable.
+ """
+
+ enabled: bool = False
+ log_period: TimeUnit = TimeUnit("0s")
diff --git a/python/knot_resolver/datamodel/templates/defer.lua.j2 b/python/knot_resolver/datamodel/templates/defer.lua.j2
new file mode 100644
index 00000000..131b71c4
--- /dev/null
+++ b/python/knot_resolver/datamodel/templates/defer.lua.j2
@@ -0,0 +1,10 @@
+{% from 'macros/common_macros.lua.j2' import boolean %}
+
+{% if cfg.defer.enabled and not disable_defer -%}
+assert(C.defer_init(
+ '{{ cfg.rundir }}/defer',
+ {{ cfg.defer.log_period.millis() }},
+ {{ cfg.workers }}) == 0)
+{% else %}
+assert(C.defer_init(nil, 0, 0) == 0)
+{%- endif %}
diff --git a/python/knot_resolver/datamodel/templates/policy-config.lua.j2 b/python/knot_resolver/datamodel/templates/policy-config.lua.j2
index 4c5c9048..9d88537a 100644
--- a/python/knot_resolver/datamodel/templates/policy-config.lua.j2
+++ b/python/knot_resolver/datamodel/templates/policy-config.lua.j2
@@ -35,6 +35,12 @@ cache.open({{ cfg.cache.size_max.bytes() }}, 'lmdb://{{ cfg.cache.storage }}')
-- FORWARD section ----------------------------------
{% include "forward.lua.j2" %}
+-- DEFER section ------------------------------------
+-- Force-disable defer to avoid the default defer config.
+{% set disable_defer = true %}
+{% include "defer.lua.j2" %}
+
+
{% endif %}
quit()
diff --git a/python/knot_resolver/datamodel/templates/worker-config.lua.j2 b/python/knot_resolver/datamodel/templates/worker-config.lua.j2
index c97f0820..be7fe150 100644
--- a/python/knot_resolver/datamodel/templates/worker-config.lua.j2
+++ b/python/knot_resolver/datamodel/templates/worker-config.lua.j2
@@ -46,6 +46,9 @@ nsid.name('{{ cfg.nsid }}' .. worker.id)
-- RATE-LIMITING section ------------------------------------
{% include "rate_limiting.lua.j2" %}
+-- DEFER section ------------------------------------
+{% include "defer.lua.j2" %}
+
{% endif %}
-- LUA section --------------------------------------
diff --git a/python/knot_resolver/manager/manager.py b/python/knot_resolver/manager/manager.py
index 952c8b7d..68edb915 100644
--- a/python/knot_resolver/manager/manager.py
+++ b/python/knot_resolver/manager/manager.py
@@ -135,6 +135,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes
config.monitoring,
config.lua,
config.rate_limiting,
+ config.defer,
]
# register and immediately call a verifier that validates config with 'canary' kresd process
@@ -224,11 +225,17 @@ class KresManager: # pylint: disable=too-many-instance-attributes
async def validate_config(self, _old: KresConfig, new: KresConfig) -> Result[NoneType, str]:
async with self._manager_lock:
if _old.rate_limiting != new.rate_limiting:
- logger.debug("Unlinking shared RRL memory")
+ logger.debug("Unlinking shared ratelimiting memory")
try:
os.unlink(str(_old.rundir) + "/ratelimiting")
except FileNotFoundError:
pass
+ if _old.workers != new.workers or _old.defer != new.defer:
+ logger.debug("Unlinking shared defer memory")
+ try:
+ os.unlink(str(_old.rundir) + "/defer")
+ except FileNotFoundError:
+ pass
logger.debug("Testing the new config with a canary process")
try:
# technically, this has side effects of leaving a new process runnning