diff options
author | Oto Šťáva <oto.stava@nic.cz> | 2023-06-23 11:02:34 +0200 |
---|---|---|
committer | Oto Šťáva <oto.stava@nic.cz> | 2023-06-23 11:11:36 +0200 |
commit | bba209bf92fb71b18482e7248a2a40a71237e6e9 (patch) | |
tree | 3a2ffb1e22849489407e45cda7d4fe1a76bc71d8 /daemon/session2.c | |
parent | daemon/session2: prevent submitting payloads when session is closing (diff) | |
download | knot-resolver-bba209bf92fb71b18482e7248a2a40a71237e6e9.tar.xz knot-resolver-bba209bf92fb71b18482e7248a2a40a71237e6e9.zip |
daemon/session2: make copies short-lived buffers when needed
Diffstat (limited to 'daemon/session2.c')
-rw-r--r-- | daemon/session2.c | 179 |
1 files changed, 132 insertions, 47 deletions
diff --git a/daemon/session2.c b/daemon/session2.c index 10534cba..491f0e6a 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -135,32 +135,58 @@ const char *protolayer_payload_name(enum protolayer_payload_type p) /* Forward decls. */ static int session2_transport_pushv(struct session2 *s, struct iovec *iov, int iovcnt, + bool iov_short_lived, const struct comm_info *comm, protolayer_finished_cb cb, void *baton); static inline int session2_transport_push(struct session2 *s, char *buf, size_t buf_len, + bool buf_short_lived, const struct comm_info *comm, protolayer_finished_cb cb, void *baton); static int session2_transport_event(struct session2 *s, enum protolayer_event_type event, void *baton); +static size_t iovecs_size(const struct iovec *iov, int cnt) +{ + size_t sum = 0; + for (int i = 0; i < cnt; i++) { + sum += iov[i].iov_len; + } + return sum; +} + +static size_t iovecs_copy(void *dest, const struct iovec *iov, int cnt, + size_t max_len) +{ + const size_t pld_size = iovecs_size(iov, cnt); + const size_t copy_size = MIN(max_len, pld_size); + char *cur = dest; + size_t remaining = copy_size; + for (int i = 0; i < cnt && remaining; i++) { + size_t l = iov[i].iov_len; + size_t to_copy = MIN(l, remaining); + memcpy(cur, iov[i].iov_base, to_copy); + remaining -= l; + cur += l; + } + + kr_assert(remaining == 0 && (cur - (char *)dest) == copy_size); + return copy_size; +} size_t protolayer_payload_size(const struct protolayer_payload *payload) { - if (payload->type == PROTOLAYER_PAYLOAD_BUFFER) { + switch (payload->type) { + case PROTOLAYER_PAYLOAD_BUFFER: return payload->buffer.len; - } else if (payload->type == PROTOLAYER_PAYLOAD_IOVEC) { - size_t sum = 0; - for (int i = 0; i < payload->iovec.cnt; i++) { - sum += payload->iovec.iov[i].iov_len; - } - return sum; - } else if (payload->type == PROTOLAYER_PAYLOAD_WIRE_BUF) { + case PROTOLAYER_PAYLOAD_IOVEC: + return iovecs_size(payload->iovec.iov, payload->iovec.cnt); + case PROTOLAYER_PAYLOAD_WIRE_BUF: return wire_buf_data_length(payload->wire_buf); - } else if(!payload->type) { + case PROTOLAYER_PAYLOAD_NULL: return 0; - } else { + default: kr_assert(false && "Invalid payload type"); return 0; } @@ -208,6 +234,7 @@ struct protolayer_payload protolayer_as_buffer(const struct protolayer_payload * if (payload->type == PROTOLAYER_PAYLOAD_WIRE_BUF) { struct protolayer_payload new_payload = { .type = PROTOLAYER_PAYLOAD_BUFFER, + .short_lived = payload->short_lived, .ttl = payload->ttl, .buffer = { .buf = wire_buf_data(payload->wire_buf), @@ -373,6 +400,7 @@ static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret) ctx->finished_cb(ret, session, &ctx->comm, ctx->finished_cb_baton); + free(ctx->async_buffer); free(ctx); return ret; @@ -402,10 +430,12 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx) if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) { session2_transport_push(session, ctx->payload.buffer.buf, ctx->payload.buffer.len, + ctx->payload.short_lived, &ctx->comm, protolayer_push_finished, ctx); } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) { session2_transport_pushv(session, ctx->payload.iovec.iov, ctx->payload.iovec.cnt, + ctx->payload.short_lived, &ctx->comm, protolayer_push_finished, ctx); } else { kr_assert(false && "Invalid payload type"); @@ -415,6 +445,23 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx) return PROTOLAYER_RET_ASYNC; } +static void protolayer_ensure_long_lived(struct protolayer_iter_ctx *ctx) +{ + if (!ctx->payload.short_lived) + return; + + size_t buf_len = protolayer_payload_size(&ctx->payload); + if (kr_fails_assert(buf_len)) + return; + + void *buf = malloc(buf_len); + kr_require(buf); + protolayer_payload_copy(buf, &ctx->payload, buf_len); + + ctx->async_buffer = buf; + ctx->payload = protolayer_buffer(buf, buf_len, false); +} + /** Processes as many layers as possible synchronously, returning when either * a layer has gone asynchronous, or when the whole sequence has finished. * @@ -459,6 +506,7 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx) if (!ctx->action) { /* Next step is from a callback */ ctx->async_mode = true; + protolayer_ensure_long_lived(ctx); return PROTOLAYER_RET_ASYNC; } @@ -1229,8 +1277,7 @@ struct session2_pushv_ctx { const struct comm_info *comm; void *baton; - char *buf; - size_t buf_len; + char *async_buf; }; static void session2_transport_parent_pushv_finished(int status, @@ -1241,36 +1288,32 @@ static void session2_transport_parent_pushv_finished(int status, struct session2_pushv_ctx *ctx = baton; if (ctx->cb) ctx->cb(status, ctx->session, comm, ctx->baton); - free(ctx->buf); + free(ctx->async_buf); free(ctx); } -static void session2_transport_udp_queue_pushv_finished(int status, void *baton) +static void session2_transport_pushv_finished(int status, struct session2_pushv_ctx *ctx) { - struct session2_pushv_ctx *ctx = baton; if (ctx->cb) ctx->cb(status, ctx->session, ctx->comm, ctx->baton); - free(ctx->buf); + free(ctx->async_buf); free(ctx); } +static void session2_transport_udp_queue_pushv_finished(int status, void *baton) +{ + session2_transport_pushv_finished(status, baton); +} + static void session2_transport_udp_pushv_finished(uv_udp_send_t *req, int status) { - struct session2_pushv_ctx *ctx = req->data; - if (ctx->cb) - ctx->cb(status, ctx->session, ctx->comm, ctx->baton); - free(ctx->buf); - free(ctx); + session2_transport_pushv_finished(status, req->data); free(req); } static void session2_transport_stream_pushv_finished(uv_write_t *req, int status) { - struct session2_pushv_ctx *ctx = req->data; - if (ctx->cb) - ctx->cb(status, ctx->session, ctx->comm, ctx->baton); - free(ctx->buf); - free(ctx); + session2_transport_pushv_finished(status, req->data); free(req); } @@ -1298,11 +1341,35 @@ static void xdp_tx_waker(uv_idle_t *handle) } #endif +static void session2_transport_pushv_ensure_long_lived( + struct iovec **iov, int *iovcnt, bool iov_short_lived, + struct iovec *out_iovecmem, struct session2_pushv_ctx *ctx) +{ + if (!iov_short_lived) + return; + + size_t iovsize = iovecs_size(*iov, *iovcnt); + if (kr_fails_assert(iovsize)) + return; + + void *buf = malloc(iovsize); + kr_require(buf); + iovecs_copy(buf, *iov, *iovcnt, iovsize); + + ctx->async_buf = buf; + out_iovecmem->iov_base = buf; + out_iovecmem->iov_len = iovsize; + *iov = out_iovecmem; + *iovcnt = 1; +} + static int session2_transport_pushv(struct session2 *s, struct iovec *iov, int iovcnt, + bool iov_short_lived, const struct comm_info *comm, protolayer_finished_cb cb, void *baton) { + struct iovec iovecmem; if (kr_fails_assert(s)) return kr_error(EINVAL); @@ -1336,34 +1403,46 @@ static int session2_transport_pushv(struct session2 *s, if (kr_fails_assert(iovcnt == 1)) return kr_error(EINVAL); + session2_transport_pushv_ensure_long_lived( + &iov, &iovcnt, iov_short_lived, + &iovecmem, ctx); udp_queue_push(fd, comm->comm_addr, iov->iov_base, iov->iov_len, session2_transport_udp_queue_pushv_finished, ctx); return kr_ok(); } else { - uv_udp_send_t *req = malloc(sizeof(*req)); - req->data = ctx; - int ret = uv_udp_send(req, (uv_udp_t *)handle, - (uv_buf_t *)iov, iovcnt, comm->comm_addr, - session2_transport_udp_pushv_finished); - if (ret) { - if (cb) - cb(ret, s, comm, baton); - free(req); - free(ctx); + int ret = uv_udp_try_send((uv_udp_t*)handle, + (uv_buf_t *)iov, iovcnt, comm->comm_addr); + if (ret == UV_EAGAIN) { + uv_udp_send_t *req = malloc(sizeof(*req)); + req->data = ctx; + session2_transport_pushv_ensure_long_lived( + &iov, &iovcnt, iov_short_lived, + &iovecmem, ctx); + ret = uv_udp_send(req, (uv_udp_t *)handle, + (uv_buf_t *)iov, iovcnt, comm->comm_addr, + session2_transport_udp_pushv_finished); + if (ret) + session2_transport_udp_pushv_finished(req, ret); + } else { + session2_transport_pushv_finished(ret, ctx); } return ret; } } else if (handle->type == UV_TCP) { - uv_write_t *req = malloc(sizeof(*req)); - req->data = ctx; - int ret = uv_write(req, (uv_stream_t *)handle, (uv_buf_t *)iov, iovcnt, - session2_transport_stream_pushv_finished); - if (ret) { - if (cb) - cb(ret, s, comm, baton); - free(req); - free(ctx); + int ret = uv_try_write((uv_stream_t *)handle, (uv_buf_t *)iov, iovcnt); + if (ret == UV_EAGAIN) { + uv_write_t *req = malloc(sizeof(*req)); + req->data = ctx; + session2_transport_pushv_ensure_long_lived( + &iov, &iovcnt, iov_short_lived, + &iovecmem, ctx); + ret = uv_write(req, (uv_stream_t *)handle, (uv_buf_t *)iov, iovcnt, + session2_transport_stream_pushv_finished); + if (ret) + session2_transport_stream_pushv_finished(req, ret); + } else { + session2_transport_pushv_finished(ret, ctx); } return ret; #if ENABLE_XDP @@ -1376,6 +1455,10 @@ static int session2_transport_pushv(struct session2 *s, if (kr_fails_assert(iovcnt == 1)) return kr_error(EINVAL); + session2_transport_pushv_ensure_long_lived( + &iov, &iovcnt, iov_short_lived, + &iovecmem, ctx); + knot_xdp_msg_t msg; #if KNOT_VERSION_HEX >= 0x030100 /* We don't have a nice way of preserving the _msg_t from frame allocation, @@ -1415,7 +1498,8 @@ static int session2_transport_pushv(struct session2 *s, free(ctx); return kr_error(EINVAL); } - int ret = session2_wrap(parent, protolayer_iovec(iov, iovcnt), + int ret = session2_wrap(parent, + protolayer_iovec(iov, iovcnt, iov_short_lived), comm, session2_transport_parent_pushv_finished, ctx); return (ret < 0) ? ret : kr_ok(); @@ -1446,6 +1530,7 @@ static void session2_transport_single_push_finished(int status, static inline int session2_transport_push(struct session2 *s, char *buf, size_t buf_len, + bool buf_short_lived, const struct comm_info *comm, protolayer_finished_cb cb, void *baton) { @@ -1460,7 +1545,7 @@ static inline int session2_transport_push(struct session2 *s, .baton = baton }; - return session2_transport_pushv(s, &ctx->iov, 1, comm, + return session2_transport_pushv(s, &ctx->iov, 1, buf_short_lived, comm, session2_transport_single_push_finished, ctx); } |