summaryrefslogtreecommitdiffstats
path: root/daemon/session2.c
diff options
context:
space:
mode:
Diffstat (limited to 'daemon/session2.c')
-rw-r--r--daemon/session2.c162
1 files changed, 112 insertions, 50 deletions
diff --git a/daemon/session2.c b/daemon/session2.c
index 40328e13..faca57bf 100644
--- a/daemon/session2.c
+++ b/daemon/session2.c
@@ -307,6 +307,18 @@ bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue)
return false;
}
+static inline ssize_t session2_get_protocol(
+ struct session2 *s, enum protolayer_type protocol)
+{
+ const struct protolayer_grp *grp = &protolayer_grps[s->proto];
+ for (ssize_t i = 0; i < grp->num_layers; i++) {
+ enum protolayer_type found = grp->layers[i];
+ if (protocol == found)
+ return i;
+ }
+
+ return -1;
+}
/** Gets layer-specific session data for the layer with the specified index
* from the manager. */
@@ -333,6 +345,14 @@ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx)
return protolayer_sess_data_get(ctx->session, ctx->layer_ix);
}
+void *protolayer_sess_data_get_proto(struct session2 *s, enum protolayer_type protocol) {
+ ssize_t layer_ix = session2_get_protocol(s, protocol);
+ if (layer_ix < 0)
+ return NULL;
+
+ return protolayer_sess_data_get(s, layer_ix);
+}
+
/** Gets layer-specific iteration data for the layer with the specified index
* from the context. */
static inline struct protolayer_data *protolayer_iter_data_get(
@@ -358,17 +378,17 @@ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx)
return protolayer_iter_data_get(ctx, ctx->layer_ix);
}
-static inline ssize_t session2_get_protocol(
- struct session2 *s, enum protolayer_type protocol)
+size_t protolayer_sess_size_est(struct session2 *s)
{
- const struct protolayer_grp *grp = &protolayer_grps[s->proto];
- for (ssize_t i = 0; i < grp->num_layers; i++) {
- enum protolayer_type found = grp->layers[i];
- if (protocol == found)
- return i;
- }
+ return s->session_size + s->wire_buf.size;
+}
- return -1;
+size_t protolayer_iter_size_est(struct protolayer_iter_ctx *ctx, bool incl_payload)
+{
+ size_t size = ctx->session->iter_ctx_size;
+ if (incl_payload)
+ size += protolayer_payload_size(&ctx->payload);
+ return size;
}
static inline bool protolayer_iter_ctx_is_last(struct protolayer_iter_ctx *ctx)
@@ -596,7 +616,7 @@ static int session2_submit(
{
if (session->closing)
return kr_error(ECANCELED);
- if (session->ref_count >= INT_MAX)
+ if (session->ref_count >= INT_MAX - 1)
return kr_error(ETOOMANYREFS);
if (kr_fails_assert(session->proto < KR_PROTO_COUNT))
return kr_error(EFAULT);
@@ -610,7 +630,7 @@ static int session2_submit(
// Note two cases: incoming session (new request)
// vs. outgoing session (resuming work on some request)
if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0))
- defer_sample_start();
+ defer_sample_start(NULL);
struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size);
kr_require(ctx);
@@ -672,7 +692,7 @@ static int session2_submit(
int ret = protolayer_step(ctx);
if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0))
- defer_sample_stop();
+ defer_sample_stop(NULL, false);
return ret;
}
@@ -845,6 +865,7 @@ struct session2 *session2_new(enum session2_transport_type transport_type,
.proto = proto,
.iter_ctx_size = iter_ctx_size,
+ .session_size = session_size,
};
memcpy(&s->layer_data, offsets, sizeof(offsets));
@@ -959,10 +980,8 @@ uv_handle_t *session2_get_handle(struct session2 *s)
static void session2_on_timeout(uv_timer_t *timer)
{
- defer_sample_start();
struct session2 *s = timer->data;
session2_event(s, s->timer_event, NULL);
- defer_sample_stop();
}
int session2_timer_start(struct session2 *s, enum protolayer_event_type event, uint64_t timeout, uint64_t repeat)
@@ -1084,11 +1103,17 @@ struct qr_task *session2_tasklist_del_msgid(const struct session2 *session, uint
void session2_tasklist_finalize(struct session2 *session, int status)
{
- while (session2_tasklist_get_len(session) > 0) {
- struct qr_task *t = session2_tasklist_del_first(session, false);
- kr_require(worker_task_numrefs(t) > 0);
- worker_task_finalize(t, status);
- worker_task_unref(t);
+ if (session2_tasklist_get_len(session) > 0) {
+ defer_sample_state_t defer_prev_sample_state;
+ defer_sample_start(&defer_prev_sample_state);
+ do {
+ struct qr_task *t = session2_tasklist_del_first(session, false);
+ kr_require(worker_task_numrefs(t) > 0);
+ worker_task_finalize(t, status);
+ worker_task_unref(t);
+ defer_sample_restart();
+ } while (session2_tasklist_get_len(session) > 0);
+ defer_sample_stop(&defer_prev_sample_state, true);
}
}
@@ -1121,27 +1146,34 @@ int session2_tasklist_finalize_expired(struct session2 *session)
key = (char *)&msg_id;
keylen = sizeof(msg_id);
}
- while (queue_len(q) > 0) {
- task = queue_head(q);
- if (session->outgoing) {
- knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
- msg_id = knot_wire_get_id(pktbuf->wire);
- }
- int res = trie_del(t, key, keylen, NULL);
- if (!worker_task_finished(task)) {
- /* task->pending_count must be zero,
- * but there are can be followers,
- * so run worker_task_subreq_finalize() to ensure retrying
- * for all the followers. */
- worker_task_subreq_finalize(task);
- worker_task_finalize(task, KR_STATE_FAIL);
- }
- if (res == KNOT_EOK) {
+
+ if (queue_len(q) > 0) {
+ defer_sample_state_t defer_prev_sample_state;
+ defer_sample_start(&defer_prev_sample_state);
+ do {
+ task = queue_head(q);
+ if (session->outgoing) {
+ knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+ msg_id = knot_wire_get_id(pktbuf->wire);
+ }
+ int res = trie_del(t, key, keylen, NULL);
+ if (!worker_task_finished(task)) {
+ /* task->pending_count must be zero,
+ * but there are can be followers,
+ * so run worker_task_subreq_finalize() to ensure retrying
+ * for all the followers. */
+ worker_task_subreq_finalize(task);
+ worker_task_finalize(task, KR_STATE_FAIL);
+ }
+ if (res == KNOT_EOK) {
+ worker_task_unref(task);
+ }
+ queue_pop(q);
worker_task_unref(task);
- }
- queue_pop(q);
- worker_task_unref(task);
- ++ret;
+ ++ret;
+ defer_sample_restart();
+ } while (queue_len(q) > 0);
+ defer_sample_stop(&defer_prev_sample_state, true);
}
queue_deinit(q);
@@ -1172,22 +1204,34 @@ struct qr_task *session2_waitinglist_pop(struct session2 *session, bool deref)
void session2_waitinglist_retry(struct session2 *session, bool increase_timeout_cnt)
{
- while (!session2_waitinglist_is_empty(session)) {
- struct qr_task *task = session2_waitinglist_pop(session, false);
- if (increase_timeout_cnt) {
- worker_task_timeout_inc(task);
- }
- worker_task_step(task, session2_get_peer(session), NULL);
- worker_task_unref(task);
+ if (!session2_waitinglist_is_empty(session)) {
+ defer_sample_state_t defer_prev_sample_state;
+ defer_sample_start(&defer_prev_sample_state);
+ do {
+ struct qr_task *task = session2_waitinglist_pop(session, false);
+ if (increase_timeout_cnt) {
+ worker_task_timeout_inc(task);
+ }
+ worker_task_step(task, session2_get_peer(session), NULL);
+ worker_task_unref(task);
+ defer_sample_restart();
+ } while (!session2_waitinglist_is_empty(session));
+ defer_sample_stop(&defer_prev_sample_state, true);
}
}
void session2_waitinglist_finalize(struct session2 *session, int status)
{
- while (!session2_waitinglist_is_empty(session)) {
- struct qr_task *t = session2_waitinglist_pop(session, false);
- worker_task_finalize(t, status);
- worker_task_unref(t);
+ if (!session2_waitinglist_is_empty(session)) {
+ defer_sample_state_t defer_prev_sample_state;
+ defer_sample_start(&defer_prev_sample_state);
+ do {
+ struct qr_task *t = session2_waitinglist_pop(session, false);
+ worker_task_finalize(t, status);
+ worker_task_unref(t);
+ defer_sample_restart();
+ } while (!session2_waitinglist_is_empty(session));
+ defer_sample_stop(&defer_prev_sample_state, true);
}
}
@@ -1301,7 +1345,19 @@ static void session2_event_unwrap(struct session2 *s, ssize_t start_ix, enum pro
void session2_event(struct session2 *s, enum protolayer_event_type event, void *baton)
{
+ /* Events may be sent from inside or outside of already measured code.
+ * From inside: close by us, statistics, ...
+ * From outside: timeout, EOF, close by external reasons, ... */
+ bool defer_accounting_here = false;
+ if (!defer_sample_is_accounting() && s->stream && !s->outgoing) {
+ defer_sample_start(NULL);
+ defer_accounting_here = true;
+ }
+
session2_event_unwrap(s, 0, event, baton);
+
+ if (defer_accounting_here)
+ defer_sample_stop(NULL, false);
}
void session2_event_after(struct session2 *s, enum protolayer_type protocol,
@@ -1684,6 +1740,12 @@ static int session2_transport_event(struct session2 *s,
if (s->closing)
return kr_ok();
+ if (event == PROTOLAYER_EVENT_EOF) {
+ // no layer wanted to retain TCP half-closed state
+ session2_force_close(s);
+ return kr_ok();
+ }
+
bool is_close_event = (event == PROTOLAYER_EVENT_CLOSE ||
event == PROTOLAYER_EVENT_FORCE_CLOSE);
if (is_close_event) {