summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--daemon/rrl/api.c5
-rw-r--r--daemon/rrl/api.h76
-rw-r--r--daemon/session2.c51
-rw-r--r--daemon/worker.c5
-rw-r--r--lib/utils.h2
5 files changed, 126 insertions, 13 deletions
diff --git a/daemon/rrl/api.c b/daemon/rrl/api.c
index 9ec25e37..2ce12da4 100644
--- a/daemon/rrl/api.c
+++ b/daemon/rrl/api.c
@@ -31,6 +31,11 @@ struct rrl *the_rrl = NULL;
int the_rrl_fd = -1;
char *the_rrl_mmap_file = NULL;
+kr_rrl_sample_state_t kr_rrl_sample_state = {
+ .do_sample = true, // FIXME: start with false, set to true based on config when opening KRU
+ .is_accounting = 0,
+};
+
/// return whether we're using optimized variant right now
static bool using_avx2(void)
{
diff --git a/daemon/rrl/api.h b/daemon/rrl/api.h
index d6f5841e..685b80aa 100644
--- a/daemon/rrl/api.h
+++ b/daemon/rrl/api.h
@@ -1,6 +1,7 @@
#include <stdbool.h>
-#include <lib/defines.h>
+#include "lib/defines.h"
+#include "lib/utils.h"
struct kr_request;
/** Initialize rate-limiting with shared mmapped memory.
@@ -16,3 +17,76 @@ bool kr_rrl_request_begin(struct kr_request *req);
/** Remove mmapped file data if not used by other processes. */
KR_EXPORT
void kr_rrl_deinit(void);
+
+
+// TODO: reconsider `static inline` cases below
+
+
+typedef struct {
+ bool do_sample; /// whether to sample; could be important if _COARSE isn't available
+ int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1
+ union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet
+ uint64_t stamp; /// monotonic nanoseconds, probably won't wrap
+} kr_rrl_sample_state_t;
+extern kr_rrl_sample_state_t kr_rrl_sample_state;
+
+#include <time.h>
+static inline uint64_t get_stamp(void)
+{
+ struct timespec now_ts = {0};
+ clock_gettime(CLOCK_THREAD_CPUTIME_ID, &now_ts);
+ return now_ts.tv_nsec + 1000*1000*1000 * (uint64_t)now_ts.tv_sec;
+}
+
+/// Start accounting work, if not doing it already.
+static inline void kr_rrl_sample_start(void)
+{
+ if (!kr_rrl_sample_state.do_sample) return;
+ kr_assert(!kr_rrl_sample_state.is_accounting);
+ ++kr_rrl_sample_state.is_accounting;
+ kr_rrl_sample_state.stamp = get_stamp();
+ kr_rrl_sample_state.addr.ip.sa_family = AF_UNSPEC;
+}
+
+/// Annotate the work currently being accounted by an IP address.
+static inline void kr_rrl_sample_addr(const union kr_sockaddr *addr)
+{
+ if (!kr_rrl_sample_state.do_sample || kr_fails_assert(addr)) return;
+ if (!kr_rrl_sample_state.is_accounting) return;
+
+ if (kr_rrl_sample_state.addr.ip.sa_family != AF_UNSPEC) {
+ // TODO: this costs performance, so only in some debug mode?
+ kr_assert(kr_sockaddr_cmp(&addr->ip, &kr_rrl_sample_state.addr.ip) == kr_ok());
+ return;
+ }
+
+ switch (addr->ip.sa_family) {
+ case AF_INET:
+ kr_rrl_sample_state.addr.ip4 = addr->ip4;
+ break;
+ case AF_INET6:
+ kr_rrl_sample_state.addr.ip6 = addr->ip6;
+ break;
+ default:
+ kr_rrl_sample_state.addr.ip.sa_family = AF_UNSPEC;
+ break;
+ }
+}
+
+/// Stop accounting work - and change the source if applicable.
+static inline void kr_rrl_sample_stop(void)
+{
+ if (!kr_rrl_sample_state.do_sample) return;
+
+ if (kr_fails_assert(kr_rrl_sample_state.is_accounting > 0)) return; // weird
+ if (--kr_rrl_sample_state.is_accounting) return;
+
+ const uint64_t elapsed = get_stamp() - kr_rrl_sample_state.stamp;
+
+ // we accounted something
+ // FIXME: drop the log, add KRU, etc.
+ kr_log_notice(DEVEL, "%8.3f ms for %s\n", elapsed / 1000000.0,
+ kr_straddr(&kr_rrl_sample_state.addr.ip));
+ // TODO: some queries of internal origin have suspicioiusly high numbers.
+ // We won't be really accounting those, but it might suggest some other issue.
+}
diff --git a/daemon/session2.c b/daemon/session2.c
index c3d5765a..a2c519a0 100644
--- a/daemon/session2.c
+++ b/daemon/session2.c
@@ -16,6 +16,8 @@
#include "daemon/io.h"
#include "daemon/udp_queue.h"
#include "daemon/worker.h"
+#include "daemon/rrl/api.h"
+#include "daemon/proxyv2.h"
#include "daemon/session2.h"
@@ -547,13 +549,32 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx)
* PROTOLAYER_RET_ASYNC when some layers are asynchronous and waiting for
* continuation, or a negative number for errors (kr_error). */
static int protolayer_manager_submit(
- struct protolayer_manager *manager,
+ struct session2 *s,
enum protolayer_direction direction, size_t layer_ix,
struct protolayer_payload payload, const struct comm_info *comm,
protolayer_finished_cb cb, void *baton)
{
- if (manager->session->closing)
- return kr_error(ECANCELED);
+ struct protolayer_manager *manager = s->layers;
+ if (!comm)
+ comm = &manager->session->comm;
+
+ // RRL: at this point we might start doing nontrivial work,
+ // but we may not know the client's IP yet.
+ // Note two cases: incoming session (new request)
+ // vs. outgoing session (resuming work on some request)
+ if (direction == PROTOLAYER_UNWRAP) {
+ kr_rrl_sample_start();
+ // In particular we don't want to miss en/decryption work
+ // for regular connections from clients.
+ if (!s->outgoing && s->secure && !proxy_allowed(comm->comm_addr))
+ kr_rrl_sample_addr((const union kr_sockaddr *)comm->comm_addr);
+ }
+ int ret;
+
+ if (manager->session->closing) {
+ ret = kr_error(ECANCELED);
+ goto finish_ret;
+ }
struct protolayer_iter_ctx *ctx = malloc(manager->cb_ctx_size);
kr_require(ctx);
@@ -567,7 +588,7 @@ static int protolayer_manager_submit(
*ctx = (struct protolayer_iter_ctx) {
.payload = payload,
- .comm = (comm) ? *comm : manager->session->comm,
+ .comm = *comm,
.direction = direction,
.layer_ix = layer_ix,
.manager = manager,
@@ -576,8 +597,10 @@ static int protolayer_manager_submit(
};
for (size_t i = 0; i < manager->num_layers; i++) {
- if (kr_fails_assert(ctx->manager->grp < PROTOLAYER_GRP_COUNT))
- return kr_error(EFAULT);
+ if (kr_fails_assert(ctx->manager->grp < PROTOLAYER_GRP_COUNT)) {
+ ret = kr_error(EFAULT);
+ goto finish_ret;
+ }
enum protolayer_protocol p = protolayer_grps[manager->grp][i];
struct protolayer_globals *globals = &protolayer_globals[p];
@@ -591,7 +614,11 @@ static int protolayer_manager_submit(
globals->iter_init(manager, ctx, iter_data);
}
- return protolayer_step(ctx);
+ ret = protolayer_step(ctx);
+finish_ret:
+ if (direction == PROTOLAYER_UNWRAP)
+ kr_rrl_sample_stop();
+ return ret;
}
static void *get_init_param(enum protolayer_protocol p,
@@ -924,8 +951,10 @@ uv_handle_t *session2_get_handle(struct session2 *s)
static void session2_on_timeout(uv_timer_t *timer)
{
+ kr_rrl_sample_start();
struct session2 *s = timer->data;
session2_event(s, s->timer_event, NULL);
+ kr_rrl_sample_stop();
}
int session2_timer_start(struct session2 *s, enum protolayer_event_type event, uint64_t timeout, uint64_t repeat)
@@ -1177,7 +1206,7 @@ int session2_unwrap(struct session2 *s, struct protolayer_payload payload,
const struct comm_info *comm, protolayer_finished_cb cb,
void *baton)
{
- return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, 0,
+ return protolayer_manager_submit(s, PROTOLAYER_UNWRAP, 0,
payload, comm, cb, baton);
}
@@ -1189,7 +1218,7 @@ int session2_unwrap_after(struct session2 *s, enum protolayer_protocol protocol,
ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) + 1;
if (layer_ix < 0)
return layer_ix;
- return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, layer_ix,
+ return protolayer_manager_submit(s, PROTOLAYER_UNWRAP, layer_ix,
payload, comm, cb, baton);
}
@@ -1197,7 +1226,7 @@ int session2_wrap(struct session2 *s, struct protolayer_payload payload,
const struct comm_info *comm, protolayer_finished_cb cb,
void *baton)
{
- return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP,
+ return protolayer_manager_submit(s, PROTOLAYER_WRAP,
s->layers->num_layers - 1,
payload, comm, cb, baton);
}
@@ -1210,7 +1239,7 @@ int session2_wrap_after(struct session2 *s, enum protolayer_protocol protocol,
ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) - 1;
if (layer_ix < 0)
return layer_ix;
- return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP, layer_ix,
+ return protolayer_manager_submit(s, PROTOLAYER_WRAP, layer_ix,
payload, comm, cb, baton);
}
diff --git a/daemon/worker.c b/daemon/worker.c
index 2d293ba9..0445d027 100644
--- a/daemon/worker.c
+++ b/daemon/worker.c
@@ -34,6 +34,7 @@
#include "lib/layer.h"
#include "lib/layer/iterate.h" /* kr_response_classify */
#include "lib/utils.h"
+#include "daemon/rrl/api.h"
/* Magic defaults for the worker. */
@@ -362,6 +363,7 @@ static struct request_ctx *request_create(struct session2 *session,
/* We need to store a copy of peer address. */
memcpy(&ctx->source.addr.ip, src_addr, kr_sockaddr_len(src_addr));
req->qsource.addr = &ctx->source.addr.ip;
+ kr_rrl_sample_addr(&ctx->source.addr);
if (!comm_addr)
comm_addr = src_addr;
@@ -1214,6 +1216,9 @@ static int tcp_task_step(struct qr_task *task,
static int qr_task_step(struct qr_task *task,
const struct sockaddr *packet_source, knot_pkt_t *packet)
{
+ if (task && task->ctx->source.session)
+ kr_rrl_sample_addr(&task->ctx->source.addr);
+
/* No more steps after we're finished. */
if (!task || task->finished) {
return kr_error(ESTALE);
diff --git a/lib/utils.h b/lib/utils.h
index 6fbdeed2..8f84fc46 100644
--- a/lib/utils.h
+++ b/lib/utils.h
@@ -309,7 +309,7 @@ struct sockaddr *kr_sockaddr_from_key(struct sockaddr_storage *dst,
KR_EXPORT
bool kr_sockaddr_key_same_addr(const char *key_a, const char *key_b);
-/** Compare two given sockaddr.
+/** Compare two given sockaddr. (only address and port)
* return 0 - addresses are equal, error code otherwise.
*/
KR_EXPORT KR_PURE