diff options
Diffstat (limited to 'daemon/worker.c')
-rw-r--r-- | daemon/worker.c | 136 |
1 files changed, 69 insertions, 67 deletions
diff --git a/daemon/worker.c b/daemon/worker.c index 9f2bc242..f1ade4d2 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -141,7 +141,7 @@ static uv_handle_t *ioreq_spawn(struct worker_ctx *worker, { bool precond = (socktype == SOCK_DGRAM || socktype == SOCK_STREAM) && (family == AF_INET || family == AF_INET6); - if (!kr_assume(precond)) { + if (kr_fails_assert(precond)) { kr_log_verbose("[work] ioreq_spawn: pre-condition failed\n"); return NULL; } @@ -170,7 +170,7 @@ static uv_handle_t *ioreq_spawn(struct worker_ctx *worker, addr = (union inaddr *)&worker->out_addr6; } if (addr->ip.sa_family != AF_UNSPEC) { - if (!kr_assume(addr->ip.sa_family == family)) { + if (kr_fails_assert(addr->ip.sa_family == family)) { io_free(handle); return NULL; } @@ -268,12 +268,12 @@ static int subreq_key(char *dst, knot_pkt_t *pkt) #if ENABLE_XDP static uint8_t *alloc_wire_cb(struct kr_request *req, uint16_t *maxlen) { - if (!kr_assume(maxlen)) + if (kr_fails_assert(maxlen)) return NULL; struct request_ctx *ctx = (struct request_ctx *)req; /* We know it's an AF_XDP socket; otherwise this CB isn't assigned. */ uv_handle_t *handle = session_get_handle(ctx->source.session); - if (!kr_assume(handle->type == UV_POLL)) + if (kr_fails_assert(handle->type == UV_POLL)) return NULL; xdp_handle_data_t *xhd = handle->data; knot_xdp_msg_t out; @@ -285,7 +285,7 @@ static uint8_t *alloc_wire_cb(struct kr_request *req, uint16_t *maxlen) ipv6, &out, NULL); #endif if (ret != KNOT_EOK) { - (void)!kr_assume(ret == KNOT_ENOMEM); + kr_assert(ret == KNOT_ENOMEM); *maxlen = 0; return NULL; } @@ -297,7 +297,7 @@ static uint8_t *alloc_wire_cb(struct kr_request *req, uint16_t *maxlen) } static void free_wire(const struct request_ctx *ctx) { - if (!kr_assume(ctx->req.alloc_wire_cb == alloc_wire_cb)) + if (kr_fails_assert(ctx->req.alloc_wire_cb == alloc_wire_cb)) return; knot_pkt_t *ans = ctx->req.answer; if (unlikely(ans == NULL)) /* dropped */ @@ -306,7 +306,7 @@ static void free_wire(const struct request_ctx *ctx) return; /* We know it's an AF_XDP socket; otherwise alloc_wire_cb isn't assigned. */ uv_handle_t *handle = session_get_handle(ctx->source.session); - if (!kr_assume(handle->type == UV_POLL)) + if (kr_fails_assert(handle->type == UV_POLL)) return; xdp_handle_data_t *xhd = handle->data; /* Freeing is done by sending an empty packet (the API won't really send it). */ @@ -315,7 +315,7 @@ static void free_wire(const struct request_ctx *ctx) out.payload.iov_len = 0; uint32_t sent; int ret = knot_xdp_send(xhd->socket, &out, 1, &sent); - (void)!kr_assume(ret == KNOT_EOK && sent == 0); + kr_assert(ret == KNOT_EOK && sent == 0); kr_log_verbose("[xdp] freed unsent buffer, ret = %d\n", ret); } #endif @@ -360,19 +360,19 @@ static struct request_ctx *request_create(struct worker_ctx *worker, /* TODO Relocate pool to struct request */ ctx->worker = worker; - if (session && !kr_assume(session_flags(session)->outgoing == false)) { + if (session && kr_fails_assert(session_flags(session)->outgoing == false)) { pool_release(worker, pool.ctx); return NULL; } ctx->source.session = session; - if (!kr_assume(!!eth_to == !!eth_from)) { + if (kr_fails_assert(!!eth_to == !!eth_from)) { pool_release(worker, pool.ctx); return NULL; } const bool is_xdp = eth_to != NULL; if (is_xdp) { #if ENABLE_XDP - if (!kr_assume(session)) { + if (kr_fails_assert(session)) { pool_release(worker, pool.ctx); return NULL; } @@ -380,7 +380,7 @@ static struct request_ctx *request_create(struct worker_ctx *worker, memcpy(&ctx->source.eth_addrs[1], eth_from, sizeof(ctx->source.eth_addrs[1])); ctx->req.alloc_wire_cb = alloc_wire_cb; #else - (void)!kr_assume(!EINVAL); + kr_assert(!EINVAL); pool_release(worker, pool.ctx); return NULL; #endif @@ -432,7 +432,7 @@ static struct request_ctx *request_create(struct worker_ctx *worker, /** More initialization, related to the particular incoming query/packet. */ static int request_start(struct request_ctx *ctx, knot_pkt_t *query) { - if (!kr_assume(query && ctx)) + if (kr_fails_assert(query && ctx)) return kr_error(EINVAL); struct kr_request *req = &ctx->req; @@ -489,7 +489,7 @@ static void request_free(struct request_ctx *ctx) #if ENABLE_XDP free_wire(ctx); #else - (void)!kr_assume(!EINVAL); + kr_assert(!EINVAL); #endif } /* Return mempool to ring or free it if it's full */ @@ -527,7 +527,7 @@ static struct qr_task *qr_task_create(struct request_ctx *ctx) task->pktbuf = pktbuf; array_init(task->waiting); task->refs = 0; - (void)!kr_assume(ctx->task == NULL); + kr_assert(ctx->task == NULL); ctx->task = task; /* Make the primary reference to task. */ qr_task_ref(task); @@ -541,7 +541,7 @@ static void qr_task_free(struct qr_task *task) { struct request_ctx *ctx = task->ctx; - if (!kr_assume(ctx)) + if (kr_fails_assert(ctx)) return; struct worker_ctx *worker = ctx->worker; @@ -557,13 +557,13 @@ static void qr_task_free(struct qr_task *task) /*@ Register new qr_task within session. */ static int qr_task_register(struct qr_task *task, struct session *session) { - if (!kr_assume(!session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP)) + if (kr_fails_assert(!session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP)) return kr_error(EINVAL); session_tasklist_add(session, task); struct request_ctx *ctx = task->ctx; - if (!kr_assume(ctx && (ctx->source.session == NULL || ctx->source.session == session))) + if (kr_fails_assert(ctx && (ctx->source.session == NULL || ctx->source.session == session))) return kr_error(EINVAL); ctx->source.session = session; /* Soft-limit on parallel queries, there is no "slow down" RCODE @@ -611,17 +611,17 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status) qr_task_complete(task); } - if (!handle || !kr_assume(handle->data)) + if (!handle || kr_fails_assert(handle->data)) return status; struct session* s = handle->data; if (handle->type == UV_UDP && session_flags(s)->outgoing) { // This should ensure that we are only dealing with our question to upstream - if (!kr_assume(!knot_wire_get_qr(task->pktbuf->wire))) + if (kr_fails_assert(!knot_wire_get_qr(task->pktbuf->wire))) return status; // start the timer struct kr_query *qry = array_tail(task->ctx->req.rplan.pending); - if (!kr_assume(qry)) + if (kr_fails_assert(qry)) return status; size_t timeout = task->transport->timeout; int ret = session_timer_start(s, on_udp_timeout, timeout, 0); @@ -680,7 +680,7 @@ static int qr_task_send(struct qr_task *task, struct session *session, struct request_ctx *ctx = task->ctx; uv_handle_t *handle = session_get_handle(session); - if (!kr_assume(handle && handle->data == session)) + if (kr_fails_assert(handle && handle->data == session)) return qr_task_on_send(task, NULL, kr_error(EINVAL)); const bool is_stream = handle->type == UV_TCP; if (!is_stream && handle->type != UV_UDP) abort(); @@ -717,7 +717,7 @@ static int qr_task_send(struct qr_task *task, struct session *session, task->send_time = kr_now(); task->recv_time = 0; // task structure is being reused so we have to zero this out here /* Send using given protocol */ - if (!kr_assume(!session_flags(session)->closing)) + if (kr_fails_assert(!session_flags(session)->closing)) return qr_task_on_send(task, NULL, kr_error(EIO)); if (session_flags(session)->has_http) { #if ENABLE_DOH2 @@ -761,7 +761,7 @@ static int qr_task_send(struct qr_task *task, struct session *session, write_req->data = task; ret = uv_write(write_req, (uv_stream_t *)handle, buf, 3, &on_write); } else { - (void)!kr_assume(false); + kr_assert(false); } if (ret == 0) { @@ -821,7 +821,7 @@ static struct kr_query *task_get_last_pending_query(struct qr_task *task) static int session_tls_hs_cb(struct session *session, int status) { - if (!kr_assume(session_flags(session)->outgoing)) + if (kr_fails_assert(session_flags(session)->outgoing)) return kr_error(EINVAL); struct sockaddr *peer = session_get_peer(session); int deletion_res = worker_del_tcp_waiting(the_worker, peer); @@ -953,12 +953,12 @@ static void on_connect(uv_connect_t *req, int status) struct sockaddr *peer = session_get_peer(session); free(req); - if (!kr_assume(session_flags(session)->outgoing)) + if (kr_fails_assert(session_flags(session)->outgoing)) return; if (session_flags(session)->closing) { worker_del_tcp_waiting(worker, peer); - (void)!kr_assume(session_is_empty(session)); + kr_assert(session_is_empty(session)); return; } @@ -976,7 +976,7 @@ static void on_connect(uv_connect_t *req, int status) "is already timeouted, close\n", peer_str ? peer_str : ""); } - (void)!kr_assume(session_tasklist_is_empty(session)); + kr_assert(session_tasklist_is_empty(session)); session_waitinglist_retry(session, false); session_close(session); return; @@ -993,7 +993,7 @@ static void on_connect(uv_connect_t *req, int status) "is already connected, close\n", peer_str ? peer_str : ""); } - (void)!kr_assume(session_tasklist_is_empty(session)); + kr_assert(session_tasklist_is_empty(session)); session_waitinglist_retry(session, false); session_close(session); return; @@ -1014,7 +1014,7 @@ static void on_connect(uv_connect_t *req, int status) struct kr_query *qry = array_tail(task->ctx->req.rplan.pending); qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_FAILED); } - (void)!kr_assume(session_tasklist_is_empty(session)); + kr_assert(session_tasklist_is_empty(session)); session_waitinglist_retry(session, false); session_close(session); return; @@ -1027,7 +1027,7 @@ static void on_connect(uv_connect_t *req, int status) /* session isn't in list of waiting queries, * * something gone wrong */ session_waitinglist_finalize(session, KR_STATE_FAIL); - (void)!kr_assume(session_tasklist_is_empty(session)); + kr_assert(session_tasklist_is_empty(session)); session_close(session); return; } @@ -1073,7 +1073,7 @@ static void on_tcp_connect_timeout(uv_timer_t *timer) struct worker_ctx *worker = the_worker; kr_require(worker); - (void)!kr_assume(session_tasklist_is_empty(session)); + kr_assert(session_tasklist_is_empty(session)); struct sockaddr *peer = session_get_peer(session); worker_del_tcp_waiting(worker, peer); @@ -1098,7 +1098,7 @@ static void on_tcp_connect_timeout(uv_timer_t *timer) worker->stats.timeout += session_waitinglist_get_len(session); session_waitinglist_retry(session, true); - (void)!kr_assume(session_tasklist_is_empty(session)); + kr_assert(session_tasklist_is_empty(session)); /* uv_cancel() doesn't support uv_connect_t request, * so that we can't cancel it. * There still exists possibility of successful connection @@ -1113,9 +1113,9 @@ static void on_tcp_connect_timeout(uv_timer_t *timer) static void on_udp_timeout(uv_timer_t *timer) { struct session *session = timer->data; - (void)!kr_assume(session_get_handle(session)->data == session); - (void)!kr_assume(session_tasklist_get_len(session) == 1); - (void)!kr_assume(session_waitinglist_is_empty(session)); + kr_assert(session_get_handle(session)->data == session); + kr_assert(session_tasklist_get_len(session) == 1); + kr_assert(session_waitinglist_is_empty(session)); uv_timer_stop(timer); @@ -1161,7 +1161,7 @@ static uv_handle_t *transmit(struct qr_task *task) struct sockaddr *addr = (struct sockaddr *)choice; struct session *session = ret->data; struct sockaddr *peer = session_get_peer(session); - (void)!kr_assume(peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing); + kr_assert(peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing); memcpy(peer, addr, kr_sockaddr_len(addr)); if (qr_task_send(task, session, (struct sockaddr *)choice, task->pktbuf) != 0) { @@ -1192,7 +1192,7 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_ if (klen > 0) { void *val_deleted; int ret = trie_del(task->ctx->worker->subreq_out, key, klen, &val_deleted); - (void)!kr_assume(ret == KNOT_EOK && val_deleted == task); + kr_assert(ret == KNOT_EOK && val_deleted == task); } /* Notify waiting tasks. */ struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending); @@ -1220,7 +1220,7 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_ static void subreq_lead(struct qr_task *task) { - if (!kr_assume(task)) + if (kr_fails_assert(task)) return; char key[SUBREQ_KEY_LEN]; const int klen = subreq_key(key, task->pktbuf); @@ -1230,7 +1230,7 @@ static void subreq_lead(struct qr_task *task) trie_get_ins(task->ctx->worker->subreq_out, key, klen); if (unlikely(!tvp)) return; /*ENOMEM*/ - if (!kr_assume(*tvp == NULL)) + if (kr_fails_assert(*tvp == NULL)) return; *tvp = task; task->leading = true; @@ -1238,7 +1238,7 @@ static void subreq_lead(struct qr_task *task) static bool subreq_enqueue(struct qr_task *task) { - if (!kr_assume(task)) + if (kr_fails_assert(task)) return false; char key[SUBREQ_KEY_LEN]; const int klen = subreq_key(key, task->pktbuf); @@ -1278,7 +1278,7 @@ static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle) #if ENABLE_XDP struct request_ctx *ctx = task->ctx; xdp_handle_data_t *xhd = src_handle->data; - if (!kr_assume(xhd && xhd->socket && xhd->session == ctx->source.session)) + if (kr_fails_assert(xhd && xhd->socket && xhd->session == ctx->source.session)) return qr_task_on_send(task, src_handle, kr_error(EINVAL)); knot_xdp_msg_t msg; @@ -1298,7 +1298,7 @@ static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle) return qr_task_on_send(task, src_handle, ret); #else - (void)!kr_assume(!EINVAL); + kr_assert(!EINVAL); return kr_error(EINVAL); #endif } @@ -1334,7 +1334,7 @@ static int qr_task_finalize(struct qr_task *task, int state) /* Send back answer */ int ret; const uv_handle_t *src_handle = session_get_handle(source_session); - if (!kr_assume(src_handle->type == UV_UDP || src_handle->type == UV_TCP + if (kr_fails_assert(src_handle->type == UV_UDP || src_handle->type == UV_TCP || src_handle->type == UV_POLL)) { ret = kr_error(EINVAL); } else if (src_handle->type == UV_POLL) { @@ -1342,8 +1342,10 @@ static int qr_task_finalize(struct qr_task *task, int state) } else if (src_handle->type == UV_UDP && ENABLE_SENDMMSG) { int fd; ret = uv_fileno(src_handle, &fd); - if (kr_assume(ret == 0)) + if (ret == 0) udp_queue_push(fd, &ctx->req, task); + else + kr_assert(false); } else { ret = qr_task_send(task, source_session, &ctx->source.addr.ip, ctx->req.answer); } @@ -1354,7 +1356,7 @@ static int qr_task_finalize(struct qr_task *task, int state) while (!session_tasklist_is_empty(source_session)) { struct qr_task *t = session_tasklist_del_first(source_session, false); struct request_ctx *c = t->ctx; - (void)!kr_assume(c->source.session == source_session); + kr_assert(c->source.session == source_session); c->source.session = NULL; /* Don't finalize them as there can be other tasks * waiting for answer to this particular task. @@ -1395,7 +1397,7 @@ static int udp_task_step(struct qr_task *task, static int tcp_task_waiting_connection(struct session *session, struct qr_task *task) { - if (!kr_assume(session_flags(session)->outgoing && !session_flags(session)->closing)) + if (kr_fails_assert(session_flags(session)->outgoing && !session_flags(session)->closing)) return kr_error(EINVAL); /* Add task to the end of list of waiting tasks. * It will be notified in on_connect() or qr_task_on_send(). */ @@ -1408,7 +1410,7 @@ static int tcp_task_waiting_connection(struct session *session, struct qr_task * static int tcp_task_existing_connection(struct session *session, struct qr_task *task) { - if (!kr_assume(session_flags(session)->outgoing && !session_flags(session)->closing)) + if (kr_fails_assert(session_flags(session)->outgoing && !session_flags(session)->closing)) return kr_error(EINVAL); struct request_ctx *ctx = task->ctx; struct worker_ctx *worker = ctx->worker; @@ -1471,7 +1473,7 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr return kr_error(EINVAL); } struct session *session = client->data; - if (!kr_assume(session_flags(session)->has_tls == has_tls)) { + if (kr_fails_assert(session_flags(session)->has_tls == has_tls)) { tls_client_ctx_free(tls_ctx); free(conn); return kr_error(EINVAL); @@ -1539,7 +1541,7 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr static int tcp_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet) { - if (!kr_assume(task->pending_count == 0)) { + if (kr_fails_assert(task->pending_count == 0)) { subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } @@ -1601,7 +1603,7 @@ static int qr_task_step(struct qr_task *task, /* Consume input and produce next query */ struct request_ctx *ctx = task->ctx; - if (!kr_assume(ctx)) + if (kr_fails_assert(ctx)) return qr_task_finalize(task, KR_STATE_FAIL); struct kr_request *req = &ctx->req; struct worker_ctx *worker = ctx->worker; @@ -1665,7 +1667,7 @@ static int qr_task_step(struct qr_task *task, case KR_TRANSPORT_TLS: return tcp_task_step(task, packet_source, packet); default: - (void)!kr_assume(!EINVAL); + kr_assert(!EINVAL); return kr_error(EINVAL); } } @@ -1762,13 +1764,13 @@ int worker_submit(struct session *session, (int)id); return kr_error(ENOENT); } - if (!kr_assume(!session_flags(session)->closing)) + if (kr_fails_assert(!session_flags(session)->closing)) return kr_error(EINVAL); addr = peer; /* Note recieve time for RTT calculation */ task->recv_time = kr_now(); } - if (!kr_assume(!uv_is_closing(session_get_handle(session)))) + if (kr_fails_assert(!uv_is_closing(session_get_handle(session)))) return kr_error(EINVAL); /* Packet was successfully parsed. @@ -1782,10 +1784,10 @@ int worker_submit(struct session *session, static int map_add_tcp_session(map_t *map, const struct sockaddr* addr, struct session *session) { - if (!kr_assume(map && addr)) + if (kr_fails_assert(map && addr)) return kr_error(EINVAL); const char *key = tcpsess_key(addr); - if (!kr_assume(key && map_contains(map, key) == 0)) + if (kr_fails_assert(key && map_contains(map, key) == 0)) return kr_error(EINVAL); int ret = map_set(map, key, session); return ret ? kr_error(EINVAL) : kr_ok(); @@ -1793,10 +1795,10 @@ static int map_add_tcp_session(map_t *map, const struct sockaddr* addr, static int map_del_tcp_session(map_t *map, const struct sockaddr* addr) { - if (!kr_assume(map && addr)) + if (kr_fails_assert(map && addr)) return kr_error(EINVAL); const char *key = tcpsess_key(addr); - if (!kr_assume(key)) + if (kr_fails_assert(key)) return kr_error(EINVAL); int ret = map_del(map, key); return ret ? kr_error(ENOENT) : kr_ok(); @@ -1805,10 +1807,10 @@ static int map_del_tcp_session(map_t *map, const struct sockaddr* addr) static struct session* map_find_tcp_session(map_t *map, const struct sockaddr *addr) { - if (!kr_assume(map && addr)) + if (kr_fails_assert(map && addr)) return NULL; const char *key = tcpsess_key(addr); - if (!kr_assume(key)) + if (kr_fails_assert(key)) return NULL; struct session* ret = map_get(map, key); return ret; @@ -1879,7 +1881,7 @@ int worker_end_tcp(struct session *session) while (!session_waitinglist_is_empty(session)) { struct qr_task *task = session_waitinglist_pop(session, false); - (void)!kr_assume(task->refs > 1); + kr_assert(task->refs > 1); session_tasklist_del(session, task); if (session_flags(session)->outgoing) { if (task->ctx->req.options.FORWARD) { @@ -1894,7 +1896,7 @@ int worker_end_tcp(struct session *session) } qr_task_step(task, NULL, NULL); } else { - (void)!kr_assume(task->ctx->source.session == session); + kr_assert(task->ctx->source.session == session); task->ctx->source.session = NULL; } worker_task_unref(task); @@ -1910,7 +1912,7 @@ int worker_end_tcp(struct session *session) } qr_task_step(task, NULL, NULL); } else { - (void)!kr_assume(task->ctx->source.session == session); + kr_assert(task->ctx->source.session == session); task->ctx->source.session = NULL; } worker_task_unref(task); @@ -1968,7 +1970,7 @@ knot_pkt_t *worker_resolve_mk_pkt(const char *qname_str, uint16_t qtype, uint16_ struct qr_task *worker_resolve_start(knot_pkt_t *query, struct kr_qflags options) { struct worker_ctx *worker = the_worker; - if (!kr_assume(worker && query)) + if (kr_fails_assert(worker && query)) return NULL; @@ -2132,7 +2134,7 @@ static inline void reclaim_mp_freelist(mp_freelist_t *list) void worker_deinit(void) { struct worker_ctx *worker = the_worker; - if (!kr_assume(worker)) + if (kr_fails_assert(worker)) return; if (worker->z_import != NULL) { zi_free(worker->z_import); @@ -2156,7 +2158,7 @@ void worker_deinit(void) int worker_init(struct engine *engine, int worker_count) { - if (!kr_assume(engine && engine->L && the_worker == NULL)) + if (kr_fails_assert(engine && engine->L && the_worker == NULL)) return kr_error(EINVAL); kr_bindings_register(engine->L); @@ -2197,7 +2199,7 @@ int worker_init(struct engine *engine, int worker_count) lua_pushstring(engine->L, inst_name); } else { ret = asprintf(&pid_str, "%ld", (long)pid); - (void)!kr_assume(ret > 0); + kr_assert(ret > 0); lua_pushstring(engine->L, pid_str); } lua_setfield(engine->L, -2, "id"); |