diff options
-rw-r--r-- | daemon/rrl/api.c | 5 | ||||
-rw-r--r-- | daemon/rrl/api.h | 76 | ||||
-rw-r--r-- | daemon/session2.c | 51 | ||||
-rw-r--r-- | daemon/worker.c | 5 | ||||
-rw-r--r-- | lib/utils.h | 2 |
5 files changed, 126 insertions, 13 deletions
diff --git a/daemon/rrl/api.c b/daemon/rrl/api.c index 9ec25e37..2ce12da4 100644 --- a/daemon/rrl/api.c +++ b/daemon/rrl/api.c @@ -31,6 +31,11 @@ struct rrl *the_rrl = NULL; int the_rrl_fd = -1; char *the_rrl_mmap_file = NULL; +kr_rrl_sample_state_t kr_rrl_sample_state = { + .do_sample = true, // FIXME: start with false, set to true based on config when opening KRU + .is_accounting = 0, +}; + /// return whether we're using optimized variant right now static bool using_avx2(void) { diff --git a/daemon/rrl/api.h b/daemon/rrl/api.h index d6f5841e..685b80aa 100644 --- a/daemon/rrl/api.h +++ b/daemon/rrl/api.h @@ -1,6 +1,7 @@ #include <stdbool.h> -#include <lib/defines.h> +#include "lib/defines.h" +#include "lib/utils.h" struct kr_request; /** Initialize rate-limiting with shared mmapped memory. @@ -16,3 +17,76 @@ bool kr_rrl_request_begin(struct kr_request *req); /** Remove mmapped file data if not used by other processes. */ KR_EXPORT void kr_rrl_deinit(void); + + +// TODO: reconsider `static inline` cases below + + +typedef struct { + bool do_sample; /// whether to sample; could be important if _COARSE isn't available + int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1 + union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet + uint64_t stamp; /// monotonic nanoseconds, probably won't wrap +} kr_rrl_sample_state_t; +extern kr_rrl_sample_state_t kr_rrl_sample_state; + +#include <time.h> +static inline uint64_t get_stamp(void) +{ + struct timespec now_ts = {0}; + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &now_ts); + return now_ts.tv_nsec + 1000*1000*1000 * (uint64_t)now_ts.tv_sec; +} + +/// Start accounting work, if not doing it already. +static inline void kr_rrl_sample_start(void) +{ + if (!kr_rrl_sample_state.do_sample) return; + kr_assert(!kr_rrl_sample_state.is_accounting); + ++kr_rrl_sample_state.is_accounting; + kr_rrl_sample_state.stamp = get_stamp(); + kr_rrl_sample_state.addr.ip.sa_family = AF_UNSPEC; +} + +/// Annotate the work currently being accounted by an IP address. +static inline void kr_rrl_sample_addr(const union kr_sockaddr *addr) +{ + if (!kr_rrl_sample_state.do_sample || kr_fails_assert(addr)) return; + if (!kr_rrl_sample_state.is_accounting) return; + + if (kr_rrl_sample_state.addr.ip.sa_family != AF_UNSPEC) { + // TODO: this costs performance, so only in some debug mode? + kr_assert(kr_sockaddr_cmp(&addr->ip, &kr_rrl_sample_state.addr.ip) == kr_ok()); + return; + } + + switch (addr->ip.sa_family) { + case AF_INET: + kr_rrl_sample_state.addr.ip4 = addr->ip4; + break; + case AF_INET6: + kr_rrl_sample_state.addr.ip6 = addr->ip6; + break; + default: + kr_rrl_sample_state.addr.ip.sa_family = AF_UNSPEC; + break; + } +} + +/// Stop accounting work - and change the source if applicable. +static inline void kr_rrl_sample_stop(void) +{ + if (!kr_rrl_sample_state.do_sample) return; + + if (kr_fails_assert(kr_rrl_sample_state.is_accounting > 0)) return; // weird + if (--kr_rrl_sample_state.is_accounting) return; + + const uint64_t elapsed = get_stamp() - kr_rrl_sample_state.stamp; + + // we accounted something + // FIXME: drop the log, add KRU, etc. + kr_log_notice(DEVEL, "%8.3f ms for %s\n", elapsed / 1000000.0, + kr_straddr(&kr_rrl_sample_state.addr.ip)); + // TODO: some queries of internal origin have suspicioiusly high numbers. + // We won't be really accounting those, but it might suggest some other issue. +} diff --git a/daemon/session2.c b/daemon/session2.c index c3d5765a..a2c519a0 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -16,6 +16,8 @@ #include "daemon/io.h" #include "daemon/udp_queue.h" #include "daemon/worker.h" +#include "daemon/rrl/api.h" +#include "daemon/proxyv2.h" #include "daemon/session2.h" @@ -547,13 +549,32 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx) * PROTOLAYER_RET_ASYNC when some layers are asynchronous and waiting for * continuation, or a negative number for errors (kr_error). */ static int protolayer_manager_submit( - struct protolayer_manager *manager, + struct session2 *s, enum protolayer_direction direction, size_t layer_ix, struct protolayer_payload payload, const struct comm_info *comm, protolayer_finished_cb cb, void *baton) { - if (manager->session->closing) - return kr_error(ECANCELED); + struct protolayer_manager *manager = s->layers; + if (!comm) + comm = &manager->session->comm; + + // RRL: at this point we might start doing nontrivial work, + // but we may not know the client's IP yet. + // Note two cases: incoming session (new request) + // vs. outgoing session (resuming work on some request) + if (direction == PROTOLAYER_UNWRAP) { + kr_rrl_sample_start(); + // In particular we don't want to miss en/decryption work + // for regular connections from clients. + if (!s->outgoing && s->secure && !proxy_allowed(comm->comm_addr)) + kr_rrl_sample_addr((const union kr_sockaddr *)comm->comm_addr); + } + int ret; + + if (manager->session->closing) { + ret = kr_error(ECANCELED); + goto finish_ret; + } struct protolayer_iter_ctx *ctx = malloc(manager->cb_ctx_size); kr_require(ctx); @@ -567,7 +588,7 @@ static int protolayer_manager_submit( *ctx = (struct protolayer_iter_ctx) { .payload = payload, - .comm = (comm) ? *comm : manager->session->comm, + .comm = *comm, .direction = direction, .layer_ix = layer_ix, .manager = manager, @@ -576,8 +597,10 @@ static int protolayer_manager_submit( }; for (size_t i = 0; i < manager->num_layers; i++) { - if (kr_fails_assert(ctx->manager->grp < PROTOLAYER_GRP_COUNT)) - return kr_error(EFAULT); + if (kr_fails_assert(ctx->manager->grp < PROTOLAYER_GRP_COUNT)) { + ret = kr_error(EFAULT); + goto finish_ret; + } enum protolayer_protocol p = protolayer_grps[manager->grp][i]; struct protolayer_globals *globals = &protolayer_globals[p]; @@ -591,7 +614,11 @@ static int protolayer_manager_submit( globals->iter_init(manager, ctx, iter_data); } - return protolayer_step(ctx); + ret = protolayer_step(ctx); +finish_ret: + if (direction == PROTOLAYER_UNWRAP) + kr_rrl_sample_stop(); + return ret; } static void *get_init_param(enum protolayer_protocol p, @@ -924,8 +951,10 @@ uv_handle_t *session2_get_handle(struct session2 *s) static void session2_on_timeout(uv_timer_t *timer) { + kr_rrl_sample_start(); struct session2 *s = timer->data; session2_event(s, s->timer_event, NULL); + kr_rrl_sample_stop(); } int session2_timer_start(struct session2 *s, enum protolayer_event_type event, uint64_t timeout, uint64_t repeat) @@ -1177,7 +1206,7 @@ int session2_unwrap(struct session2 *s, struct protolayer_payload payload, const struct comm_info *comm, protolayer_finished_cb cb, void *baton) { - return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, 0, + return protolayer_manager_submit(s, PROTOLAYER_UNWRAP, 0, payload, comm, cb, baton); } @@ -1189,7 +1218,7 @@ int session2_unwrap_after(struct session2 *s, enum protolayer_protocol protocol, ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) + 1; if (layer_ix < 0) return layer_ix; - return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, layer_ix, + return protolayer_manager_submit(s, PROTOLAYER_UNWRAP, layer_ix, payload, comm, cb, baton); } @@ -1197,7 +1226,7 @@ int session2_wrap(struct session2 *s, struct protolayer_payload payload, const struct comm_info *comm, protolayer_finished_cb cb, void *baton) { - return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP, + return protolayer_manager_submit(s, PROTOLAYER_WRAP, s->layers->num_layers - 1, payload, comm, cb, baton); } @@ -1210,7 +1239,7 @@ int session2_wrap_after(struct session2 *s, enum protolayer_protocol protocol, ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) - 1; if (layer_ix < 0) return layer_ix; - return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP, layer_ix, + return protolayer_manager_submit(s, PROTOLAYER_WRAP, layer_ix, payload, comm, cb, baton); } diff --git a/daemon/worker.c b/daemon/worker.c index 2d293ba9..0445d027 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -34,6 +34,7 @@ #include "lib/layer.h" #include "lib/layer/iterate.h" /* kr_response_classify */ #include "lib/utils.h" +#include "daemon/rrl/api.h" /* Magic defaults for the worker. */ @@ -362,6 +363,7 @@ static struct request_ctx *request_create(struct session2 *session, /* We need to store a copy of peer address. */ memcpy(&ctx->source.addr.ip, src_addr, kr_sockaddr_len(src_addr)); req->qsource.addr = &ctx->source.addr.ip; + kr_rrl_sample_addr(&ctx->source.addr); if (!comm_addr) comm_addr = src_addr; @@ -1214,6 +1216,9 @@ static int tcp_task_step(struct qr_task *task, static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet) { + if (task && task->ctx->source.session) + kr_rrl_sample_addr(&task->ctx->source.addr); + /* No more steps after we're finished. */ if (!task || task->finished) { return kr_error(ESTALE); diff --git a/lib/utils.h b/lib/utils.h index 6fbdeed2..8f84fc46 100644 --- a/lib/utils.h +++ b/lib/utils.h @@ -309,7 +309,7 @@ struct sockaddr *kr_sockaddr_from_key(struct sockaddr_storage *dst, KR_EXPORT bool kr_sockaddr_key_same_addr(const char *key_a, const char *key_b); -/** Compare two given sockaddr. +/** Compare two given sockaddr. (only address and port) * return 0 - addresses are equal, error code otherwise. */ KR_EXPORT KR_PURE |