diff options
author | Grigorii Demidov <grigorii.demidov@nic.cz> | 2018-09-27 16:56:02 +0200 |
---|---|---|
committer | Vladimír Čunát <vladimir.cunat@nic.cz> | 2018-10-12 17:36:45 +0200 |
commit | 829670741e6bf33093f1f7d59083820d549656e4 (patch) | |
tree | eb45bafdeb7bb1d947299c765a0330b4a3a81557 /daemon/session.c | |
parent | lib/generic/trie: add some tests (diff) | |
download | knot-resolver-829670741e6bf33093f1f7d59083820d549656e4.tar.xz knot-resolver-829670741e6bf33093f1f7d59083820d549656e4.zip |
daemon/session: migrate from array_t to trie_t & queue_t; daemon/worker: some simplifications
Diffstat (limited to 'daemon/session.c')
-rw-r--r-- | daemon/session.c | 223 |
1 files changed, 114 insertions, 109 deletions
diff --git a/daemon/session.c b/daemon/session.c index 830ca4d7..1dbe6f67 100644 --- a/daemon/session.c +++ b/daemon/session.c @@ -8,9 +8,7 @@ #include "daemon/tls.h" #include "daemon/worker.h" #include "daemon/io.h" - -/** List of tasks. */ -typedef array_t(struct qr_task *) session_tasklist_t; +#include "lib/generic/queue.h" /* Per-session (TCP or UDP) persistent structure, * that exists between remote counterpart and a local socket. @@ -24,8 +22,8 @@ struct session { struct tls_ctx_t *tls_ctx; /**< server side tls-related data. */ struct tls_client_ctx_t *tls_client_ctx; /**< client side tls-related data. */ - session_tasklist_t tasks; /**< list of tasks which assotiated with given session. */ - session_tasklist_t waiting; /**< list of tasks been waiting for IO (subset of taska). */ + trie_t *tasks; /**< list of tasks assotiated with given session. */ + queue_t(struct qr_task *) waiting; /**< list of tasks waiting for sending to upstream. */ uint8_t *wire_buf; /**< Buffer for DNS message. */ ssize_t wire_buf_size; /**< Buffer size. */ @@ -54,7 +52,7 @@ static void on_session_timer_close(uv_handle_t *timer) void session_free(struct session *session) { if (session) { - assert(session->tasks.len == 0 && session->waiting.len == 0); + assert(session_is_empty(session)); session_clear(session); free(session); } @@ -62,12 +60,13 @@ void session_free(struct session *session) void session_clear(struct session *session) { - assert(session->tasks.len == 0 && session->waiting.len == 0); + assert(session_is_empty(session)); if (session->handle && session->handle->type == UV_TCP) { free(session->wire_buf); } - array_clear(session->tasks); - array_clear(session->waiting); + trie_clear(session->tasks); + trie_free(session->tasks); + queue_deinit(session->waiting); tls_free(session->tls_ctx); tls_client_ctx_free(session->tls_client_ctx); memset(session, 0, sizeof(*session)); @@ -75,8 +74,7 @@ void session_clear(struct session *session) void session_close(struct session *session) { - assert(session->tasks.len == 0 && session->waiting.len == 0); - + assert(session_is_empty(session)); if (session->sflags.closing) { return; } @@ -84,8 +82,7 @@ void session_close(struct session *session) uv_handle_t *handle = session->handle; io_stop_read(handle); session->sflags.closing = true; - if (session->sflags.outgoing && - session->peer.ip.sa_family != AF_UNSPEC) { + if (session->peer.ip.sa_family != AF_UNSPEC) { struct worker_ctx *worker = handle->loop->data; struct sockaddr *peer = &session->peer.ip; worker_del_tcp_connected(worker, peer); @@ -111,98 +108,124 @@ int session_start_read(struct session *session) return io_start_read(session->handle); } -int session_waitinglist_add(struct session *session, struct qr_task *task) +int session_waitinglist_push(struct session *session, struct qr_task *task) { - for (int i = 0; i < session->waiting.len; ++i) { - if (session->waiting.at[i] == task) { - return i; - } - } - int ret = array_push(session->waiting, task); - if (ret >= 0) { - worker_task_ref(task); - } - return ret; + queue_push(session->waiting, task); + worker_task_ref(task); + return kr_ok(); } -int session_waitinglist_del(struct session *session, struct qr_task *task) +struct qr_task *session_waitinglist_get(const struct session *session) { - int ret = kr_error(ENOENT); - for (int i = 0; i < session->waiting.len; ++i) { - if (session->waiting.at[i] == task) { - array_del(session->waiting, i); - worker_task_unref(task); - ret = kr_ok(); - break; - } - } - return ret; + return queue_head(session->waiting); } -int session_waitinglist_del_index(struct session *session, int index) +struct qr_task *session_waitinglist_pop(struct session *session, bool deref) { - int ret = kr_error(ENOENT); - if (index < session->waiting.len) { - struct qr_task *task = session->waiting.at[index]; - array_del(session->waiting, index); - worker_task_unref(task); - ret = kr_ok(); + struct qr_task *t = session_waitinglist_get(session); + queue_pop(session->waiting); + if (deref) { + worker_task_unref(t); } - return ret; + return t; } int session_tasklist_add(struct session *session, struct qr_task *task) { - for (int i = 0; i < session->tasks.len; ++i) { - if (session->tasks.at[i] == task) { - return i; - } + trie_t *t = session->tasks; + uint16_t task_msg_id = 0; + const char *key = NULL; + size_t key_len = 0; + if (session->sflags.outgoing) { + knot_pkt_t *pktbuf = worker_task_get_pktbuf(task); + task_msg_id = knot_wire_get_id(pktbuf->wire); + key = (const char *)&task_msg_id; + key_len = sizeof(task_msg_id); + } else { + key = (const char *)task; + key_len = sizeof(task); + } + trie_val_t *v = trie_get_ins(t, key, key_len); + if (unlikely(!v)) { + assert(false); + return kr_error(ENOMEM); } - int ret = array_push(session->tasks, task); - if (ret >= 0) { + if (*v == NULL) { + *v = task; worker_task_ref(task); + } else if (*v != task) { + assert(false); + return kr_error(ENOMEM); } - return ret; + return kr_ok(); } int session_tasklist_del(struct session *session, struct qr_task *task) { - int ret = kr_error(ENOENT); - for (int i = 0; i < session->tasks.len; ++i) { - if (session->tasks.at[i] == task) { - array_del(session->tasks, i); - worker_task_unref(task); - ret = kr_ok(); - break; - } + trie_t *t = session->tasks; + uint16_t task_msg_id = 0; + const char *key = NULL; + size_t key_len = 0; + trie_val_t val; + if (session->sflags.outgoing) { + knot_pkt_t *pktbuf = worker_task_get_pktbuf(task); + task_msg_id = knot_wire_get_id(pktbuf->wire); + key = (const char *)&task_msg_id; + key_len = sizeof(task_msg_id); + } else { + key = (const char *)task; + key_len = sizeof(task); + } + int ret = trie_del(t, key, key_len, &val); + if (ret == kr_ok()) { + assert(val == task); + worker_task_unref(val); } return ret; } -int session_tasklist_del_index(struct session *session, int index) +struct qr_task *session_tasklist_get_first(struct session *session) { - int ret = kr_error(ENOENT); - if (index < session->tasks.len) { - struct qr_task *task = session->tasks.at[index]; - array_del(session->tasks, index); - worker_task_unref(task); - ret = kr_ok(); + trie_val_t *val = trie_get_first(session->tasks, NULL, NULL); + return val ? (struct qr_task *) *val : NULL; +} + +struct qr_task *session_tasklist_del_first(struct session *session, bool deref) +{ + trie_val_t val = NULL; + int res = trie_del_first(session->tasks, NULL, NULL, &val); + if (res != kr_ok()) { + val = NULL; + } else if (deref) { + worker_task_unref(val); + } + return (struct qr_task *)val; +} +struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16_t msg_id) +{ + trie_t *t = session->tasks; + assert(session->sflags.outgoing); + struct qr_task *ret = NULL; + const char *key = (const char *)&msg_id; + size_t key_len = sizeof(msg_id); + trie_val_t val; + int res = trie_del(t, key, key_len, &val); + if (res == kr_ok()) { + ret = val; + assert(worker_task_numrefs(ret) > 1); + worker_task_unref(ret); } return ret; } -struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id) +struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id) { + trie_t *t = session->tasks; + assert(session->sflags.outgoing); struct qr_task *ret = NULL; - const session_tasklist_t *tasklist = &session->tasks; - for (size_t i = 0; i < tasklist->len; ++i) { - struct qr_task *task = tasklist->at[i]; - knot_pkt_t *pktbuf = worker_task_get_pktbuf(task); - uint16_t task_msg_id = knot_wire_get_id(pktbuf->wire); - if (task_msg_id == msg_id) { - ret = task; - break; - } + trie_val_t *val = trie_get_try(t, (char *)&msg_id, sizeof(msg_id)); + if (val) { + ret = *val; } return ret; } @@ -249,6 +272,11 @@ uv_handle_t *session_get_handle(struct session *session) return session->handle; } +struct session *session_get(uv_handle_t *h) +{ + return h->data; +} + struct session *session_new(uv_handle_t *handle) { if (!handle) { @@ -259,6 +287,8 @@ struct session *session_new(uv_handle_t *handle) return NULL; } + queue_init(session->waiting); + session->tasks = trie_create(NULL); if (handle->type == UV_TCP) { uint8_t *wire_buf = malloc(KNOT_WIRE_MAX_PKTSIZE); if (!wire_buf) { @@ -293,12 +323,12 @@ struct session *session_new(uv_handle_t *handle) size_t session_tasklist_get_len(const struct session *session) { - return session->tasks.len; + return trie_weight(session->tasks); } size_t session_waitinglist_get_len(const struct session *session) { - return session->waiting.len; + return queue_len(session->waiting); } bool session_tasklist_is_empty(const struct session *session) @@ -327,30 +357,10 @@ void session_set_has_tls(struct session *session, bool has_tls) session->sflags.has_tls = has_tls; } -struct qr_task *session_waitinglist_get_first(const struct session *session) -{ - struct qr_task *t = NULL; - if (session->waiting.len > 0) { - t = session->waiting.at[0]; - } - return t; -} - -struct qr_task *session_tasklist_get_first(const struct session *session) -{ - struct qr_task *t = NULL; - if (session->tasks.len > 0) { - t = session->tasks.at[0]; - } - return t; -} - void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt) { - while (session->waiting.len > 0) { - struct qr_task *task = session->waiting.at[0]; - session_tasklist_del(session, task); - array_del(session->waiting, 0); + while (!session_waitinglist_is_empty(session)) { + struct qr_task *task = session_waitinglist_pop(session, false); assert(worker_task_numrefs(task) > 1); if (increase_timeout_cnt) { worker_task_timeout_inc(task); @@ -362,10 +372,8 @@ void session_waitinglist_retry(struct session *session, bool increase_timeout_cn void session_waitinglist_finalize(struct session *session, int status) { - while (session->waiting.len > 0) { - struct qr_task *t = session->waiting.at[0]; - array_del(session->waiting, 0); - session_tasklist_del(session, t); + while (!session_waitinglist_is_empty(session)) { + struct qr_task *t = session_waitinglist_pop(session, false); if (session->sflags.outgoing) { worker_task_finalize(t, status); } else { @@ -379,9 +387,9 @@ void session_waitinglist_finalize(struct session *session, int status) void session_tasklist_finalize(struct session *session, int status) { - while (session->tasks.len > 0) { - struct qr_task *t = session->tasks.at[0]; - array_del(session->tasks, 0); + while (session_tasklist_get_len(session) > 0) { + struct qr_task *t = session_tasklist_del_first(session, false); + assert(worker_task_numrefs(t) > 0); if (session->sflags.outgoing) { worker_task_finalize(t, status); } else { @@ -690,8 +698,6 @@ void session_kill_ioreq(struct session *s, struct qr_task *task) } /* TCP-specific code now. */ if (s->handle->type != UV_TCP) abort(); - session_waitinglist_del(s, task); - session_tasklist_del(s, task); int res = 0; @@ -714,4 +720,3 @@ void session_kill_ioreq(struct session *s, struct qr_task *task) session_close(s); } } - |