summaryrefslogtreecommitdiffstats
path: root/daemon/session2.c
diff options
context:
space:
mode:
authorOto Šťáva <oto.stava@nic.cz>2023-06-23 11:02:34 +0200
committerOto Šťáva <oto.stava@nic.cz>2023-06-23 11:11:36 +0200
commitbba209bf92fb71b18482e7248a2a40a71237e6e9 (patch)
tree3a2ffb1e22849489407e45cda7d4fe1a76bc71d8 /daemon/session2.c
parentdaemon/session2: prevent submitting payloads when session is closing (diff)
downloadknot-resolver-bba209bf92fb71b18482e7248a2a40a71237e6e9.tar.xz
knot-resolver-bba209bf92fb71b18482e7248a2a40a71237e6e9.zip
daemon/session2: make copies short-lived buffers when needed
Diffstat (limited to 'daemon/session2.c')
-rw-r--r--daemon/session2.c179
1 files changed, 132 insertions, 47 deletions
diff --git a/daemon/session2.c b/daemon/session2.c
index 10534cba..491f0e6a 100644
--- a/daemon/session2.c
+++ b/daemon/session2.c
@@ -135,32 +135,58 @@ const char *protolayer_payload_name(enum protolayer_payload_type p)
/* Forward decls. */
static int session2_transport_pushv(struct session2 *s,
struct iovec *iov, int iovcnt,
+ bool iov_short_lived,
const struct comm_info *comm,
protolayer_finished_cb cb, void *baton);
static inline int session2_transport_push(struct session2 *s,
char *buf, size_t buf_len,
+ bool buf_short_lived,
const struct comm_info *comm,
protolayer_finished_cb cb, void *baton);
static int session2_transport_event(struct session2 *s,
enum protolayer_event_type event,
void *baton);
+static size_t iovecs_size(const struct iovec *iov, int cnt)
+{
+ size_t sum = 0;
+ for (int i = 0; i < cnt; i++) {
+ sum += iov[i].iov_len;
+ }
+ return sum;
+}
+
+static size_t iovecs_copy(void *dest, const struct iovec *iov, int cnt,
+ size_t max_len)
+{
+ const size_t pld_size = iovecs_size(iov, cnt);
+ const size_t copy_size = MIN(max_len, pld_size);
+ char *cur = dest;
+ size_t remaining = copy_size;
+ for (int i = 0; i < cnt && remaining; i++) {
+ size_t l = iov[i].iov_len;
+ size_t to_copy = MIN(l, remaining);
+ memcpy(cur, iov[i].iov_base, to_copy);
+ remaining -= l;
+ cur += l;
+ }
+
+ kr_assert(remaining == 0 && (cur - (char *)dest) == copy_size);
+ return copy_size;
+}
size_t protolayer_payload_size(const struct protolayer_payload *payload)
{
- if (payload->type == PROTOLAYER_PAYLOAD_BUFFER) {
+ switch (payload->type) {
+ case PROTOLAYER_PAYLOAD_BUFFER:
return payload->buffer.len;
- } else if (payload->type == PROTOLAYER_PAYLOAD_IOVEC) {
- size_t sum = 0;
- for (int i = 0; i < payload->iovec.cnt; i++) {
- sum += payload->iovec.iov[i].iov_len;
- }
- return sum;
- } else if (payload->type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
+ case PROTOLAYER_PAYLOAD_IOVEC:
+ return iovecs_size(payload->iovec.iov, payload->iovec.cnt);
+ case PROTOLAYER_PAYLOAD_WIRE_BUF:
return wire_buf_data_length(payload->wire_buf);
- } else if(!payload->type) {
+ case PROTOLAYER_PAYLOAD_NULL:
return 0;
- } else {
+ default:
kr_assert(false && "Invalid payload type");
return 0;
}
@@ -208,6 +234,7 @@ struct protolayer_payload protolayer_as_buffer(const struct protolayer_payload *
if (payload->type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
struct protolayer_payload new_payload = {
.type = PROTOLAYER_PAYLOAD_BUFFER,
+ .short_lived = payload->short_lived,
.ttl = payload->ttl,
.buffer = {
.buf = wire_buf_data(payload->wire_buf),
@@ -373,6 +400,7 @@ static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret)
ctx->finished_cb(ret, session, &ctx->comm,
ctx->finished_cb_baton);
+ free(ctx->async_buffer);
free(ctx);
return ret;
@@ -402,10 +430,12 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx)
if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
session2_transport_push(session,
ctx->payload.buffer.buf, ctx->payload.buffer.len,
+ ctx->payload.short_lived,
&ctx->comm, protolayer_push_finished, ctx);
} else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
session2_transport_pushv(session,
ctx->payload.iovec.iov, ctx->payload.iovec.cnt,
+ ctx->payload.short_lived,
&ctx->comm, protolayer_push_finished, ctx);
} else {
kr_assert(false && "Invalid payload type");
@@ -415,6 +445,23 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx)
return PROTOLAYER_RET_ASYNC;
}
+static void protolayer_ensure_long_lived(struct protolayer_iter_ctx *ctx)
+{
+ if (!ctx->payload.short_lived)
+ return;
+
+ size_t buf_len = protolayer_payload_size(&ctx->payload);
+ if (kr_fails_assert(buf_len))
+ return;
+
+ void *buf = malloc(buf_len);
+ kr_require(buf);
+ protolayer_payload_copy(buf, &ctx->payload, buf_len);
+
+ ctx->async_buffer = buf;
+ ctx->payload = protolayer_buffer(buf, buf_len, false);
+}
+
/** Processes as many layers as possible synchronously, returning when either
* a layer has gone asynchronous, or when the whole sequence has finished.
*
@@ -459,6 +506,7 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx)
if (!ctx->action) {
/* Next step is from a callback */
ctx->async_mode = true;
+ protolayer_ensure_long_lived(ctx);
return PROTOLAYER_RET_ASYNC;
}
@@ -1229,8 +1277,7 @@ struct session2_pushv_ctx {
const struct comm_info *comm;
void *baton;
- char *buf;
- size_t buf_len;
+ char *async_buf;
};
static void session2_transport_parent_pushv_finished(int status,
@@ -1241,36 +1288,32 @@ static void session2_transport_parent_pushv_finished(int status,
struct session2_pushv_ctx *ctx = baton;
if (ctx->cb)
ctx->cb(status, ctx->session, comm, ctx->baton);
- free(ctx->buf);
+ free(ctx->async_buf);
free(ctx);
}
-static void session2_transport_udp_queue_pushv_finished(int status, void *baton)
+static void session2_transport_pushv_finished(int status, struct session2_pushv_ctx *ctx)
{
- struct session2_pushv_ctx *ctx = baton;
if (ctx->cb)
ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
- free(ctx->buf);
+ free(ctx->async_buf);
free(ctx);
}
+static void session2_transport_udp_queue_pushv_finished(int status, void *baton)
+{
+ session2_transport_pushv_finished(status, baton);
+}
+
static void session2_transport_udp_pushv_finished(uv_udp_send_t *req, int status)
{
- struct session2_pushv_ctx *ctx = req->data;
- if (ctx->cb)
- ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
- free(ctx->buf);
- free(ctx);
+ session2_transport_pushv_finished(status, req->data);
free(req);
}
static void session2_transport_stream_pushv_finished(uv_write_t *req, int status)
{
- struct session2_pushv_ctx *ctx = req->data;
- if (ctx->cb)
- ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
- free(ctx->buf);
- free(ctx);
+ session2_transport_pushv_finished(status, req->data);
free(req);
}
@@ -1298,11 +1341,35 @@ static void xdp_tx_waker(uv_idle_t *handle)
}
#endif
+static void session2_transport_pushv_ensure_long_lived(
+ struct iovec **iov, int *iovcnt, bool iov_short_lived,
+ struct iovec *out_iovecmem, struct session2_pushv_ctx *ctx)
+{
+ if (!iov_short_lived)
+ return;
+
+ size_t iovsize = iovecs_size(*iov, *iovcnt);
+ if (kr_fails_assert(iovsize))
+ return;
+
+ void *buf = malloc(iovsize);
+ kr_require(buf);
+ iovecs_copy(buf, *iov, *iovcnt, iovsize);
+
+ ctx->async_buf = buf;
+ out_iovecmem->iov_base = buf;
+ out_iovecmem->iov_len = iovsize;
+ *iov = out_iovecmem;
+ *iovcnt = 1;
+}
+
static int session2_transport_pushv(struct session2 *s,
struct iovec *iov, int iovcnt,
+ bool iov_short_lived,
const struct comm_info *comm,
protolayer_finished_cb cb, void *baton)
{
+ struct iovec iovecmem;
if (kr_fails_assert(s))
return kr_error(EINVAL);
@@ -1336,34 +1403,46 @@ static int session2_transport_pushv(struct session2 *s,
if (kr_fails_assert(iovcnt == 1))
return kr_error(EINVAL);
+ session2_transport_pushv_ensure_long_lived(
+ &iov, &iovcnt, iov_short_lived,
+ &iovecmem, ctx);
udp_queue_push(fd, comm->comm_addr, iov->iov_base, iov->iov_len,
session2_transport_udp_queue_pushv_finished,
ctx);
return kr_ok();
} else {
- uv_udp_send_t *req = malloc(sizeof(*req));
- req->data = ctx;
- int ret = uv_udp_send(req, (uv_udp_t *)handle,
- (uv_buf_t *)iov, iovcnt, comm->comm_addr,
- session2_transport_udp_pushv_finished);
- if (ret) {
- if (cb)
- cb(ret, s, comm, baton);
- free(req);
- free(ctx);
+ int ret = uv_udp_try_send((uv_udp_t*)handle,
+ (uv_buf_t *)iov, iovcnt, comm->comm_addr);
+ if (ret == UV_EAGAIN) {
+ uv_udp_send_t *req = malloc(sizeof(*req));
+ req->data = ctx;
+ session2_transport_pushv_ensure_long_lived(
+ &iov, &iovcnt, iov_short_lived,
+ &iovecmem, ctx);
+ ret = uv_udp_send(req, (uv_udp_t *)handle,
+ (uv_buf_t *)iov, iovcnt, comm->comm_addr,
+ session2_transport_udp_pushv_finished);
+ if (ret)
+ session2_transport_udp_pushv_finished(req, ret);
+ } else {
+ session2_transport_pushv_finished(ret, ctx);
}
return ret;
}
} else if (handle->type == UV_TCP) {
- uv_write_t *req = malloc(sizeof(*req));
- req->data = ctx;
- int ret = uv_write(req, (uv_stream_t *)handle, (uv_buf_t *)iov, iovcnt,
- session2_transport_stream_pushv_finished);
- if (ret) {
- if (cb)
- cb(ret, s, comm, baton);
- free(req);
- free(ctx);
+ int ret = uv_try_write((uv_stream_t *)handle, (uv_buf_t *)iov, iovcnt);
+ if (ret == UV_EAGAIN) {
+ uv_write_t *req = malloc(sizeof(*req));
+ req->data = ctx;
+ session2_transport_pushv_ensure_long_lived(
+ &iov, &iovcnt, iov_short_lived,
+ &iovecmem, ctx);
+ ret = uv_write(req, (uv_stream_t *)handle, (uv_buf_t *)iov, iovcnt,
+ session2_transport_stream_pushv_finished);
+ if (ret)
+ session2_transport_stream_pushv_finished(req, ret);
+ } else {
+ session2_transport_pushv_finished(ret, ctx);
}
return ret;
#if ENABLE_XDP
@@ -1376,6 +1455,10 @@ static int session2_transport_pushv(struct session2 *s,
if (kr_fails_assert(iovcnt == 1))
return kr_error(EINVAL);
+ session2_transport_pushv_ensure_long_lived(
+ &iov, &iovcnt, iov_short_lived,
+ &iovecmem, ctx);
+
knot_xdp_msg_t msg;
#if KNOT_VERSION_HEX >= 0x030100
/* We don't have a nice way of preserving the _msg_t from frame allocation,
@@ -1415,7 +1498,8 @@ static int session2_transport_pushv(struct session2 *s,
free(ctx);
return kr_error(EINVAL);
}
- int ret = session2_wrap(parent, protolayer_iovec(iov, iovcnt),
+ int ret = session2_wrap(parent,
+ protolayer_iovec(iov, iovcnt, iov_short_lived),
comm, session2_transport_parent_pushv_finished,
ctx);
return (ret < 0) ? ret : kr_ok();
@@ -1446,6 +1530,7 @@ static void session2_transport_single_push_finished(int status,
static inline int session2_transport_push(struct session2 *s,
char *buf, size_t buf_len,
+ bool buf_short_lived,
const struct comm_info *comm,
protolayer_finished_cb cb, void *baton)
{
@@ -1460,7 +1545,7 @@ static inline int session2_transport_push(struct session2 *s,
.baton = baton
};
- return session2_transport_pushv(s, &ctx->iov, 1, comm,
+ return session2_transport_pushv(s, &ctx->iov, 1, buf_short_lived, comm,
session2_transport_single_push_finished, ctx);
}