diff options
Diffstat (limited to 'daemon/session2.c')
-rw-r--r-- | daemon/session2.c | 162 |
1 files changed, 112 insertions, 50 deletions
diff --git a/daemon/session2.c b/daemon/session2.c index 40328e13..faca57bf 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -307,6 +307,18 @@ bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue) return false; } +static inline ssize_t session2_get_protocol( + struct session2 *s, enum protolayer_type protocol) +{ + const struct protolayer_grp *grp = &protolayer_grps[s->proto]; + for (ssize_t i = 0; i < grp->num_layers; i++) { + enum protolayer_type found = grp->layers[i]; + if (protocol == found) + return i; + } + + return -1; +} /** Gets layer-specific session data for the layer with the specified index * from the manager. */ @@ -333,6 +345,14 @@ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx) return protolayer_sess_data_get(ctx->session, ctx->layer_ix); } +void *protolayer_sess_data_get_proto(struct session2 *s, enum protolayer_type protocol) { + ssize_t layer_ix = session2_get_protocol(s, protocol); + if (layer_ix < 0) + return NULL; + + return protolayer_sess_data_get(s, layer_ix); +} + /** Gets layer-specific iteration data for the layer with the specified index * from the context. */ static inline struct protolayer_data *protolayer_iter_data_get( @@ -358,17 +378,17 @@ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx) return protolayer_iter_data_get(ctx, ctx->layer_ix); } -static inline ssize_t session2_get_protocol( - struct session2 *s, enum protolayer_type protocol) +size_t protolayer_sess_size_est(struct session2 *s) { - const struct protolayer_grp *grp = &protolayer_grps[s->proto]; - for (ssize_t i = 0; i < grp->num_layers; i++) { - enum protolayer_type found = grp->layers[i]; - if (protocol == found) - return i; - } + return s->session_size + s->wire_buf.size; +} - return -1; +size_t protolayer_iter_size_est(struct protolayer_iter_ctx *ctx, bool incl_payload) +{ + size_t size = ctx->session->iter_ctx_size; + if (incl_payload) + size += protolayer_payload_size(&ctx->payload); + return size; } static inline bool protolayer_iter_ctx_is_last(struct protolayer_iter_ctx *ctx) @@ -596,7 +616,7 @@ static int session2_submit( { if (session->closing) return kr_error(ECANCELED); - if (session->ref_count >= INT_MAX) + if (session->ref_count >= INT_MAX - 1) return kr_error(ETOOMANYREFS); if (kr_fails_assert(session->proto < KR_PROTO_COUNT)) return kr_error(EFAULT); @@ -610,7 +630,7 @@ static int session2_submit( // Note two cases: incoming session (new request) // vs. outgoing session (resuming work on some request) if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0)) - defer_sample_start(); + defer_sample_start(NULL); struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size); kr_require(ctx); @@ -672,7 +692,7 @@ static int session2_submit( int ret = protolayer_step(ctx); if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0)) - defer_sample_stop(); + defer_sample_stop(NULL, false); return ret; } @@ -845,6 +865,7 @@ struct session2 *session2_new(enum session2_transport_type transport_type, .proto = proto, .iter_ctx_size = iter_ctx_size, + .session_size = session_size, }; memcpy(&s->layer_data, offsets, sizeof(offsets)); @@ -959,10 +980,8 @@ uv_handle_t *session2_get_handle(struct session2 *s) static void session2_on_timeout(uv_timer_t *timer) { - defer_sample_start(); struct session2 *s = timer->data; session2_event(s, s->timer_event, NULL); - defer_sample_stop(); } int session2_timer_start(struct session2 *s, enum protolayer_event_type event, uint64_t timeout, uint64_t repeat) @@ -1084,11 +1103,17 @@ struct qr_task *session2_tasklist_del_msgid(const struct session2 *session, uint void session2_tasklist_finalize(struct session2 *session, int status) { - while (session2_tasklist_get_len(session) > 0) { - struct qr_task *t = session2_tasklist_del_first(session, false); - kr_require(worker_task_numrefs(t) > 0); - worker_task_finalize(t, status); - worker_task_unref(t); + if (session2_tasklist_get_len(session) > 0) { + defer_sample_state_t defer_prev_sample_state; + defer_sample_start(&defer_prev_sample_state); + do { + struct qr_task *t = session2_tasklist_del_first(session, false); + kr_require(worker_task_numrefs(t) > 0); + worker_task_finalize(t, status); + worker_task_unref(t); + defer_sample_restart(); + } while (session2_tasklist_get_len(session) > 0); + defer_sample_stop(&defer_prev_sample_state, true); } } @@ -1121,27 +1146,34 @@ int session2_tasklist_finalize_expired(struct session2 *session) key = (char *)&msg_id; keylen = sizeof(msg_id); } - while (queue_len(q) > 0) { - task = queue_head(q); - if (session->outgoing) { - knot_pkt_t *pktbuf = worker_task_get_pktbuf(task); - msg_id = knot_wire_get_id(pktbuf->wire); - } - int res = trie_del(t, key, keylen, NULL); - if (!worker_task_finished(task)) { - /* task->pending_count must be zero, - * but there are can be followers, - * so run worker_task_subreq_finalize() to ensure retrying - * for all the followers. */ - worker_task_subreq_finalize(task); - worker_task_finalize(task, KR_STATE_FAIL); - } - if (res == KNOT_EOK) { + + if (queue_len(q) > 0) { + defer_sample_state_t defer_prev_sample_state; + defer_sample_start(&defer_prev_sample_state); + do { + task = queue_head(q); + if (session->outgoing) { + knot_pkt_t *pktbuf = worker_task_get_pktbuf(task); + msg_id = knot_wire_get_id(pktbuf->wire); + } + int res = trie_del(t, key, keylen, NULL); + if (!worker_task_finished(task)) { + /* task->pending_count must be zero, + * but there are can be followers, + * so run worker_task_subreq_finalize() to ensure retrying + * for all the followers. */ + worker_task_subreq_finalize(task); + worker_task_finalize(task, KR_STATE_FAIL); + } + if (res == KNOT_EOK) { + worker_task_unref(task); + } + queue_pop(q); worker_task_unref(task); - } - queue_pop(q); - worker_task_unref(task); - ++ret; + ++ret; + defer_sample_restart(); + } while (queue_len(q) > 0); + defer_sample_stop(&defer_prev_sample_state, true); } queue_deinit(q); @@ -1172,22 +1204,34 @@ struct qr_task *session2_waitinglist_pop(struct session2 *session, bool deref) void session2_waitinglist_retry(struct session2 *session, bool increase_timeout_cnt) { - while (!session2_waitinglist_is_empty(session)) { - struct qr_task *task = session2_waitinglist_pop(session, false); - if (increase_timeout_cnt) { - worker_task_timeout_inc(task); - } - worker_task_step(task, session2_get_peer(session), NULL); - worker_task_unref(task); + if (!session2_waitinglist_is_empty(session)) { + defer_sample_state_t defer_prev_sample_state; + defer_sample_start(&defer_prev_sample_state); + do { + struct qr_task *task = session2_waitinglist_pop(session, false); + if (increase_timeout_cnt) { + worker_task_timeout_inc(task); + } + worker_task_step(task, session2_get_peer(session), NULL); + worker_task_unref(task); + defer_sample_restart(); + } while (!session2_waitinglist_is_empty(session)); + defer_sample_stop(&defer_prev_sample_state, true); } } void session2_waitinglist_finalize(struct session2 *session, int status) { - while (!session2_waitinglist_is_empty(session)) { - struct qr_task *t = session2_waitinglist_pop(session, false); - worker_task_finalize(t, status); - worker_task_unref(t); + if (!session2_waitinglist_is_empty(session)) { + defer_sample_state_t defer_prev_sample_state; + defer_sample_start(&defer_prev_sample_state); + do { + struct qr_task *t = session2_waitinglist_pop(session, false); + worker_task_finalize(t, status); + worker_task_unref(t); + defer_sample_restart(); + } while (!session2_waitinglist_is_empty(session)); + defer_sample_stop(&defer_prev_sample_state, true); } } @@ -1301,7 +1345,19 @@ static void session2_event_unwrap(struct session2 *s, ssize_t start_ix, enum pro void session2_event(struct session2 *s, enum protolayer_event_type event, void *baton) { + /* Events may be sent from inside or outside of already measured code. + * From inside: close by us, statistics, ... + * From outside: timeout, EOF, close by external reasons, ... */ + bool defer_accounting_here = false; + if (!defer_sample_is_accounting() && s->stream && !s->outgoing) { + defer_sample_start(NULL); + defer_accounting_here = true; + } + session2_event_unwrap(s, 0, event, baton); + + if (defer_accounting_here) + defer_sample_stop(NULL, false); } void session2_event_after(struct session2 *s, enum protolayer_type protocol, @@ -1684,6 +1740,12 @@ static int session2_transport_event(struct session2 *s, if (s->closing) return kr_ok(); + if (event == PROTOLAYER_EVENT_EOF) { + // no layer wanted to retain TCP half-closed state + session2_force_close(s); + return kr_ok(); + } + bool is_close_event = (event == PROTOLAYER_EVENT_CLOSE || event == PROTOLAYER_EVENT_FORCE_CLOSE); if (is_close_event) { |