diff options
Diffstat (limited to 'modules')
-rw-r--r-- | modules/http2/NWGNUmod_http2 | 1 | ||||
-rw-r--r-- | modules/http2/config2.m4 | 4 | ||||
-rw-r--r-- | modules/http2/h2_conn_io.c | 10 | ||||
-rw-r--r-- | modules/http2/h2_conn_io.h | 1 | ||||
-rw-r--r-- | modules/http2/h2_mplx.c | 334 | ||||
-rw-r--r-- | modules/http2/h2_mplx.h | 64 | ||||
-rw-r--r-- | modules/http2/h2_ngn_shed.c | 333 | ||||
-rw-r--r-- | modules/http2/h2_ngn_shed.h | 64 | ||||
-rw-r--r-- | modules/http2/h2_proxy_session.c | 282 | ||||
-rw-r--r-- | modules/http2/h2_request.c | 3 | ||||
-rw-r--r-- | modules/http2/h2_session.c | 16 | ||||
-rw-r--r-- | modules/http2/h2_task.c | 27 | ||||
-rw-r--r-- | modules/http2/h2_task.h | 5 | ||||
-rw-r--r-- | modules/http2/h2_task_output.c | 106 | ||||
-rw-r--r-- | modules/http2/h2_task_output.h | 9 | ||||
-rw-r--r-- | modules/http2/h2_version.h | 4 | ||||
-rw-r--r-- | modules/http2/mod_http2.c | 17 | ||||
-rw-r--r-- | modules/http2/mod_http2.h | 28 | ||||
-rw-r--r-- | modules/http2/mod_proxy_http2.c | 110 |
19 files changed, 884 insertions, 534 deletions
diff --git a/modules/http2/NWGNUmod_http2 b/modules/http2/NWGNUmod_http2 index 12811bd270..c471ea470d 100644 --- a/modules/http2/NWGNUmod_http2 +++ b/modules/http2/NWGNUmod_http2 @@ -198,6 +198,7 @@ FILES_nlm_objs = \ $(OBJDIR)/h2_io.o \ $(OBJDIR)/h2_io_set.o \ $(OBJDIR)/h2_mplx.o \ + $(OBJDIR)/h2_ngn_shed.o \ $(OBJDIR)/h2_push.o \ $(OBJDIR)/h2_request.o \ $(OBJDIR)/h2_response.o \ diff --git a/modules/http2/config2.m4 b/modules/http2/config2.m4 index b47b2d2192..1515c4dbcf 100644 --- a/modules/http2/config2.m4 +++ b/modules/http2/config2.m4 @@ -33,6 +33,7 @@ h2_int_queue.lo dnl h2_io.lo dnl h2_io_set.lo dnl h2_mplx.lo dnl +h2_ngn_shed.lo dnl h2_push.lo dnl h2_request.lo dnl h2_response.lo dnl @@ -200,7 +201,8 @@ is usually linked shared and requires loading. ], $http2_objs, , most, [ ]) # Ensure that other modules can pick up mod_http2.h -APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current]) +# icing: hold back for now until it is more stable +#APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current]) diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index 4a8375b940..56d01e6732 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -213,6 +213,16 @@ apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush) return h2_conn_io_flush_int(io, flush, 0); } +apr_status_t h2_conn_io_flush(h2_conn_io *io) +{ + /* make sure we always write a flush, even if our buffers are empty. + * We want to flush not only our buffers, but alse ones further down + * the connection filters. */ + apr_bucket *b = apr_bucket_flush_create(io->connection->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(io->output, b); + return h2_conn_io_flush_int(io, 0, 0); +} + apr_status_t h2_conn_io_consider_pass(h2_conn_io *io) { apr_off_t len = 0; diff --git a/modules/http2/h2_conn_io.h b/modules/http2/h2_conn_io.h index f0243927d6..8d71fffcd7 100644 --- a/modules/http2/h2_conn_io.h +++ b/modules/http2/h2_conn_io.h @@ -78,6 +78,7 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, struct h2_session *session); * @param flush if a flush bucket should be appended to any output */ apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush); +apr_status_t h2_conn_io_flush(h2_conn_io *io); /** * Check the amount of buffered output and pass it on if enough has accumulated. diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 86451ecb56..43a4630e9f 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -38,6 +38,7 @@ #include "h2_io_set.h" #include "h2_response.h" #include "h2_mplx.h" +#include "h2_ngn_shed.h" #include "h2_request.h" #include "h2_stream.h" #include "h2_task.h" @@ -143,10 +144,7 @@ static void h2_mplx_destroy(h2_mplx *m) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): destroy, ios=%d", m->id, (int)h2_io_set_size(m->stream_ios)); - m->aborted = 1; - check_tx_free(m); - if (m->pool) { apr_pool_destroy(m->pool); } @@ -197,7 +195,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, return NULL; } - status = apr_thread_cond_create(&m->task_done, m->pool); + status = apr_thread_cond_create(&m->req_added, m->pool); if (status != APR_SUCCESS) { h2_mplx_destroy(m); return NULL; @@ -217,6 +215,9 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->tx_handles_reserved = 0; m->tx_chunk_size = 4; + + m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->stream_max_mem); + h2_ngn_shed_set_ctx(m->ngn_shed , m); } return m; } @@ -362,7 +363,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) h2_mplx_set_consumed_cb(m, NULL, NULL); h2_iq_clear(m->q); - apr_thread_cond_broadcast(m->task_done); + apr_thread_cond_broadcast(m->req_added); while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) { /* iterate until all ios have been orphaned or destroyed */ } @@ -397,8 +398,8 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) h2_io_set_iter(m->stream_ios, stream_print, m); } } - m->aborted = 1; - apr_thread_cond_broadcast(m->task_done); + h2_mplx_abort(m); + apr_thread_cond_broadcast(m->req_added); } } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) @@ -412,15 +413,13 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) void h2_mplx_abort(h2_mplx *m) { - apr_status_t status; int acquired; AP_DEBUG_ASSERT(m); - if (!m->aborted) { - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - m->aborted = 1; - leave_mutex(m, acquired); - } + if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) { + m->aborted = 1; + h2_ngn_shed_abort(m->ngn_shed); + leave_mutex(m, acquired); } } @@ -695,7 +694,8 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) } static apr_status_t out_write(h2_mplx *m, h2_io *io, - ap_filter_t* f, apr_bucket_brigade *bb, + ap_filter_t* f, int blocking, + apr_bucket_brigade *bb, apr_table_t *trailers, struct apr_thread_cond_t *iowait) { @@ -719,6 +719,9 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io, && iowait && (m->stream_max_mem <= h2_io_out_length(io)) && !is_aborted(m, &status)) { + if (!blocking) { + return APR_INCOMPLETE; + } trailers = NULL; if (f) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, @@ -757,7 +760,12 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response, check_tx_reservation(m); } if (bb) { - status = out_write(m, io, f, bb, response->trailers, iowait); + status = out_write(m, io, f, 0, bb, response->trailers, iowait); + if (status == APR_INCOMPLETE) { + /* write will have transferred as much data as possible. + caller has to deal with non-empty brigade */ + status = APR_SUCCESS; + } } have_out_data_for(m, stream_id); } @@ -791,7 +799,8 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response, } apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, - ap_filter_t* f, apr_bucket_brigade *bb, + ap_filter_t* f, int blocking, + apr_bucket_brigade *bb, apr_table_t *trailers, struct apr_thread_cond_t *iowait) { @@ -802,7 +811,7 @@ apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { - status = out_write(m, io, f, bb, trailers, iowait); + status = out_write(m, io, f, blocking, bb, trailers, iowait); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, "h2_mplx(%ld-%d): write with trailers=%s", m->id, io->id, trailers? "yes" : "no"); @@ -1111,7 +1120,9 @@ static void task_done(h2_mplx *m, h2_task *task) if (task->frozen) { /* this task was handed over to an engine for processing */ h2_task_thaw(task); - /* TODO: can we signal an engine that it can now start on this? */ + /* TODO: not implemented yet... */ + /*h2_task_set_io_blocking(task, 0);*/ + apr_thread_cond_broadcast(m->req_added); } else { h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); @@ -1174,7 +1185,6 @@ static void task_done(h2_mplx *m, h2_task *task) /* hang around until the stream deregisteres */ } } - apr_thread_cond_broadcast(m->task_done); } } } @@ -1337,59 +1347,8 @@ apr_status_t h2_mplx_idle(h2_mplx *m) * HTTP/2 request engines ******************************************************************************/ -typedef struct h2_req_entry h2_req_entry; -struct h2_req_entry { - APR_RING_ENTRY(h2_req_entry) link; - request_rec *r; -}; - -#define H2_REQ_ENTRY_NEXT(e) APR_RING_NEXT((e), link) -#define H2_REQ_ENTRY_PREV(e) APR_RING_PREV((e), link) -#define H2_REQ_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link) - -typedef struct h2_req_engine_i h2_req_engine_i; -struct h2_req_engine_i { - h2_req_engine pub; - conn_rec *c; /* connection this engine is assigned to */ - h2_mplx *m; - unsigned int shutdown : 1; /* engine is being shut down */ - apr_thread_cond_t *io; /* condition var for waiting on data */ - APR_RING_HEAD(h2_req_entries, h2_req_entry) entries; - apr_size_t no_assigned; /* # of assigned requests */ - apr_size_t no_live; /* # of live */ - apr_size_t no_finished; /* # of finished */ -}; - -#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_req_entry, link) -#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_req_entry, link) -#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b) -#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b) - -#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \ -h2_req_entry *ap__b = (e); \ -APR_RING_INSERT_HEAD((b), ap__b, h2_req_entry, link); \ -} while (0) - -#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \ -h2_req_entry *ap__b = (e); \ -APR_RING_INSERT_TAIL((b), ap__b, h2_req_entry, link); \ -} while (0) - -static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, - h2_req_engine_i *engine, - request_rec *r) -{ - h2_req_entry *entry = apr_pcalloc(r->pool, sizeof(*entry)); - - APR_RING_ELEM_INIT(entry, link); - entry->r = r; - H2_REQ_ENTRIES_INSERT_TAIL(&engine->entries, entry); - return APR_SUCCESS; -} - - -apr_status_t h2_mplx_engine_push(const char *engine_type, - request_rec *r, h2_mplx_engine_init *einit) +apr_status_t h2_mplx_req_engine_push(const char *ngn_type, + request_rec *r, h2_req_engine_init *einit) { apr_status_t status; h2_mplx *m; @@ -1409,63 +1368,7 @@ apr_status_t h2_mplx_engine_push(const char *engine_type, status = APR_ECONNABORTED; } else { - h2_req_engine_i *engine = (h2_req_engine_i*)m->engine; - - apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id); - status = APR_EOF; - - if (task->ser_headers) { - /* Max compatibility, deny processing of this */ - } - else if (engine && !strcmp(engine->pub.type, engine_type)) { - if (engine->shutdown - || engine->no_assigned >= H2MIN(engine->pub.capacity, 100)) { - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, - "h2_mplx(%ld): engine shutdown or over %s", - m->c->id, engine->pub.id); - engine = NULL; - } - else if (h2_mplx_engine_schedule(m, engine, r) == APR_SUCCESS) { - /* this task will be processed in another thread, - * freeze any I/O for the time being. */ - h2_task_freeze(task, r); - engine->no_assigned++; - status = APR_SUCCESS; - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, - "h2_mplx(%ld): push request %s", - m->c->id, r->the_request); - } - else { - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, - "h2_mplx(%ld): engine error adding req %s", - m->c->id, engine->pub.id); - engine = NULL; - } - } - - if (!engine && einit) { - engine = apr_pcalloc(task->c->pool, sizeof(*engine)); - engine->pub.id = apr_psprintf(task->c->pool, "eng-%ld-%d", - m->id, m->next_eng_id++); - engine->pub.pool = task->c->pool; - engine->pub.type = apr_pstrdup(task->c->pool, engine_type); - engine->pub.window_bits = 30; - engine->pub.req_window_bits = h2_log2(m->stream_max_mem); - engine->c = r->connection; - APR_RING_INIT(&engine->entries, h2_req_entry, link); - engine->m = m; - engine->io = task->io; - engine->no_assigned = 1; - engine->no_live = 1; - - status = einit(&engine->pub, r); - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, - "h2_mplx(%ld): init engine %s (%s)", - m->c->id, engine->pub.id, engine->pub.type); - if (status == APR_SUCCESS) { - m->engine = &engine->pub; - } - } + status = h2_ngn_shed_push_req(m->ngn_shed, ngn_type, task, r, einit); } leave_mutex(m, acquired); @@ -1473,163 +1376,66 @@ apr_status_t h2_mplx_engine_push(const char *engine_type, return status; } -static h2_req_entry *pop_non_frozen(h2_req_engine_i *engine) -{ - h2_req_entry *entry; - h2_task *task; - - for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries); - entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries); - entry = H2_REQ_ENTRY_NEXT(entry)) { - task = h2_ctx_rget_task(entry->r); - AP_DEBUG_ASSERT(task); - if (!task->frozen) { - H2_REQ_ENTRY_REMOVE(entry); - return entry; - } - } - return NULL; -} - -static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, - apr_read_type_e block, request_rec **pr) -{ - h2_req_entry *entry; - - AP_DEBUG_ASSERT(m); - AP_DEBUG_ASSERT(engine); - while (1) { - if (m->aborted) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): mplx abort while pulling requests %s", - m->id, engine->pub.id); - *pr = NULL; - return APR_EOF; - } - - if (!H2_REQ_ENTRIES_EMPTY(&engine->entries) - && (entry = pop_non_frozen(engine))) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r, - "h2_mplx(%ld): request %s pulled by engine %s", - m->c->id, entry->r->the_request, engine->pub.id); - engine->no_live++; - entry->r->connection->current_thread = engine->c->current_thread; - *pr = entry->r; - return APR_SUCCESS; - } - else if (APR_NONBLOCK_READ == block) { - *pr = NULL; - return APR_EAGAIN; - } - else if (H2_REQ_ENTRIES_EMPTY(&engine->entries)) { - engine->shutdown = 1; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): emtpy queue, shutdown engine %s", - m->id, engine->pub.id); - *pr = NULL; - return APR_EOF; - } - apr_thread_cond_timedwait(m->task_done, m->lock, - apr_time_from_msec(100)); - } -} - -apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine, - apr_read_type_e block, request_rec **pr) +apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, + apr_read_type_e block, + apr_uint32_t capacity, + request_rec **pr) { - h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine; - h2_mplx *m = engine->m; + h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); + h2_mplx *m = h2_ngn_shed_get_ctx(shed); apr_status_t status; int acquired; *pr = NULL; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - status = engine_pull(m, engine, block, pr); + int want_shutdown = (block == APR_BLOCK_READ); + if (0 && want_shutdown) { + /* For a blocking read, check first if requests are to be + * had and, if not, wait a short while before doing the + * blocking, and if unsuccessful, terminating read. + */ + status = h2_ngn_shed_pull_req(shed, ngn, capacity, 0, pr); + if (status != APR_EAGAIN) { + return status; + } + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, + "h2_mplx(%ld): start block engine pull", m->id); + apr_thread_cond_timedwait(m->req_added, m->lock, + apr_time_from_msec(100)); + ap_log_cerror(APLOG_MARK, APLOG_INFO, status, m->c, + "h2_mplx(%ld): done block engine pull", m->id); + } + status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr); leave_mutex(m, acquired); } return status; } -static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task, - int waslive, int aborted) +void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn) { - int acquired; - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, - "h2_mplx(%ld): task %s %s by %s", - m->id, task->id, aborted? "aborted":"done", - engine->pub.id); - h2_task_output_close(task->output); - engine->no_finished++; - if (waslive) engine->no_live--; - engine->no_assigned--; - if (task->c != engine->c) { /* do not release what the engine runs on */ - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - task_done(m, task); - leave_mutex(m, acquired); - } - } -} - -void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn) -{ - h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine; - h2_mplx *m = engine->m; - h2_task *task; + h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); + h2_mplx *m = h2_ngn_shed_get_ctx(shed); int acquired; - task = h2_ctx_cget_task(r_conn); - if (task && (enter_mutex(m, &acquired) == APR_SUCCESS)) { - engine_done(m, engine, task, 1, 0); + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + if (h2_ngn_shed_done_req(shed, ngn, r_conn) == APR_SUCCESS) { + h2_task *task = h2_ctx_cget_task(r_conn); + if (task) { + task_done(m, task); + } + } leave_mutex(m, acquired); } } -void h2_mplx_engine_exit(h2_req_engine *pub_engine) +void h2_mplx_req_engine_exit(h2_req_engine *ngn) { - h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine; - h2_mplx *m = engine->m; + h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); + h2_mplx *m = h2_ngn_shed_get_ctx(shed); int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { - if (!m->aborted - && !H2_REQ_ENTRIES_EMPTY(&engine->entries)) { - h2_req_entry *entry; - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - "h2_mplx(%ld): exit engine %s (%s), " - "has still requests queued, shutdown=%d," - "assigned=%ld, live=%ld, finished=%ld", - m->c->id, engine->pub.id, engine->pub.type, - engine->shutdown, - (long)engine->no_assigned, (long)engine->no_live, - (long)engine->no_finished); - for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries); - entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries); - entry = H2_REQ_ENTRY_NEXT(entry)) { - request_rec *r = entry->r; - h2_task *task = h2_ctx_rget_task(r); - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - "h2_mplx(%ld): engine %s has queued task %s, " - "frozen=%d, aborting", - m->c->id, engine->pub.id, task->id, task->frozen); - engine_done(m, engine, task, 0, 1); - } - } - if (!m->aborted && (engine->no_assigned > 1 || engine->no_live > 1)) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - "h2_mplx(%ld): exit engine %s (%s), " - "assigned=%ld, live=%ld, finished=%ld", - m->c->id, engine->pub.id, engine->pub.type, - (long)engine->no_assigned, (long)engine->no_live, - (long)engine->no_finished); - } - else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): exit engine %s (%s)", - m->c->id, engine->pub.id, engine->pub.type); - } - if (m->engine == &engine->pub) { - m->engine = NULL; /* TODO */ - } + h2_ngn_shed_done_ngn(shed, ngn); leave_mutex(m, acquired); } } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 4d6ce7c0d5..368d92fc96 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -47,6 +47,7 @@ struct h2_io_set; struct apr_thread_cond_t; struct h2_workers; struct h2_int_queue; +struct h2_ngn_shed; struct h2_req_engine; #include <apr_queue.h> @@ -75,19 +76,19 @@ struct h2_mplx { struct h2_io_set *ready_ios; struct h2_io_set *redo_ios; - int max_stream_started; /* highest stream id that started processing */ - int workers_busy; /* # of workers processing on this mplx */ - int workers_limit; /* current # of workers limit, dynamic */ - int workers_def_limit; /* default # of workers limit */ - int workers_max; /* max, hard limit # of workers in a process */ - apr_time_t last_idle_block; /* last time, this mplx entered IDLE while - * streams were ready */ - apr_time_t last_limit_change;/* last time, worker limit changed */ + apr_uint32_t max_stream_started; /* highest stream id that started processing */ + apr_uint32_t workers_busy; /* # of workers processing on this mplx */ + apr_uint32_t workers_limit; /* current # of workers limit, dynamic */ + apr_uint32_t workers_def_limit; /* default # of workers limit */ + apr_uint32_t workers_max; /* max, hard limit # of workers in a process */ + apr_time_t last_idle_block; /* last time, this mplx entered IDLE while + * streams were ready */ + apr_time_t last_limit_change; /* last time, worker limit changed */ apr_interval_time_t limit_change_interval; apr_thread_mutex_t *lock; struct apr_thread_cond_t *added_output; - struct apr_thread_cond_t *task_done; + struct apr_thread_cond_t *req_added; struct apr_thread_cond_t *join_wait; apr_size_t stream_max_mem; @@ -102,11 +103,8 @@ struct h2_mplx { h2_mplx_consumed_cb *input_consumed; void *input_consumed_ctx; - - struct h2_req_engine *engine; - /* TODO: signal for waiting tasks*/ - apr_queue_t *engine_queue; - int next_eng_id; + + struct h2_ngn_shed *ngn_shed; }; @@ -308,12 +306,16 @@ apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id, * of bytes buffered reaches configured max. * @param stream_id the stream identifier * @param filter the apache filter context of the data + * @param blocking == 0 iff call should return with APR_INCOMPLETE if + * the full brigade cannot be written at once * @param bb the bucket brigade to append * @param trailers optional trailers for response, maybe NULL * @param iowait a conditional used for block/signalling in h2_mplx */ apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id, - ap_filter_t* filter, apr_bucket_brigade *bb, + ap_filter_t* filter, + int blocking, + apr_bucket_brigade *bb, apr_table_t *trailers, struct apr_thread_cond_t *iowait); @@ -408,20 +410,24 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx, link); \ apr_status_t h2_mplx_idle(h2_mplx *m); /******************************************************************************* - * h2_mplx h2_req_engine handling. + * h2_req_engine handling ******************************************************************************/ - -typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine, - request_rec *r); - -apr_status_t h2_mplx_engine_push(const char *engine_type, - request_rec *r, h2_mplx_engine_init *einit); - -apr_status_t h2_mplx_engine_pull(struct h2_req_engine *engine, - apr_read_type_e block, request_rec **pr); - -void h2_mplx_engine_done(struct h2_req_engine *engine, conn_rec *r_conn); - -void h2_mplx_engine_exit(struct h2_req_engine *engine); + +typedef apr_status_t h2_mplx_req_engine_init(struct h2_req_engine *engine, + const char *id, + const char *type, + apr_pool_t *pool, + apr_uint32_t req_buffer_size, + request_rec *r); + +apr_status_t h2_mplx_req_engine_push(const char *ngn_type, + request_rec *r, + h2_mplx_req_engine_init *einit); +apr_status_t h2_mplx_req_engine_pull(struct h2_req_engine *ngn, + apr_read_type_e block, + apr_uint32_t capacity, + request_rec **pr); +void h2_mplx_req_engine_done(struct h2_req_engine *ngn, conn_rec *r_conn); +void h2_mplx_req_engine_exit(struct h2_req_engine *ngn); #endif /* defined(__mod_h2__h2_mplx__) */ diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c new file mode 100644 index 0000000000..e8e6755e94 --- /dev/null +++ b/modules/http2/h2_ngn_shed.c @@ -0,0 +1,333 @@ +/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <assert.h> +#include <stddef.h> +#include <stdlib.h> + +#include <apr_thread_mutex.h> +#include <apr_thread_cond.h> +#include <apr_strings.h> +#include <apr_time.h> + +#include <httpd.h> +#include <http_core.h> +#include <http_log.h> + +#include "mod_http2.h" + +#include "h2_private.h" +#include "h2_config.h" +#include "h2_conn.h" +#include "h2_ctx.h" +#include "h2_h2.h" +#include "h2_int_queue.h" +#include "h2_response.h" +#include "h2_request.h" +#include "h2_task.h" +#include "h2_task_output.h" +#include "h2_util.h" +#include "h2_ngn_shed.h" + + +typedef struct h2_ngn_entry h2_ngn_entry; +struct h2_ngn_entry { + APR_RING_ENTRY(h2_ngn_entry) link; + request_rec *r; +}; + +#define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link) +#define H2_NGN_ENTRY_PREV(e) APR_RING_PREV((e), link) +#define H2_NGN_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link) + +#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_ngn_entry, link) +#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_ngn_entry, link) +#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b) +#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b) + +#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \ +h2_ngn_entry *ap__b = (e); \ +APR_RING_INSERT_HEAD((b), ap__b, h2_ngn_entry, link); \ +} while (0) + +#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \ +h2_ngn_entry *ap__b = (e); \ +APR_RING_INSERT_TAIL((b), ap__b, h2_ngn_entry, link); \ +} while (0) + +struct h2_req_engine { + const char *id; /* identifier */ + const char *type; /* name of the engine type */ + apr_pool_t *pool; /* pool for engine specific allocations */ + conn_rec *c; /* connection this engine is assigned to */ + h2_ngn_shed *shed; + + unsigned int shutdown : 1; /* engine is being shut down */ + + APR_RING_HEAD(h2_req_entries, h2_ngn_entry) entries; + apr_uint32_t capacity; /* maximum concurrent requests */ + apr_uint32_t no_assigned; /* # of assigned requests */ + apr_uint32_t no_live; /* # of live */ + apr_uint32_t no_finished; /* # of finished */ + + apr_thread_cond_t *io; /* condition var for waiting on data */ +}; + +h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c, + apr_uint32_t req_buffer_size) +{ + h2_ngn_shed *shed; + + shed = apr_pcalloc(pool, sizeof(*shed)); + shed->c = c; + shed->pool = pool; + shed->req_buffer_size = req_buffer_size; + shed->ngns = apr_hash_make(pool); + + return shed; +} + +void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx) +{ + shed->user_ctx = user_ctx; +} + +void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed) +{ + return shed->user_ctx; +} + +h2_ngn_shed *h2_ngn_shed_get_shed(h2_req_engine *ngn) +{ + return ngn->shed; +} + +void h2_ngn_shed_abort(h2_ngn_shed *shed) +{ + shed->aborted = 1; +} + +static apr_status_t ngn_schedule(h2_req_engine *ngn, request_rec *r) +{ + h2_ngn_entry *entry = apr_pcalloc(r->pool, sizeof(*entry)); + + APR_RING_ELEM_INIT(entry, link); + entry->r = r; + H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry); + return APR_SUCCESS; +} + + +apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, + h2_task *task, request_rec *r, + h2_req_engine_init *einit){ + h2_req_engine *ngn; + apr_status_t status = APR_EOF; + + AP_DEBUG_ASSERT(shed); + + apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id); + if (task->ser_headers) { + /* Max compatibility, deny processing of this */ + return APR_EOF; + } + + ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING); + if (ngn) { + if (ngn->shutdown) { + ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, + "h2_ngn_shed(%ld): %s in shutdown", + shed->c->id, ngn->id); + ngn = NULL; + } + else if (ngn->no_assigned >= ngn->capacity) { + ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, + "h2_ngn_shed(%ld): %s over capacity %d/%d", + shed->c->id, ngn->id, ngn->no_assigned, + ngn->capacity); + ngn = NULL; + } + else if (ngn_schedule(ngn, r) == APR_SUCCESS) { + /* this task will be processed in another thread, + * freeze any I/O for the time being. */ + h2_task_freeze(task, r); + ngn->no_assigned++; + status = APR_SUCCESS; + ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r, + "h2_ngn_shed(%ld): pushed request %s to %s", + shed->c->id, task->id, ngn->id); + } + else { + ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r, + "h2_ngn_shed(%ld): engine error adding req %s", + shed->c->id, ngn->id); + ngn = NULL; + } + } + + if (!ngn && einit) { + ngn = apr_pcalloc(task->c->pool, sizeof(*ngn)); + ngn->id = apr_psprintf(task->c->pool, "ngn-%ld-%d", + shed->c->id, shed->next_ngn_id++); + ngn->pool = task->c->pool; + ngn->type = apr_pstrdup(task->c->pool, ngn_type); + ngn->c = r->connection; + APR_RING_INIT(&ngn->entries, h2_ngn_entry, link); + ngn->shed = shed; + ngn->capacity = 100; + ngn->io = task->io; + ngn->no_assigned = 1; + ngn->no_live = 1; + + status = einit(ngn, ngn->id, ngn->type, ngn->pool, + shed->req_buffer_size, r); + ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r, + "h2_ngn_shed(%ld): init engine %s (%s)", + shed->c->id, ngn->id, ngn->type); + if (status == APR_SUCCESS) { + apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, ngn); + } + } + return status; +} + +static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn) +{ + h2_ngn_entry *entry; + h2_task *task; + + for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); + entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries); + entry = H2_NGN_ENTRY_NEXT(entry)) { + task = h2_ctx_rget_task(entry->r); + AP_DEBUG_ASSERT(task); + if (!task->frozen) { + H2_NGN_ENTRY_REMOVE(entry); + return entry; + } + } + return NULL; +} + +apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, + h2_req_engine *ngn, + apr_uint32_t capacity, + int want_shutdown, + request_rec **pr) +{ + h2_ngn_entry *entry; + + AP_DEBUG_ASSERT(ngn); + *pr = NULL; + if (shed->aborted) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c, + "h2_ngn_shed(%ld): abort while pulling requests %s", + shed->c->id, ngn->id); + return APR_EOF; + } + + ngn->capacity = capacity; + if (!H2_REQ_ENTRIES_EMPTY(&ngn->entries) + && (entry = pop_non_frozen(ngn))) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r, + "h2_ngn_shed(%ld): pulled request %s for engine %s", + shed->c->id, entry->r->the_request, ngn->id); + ngn->no_live++; + entry->r->connection->current_thread = ngn->c->current_thread; + *pr = entry->r; + return APR_SUCCESS; + } + else if (want_shutdown) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, + "h2_ngn_shed(%ld): emtpy queue, shutdown engine %s", + shed->c->id, ngn->id); + ngn->shutdown = 1; + return APR_EOF; + } + return APR_EAGAIN; +} + +static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, h2_task *task, + int waslive, int aborted) +{ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, + "h2_ngn_shed(%ld): task %s %s by %s", + shed->c->id, task->id, aborted? "aborted":"done", ngn->id); + h2_task_output_close(task->output); + ngn->no_finished++; + if (waslive) ngn->no_live--; + ngn->no_assigned--; + if (task->c != ngn->c) { /* do not release what the engine runs on */ + return APR_SUCCESS; + } + return APR_EAGAIN; +} + +apr_status_t h2_ngn_shed_done_req(h2_ngn_shed *shed, + h2_req_engine *ngn, conn_rec *r_conn) +{ + h2_task *task = h2_ctx_cget_task(r_conn); + if (task) { + return ngn_done_task(shed, ngn, task, 1, 0); + } + return APR_ECONNABORTED; +} + +void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn) +{ + h2_req_engine *existing; + + if (!shed->aborted + && !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) { + h2_ngn_entry *entry; + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c, + "h2_ngn_shed(%ld): exit engine %s (%s), " + "has still requests queued, shutdown=%d," + "assigned=%ld, live=%ld, finished=%ld", + shed->c->id, ngn->id, ngn->type, + ngn->shutdown, + (long)ngn->no_assigned, (long)ngn->no_live, + (long)ngn->no_finished); + for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); + entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries); + entry = H2_NGN_ENTRY_NEXT(entry)) { + request_rec *r = entry->r; + h2_task *task = h2_ctx_rget_task(r); + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c, + "h2_ngn_shed(%ld): engine %s has queued task %s, " + "frozen=%d, aborting", + shed->c->id, ngn->id, task->id, task->frozen); + ngn_done_task(shed, ngn, task, 0, 1); + } + } + if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c, + "h2_ngn_shed(%ld): exit engine %s (%s), " + "assigned=%ld, live=%ld, finished=%ld", + shed->c->id, ngn->id, ngn->type, + (long)ngn->no_assigned, (long)ngn->no_live, + (long)ngn->no_finished); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, shed->c, + "h2_ngn_shed(%ld): exit engine %s (%s)", + shed->c->id, ngn->id, ngn->type); + } + + existing = apr_hash_get(shed->ngns, ngn->type, APR_HASH_KEY_STRING); + if (existing == ngn) { + apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, NULL); + } +} diff --git a/modules/http2/h2_ngn_shed.h b/modules/http2/h2_ngn_shed.h new file mode 100644 index 0000000000..abed58a458 --- /dev/null +++ b/modules/http2/h2_ngn_shed.h @@ -0,0 +1,64 @@ +/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef h2_req_shed_h +#define h2_req_shed_h + +struct h2_req_engine; + +typedef struct h2_ngn_shed h2_ngn_shed; +struct h2_ngn_shed { + conn_rec *c; + apr_pool_t *pool; + apr_hash_t *ngns; + int next_ngn_id; + void *user_ctx; + + unsigned int aborted : 1; + apr_uint32_t req_buffer_size; /* preferred buffer size for responses */ +}; + +typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine, + const char *id, + const char *type, + apr_pool_t *pool, + apr_uint32_t req_buffer_size, + request_rec *r); + +h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c, + apr_uint32_t req_buffer_size); + +void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx); +void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed); + +h2_ngn_shed *h2_ngn_shed_get_shed(struct h2_req_engine *ngn); + +void h2_ngn_shed_abort(h2_ngn_shed *shed); + +apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, + struct h2_task *task, request_rec *r, + h2_shed_ngn_init *init_cb); + +apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn, + apr_uint32_t capacity, + int want_shutdown, request_rec **pr); + +apr_status_t h2_ngn_shed_done_req(h2_ngn_shed *shed, + struct h2_req_engine *ngn, conn_rec *r_conn); + +void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn); + + +#endif /* h2_req_shed_h */ diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index 9e6ef35755..a5c30e77fa 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -17,6 +17,7 @@ #include <apr_strings.h> #include <nghttp2/nghttp2.h> +#include <mpm_common.h> #include <httpd.h> #include <mod_proxy.h> #include <mod_http2.h> @@ -91,23 +92,11 @@ static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc, * issues in case of error returned below. */ apr_brigade_cleanup(bb); if (status != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO(01084) - "pass request body failed to %pI (%s)", + ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO() + "pass output failed to %pI (%s)", p_conn->addr, p_conn->hostname); - if (origin->aborted) { - const char *ssl_note; - - if (((ssl_note = apr_table_get(origin->notes, "SSL_connect_rv")) - != NULL) && (strcmp(ssl_note, "err") == 0)) { - return HTTP_INTERNAL_SERVER_ERROR; - } - return HTTP_GATEWAY_TIME_OUT; - } - else { - return HTTP_BAD_REQUEST; - } } - return OK; + return status; } static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data, @@ -118,19 +107,19 @@ static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data, apr_status_t status; int flush = 1; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, - "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d", - session->id, (int)length, flush); - b = apr_bucket_transient_create((const char*)data, length, - session->c->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(session->output, b); + if (data) { + b = apr_bucket_transient_create((const char*)data, length, + session->c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(session->output, b); + } status = proxy_pass_brigade(session->c->bucket_alloc, session->p_conn, session->c, session->output, flush); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, + "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d", + session->id, (int)length, flush); if (status != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, - "h2_proxy_sesssion(%s): sending", session->id); return NGHTTP2_ERR_CALLBACK_FAILURE; } return length; @@ -146,7 +135,7 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame, h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03341) - "h2_session(%s): recv FRAME[%s]", + "h2_proxy_session(%s): recv FRAME[%s]", session->id, buffer); } @@ -167,7 +156,7 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame, h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03342) - "h2_session(%s): recv FRAME[%s]", + "h2_proxy_session(%s): recv FRAME[%s]", session->id, buffer); } break; @@ -186,7 +175,7 @@ static int before_frame_send(nghttp2_session *ngh2, h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03343) - "h2_session(%s): sent FRAME[%s]", + "h2_proxy_session(%s): sent FRAME[%s]", session->id, buffer); } return 0; @@ -339,9 +328,13 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags, apr_bucket *b; apr_status_t status; - nghttp2_session_consume(ngh2, stream_id, len); + /*nghttp2_session_consume(ngh2, stream_id, len);*/ stream = nghttp2_session_get_stream_user_data(ngh2, stream_id); if (!stream) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, + "h2_proxy_session(%s): recv data chunk for " + "unknown stream %d, ignored", + session->id, stream_id); return 0; } @@ -359,10 +352,14 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags, b = apr_bucket_flush_create(stream->r->connection->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->output, b); } + + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, + "h2_proxy_session(%s): pass response data for " + "stream %d, %d bytes", session->id, stream_id, (int)len); status = ap_pass_brigade(stream->r->output_filters, stream->output); if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03344) - "h2_session(%s-%d): passing output", + "h2_proxy_session(%s): passing output on stream %d", session->id, stream->id); nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id, NGHTTP2_STREAM_CLOSED); @@ -375,6 +372,9 @@ static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id, uint32_t error_code, void *user_data) { h2_proxy_session *session = user_data; + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO() + "h2_proxy_session(%s): stream=%d, closed, err=%d", + session->id, stream_id, error_code); dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL); return 0; } @@ -415,6 +415,9 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id, *data_flags = 0; stream = nghttp2_session_get_stream_user_data(ngh2, stream_id); if (!stream) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, + "h2_proxy_stream(%s): data_read, stream %d not found", + stream->session->id, stream_id); return NGHTTP2_ERR_CALLBACK_FAILURE; } @@ -492,6 +495,7 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn, h2_proxy_session *session; nghttp2_session_callbacks *cbs; nghttp2_option *option; + ap_filter_t *f; session = apr_pcalloc(pool, sizeof(*session)); apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close); @@ -522,6 +526,7 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn, nghttp2_option_new(&option); nghttp2_option_set_peer_max_concurrent_streams(option, 100); + nghttp2_option_set_no_auto_window_update(option, 0); nghttp2_session_client_new2(&session->ngh2, cbs, session, option); @@ -530,6 +535,14 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "setup session for %s", p_conn->hostname); + + f = session->c->input_filters; + while (f) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_proxy_session(%s): c->input_filter %s", + session->id, f->frec->name); + f = f->next; + } } return p_conn->data; @@ -539,6 +552,12 @@ static apr_status_t session_start(h2_proxy_session *session) { nghttp2_settings_entry settings[2]; int rv, add_conn_window; + apr_socket_t *s; + + s = ap_get_conn_socket(session->c); + if (s) { + ap_sock_disable_nagle(s); + } settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH; settings[0].value = 0; @@ -557,47 +576,6 @@ static apr_status_t session_start(h2_proxy_session *session) return rv? APR_EGENERAL : APR_SUCCESS; } -static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb) -{ - apr_status_t status = APR_SUCCESS; - apr_size_t readlen = 0; - ssize_t n; - - while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) { - apr_bucket* b = APR_BRIGADE_FIRST(bb); - - if (!APR_BUCKET_IS_METADATA(b)) { - const char *bdata = NULL; - apr_size_t blen = 0; - - status = apr_bucket_read(b, &bdata, &blen, APR_NONBLOCK_READ); - if (status == APR_SUCCESS && blen > 0) { - n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen); - if (n < 0) { - if (nghttp2_is_fatal((int)n)) { - return APR_EGENERAL; - } - } - else { - readlen += n; - if (n < blen) { - apr_bucket_split(b, n); - } - } - } - } - apr_bucket_delete(b); - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - "h2_session(%s): fed %ld bytes of input to session", - session->id, (long)readlen); - if (readlen == 0 && status == APR_SUCCESS) { - return APR_EAGAIN; - } - return status; -} - static apr_status_t open_stream(h2_proxy_session *session, const char *url, request_rec *r, h2_proxy_stream **pstream) { @@ -668,13 +646,13 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st const char *task_id = apr_table_get(stream->r->connection->notes, H2_TASK_ID_NOTE); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session(%s): submit %s%s -> %d (task %s)", + "h2_proxy_session(%s): submit %s%s -> %d (task %s)", session->id, stream->req->authority, stream->req->path, rv, task_id); } else { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session(%s-%d): submit %s%s", + "h2_proxy_session(%s-%d): submit %s%s", session->id, rv, stream->req->authority, stream->req->path); } @@ -689,50 +667,114 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st return APR_EGENERAL; } -static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block, - apr_interval_time_t timeout) +static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb) { - apr_status_t status; - apr_socket_t *socket = NULL; - apr_time_t save_timeout = -1; + apr_status_t status = APR_SUCCESS; + apr_size_t readlen = 0; + ssize_t n; - if (block) { - socket = ap_get_conn_socket(session->c); - if (socket) { - apr_socket_timeout_get(socket, &save_timeout); - apr_socket_timeout_set(socket, timeout); + while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) { + apr_bucket* b = APR_BRIGADE_FIRST(bb); + + if (APR_BUCKET_IS_METADATA(b)) { + if (APR_BUCKET_IS_EOS(b)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_proxy_session(%s): read EOS from conn", + session->id); + } + else if (APR_BUCKET_IS_FLUSH(b)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_proxy_session(%s): read FLUSH from conn", + session->id); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_proxy_session(%s): read unkown META from conn", + session->id); + } } else { - /* cannot block on timeout */ - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, - "h2_session(%s): unable to get conn socket", - session->id); - return APR_ENOTIMPL; + const char *bdata = NULL; + apr_size_t blen = 0; + + status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ); + if (status == APR_SUCCESS && blen > 0) { + n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_proxy_session(%s): feeding %ld bytes -> %ld", + session->id, (long)blen, (long)n); + if (n < 0) { + if (nghttp2_is_fatal((int)n)) { + status = APR_EGENERAL; + } + } + else { + readlen += n; + if (n < blen) { + apr_bucket_split(b, n); + } + } + } } + apr_bucket_delete(b); } - status = ap_get_brigade(session->c->input_filters, session->input, - AP_MODE_READBYTES, - block? APR_BLOCK_READ : APR_NONBLOCK_READ, - 64 * 1024); - if (socket && save_timeout != -1) { - apr_socket_timeout_set(socket, save_timeout); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, + "h2_proxy_session(%s): fed %ld bytes of input to session", + session->id, (long)readlen); + if (readlen == 0 && status == APR_SUCCESS) { + return APR_EAGAIN; } + return status; +} + +static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block, + apr_interval_time_t timeout) +{ + apr_status_t status = APR_SUCCESS; - if (status == APR_SUCCESS) { - if (APR_BRIGADE_EMPTY(session->input)) { - status = APR_EAGAIN; + if (APR_BRIGADE_EMPTY(session->input)) { + apr_socket_t *socket = NULL; + apr_time_t save_timeout = -1; + + if (block) { + socket = ap_get_conn_socket(session->c); + if (socket) { + apr_socket_timeout_get(socket, &save_timeout); + apr_socket_timeout_set(socket, timeout); + } + else { + /* cannot block on timeout */ + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, + "h2_proxy_session(%s): unable to get conn socket", + session->id); + return APR_ENOTIMPL; + } } - else { - feed_brigade(session, session->input); + + status = ap_get_brigade(session->c->input_filters, session->input, + AP_MODE_READBYTES, + block? APR_BLOCK_READ : APR_NONBLOCK_READ, + 64 * 1024); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, + "h2_proxy_session(%s): read from conn", session->id); + if (socket && save_timeout != -1) { + apr_socket_timeout_set(socket, save_timeout); } } + + if (status == APR_SUCCESS) { + status = feed_brigade(session, session->input); + } else if (APR_STATUS_IS_TIMEUP(status)) { /* nop */ } else if (!APR_STATUS_IS_EAGAIN(status)) { - dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, + "h2_proxy_session(%s): read error", session->id); + dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL); } + return status; } @@ -918,8 +960,8 @@ static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg) break; default: - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, - "h2_session(%s): conn error -> shutdown", session->id); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, arg, session->c, + "h2_proxy_session(%s): conn error -> shutdown", session->id); session_shutdown(session, arg, msg); break; } @@ -936,7 +978,7 @@ static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg) default: ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, - "h2_session(%s): proto error -> shutdown", session->id); + "h2_proxy_session(%s): proto error -> shutdown", session->id); session_shutdown(session, arg, msg); break; } @@ -984,7 +1026,6 @@ static void ev_no_io(h2_proxy_session *session, int arg, const char *msg) * task processing in other threads. Do a busy wait with * backoff timer. */ transit(session, "no io", H2_PROXYS_ST_WAIT); - session->wait_timeout = 25; } break; default: @@ -1133,7 +1174,7 @@ static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, break; default: ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, - "h2_session(%s): unknown event %d", + "h2_proxy_session(%s): unknown event %d", session->id, ev); break; } @@ -1145,7 +1186,7 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session) int have_written = 0, have_read = 0; ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_session(%s): process", session->id); + "h2_proxy_session(%s): process", session->id); switch (session->state) { case H2_PROXYS_ST_INIT: @@ -1154,7 +1195,7 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session) dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL); } else { - dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL); + dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL); } break; @@ -1165,8 +1206,8 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session) int rv = nghttp2_session_send(session->ngh2); if (rv < 0 && nghttp2_is_fatal(rv)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_session(%s): write, rv=%d", session->id, rv); - dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL); + "h2_proxy_session(%s): write, rv=%d", session->id, rv); + dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL); break; } have_written = 1; @@ -1189,14 +1230,27 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session) if (check_suspended(session) == APR_EAGAIN) { /* no stream has become resumed. Do a blocking read with * ever increasing timeouts... */ - status = h2_proxy_session_read(session, 0, session->wait_timeout); - if (status == APR_SUCCESS) { - dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL); + if (session->wait_timeout < 25) { + session->wait_timeout = 25; } - else if (APR_STATUS_IS_TIMEUP(status)) { + else { session->wait_timeout = H2MIN(apr_time_from_msec(100), 2*session->wait_timeout); } + + status = h2_proxy_session_read(session, 1, session->wait_timeout); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, + "h2_proxy_session(%s): WAIT read, timeout=%fms", + session->id, (float)session->wait_timeout/1000.0); + if (status == APR_SUCCESS) { + have_read = 1; + dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL); + } + else if (APR_STATUS_IS_TIMEUP(status) + || APR_STATUS_IS_EAGAIN(status)) { + /* go back to checking all inputs again */ + transit(session, "wait cycle", H2_PROXYS_ST_BUSY); + } } break; @@ -1208,13 +1262,17 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session) default: ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c, - APLOGNO(03346)"h2_session(%s): unknown state %d", + APLOGNO(03346)"h2_proxy_session(%s): unknown state %d", session->id, session->state); dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL); break; } + if (have_read || have_written) { + session->wait_timeout = 0; + } + if (!nghttp2_session_want_read(session->ngh2) && !nghttp2_session_want_write(session->ngh2)) { dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL); diff --git a/modules/http2/h2_request.c b/modules/http2/h2_request.c index 2767ef538a..18509dfc12 100644 --- a/modules/http2/h2_request.c +++ b/modules/http2/h2_request.c @@ -451,6 +451,9 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn) /* Request check post hooks failed. An example of this would be a * request for a vhost where h2 is disabled --> 421. */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, conn, APLOGNO() + "h2_request(%d): access_status=%d, request_create failed", + req->id, access_status); ap_die(access_status, r); ap_update_child_status(conn->sbh, SERVER_BUSY_LOG, r); ap_run_log_transaction(r); diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 855e8b9c85..ecd8ae0061 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -597,15 +597,6 @@ static int on_frame_send_cb(nghttp2_session *ngh2, (long)session->frames_sent); } ++session->frames_sent; - switch (frame->hd.type) { - case NGHTTP2_HEADERS: - case NGHTTP2_DATA: - /* no explicit flushing necessary */ - break; - default: - session->flush = 1; - break; - } return 0; } @@ -2021,6 +2012,8 @@ apr_status_t h2_session_process(h2_session *session, int async) no_streams = h2_ihash_is_empty(session->streams); update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE : SERVER_BUSY_READ), "idle"); + /* make certain, the client receives everything before we idle */ + h2_conn_io_flush(&session->io); if (async && no_streams && !session->r && session->requests_received) { ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, "h2_session(%ld): async idle, nonblock read", session->id); @@ -2176,6 +2169,8 @@ apr_status_t h2_session_process(h2_session *session, int async) "h2_session: wait for data, %ld micros", (long)session->wait_us); } + /* make certain, the client receives everything before we idle */ + h2_conn_io_flush(&session->io); status = h2_mplx_out_trywait(session->mplx, session->wait_us, session->iowait); if (status == APR_SUCCESS) { @@ -2212,8 +2207,7 @@ apr_status_t h2_session_process(h2_session *session, int async) } out: - h2_conn_io_pass(&session->io, session->flush); - session->flush = 0; + h2_conn_io_flush(&session->io); ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, "h2_session(%ld): [%s] process returns", diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 06543d670c..0a781f63d4 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -93,16 +93,16 @@ static apr_status_t h2_response_freeze_filter(ap_filter_t* f, AP_DEBUG_ASSERT(task); if (task->frozen) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r, + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r, "h2_response_freeze_filter, saving"); - return ap_save_brigade(f, &task->frozen_out, &bb, task->c->pool); + return ap_save_brigade(f, &task->output->frozen_bb, &bb, task->c->pool); } if (APR_BRIGADE_EMPTY(bb)) { return APR_SUCCESS; } - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r, + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r, "h2_response_freeze_filter, passing"); return ap_pass_brigade(f->next, bb); } @@ -197,12 +197,18 @@ h2_task *h2_task_create(long session_id, const h2_request *req, task->request = req; task->input_eos = !req->body; task->ser_headers = req->serialize; + task->blocking = 1; h2_ctx_create_for(c, task); return task; } +void h2_task_set_io_blocking(h2_task *task, int blocking) +{ + task->blocking = blocking; +} + apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond) { apr_status_t status; @@ -212,6 +218,8 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond) task->input = h2_task_input_create(task, task->c); task->output = h2_task_output_create(task, task->c); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + "h2_task(%s): process connection", task->id); ap_process_connection(task->c, ap_get_conn_socket(task->c)); if (task->frozen) { @@ -236,6 +244,8 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c) conn_state_t *cs = c->cs; request_rec *r; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "h2_task(%s): create request_rec", task->id); r = h2_request_create_rec(req, c); if (r && (r->status == HTTP_OK)) { ap_update_child_status(c->sbh, SERVER_BUSY_READ, r); @@ -264,6 +274,15 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c) cs->state = CONN_STATE_WRITE_COMPLETION; r = NULL; } + else if (!r) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "h2_task(%s): create request_rec failed, r=NULL", task->id); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "h2_task(%s): create request_rec failed, r->status=%d", + task->id, r->status); + } c->sbh = NULL; return APR_SUCCESS; @@ -297,7 +316,7 @@ apr_status_t h2_task_freeze(h2_task *task, request_rec *r) conn_rec *c = task->c; task->frozen = 1; - task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc); + task->output->frozen_bb = apr_brigade_create(c->pool, c->bucket_alloc); ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, "h2_task(%s), frozen", task->id); diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index 24bde946f3..462b2566a6 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -58,12 +58,11 @@ struct h2_task { unsigned int input_eos : 1; unsigned int ser_headers : 1; unsigned int frozen : 1; + unsigned int blocking : 1; struct h2_task_input *input; struct h2_task_output *output; struct apr_thread_cond_t *io; /* used to wait for events on */ - - apr_bucket_brigade *frozen_out; }; h2_task *h2_task_create(long session_id, const struct h2_request *req, @@ -83,4 +82,6 @@ extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out apr_status_t h2_task_freeze(h2_task *task, request_rec *r); apr_status_t h2_task_thaw(h2_task *task); +void h2_task_set_io_blocking(h2_task *task, int blocking); + #endif /* defined(__mod_h2__h2_task__) */ diff --git a/modules/http2/h2_task_output.c b/modules/http2/h2_task_output.c index 1a2bd86331..f1f52c005b 100644 --- a/modules/http2/h2_task_output.c +++ b/modules/http2/h2_task_output.c @@ -77,14 +77,14 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f, if (f) { /* This happens currently when ap_die(status, r) is invoked * by a read request filter. */ - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03204) + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204) "h2_task_output(%s): write without response by %s " "for %s %s %s", output->task->id, caller, output->task->request->method, output->task->request->authority, output->task->request->path); - f->c->aborted = 1; + output->c->aborted = 1; } if (output->task->io) { apr_thread_cond_broadcast(output->task->io); @@ -94,37 +94,48 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f, if (h2_task_logio_add_bytes_out) { /* counter headers as if we'd do a HTTP/1.1 serialization */ - /* TODO: counter a virtual status line? */ - apr_off_t bytes_written; - apr_brigade_length(bb, 0, &bytes_written); - bytes_written += h2_util_table_bytes(response->headers, 3)+1; - h2_task_logio_add_bytes_out(f->c, bytes_written); + output->written = h2_util_table_bytes(response->headers, 3)+1; + h2_task_logio_add_bytes_out(output->c, output->written); } get_trailers(output); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03348) - "h2_task_output(%s): open as needed %s %s %s", + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348) + "h2_task(%s): open response to %s %s %s", output->task->id, output->task->request->method, output->task->request->authority, output->task->request->path); return h2_mplx_out_open(output->task->mplx, output->task->stream_id, response, f, bb, output->task->io); } - return APR_EOF; + return APR_SUCCESS; } -void h2_task_output_close(h2_task_output *output) +static apr_status_t write_brigade_raw(h2_task_output *output, + ap_filter_t* f, apr_bucket_brigade* bb) { - open_if_needed(output, NULL, NULL, "close"); - if (output->state != H2_TASK_OUT_DONE) { - if (output->task->frozen_out - && !APR_BRIGADE_EMPTY(output->task->frozen_out)) { - h2_mplx_out_write(output->task->mplx, output->task->stream_id, - NULL, output->task->frozen_out, NULL, NULL); + apr_off_t written, left; + apr_status_t status; + + apr_brigade_length(bb, 0, &written); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c, + "h2_task(%s): write response body (%ld bytes)", + output->task->id, (long)written); + + status = h2_mplx_out_write(output->task->mplx, output->task->stream_id, + f, output->task->blocking, bb, + get_trailers(output), output->task->io); + if (status == APR_INCOMPLETE) { + apr_brigade_length(bb, 0, &left); + written -= left; + status = APR_SUCCESS; + } + + if (status == APR_SUCCESS) { + output->written += written; + if (h2_task_logio_add_bytes_out) { + h2_task_logio_add_bytes_out(output->c, written); } - h2_mplx_out_close(output->task->mplx, output->task->stream_id, - get_trailers(output)); - output->state = H2_TASK_OUT_DONE; } + return status; } /* Bring the data from the brigade (which represents the result of the @@ -137,34 +148,57 @@ apr_status_t h2_task_output_write(h2_task_output *output, apr_status_t status; if (APR_BRIGADE_EMPTY(bb)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_task_output(%s): empty write", output->task->id); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c, + "h2_task(%s): empty write", output->task->id); return APR_SUCCESS; } if (output->task->frozen) { h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2, "frozen task output write", bb); - return ap_save_brigade(f, &output->task->frozen_out, &bb, - output->c->pool); + return ap_save_brigade(f, &output->frozen_bb, &bb, output->c->pool); } status = open_if_needed(output, f, bb, "write"); - if (status != APR_EOF) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_task_output(%s): opened and passed brigade", + + /* Attempt to write saved brigade first */ + if (status == APR_SUCCESS && output->bb + && !APR_BRIGADE_EMPTY(output->bb)) { + status = write_brigade_raw(output, f, output->bb); + } + + /* If there is nothing saved (anymore), try to write the brigade passed */ + if (status == APR_SUCCESS + && (!output->bb || APR_BRIGADE_EMPTY(output->bb)) + && !APR_BRIGADE_EMPTY(bb)) { + status = write_brigade_raw(output, f, bb); + } + + /* If the passed brigade is not empty, save it before return */ + if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, output->c, + "h2_task(%s): could not write all, saving brigade", output->task->id); - return status; + if (!output->bb) { + output->bb = apr_brigade_create(output->c->pool, output->c->bucket_alloc); + } + return ap_save_brigade(f, &output->bb, &bb, output->c->pool); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_task_output(%s): write brigade", output->task->id); - if (h2_task_logio_add_bytes_out) { - apr_off_t bytes_written; - apr_brigade_length(bb, 0, &bytes_written); - h2_task_logio_add_bytes_out(f->c, bytes_written); + return status; +} + +void h2_task_output_close(h2_task_output *output) +{ + open_if_needed(output, NULL, NULL, "close"); + if (output->state != H2_TASK_OUT_DONE) { + if (output->frozen_bb && !APR_BRIGADE_EMPTY(output->frozen_bb)) { + h2_mplx_out_write(output->task->mplx, output->task->stream_id, + NULL, 1, output->frozen_bb, NULL, NULL); + } + h2_mplx_out_close(output->task->mplx, output->task->stream_id, + get_trailers(output)); + output->state = H2_TASK_OUT_DONE; } - return h2_mplx_out_write(output->task->mplx, output->task->stream_id, - f, bb, get_trailers(output), output->task->io); } diff --git a/modules/http2/h2_task_output.h b/modules/http2/h2_task_output.h index aa719cdeea..26326f0908 100644 --- a/modules/http2/h2_task_output.h +++ b/modules/http2/h2_task_output.h @@ -30,16 +30,21 @@ typedef enum { H2_TASK_OUT_INIT, H2_TASK_OUT_STARTED, H2_TASK_OUT_DONE, -} h2_task_output_state_t; +} h2_task_out_state_t; typedef struct h2_task_output h2_task_output; struct h2_task_output { conn_rec *c; struct h2_task *task; - h2_task_output_state_t state; + h2_task_out_state_t state; struct h2_from_h1 *from_h1; + unsigned int trailers_passed : 1; + + apr_off_t written; + apr_bucket_brigade *bb; + apr_bucket_brigade *frozen_bb; }; h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c); diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index 0cc90f9ed1..7c3605a127 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.3.3-DEV" +#define MOD_HTTP2_VERSION "1.4.0-DEV" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010303 +#define MOD_HTTP2_VERSION_NUM 0x010400 #endif /* mod_h2_h2_version_h */ diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c index 87cac2f678..5216523022 100644 --- a/modules/http2/mod_http2.c +++ b/modules/http2/mod_http2.c @@ -128,28 +128,29 @@ static char *http2_var_lookup(apr_pool_t *, server_rec *, conn_rec *, request_rec *, char *name); static int http2_is_h2(conn_rec *); -static apr_status_t http2_req_engine_push(const char *engine_type, +static apr_status_t http2_req_engine_push(const char *ngn_type, request_rec *r, h2_req_engine_init *einit) { - return h2_mplx_engine_push(engine_type, r, einit); + return h2_mplx_req_engine_push(ngn_type, r, einit); } -static apr_status_t http2_req_engine_pull(h2_req_engine *engine, +static apr_status_t http2_req_engine_pull(h2_req_engine *ngn, apr_read_type_e block, + apr_uint32_t capacity, request_rec **pr) { - return h2_mplx_engine_pull(engine, block, pr); + return h2_mplx_req_engine_pull(ngn, block, capacity, pr); } -static void http2_req_engine_done(h2_req_engine *engine, conn_rec *r_conn) +static void http2_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn) { - h2_mplx_engine_done(engine, r_conn); + h2_mplx_req_engine_done(ngn, r_conn); } -static void http2_req_engine_exit(h2_req_engine *engine) +static void http2_req_engine_exit(h2_req_engine *ngn) { - h2_mplx_engine_exit(engine); + h2_mplx_req_engine_exit(ngn); } diff --git a/modules/http2/mod_http2.h b/modules/http2/mod_http2.h index ae13529310..d5af1d3ce2 100644 --- a/modules/http2/mod_http2.h +++ b/modules/http2/mod_http2.h @@ -43,27 +43,12 @@ typedef struct h2_req_engine h2_req_engine; * @param engine the allocated, partially filled structure * @param r the first request to process, or NULL */ -typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, request_rec *r); - -/** - * The public structure of a h2_req_engine. It gets allocated by the http2 - * infrastructure, assigned id, type, pool, io and connection and passed to the - * h2_req_engine_init() callback to complete initialization. - * This happens whenever a new request gets "push"ed for an engine type and - * no instance, or no free instance, for the type is available. - */ -struct h2_req_engine { - const char *id; /* identifier */ - apr_pool_t *pool; /* pool for engine specific allocations */ - const char *type; /* name of the engine type */ - unsigned char window_bits;/* preferred size of overall response data - * mod_http2 is willing to buffer as log2 */ - unsigned char req_window_bits;/* preferred size of response body data - * mod_http2 is willing to buffer per request, - * as log2 */ - apr_size_t capacity; /* maximum concurrent requests */ - void *user_data; /* user specific data */ -}; +typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, + const char *id, + const char *type, + apr_pool_t *pool, + apr_uint32_t req_buffer_size, + request_rec *r); /** * Push a request to an engine with the specified name for further processing. @@ -95,6 +80,7 @@ APR_DECLARE_OPTIONAL_FN(apr_status_t, APR_DECLARE_OPTIONAL_FN(apr_status_t, http2_req_engine_pull, (h2_req_engine *engine, apr_read_type_e block, + apr_uint32_t capacity, request_rec **pr)); APR_DECLARE_OPTIONAL_FN(void, http2_req_engine_done, (h2_req_engine *engine, diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index 3349c12012..c5f827f29a 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -44,7 +44,9 @@ static int (*is_h2)(conn_rec *c); static apr_status_t (*req_engine_push)(const char *name, request_rec *r, h2_req_engine_init *einit); static apr_status_t (*req_engine_pull)(h2_req_engine *engine, - apr_read_type_e block, request_rec **pr); + apr_read_type_e block, + apr_uint32_t capacity, + request_rec **pr); static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn); static void (*req_engine_exit)(h2_req_engine *engine); @@ -59,9 +61,16 @@ typedef struct h2_proxy_ctx { proxy_server_conf *conf; h2_req_engine *engine; + const char *engine_id; + const char *engine_type; + apr_pool_t *engine_pool; + apr_uint32_t req_buffer_size; + unsigned standalone : 1; unsigned is_ssl : 1; unsigned flushall : 1; + + apr_status_t r_status; /* status of our first request work */ } h2_proxy_ctx; static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog, @@ -189,13 +198,21 @@ static int proxy_http2_canon(request_rec *r, char *url) return OK; } -static apr_status_t proxy_engine_init(h2_req_engine *engine, request_rec *r) +static apr_status_t proxy_engine_init(h2_req_engine *engine, + const char *id, + const char *type, + apr_pool_t *pool, + apr_uint32_t req_buffer_size, + request_rec *r) { h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config, &proxy_http2_module); if (ctx) { - engine->capacity = 100; /* guess until we know */ ctx->engine = engine; + ctx->engine_id = id; + ctx->engine_type = type; + ctx->engine_pool = pool; + ctx->req_buffer_size = req_buffer_size; return APR_SUCCESS; } ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, @@ -225,28 +242,34 @@ static void request_done(h2_proxy_session *session, request_rec *r) { h2_proxy_ctx *ctx = session->user_data; - if (req_engine_done && r != ctx->rbase) { + if (r == ctx->rbase) { + ctx->r_status = APR_SUCCESS; + } + else if (req_engine_done && ctx->engine) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, "h2_proxy_session(%s): request %s", - ctx->engine->id, r->the_request); + ctx->engine_id, r->the_request); req_engine_done(ctx->engine, r->connection); } + } -static request_rec *next_request(h2_proxy_ctx *ctx, h2_proxy_session *session, - request_rec *r, int before_leave) +static apr_status_t next_request(h2_proxy_ctx *ctx, h2_proxy_session *session, + request_rec *r, int before_leave, + request_rec **pr) { - if (!r && !ctx->standalone) { - ctx->engine->capacity = session->remote_max_concurrent; - if (req_engine_pull(ctx->engine, - before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ, - &r) == APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_proxy_session(%s): pulled request %s", - session->id, r->the_request); - } - } - return r; + *pr = r; + if (!r && ctx->engine) { + apr_status_t status; + status = req_engine_pull(ctx->engine, + before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ, + H2MAX(1, session->remote_max_concurrent), pr); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, + "h2_proxy_session(%s): pulled request %s", + session->id, (*pr? (*pr)->the_request : "NULL")); + return status; + } + return *pr? APR_SUCCESS : APR_EAGAIN; } static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) { @@ -255,7 +278,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) { setup_backend: ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, - "eng(%s): setup backend", ctx->engine->id); + "eng(%s): setup backend", ctx->engine_id); /* Step Two: Make the Connection (or check that an already existing * socket is still usable). On success, we have a socket connected to * backend->hostname. */ @@ -299,10 +322,9 @@ setup_backend: * loop until we got the response or encounter errors. */ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, - "eng(%s): setup session", ctx->engine->id); - session = h2_proxy_session_setup(ctx->engine->id, ctx->p_conn, ctx->conf, - ctx->engine->window_bits, - ctx->engine->req_window_bits, + "eng(%s): setup session", ctx->engine_id); + session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf, + 30, h2_log2(ctx->req_buffer_size), request_done); if (!session) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection, @@ -312,27 +334,32 @@ setup_backend: run_session: ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, - "eng(%s): run session %s", ctx->engine->id, session->id); + "eng(%s): run session %s", ctx->engine_id, session->id); session->user_data = ctx; status = h2_proxy_session_process(session); while (APR_STATUS_IS_EAGAIN(status)) { - r = next_request(ctx, session, r, 0); - if (r) { + status = next_request(ctx, session, r, 0, &r); + if (status == APR_SUCCESS) { add_request(session, r); r = NULL; } + else if (!APR_STATUS_IS_EAGAIN(status)) { + break; + } status = h2_proxy_session_process(session); } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, ctx->owner, - "eng(%s): end of session run", ctx->engine->id); + "eng(%s): end of session run", ctx->engine_id); if (session->state == H2_PROXYS_ST_DONE || status != APR_SUCCESS) { ctx->p_conn->close = 1; } - r = next_request(ctx, session, r, 1); - if (r) { + if (status == APR_SUCCESS) { + status = next_request(ctx, session, r, 1, &r); + } + if (status == APR_SUCCESS) { if (ctx->p_conn->close) { /* the connection is/willbe closed, the session is terminated. * Any open stream of that session needs to @@ -355,7 +382,8 @@ run_session: } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, ctx->p_conn->connection, "eng(%s): session run done", - ctx->engine->id); + ctx->engine_id); + session->user_data = NULL; return status; } @@ -412,6 +440,7 @@ static int proxy_http2_handler(request_rec *r, ctx->worker = worker; ctx->conf = conf; ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0; + ctx->r_status = HTTP_SERVICE_UNAVAILABLE; ap_set_module_config(c->conn_config, &proxy_http2_module, ctx); apr_table_setn(r->notes, H2_PROXY_REQ_URL_NOTE, url); @@ -460,6 +489,7 @@ static int proxy_http2_handler(request_rec *r, ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, "H2: pushed request %s to engine type %s", url, engine_type); + ctx->r_status = APR_SUCCESS; goto cleanup; } } @@ -467,14 +497,10 @@ static int proxy_http2_handler(request_rec *r, if (!ctx->engine) { /* No engine was available or has been initialized, handle this * request just by ourself. */ - h2_req_engine *engine = apr_pcalloc(p, sizeof(*engine)); - engine->id = apr_psprintf(p, "eng-proxy-%ld", c->id); - engine->type = engine_type; - engine->pool = p; - engine->capacity = 1; - engine->window_bits = 30; - engine->req_window_bits = 16; - ctx->engine = engine; + ctx->engine_id = apr_psprintf(p, "eng-proxy-%ld", c->id); + ctx->engine_type = engine_type; + ctx->engine_pool = p; + ctx->req_buffer_size = (32*1024); ctx->standalone = 1; ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, "h2_proxy_http2(%ld): setup standalone engine for type %s", @@ -482,16 +508,16 @@ static int proxy_http2_handler(request_rec *r, } else { ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, - "H2: hosting engine %s for request %s", ctx->engine->id, url); + "H2: hosting engine %s for request %s", ctx->engine_id, url); } status = proxy_engine_run(ctx, r); cleanup: - if (!ctx->standalone && ctx->engine && req_engine_exit) { + if (ctx->engine && req_engine_exit) { req_engine_exit(ctx->engine); + ctx->engine = NULL; } - ctx->engine = NULL; if (ctx) { if (ctx->p_conn) { @@ -507,7 +533,7 @@ cleanup: ctx->p_conn = NULL; } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, "leaving handler"); - return status; + return ctx->r_status; } static void register_hook(apr_pool_t *p) |