diff options
author | Paul Querna <pquerna@apache.org> | 2011-11-15 16:51:03 +0100 |
---|---|---|
committer | Paul Querna <pquerna@apache.org> | 2011-11-15 16:51:03 +0100 |
commit | bde16999845d8b7ad7d191cb8981f9e3d5091a66 (patch) | |
tree | d2eea0d01e6be720f901a0541295bc64f81f0bab /server/mpm/event/event.c | |
parent | Instead of disabling the listening sockets from the pollset when under load, ... (diff) | |
download | apache2-bde16999845d8b7ad7d191cb8981f9e3d5091a66.tar.xz apache2-bde16999845d8b7ad7d191cb8981f9e3d5091a66.zip |
Create a new lock free circular queue, and use it in the EventMPM to remove the timeout mutex that was wrapping both timeout queue operations and pollset operations.
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1202257 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to '')
-rw-r--r-- | server/mpm/event/event.c | 184 |
1 files changed, 118 insertions, 66 deletions
diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index 8e4f580eea..5d8fbbfc3b 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -97,6 +97,8 @@ #include <limits.h> /* for INT_MAX */ +#include "equeue.h" + #if HAVE_SERF #include "mod_serf.h" #include "serf.h" @@ -183,7 +185,19 @@ static fd_queue_t *worker_queue; static fd_queue_info_t *worker_queue_info; static int mpm_state = AP_MPMQ_STARTING; -static apr_thread_mutex_t *timeout_mutex; +typedef enum { + TIMEOUT_WRITE_COMPLETION, + TIMEOUT_KEEPALIVE, + TIMEOUT_LINGER, + TIMEOUT_SHORT_LINGER +} timeout_type_e; + +typedef struct pollset_op_t { + timeout_type_e timeout_type; + conn_state_t *cs; + const char *tag; +} pollset_op_t; + APR_RING_HEAD(timeout_head_t, conn_state_t); struct timeout_queue { struct timeout_head_t head; @@ -356,6 +370,7 @@ static apr_os_thread_t *listener_os_thread; * perform a non-graceful (forced) shutdown of the server. */ static apr_socket_t **worker_sockets; +static ap_equeue_t **worker_equeues; static void disable_listensocks(int process_slot) { @@ -737,15 +752,45 @@ static void set_signals(void) #endif } +static void process_pollop(pollset_op_t *op) +{ + apr_status_t rv; + conn_state_t *cs = op->cs; + + switch (op->timeout_type) { + case TIMEOUT_WRITE_COMPLETION: + TO_QUEUE_APPEND(write_completion_q, cs); + break; + case TIMEOUT_KEEPALIVE: + TO_QUEUE_APPEND(keepalive_q, cs); + break; + case TIMEOUT_LINGER: + TO_QUEUE_APPEND(linger_q, cs); + break; + case TIMEOUT_SHORT_LINGER: + TO_QUEUE_APPEND(short_linger_q, cs); + break; + } + + rv = apr_pollset_add(event_pollset, &op->cs->pfd); + + if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { + ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, + "%s: apr_pollset_add failure", op->tag); + } +} + /* * close our side of the connection * Pre-condition: cs is not in any timeout queue and not in the pollset, * timeout_mutex is not locked * return: 0 if connection is fully closed, * 1 if connection is lingering - * may be called by listener or by worker thread + * may be called by listener or by worker thread. + * the eq may be null if called from the listener thread, + * and the pollset operations are done directly by this function. */ -static int start_lingering_close(conn_state_t *cs) +static int start_lingering_close(conn_state_t *cs, ap_equeue_t *eq) { apr_status_t rv; if (ap_start_lingering_close(cs->c)) { @@ -755,7 +800,15 @@ static int start_lingering_close(conn_state_t *cs) } else { apr_socket_t *csd = ap_get_conn_socket(cs->c); - struct timeout_queue *q; + pollset_op_t localv; + pollset_op_t *v; + + if (eq) { + v = ap_equeue_writer_value(eq); + } + else { + v = &localv; + } rv = apr_socket_timeout_set(csd, 0); AP_DEBUG_ASSERT(rv == APR_SUCCESS); @@ -767,30 +820,25 @@ static int start_lingering_close(conn_state_t *cs) if (apr_table_get(cs->c->notes, "short-lingering-close")) { cs->expiration_time = apr_time_now() + apr_time_from_sec(SECONDS_TO_LINGER); - q = &short_linger_q; + v->timeout_type = TIMEOUT_SHORT_LINGER; + v->tag = "start_lingering_close(short)"; cs->state = CONN_STATE_LINGER_SHORT; } else { cs->expiration_time = apr_time_now() + apr_time_from_sec(MAX_SECS_TO_LINGER); - q = &linger_q; + v->timeout_type = TIMEOUT_LINGER; + v->tag = "start_lingering_close(normal)"; cs->state = CONN_STATE_LINGER_NORMAL; } - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_APPEND(*q, cs); + cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR; - rv = apr_pollset_add(event_pollset, &cs->pfd); - apr_thread_mutex_unlock(timeout_mutex); - if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { - ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, - "start_lingering_close: apr_pollset_add failure"); - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_REMOVE(*q, cs); - apr_thread_mutex_unlock(timeout_mutex); - apr_socket_close(cs->pfd.desc.s); - apr_pool_clear(cs->p); - ap_push_pool(worker_queue_info, cs->p); - return 0; + v->cs = cs; + if (eq != NULL) { + ap_equeue_writer_onward(eq); + } + else { + process_pollop(v); } } return 1; @@ -802,7 +850,7 @@ static int start_lingering_close(conn_state_t *cs) * Pre-condition: cs is not in any timeout queue and not in the pollset * return: irrelevant (need same prototype as start_lingering_close) */ -static int stop_lingering_close(conn_state_t *cs) +static int stop_lingering_close(conn_state_t *cs, ap_equeue_t *eq) { apr_status_t rv; apr_socket_t *csd = ap_get_conn_socket(cs->c); @@ -824,7 +872,9 @@ static int stop_lingering_close(conn_state_t *cs) * 0 if it is still open and waiting for some event */ static int process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * sock, - conn_state_t * cs, int my_child_num, + conn_state_t * cs, + ap_equeue_t *eq, + int my_child_num, int my_thread_num) { conn_rec *c; @@ -858,7 +908,6 @@ static int process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * sock pt->type = PT_CSD; pt->baton = cs; cs->pfd.client_data = pt; - TO_QUEUE_ELEM_INIT(cs); ap_update_vhost_given_ip(c); @@ -935,12 +984,16 @@ read_request: * Set a write timeout for this connection, and let the * event thread poll for writeability. */ + pollset_op_t *v = ap_equeue_writer_value(eq); + cs->expiration_time = ap_server_conf->timeout + apr_time_now(); - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_APPEND(write_completion_q, cs); cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR; - rc = apr_pollset_add(event_pollset, &cs->pfd); - apr_thread_mutex_unlock(timeout_mutex); + + v->cs = cs; + v->timeout_type = TIMEOUT_WRITE_COMPLETION; + v->tag = "process_socket(write_completion)"; + + ap_equeue_writer_onward(eq); return 1; } else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted || @@ -957,11 +1010,12 @@ read_request: } if (cs->state == CONN_STATE_LINGER) { - if (!start_lingering_close(cs)) + if (!start_lingering_close(cs, eq)) { return 0; + } } else if (cs->state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) { - apr_status_t rc; + pollset_op_t *v; /* It greatly simplifies the logic to use a single timeout value here * because the new element can just be added to the end of the list and @@ -973,19 +1027,14 @@ read_request: */ cs->expiration_time = ap_server_conf->keep_alive_timeout + apr_time_now(); - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_APPEND(keepalive_q, cs); /* Add work to pollset. */ + v = ap_equeue_writer_value(eq); + v->timeout_type = TIMEOUT_KEEPALIVE; + v->cs = cs; cs->pfd.reqevents = APR_POLLIN; - rc = apr_pollset_add(event_pollset, &cs->pfd); - apr_thread_mutex_unlock(timeout_mutex); - - if (rc != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, - "process_socket: apr_pollset_add failure"); - AP_DEBUG_ASSERT(rc == APR_SUCCESS); - } + v->tag = "process_socket(keepalive)"; + ap_equeue_writer_onward(eq); } return 1; } @@ -1273,7 +1322,6 @@ static void process_lingering_close(conn_state_t *cs, const apr_pollfd_t *pfd) return; } - apr_thread_mutex_lock(timeout_mutex); rv = apr_pollset_remove(event_pollset, pfd); AP_DEBUG_ASSERT(rv == APR_SUCCESS); @@ -1281,7 +1329,6 @@ static void process_lingering_close(conn_state_t *cs, const apr_pollfd_t *pfd) AP_DEBUG_ASSERT(rv == APR_SUCCESS); TO_QUEUE_REMOVE(*q, cs); - apr_thread_mutex_unlock(timeout_mutex); TO_QUEUE_ELEM_INIT(cs); apr_pool_clear(cs->p); @@ -1294,7 +1341,7 @@ static void process_lingering_close(conn_state_t *cs, const apr_pollfd_t *pfd) */ static void process_timeout_queue(struct timeout_queue *q, apr_time_t timeout_time, - int (*func)(conn_state_t *)) + int (*func)(conn_state_t *, ap_equeue_t *eq)) { int count = 0; conn_state_t *first, *cs, *last; @@ -1322,15 +1369,13 @@ static void process_timeout_queue(struct timeout_queue *q, APR_RING_UNSPLICE(first, last, timeout_list); AP_DEBUG_ASSERT(q->count >= count); q->count -= count; - apr_thread_mutex_unlock(timeout_mutex); while (count) { cs = APR_RING_NEXT(first, timeout_list); TO_QUEUE_ELEM_INIT(first); - func(first); + func(first, NULL); first = cs; count--; } - apr_thread_mutex_lock(timeout_mutex); } static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) @@ -1397,14 +1442,12 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) /* trace log status every second */ if (now - last_log > apr_time_from_msec(1000)) { last_log = now; - apr_thread_mutex_lock(timeout_mutex); ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, "connections: %d (write-completion: %d " "keep-alive: %d lingering: %d)", connection_count, write_completion_q.count, keepalive_q.count, linger_q.count + short_linger_q.count); - apr_thread_mutex_unlock(timeout_mutex); } } @@ -1483,7 +1526,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) case CONN_STATE_WRITE_COMPLETION: get_worker(&have_idle_worker, blocking, &workers_were_busy); - apr_thread_mutex_lock(timeout_mutex); TO_QUEUE_REMOVE(*remove_from_q, cs); rc = apr_pollset_remove(event_pollset, &cs->pfd); @@ -1496,19 +1538,17 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, "pollset remove failed"); - apr_thread_mutex_unlock(timeout_mutex); - start_lingering_close(cs); + start_lingering_close(cs, NULL); break; } - apr_thread_mutex_unlock(timeout_mutex); TO_QUEUE_ELEM_INIT(cs); /* If we didn't get a worker immediately for a keep-alive * request, we close the connection, so that the client can * re-connect to a different process. */ if (!have_idle_worker) { - start_lingering_close(cs); + start_lingering_close(cs, NULL); break; } rc = push2worker(out_pfd, event_pollset); @@ -1632,6 +1672,20 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) num--; } /* while for processing poll */ + { + /* TODO: break out to separate function */ + int i; + + for (i = 0; i < threads_per_child; i++) { + ap_equeue_t *eq = worker_equeues[i]; + pollset_op_t *op = NULL; + + while ((op = ap_equeue_reader_next(eq)) != NULL) { + process_pollop(op); + } + } + } + /* XXX possible optimization: stash the current time for use as * r->request_time for new requests */ @@ -1642,7 +1696,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) timeout_time = now + TIMEOUT_FUDGE_FACTOR; /* handle timed out sockets */ - apr_thread_mutex_lock(timeout_mutex); /* Step 1: keepalive timeouts */ /* If all workers are busy, we kill older keep-alive connections so that they @@ -1672,7 +1725,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) ps->write_completion = write_completion_q.count; ps->lingering_close = linger_q.count + short_linger_q.count; ps->keep_alive = keepalive_q.count; - apr_thread_mutex_unlock(timeout_mutex); ps->connections = apr_atomic_read32(&connection_count); /* XXX: should count CONN_STATE_SUSPENDED and set ps->suspended */ @@ -1716,6 +1768,7 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) apr_status_t rv; int is_idle = 0; timer_event_t *te = NULL; + ap_equeue_t *eq = worker_equeues[thread_slot]; free(ti); @@ -1788,7 +1841,7 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) else { is_idle = 0; worker_sockets[thread_slot] = csd; - rv = process_socket(thd, ptrans, csd, cs, process_slot, thread_slot); + rv = process_socket(thd, ptrans, csd, cs, eq, process_slot, thread_slot); if (!rv) { requests_this_child--; } @@ -1885,17 +1938,6 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy) clean_child_exit(APEXIT_CHILDFATAL); } - /* Create the timeout mutex and main pollset before the listener - * thread starts. - */ - rv = apr_thread_mutex_create(&timeout_mutex, APR_THREAD_MUTEX_DEFAULT, - pchild); - if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, - "creation of the timeout mutex failed."); - clean_child_exit(APEXIT_CHILDFATAL); - } - /* Create the main pollset */ rv = apr_pollset_create(&event_pollset, threads_per_child, /* XXX don't we need more, to handle @@ -1912,6 +1954,16 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy) worker_sockets = apr_pcalloc(pchild, threads_per_child * sizeof(apr_socket_t *)); + worker_equeues = apr_palloc(pchild, threads_per_child * sizeof(ap_equeue_t*)); + + for (i = 0; i < threads_per_child; i++) { + ap_equeue_t* eq = NULL; + /* TODO: research/test optimal size of queue here */ + ap_equeue_create(pchild, 16, sizeof(pollset_op_t), &eq); + /* same as thread ID */ + worker_equeues[i] = eq; + } + loops = prev_threads_created = 0; while (1) { /* threads_per_child does not include the listener thread */ |