summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_workers.c
diff options
context:
space:
mode:
authorStefan Eissing <icing@apache.org>2017-03-25 17:07:30 +0100
committerStefan Eissing <icing@apache.org>2017-03-25 17:07:30 +0100
commitf56c4d62547bd9b11591ee306d26827030fdc4a3 (patch)
tree1d80cb17ec4b693b9e1dc304a0e14faa7360a626 /modules/http2/h2_workers.c
parentCHANGES: backported features to 2.4.x (diff)
downloadapache2-f56c4d62547bd9b11591ee306d26827030fdc4a3.tar.xz
apache2-f56c4d62547bd9b11591ee306d26827030fdc4a3.zip
On the trunk:
mod_http2: h2 workers with improved scalability for better scheduling performance. There are H2MaxWorkers threads created at start and the number is kept constant. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1788672 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules/http2/h2_workers.c')
-rw-r--r--modules/http2/h2_workers.c464
1 files changed, 215 insertions, 249 deletions
diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c
index e0f4308816..02e7d76336 100644
--- a/modules/http2/h2_workers.c
+++ b/modules/http2/h2_workers.c
@@ -27,242 +27,232 @@
#include "h2_private.h"
#include "h2_mplx.h"
#include "h2_task.h"
-#include "h2_worker.h"
#include "h2_workers.h"
+#include "h2_util.h"
+typedef struct h2_slot h2_slot;
+struct h2_slot {
+ int id;
+ h2_slot *next;
+ h2_workers *workers;
+ int aborted;
+ int sticks;
+ h2_task *task;
+ apr_thread_t *thread;
+ apr_thread_cond_t *not_idle;
+};
-static int in_list(h2_workers *workers, h2_mplx *m)
+static h2_slot *pop_slot(h2_slot **phead)
{
- h2_mplx *e;
- for (e = H2_MPLX_LIST_FIRST(&workers->mplxs);
- e != H2_MPLX_LIST_SENTINEL(&workers->mplxs);
- e = H2_MPLX_NEXT(e)) {
- if (e == m) {
- return 1;
+ /* Atomically pop a slot from the list */
+ for (;;) {
+ h2_slot *first = *phead;
+ if (first == NULL) {
+ return NULL;
+ }
+ if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
+ first->next = NULL;
+ return first;
}
}
- return 0;
}
-static void cleanup_zombies(h2_workers *workers, int lock)
+static void push_slot(h2_slot **phead, h2_slot *slot)
{
- if (lock) {
- apr_thread_mutex_lock(workers->lock);
- }
- while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) {
- h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies);
- H2_WORKER_REMOVE(zombie);
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: cleanup zombie %d", zombie->id);
- h2_worker_destroy(zombie);
+ /* Atomically push a slot to the list */
+ ap_assert(!slot->next);
+ for (;;) {
+ h2_slot *next = slot->next = *phead;
+ if (apr_atomic_casptr((void*)phead, slot, next) == next) {
+ return;
+ }
}
- if (lock) {
+}
+
+static void wake_idle_worker(h2_workers *workers)
+{
+ h2_slot *slot = pop_slot(&workers->idle);
+ if (slot) {
+ apr_thread_mutex_lock(workers->lock);
+ apr_thread_cond_signal(slot->not_idle);
apr_thread_mutex_unlock(workers->lock);
}
}
-static h2_task *next_task(h2_workers *workers)
+static void cleanup_zombies(h2_workers *workers)
{
- h2_task *task = NULL;
- h2_mplx *last = NULL;
- int has_more;
-
- /* Get the next h2_mplx to process that has a task to hand out.
- * If it does, place it at the end of the queu and return the
- * task to the worker.
- * If it (currently) has no tasks, remove it so that it needs
- * to register again for scheduling.
- * If we run out of h2_mplx in the queue, we need to wait for
- * new mplx to arrive. Depending on how many workers do exist,
- * we do a timed wait or block indefinitely.
- */
- while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
- h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs);
-
- if (last == m) {
- break;
- }
- H2_MPLX_REMOVE(m);
- --workers->mplx_count;
-
- task = h2_mplx_pop_task(m, &has_more);
- if (has_more) {
- H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
- ++workers->mplx_count;
- if (!last) {
- last = m;
- }
+ h2_slot *slot;
+ while ((slot = pop_slot(&workers->zombies))) {
+ if (slot->thread) {
+ apr_status_t status;
+ apr_thread_join(&status, slot->thread);
+ slot->thread = NULL;
}
+ --workers->worker_count;
+ push_slot(&workers->free, slot);
}
- return task;
+}
+
+static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m)
+{
+ int has_more;
+ slot->task = h2_mplx_pop_task(m, &has_more);
+ if (slot->task) {
+ /* Ok, we got something to give back to the worker for execution.
+ * If we still have idle workers, we let the worker be sticky,
+ * e.g. making it poll the task's h2_mplx instance for more work
+ * before asking back here. */
+ slot->sticks = slot->workers->max_workers;
+ return has_more? APR_EAGAIN : APR_SUCCESS;
+ }
+ slot->sticks = 0;
+ return APR_EOF;
+}
+
+static h2_fifo_op_t mplx_peek(void *head, void *ctx)
+{
+ h2_mplx *m = head;
+ h2_slot *slot = ctx;
+
+ if (slot_pull_task(slot, m) == APR_EAGAIN) {
+ wake_idle_worker(slot->workers);
+ return H2_FIFO_OP_REPUSH;
+ }
+ return H2_FIFO_OP_PULL;
}
/**
* Get the next task for the given worker. Will block until a task arrives
* or the max_wait timer expires and more than min workers exist.
*/
-static apr_status_t get_mplx_next(h2_worker *worker, void *ctx,
- h2_task **ptask, int *psticky)
+static apr_status_t get_next(h2_slot *slot)
{
+ h2_workers *workers = slot->workers;
apr_status_t status;
- apr_time_t wait_until = 0, now;
- h2_workers *workers = ctx;
- h2_task *task = NULL;
-
- *ptask = NULL;
- *psticky = 0;
- status = apr_thread_mutex_lock(workers->lock);
- if (status == APR_SUCCESS) {
- ++workers->idle_workers;
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): looking for work", worker->id);
-
- while (!h2_worker_is_aborted(worker) && !workers->aborted
- && !(task = next_task(workers))) {
-
- /* Need to wait for a new tasks to arrive. If we are above
- * minimum workers, we do a timed wait. When timeout occurs
- * and we have still more workers, we shut down one after
- * the other. */
- cleanup_zombies(workers, 0);
- if (workers->worker_count > workers->min_workers) {
- now = apr_time_now();
- if (now >= wait_until) {
- wait_until = now + apr_time_from_sec(workers->max_idle_secs);
- }
-
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): waiting signal, "
- "workers=%d, idle=%d", worker->id,
- (int)workers->worker_count,
- workers->idle_workers);
- status = apr_thread_cond_timedwait(workers->mplx_added,
- workers->lock,
- wait_until - now);
- if (status == APR_TIMEUP
- && workers->worker_count > workers->min_workers) {
- /* waited long enough without getting a task and
- * we are above min workers, abort this one. */
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0,
- workers->s,
- "h2_workers: aborting idle worker");
- h2_worker_abort(worker);
- break;
- }
- }
- else {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): waiting signal (eternal), "
- "worker_count=%d, idle=%d", worker->id,
- (int)workers->worker_count,
- workers->idle_workers);
- apr_thread_cond_wait(workers->mplx_added, workers->lock);
- }
+ slot->task = NULL;
+ while (!slot->aborted) {
+ if (!slot->task) {
+ status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
}
- /* Here, we either have gotten task or decided to shut down
- * the calling worker.
- */
- if (task) {
- /* Ok, we got something to give back to the worker for execution.
- * If we have more idle workers than h2_mplx in our queue, then
- * we let the worker be sticky, e.g. making it poll the task's
- * h2_mplx instance for more work before asking back here.
- * This avoids entering our global lock as long as enough idle
- * workers remain. Stickiness of a worker ends when the connection
- * has no new tasks to process, so the worker will get back here
- * eventually.
- */
- *ptask = task;
- *psticky = (workers->max_workers >= workers->mplx_count);
-
- if (workers->mplx_count && workers->idle_workers > 1) {
- apr_thread_cond_signal(workers->mplx_added);
- }
+ if (slot->task) {
+ return APR_SUCCESS;
}
- --workers->idle_workers;
+ apr_thread_mutex_lock(workers->lock);
+ ++workers->idle_workers;
+ cleanup_zombies(workers);
+ if (slot->next == NULL) {
+ push_slot(&workers->idle, slot);
+ }
+ apr_thread_cond_wait(slot->not_idle, workers->lock);
apr_thread_mutex_unlock(workers->lock);
}
-
- return *ptask? APR_SUCCESS : APR_EOF;
+ return APR_EOF;
}
-static void worker_done(h2_worker *worker, void *ctx)
+static void slot_done(h2_slot *slot)
{
- h2_workers *workers = ctx;
- apr_status_t status = apr_thread_mutex_lock(workers->lock);
- if (status == APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): done", worker->id);
- H2_WORKER_REMOVE(worker);
- --workers->worker_count;
- H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker);
-
- apr_thread_mutex_unlock(workers->lock);
- }
+ push_slot(&(slot->workers->zombies), slot);
}
-static apr_status_t add_worker(h2_workers *workers)
+
+static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
{
- h2_worker *w = h2_worker_create(workers->next_worker_id++,
- workers->pool, workers->thread_attr,
- get_mplx_next, worker_done, workers);
- if (!w) {
- return APR_ENOMEM;
+ h2_slot *slot = wctx;
+
+ while (!slot->aborted) {
+
+ /* Get a h2_task from the mplxs queue. */
+ get_next(slot);
+ while (slot->task) {
+
+ h2_task_do(slot->task, thread, slot->id);
+
+ /* Report the task as done. If stickyness is left, offer the
+ * mplx the opportunity to give us back a new task right away.
+ */
+ if (!slot->aborted && (--slot->sticks > 0)) {
+ h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task);
+ }
+ else {
+ h2_mplx_task_done(slot->task->mplx, slot->task, NULL);
+ slot->task = NULL;
+ }
+ }
}
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: adding worker(%d)", w->id);
- ++workers->worker_count;
- H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w);
- return APR_SUCCESS;
+
+ slot_done(slot);
+ return NULL;
}
-static apr_status_t h2_workers_start(h2_workers *workers)
+static apr_status_t activate_slot(h2_workers *workers)
{
- apr_status_t status = apr_thread_mutex_lock(workers->lock);
- if (status == APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: starting");
-
- while (workers->worker_count < workers->min_workers
- && status == APR_SUCCESS) {
- status = add_worker(workers);
+ h2_slot *slot = pop_slot(&workers->free);
+ if (slot) {
+ apr_status_t status;
+
+ slot->workers = workers;
+ slot->aborted = 0;
+ slot->task = NULL;
+ if (!slot->not_idle) {
+ status = apr_thread_cond_create(&slot->not_idle, workers->pool);
+ if (status != APR_SUCCESS) {
+ push_slot(&workers->free, slot);
+ return status;
+ }
}
- apr_thread_mutex_unlock(workers->lock);
+
+ apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot,
+ workers->pool);
+ if (!slot->thread) {
+ push_slot(&workers->free, slot);
+ return APR_ENOMEM;
+ }
+
+ ++workers->worker_count;
+ return APR_SUCCESS;
}
- return status;
+ return APR_EAGAIN;
}
static apr_status_t workers_pool_cleanup(void *data)
{
h2_workers *workers = data;
- h2_worker *w;
+ h2_slot *slot;
if (!workers->aborted) {
+ apr_thread_mutex_lock(workers->lock);
workers->aborted = 1;
-
/* before we go, cleanup any zombies and abort the rest */
- cleanup_zombies(workers, 1);
- w = H2_WORKER_LIST_FIRST(&workers->workers);
- while (w != H2_WORKER_LIST_SENTINEL(&workers->workers)) {
- h2_worker_abort(w);
- w = H2_WORKER_NEXT(w);
+ cleanup_zombies(workers);
+ for (;;) {
+ slot = pop_slot(&workers->idle);
+ if (slot) {
+ slot->aborted = 1;
+ apr_thread_cond_signal(slot->not_idle);
+ }
+ else {
+ break;
+ }
}
- apr_thread_mutex_lock(workers->lock);
- apr_thread_cond_broadcast(workers->mplx_added);
apr_thread_mutex_unlock(workers->lock);
+
+ h2_fifo_term(workers->mplxs);
+ h2_fifo_interrupt(workers->mplxs);
}
return APR_SUCCESS;
}
h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
- int min_workers, int max_workers)
+ int min_workers, int max_workers,
+ int idle_secs)
{
apr_status_t status;
h2_workers *workers;
apr_pool_t *pool;
+ int i;
ap_assert(s);
ap_assert(server_pool);
@@ -275,98 +265,74 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
apr_pool_create(&pool, server_pool);
apr_pool_tag(pool, "h2_workers");
workers = apr_pcalloc(pool, sizeof(h2_workers));
- if (workers) {
- workers->s = s;
- workers->pool = pool;
- workers->min_workers = min_workers;
- workers->max_workers = max_workers;
- workers->max_idle_secs = 10;
-
- apr_threadattr_create(&workers->thread_attr, workers->pool);
- if (ap_thread_stacksize != 0) {
- apr_threadattr_stacksize_set(workers->thread_attr,
- ap_thread_stacksize);
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
- "h2_workers: using stacksize=%ld",
- (long)ap_thread_stacksize);
- }
-
- APR_RING_INIT(&workers->workers, h2_worker, link);
- APR_RING_INIT(&workers->zombies, h2_worker, link);
- APR_RING_INIT(&workers->mplxs, h2_mplx, link);
-
- status = apr_thread_mutex_create(&workers->lock,
- APR_THREAD_MUTEX_DEFAULT,
- workers->pool);
- if (status == APR_SUCCESS) {
- status = apr_thread_cond_create(&workers->mplx_added, workers->pool);
- }
- if (status == APR_SUCCESS) {
- status = h2_workers_start(workers);
- }
- if (status == APR_SUCCESS) {
- apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);
- return workers;
- }
+ if (!workers) {
+ return NULL;
}
- return NULL;
-}
+
+ workers->s = s;
+ workers->pool = pool;
+ workers->min_workers = min_workers;
+ workers->max_workers = max_workers;
+ workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
-apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
-{
- apr_status_t status = apr_thread_mutex_lock(workers->lock);
+ status = h2_fifo_create(&workers->mplxs, pool, workers->max_workers);
if (status != APR_SUCCESS) {
- return status;
+ return NULL;
}
- ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s,
- "h2_workers: register mplx(%ld), idle=%d",
- m->id, workers->idle_workers);
- if (in_list(workers, m)) {
- status = APR_EAGAIN;
+ status = apr_threadattr_create(&workers->thread_attr, workers->pool);
+ if (status != APR_SUCCESS) {
+ return NULL;
}
- else {
- H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
- ++workers->mplx_count;
- status = APR_SUCCESS;
+
+ if (ap_thread_stacksize != 0) {
+ apr_threadattr_stacksize_set(workers->thread_attr,
+ ap_thread_stacksize);
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
+ "h2_workers: using stacksize=%ld",
+ (long)ap_thread_stacksize);
}
- if (workers->idle_workers > 0) {
- apr_thread_cond_signal(workers->mplx_added);
+ status = apr_thread_mutex_create(&workers->lock,
+ APR_THREAD_MUTEX_DEFAULT,
+ workers->pool);
+ if (status == APR_SUCCESS) {
+ int n = workers->nslots = workers->max_workers;
+ workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
+ if (workers->slots == NULL) {
+ status = APR_ENOMEM;
+ }
}
- else if (status == APR_SUCCESS
- && workers->worker_count < workers->max_workers) {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: got %d worker, adding 1",
- workers->worker_count);
- add_worker(workers);
+ if (status == APR_SUCCESS) {
+ workers->free = &workers->slots[0];
+ for (i = 0; i < workers->nslots-1; ++i) {
+ workers->slots[i].next = &workers->slots[i+1];
+ workers->slots[i].id = i;
+ }
+ while (workers->worker_count < workers->max_workers
+ && status == APR_SUCCESS) {
+ status = activate_slot(workers);
+ }
}
- apr_thread_mutex_unlock(workers->lock);
- return status;
+ if (status == APR_SUCCESS) {
+ apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);
+ return workers;
+ }
+ return NULL;
}
-apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
+apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
{
- apr_status_t status = apr_thread_mutex_lock(workers->lock);
- if (status == APR_SUCCESS) {
- status = APR_EAGAIN;
- if (in_list(workers, m)) {
- H2_MPLX_REMOVE(m);
- status = APR_SUCCESS;
- }
- apr_thread_mutex_unlock(workers->lock);
- }
+ apr_status_t status;
+ if ((status = h2_fifo_try_push(workers->mplxs, m)) != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s,
+ "h2_workers: unable to push mplx(%ld)", m->id);
+ }
+ wake_idle_worker(workers);
return status;
}
-void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs)
+apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
{
- if (idle_secs <= 0) {
- ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s,
- APLOGNO(02962) "h2_workers: max_worker_idle_sec value of %d"
- " is not valid, ignored.", idle_secs);
- return;
- }
- workers->max_idle_secs = idle_secs;
+ return h2_fifo_remove(workers->mplxs, m);
}
-