summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLukáš Ondráček <lukas.ondracek@nic.cz>2025-01-10 18:04:41 +0100
committerLukáš Ondráček <lukas.ondracek@nic.cz>2025-01-10 18:04:41 +0100
commitef655fc4f4c5a916a74e9cfd4968578cb9bb85a8 (patch)
tree3d4a9c2891f8924bbb9acf9a2625b201fcee58cc
parentdaemon/defer: update uv time after longer operations (diff)
downloadknot-resolver-ef655fc4f4c5a916a74e9cfd4968578cb9bb85a8.tar.xz
knot-resolver-ef655fc4f4c5a916a74e9cfd4968578cb9bb85a8.zip
daemon/defer: redesign UDP and non-UDP phase transition
-rw-r--r--daemon/defer.c142
1 files changed, 63 insertions, 79 deletions
diff --git a/daemon/defer.c b/daemon/defer.c
index b1abcacd..223a2d61 100644
--- a/daemon/defer.c
+++ b/daemon/defer.c
@@ -85,31 +85,43 @@ ptrdiff_t waiting_requests_size = 0; // signed for non-negativeness asserts
int queue_ix = QUEUES_CNT; // MIN( last popped queue, first non-empty queue )
enum phase {
- PHASE_UDP = 1,
- PHASE_NON_UDP = 2,
- PHASE_ANY = PHASE_UDP | PHASE_NON_UDP
-} phase = PHASE_ANY;
-uint64_t phase_elapsed = 0; // ns
-bool phase_accounting = false; // add accounted time to phase_elapsed in defer_account
-
-static inline void phase_set(enum phase p)
+ PHASE_NONE,
+ PHASE_UDP,
+ PHASE_NON_UDP
+} phase = PHASE_NONE;
+uint64_t phase_elapsed[3] = { 0 }; // ns; [PHASE_NONE] value is being incremented but never used
+const uint64_t phase_limits[3] = {0, PHASE_UDP_TIMEOUT, PHASE_NON_UDP_TIMEOUT};
+uint64_t phase_stamp = 0;
+
+static inline bool phase_over_limit(enum phase p)
{
- if (phase != p) {
- phase_elapsed = 0;
- phase = p;
- }
+ return phase_elapsed[p] >= phase_limits[p];
+}
+
+/// Reset elapsed times of phases and set phase to UDP, NON_UDP, or NONE.
+static inline void phase_reset(enum phase p)
+{
+ phase_elapsed[PHASE_UDP] = 0;
+ phase_elapsed[PHASE_NON_UDP] = 0;
+ phase_stamp = defer_sample_state.stamp;
+ phase = p;
}
-static inline void phase_charge(uint64_t nsec)
+
+/// Set phase to UDP or NON_UDP if it is not over limit or both are over limit (reset them).
+static inline bool phase_try_set(enum phase p)
{
- kr_assert(phase != PHASE_ANY);
- phase_elapsed += nsec;
- if ((phase == PHASE_UDP) && (phase_elapsed > PHASE_UDP_TIMEOUT)) {
- phase_set(PHASE_NON_UDP);
- phase_accounting = false;
- } else if ((phase == PHASE_NON_UDP) && (phase_elapsed > PHASE_NON_UDP_TIMEOUT)) {
- phase_set(PHASE_UDP);
- phase_accounting = false;
+ phase_elapsed[phase] += defer_sample_state.stamp - phase_stamp;
+ phase_stamp = defer_sample_state.stamp;
+
+ if (!phase_over_limit(p)) {
+ phase = p;
+ return true;
+ } else if (phase_over_limit(PHASE_UDP) && phase_over_limit(PHASE_NON_UDP)) {
+ phase_reset(p);
+ return true;
}
+
+ return false;
}
struct pl_defer_sess_data {
@@ -247,11 +259,7 @@ static inline int kru_charge_classify(const struct kru_conf *kru_conf, uint8_t *
/// Increment KRU counters by given time.
void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream)
{
- if (phase_accounting) {
- phase_charge(nsec);
- }
-
- if (!stream) return; // UDP is not accounted in KRU
+ if (!stream) return; // UDP is not accounted in KRU; TODO remove !stream invocations?
_Alignas(16) uint8_t key[16] = {0, };
const struct kru_conf *kru_conf;
@@ -312,7 +320,7 @@ static inline int classify(const union kr_sockaddr *addr, bool stream)
}
-/// Push query to a queue according to its priority and activate idle.
+/// Push query to a queue according to its priority.
static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, bool to_head_end)
{
if (to_head_end) {
@@ -321,46 +329,30 @@ static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, boo
queue_push(queues[priority], ctx);
}
queue_ix = MIN(queue_ix, priority);
- if (waiting_requests++ <= 0) {
- kr_assert(waiting_requests == 1);
- uv_idle_start(&idle_handle, defer_queues_idle);
- VERBOSE_LOG(" activating idle\n");
- }
+ waiting_requests++;
}
-/// Pop and return query from the specified queue, deactivate idle if not needed.
+/// Pop and return query from the specified queue..
static inline struct protolayer_iter_ctx *pop_query_queue(int priority)
{
kr_assert(queue_len(queues[priority]) > 0);
struct protolayer_iter_ctx *ctx = queue_head(queues[priority]);
queue_pop(queues[priority]);
- if (--waiting_requests <= 0) {
- kr_assert(waiting_requests == 0);
- uv_idle_stop(&idle_handle);
- VERBOSE_LOG(" deactivating idle\n");
- }
+ waiting_requests--;
+ kr_assert(waiting_requests >= 0);
return ctx;
}
-/// Pop and return the query with the highest priority, UDP or non-UDP based on current phase,
-/// deactivate idle if not needed.
+/// Pop and return the query with the highest priority, UDP or non-UDP based on the current phase.
static inline struct protolayer_iter_ctx *pop_query(void)
{
const int waiting_udp = queue_len(queues[PRIORITY_UDP]);
const int waiting_non_udp = waiting_requests - waiting_udp;
- enum phase new_phase;
- if ((phase & PHASE_NON_UDP) && (waiting_non_udp > 0)) {
- new_phase = PHASE_NON_UDP; // maybe changing from PHASE_ANY
- } else if ((phase & PHASE_UDP) && (waiting_udp > 0)) {
- new_phase = PHASE_UDP; // maybe changing from PHASE_ANY
- } else if (waiting_non_udp > 0) {
- new_phase = PHASE_NON_UDP; // change from PHASE_UDP, no UDP queries
- } else {
- new_phase = PHASE_UDP; // change from PHASE_NON_UDP, no non-UDP queries
- }
- phase_set(new_phase);
+ if (!((waiting_non_udp > 0) && phase_try_set(PHASE_NON_UDP)) &&
+ !((waiting_udp > 0) && phase_try_set(PHASE_UDP)))
+ phase_reset(waiting_non_udp > 0 ? PHASE_NON_UDP : PHASE_UDP);
int i;
if (phase == PHASE_NON_UDP) {
@@ -501,13 +493,11 @@ static inline void process_deferred_over_size_limit(void) {
defer_sample_state_t prev_sample_state;
defer_sample_start(&prev_sample_state);
do {
- phase_accounting = true;
process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
// If the unwrapped query is to be processed here,
// it is the last iteration and the query is processed after returning.
defer_sample_restart();
} while (waiting_requests_size > MAX_WAITING_REQS_SIZE);
- phase_accounting = false;
defer_sample_stop(&prev_sample_state, true);
}
}
@@ -538,7 +528,6 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
void *sess_data, void *iter_data,
struct protolayer_iter_ctx *ctx)
{
- phase_accounting = false;
if (!defer || ctx->session->outgoing)
return protolayer_continue(ctx);
@@ -550,6 +539,8 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
VERBOSE_LOG(" %s UNWRAP\n",
kr_straddr(ctx->comm->src_addr));
+ uv_idle_start(&idle_handle, defer_queues_idle);
+
if (queue_len(sdata->queue) > 0) { // stream with preceding packet already deferred
queue_push(sdata->queue, ctx);
waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, false);
@@ -561,14 +552,10 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
int priority = classify((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream);
- // Process synchronously if the phase is active and no requests can be scheduled before.
- if ((
- ((priority == PRIORITY_UDP) && (phase & PHASE_UDP)) ||
- ((priority == 0) && (phase & PHASE_NON_UDP))
- ) && (queue_len(queues[priority]) == 0)) {
+ // Process synchronously unless there may exist requests that has to be processed first
+ if (((priority == 0) || (priority == PRIORITY_UDP)) && (queue_len(queues[priority]) == 0) &&
+ phase_try_set(priority == PRIORITY_UDP ? PHASE_UDP : PHASE_NON_UDP)) {
VERBOSE_LOG(" CONTINUE\n");
- phase_set(priority == PRIORITY_UDP ? PHASE_UDP : PHASE_NON_UDP);
- phase_accounting = true;
return protolayer_continue(ctx);
}
@@ -590,10 +577,6 @@ static enum protolayer_event_cb_result pl_defer_event_unwrap(
enum protolayer_event_type event, void **baton,
struct session2 *session, void *sess_data)
{
- if ((event == PROTOLAYER_EVENT_EOF) || (event == PROTOLAYER_EVENT_GENERAL_TIMEOUT)) {
- // disable accounting only for events that cannot occur during incoming data processing
- phase_accounting = false;
- }
if (!defer || !session->stream || session->outgoing)
return PROTOLAYER_EVENT_PROPAGATE;
@@ -620,25 +603,26 @@ static enum protolayer_event_cb_result pl_defer_event_unwrap(
/// Idle: continue processing deferred requests.
static void defer_queues_idle(uv_idle_t *handle)
{
- kr_assert(waiting_requests > 0);
VERBOSE_LOG("IDLE\n");
- VERBOSE_LOG(" %d waiting\n", waiting_requests);
- defer_sample_start(NULL);
- uint64_t idle_stamp = defer_sample_state.stamp;
- do {
- phase_accounting = true;
- process_single_deferred();
- defer_sample_restart();
- } while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT));
- phase_accounting = false;
- defer_sample_stop(NULL, true);
- cleanup_queues();
- udp_queue_send_all();
+ if (waiting_requests > 0) {
+ VERBOSE_LOG(" %d waiting\n", waiting_requests);
+ defer_sample_start(NULL);
+ uint64_t idle_stamp = defer_sample_state.stamp;
+ do {
+ process_single_deferred();
+ defer_sample_restart();
+ } while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT));
+ defer_sample_stop(NULL, true);
+ cleanup_queues();
+ udp_queue_send_all();
+ }
if (waiting_requests > 0) {
VERBOSE_LOG(" %d waiting\n", waiting_requests);
} else {
- phase_set(PHASE_ANY);
+ phase_reset(PHASE_NONE);
+ VERBOSE_LOG(" deactivate idle\n");
+ uv_idle_stop(&idle_handle);
}
VERBOSE_LOG("POLL\n");
}