[prev in list] [next in list] [prev in thread] [next in thread]
List: apache-cvs
Subject: svn commit: r1688474 [5/21] - in /httpd/httpd/trunk/modules/http2: ./ m4/ mod-h2.xcodeproj/ mod-h2.x
From: jim () apache ! org
Date: 2015-06-30 15:26:19
Message-ID: 20150630152622.34FB7AC08E0 () hades ! apache ! org
[Download RAW message or body]
Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.c?rev=1688474&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.c (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.c Tue Jun 30 15:26:16 2015
@@ -0,0 +1,616 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stdio.h>
+
+#include <apr_lib.h>
+#include <apr_strings.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_log.h>
+#include <http_connection.h>
+#include <http_protocol.h>
+#include <http_request.h>
+#include <util_time.h>
+
+#include "h2_private.h"
+#include "h2_response.h"
+#include "h2_from_h1.h"
+#include "h2_task.h"
+#include "h2_task_output.h"
+#include "h2_util.h"
+
+
+static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state);
+
+h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool)
+{
+ h2_from_h1 *from_h1 = apr_pcalloc(pool, sizeof(h2_from_h1));
+ if (from_h1) {
+ from_h1->stream_id = stream_id;
+ from_h1->pool = pool;
+ from_h1->state = H2_RESP_ST_STATUS_LINE;
+ from_h1->hlines = apr_array_make(pool, 10, sizeof(char *));
+ }
+ return from_h1;
+}
+
+apr_status_t h2_from_h1_destroy(h2_from_h1 *from_h1)
+{
+ if (from_h1->response) {
+ h2_response_destroy(from_h1->response);
+ from_h1->response = NULL;
+ }
+ from_h1->bb = NULL;
+ return APR_SUCCESS;
+}
+
+h2_from_h1_state_t h2_from_h1_get_state(h2_from_h1 *from_h1)
+{
+ return from_h1->state;
+}
+
+static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state)
+{
+ if (from_h1->state != state) {
+ from_h1->state = state;
+ }
+}
+
+h2_response *h2_from_h1_get_response(h2_from_h1 *from_h1)
+{
+ return from_h1->response;
+}
+
+static apr_status_t make_h2_headers(h2_from_h1 *from_h1, request_rec *r)
+{
+ from_h1->response = h2_response_create(from_h1->stream_id,
+ from_h1->status, from_h1->hlines,
+ from_h1->pool);
+ if (from_h1->response == NULL) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EINVAL, r->connection,
+ "h2_from_h1(%d): unable to create resp_head",
+ from_h1->stream_id);
+ return APR_EINVAL;
+ }
+ from_h1->content_length = from_h1->response->content_length;
+ from_h1->chunked = r->chunked;
+
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
+ "h2_from_h1(%d): converted headers, content-length: %d"
+ ", chunked=%d",
+ from_h1->stream_id, (int)from_h1->content_length,
+ (int)from_h1->chunked);
+
+ set_state(from_h1, ((from_h1->chunked || from_h1->content_length > 0)?
+ H2_RESP_ST_BODY : H2_RESP_ST_DONE));
+ /* We are ready to be sent to the client */
+ return APR_SUCCESS;
+}
+
+static apr_status_t parse_header(h2_from_h1 *from_h1, ap_filter_t* f,
+ char *line) {
+ (void)f;
+
+ if (line[0] == ' ' || line[0] == '\t') {
+ /* continuation line from the header before this */
+ while (line[0] == ' ' || line[0] == '\t') {
+ ++line;
+ }
+
+ char **plast = apr_array_pop(from_h1->hlines);
+ if (plast == NULL) {
+ /* not well formed */
+ return APR_EINVAL;
+ }
+ APR_ARRAY_PUSH(from_h1->hlines, const char*) = apr_psprintf(from_h1->pool, \
"%s %s", *plast, line); + }
+ else {
+ /* new header line */
+ APR_ARRAY_PUSH(from_h1->hlines, const char*) = apr_pstrdup(from_h1->pool, \
line); + }
+ return APR_SUCCESS;
+}
+
+static apr_status_t get_line(h2_from_h1 *from_h1, apr_bucket_brigade *bb,
+ ap_filter_t* f, char *line, apr_size_t len)
+{
+ if (!from_h1->bb) {
+ from_h1->bb = apr_brigade_create(from_h1->pool, f->c->bucket_alloc);
+ }
+ else {
+ apr_brigade_cleanup(from_h1->bb);
+ }
+ apr_status_t status = apr_brigade_split_line(from_h1->bb, bb,
+ APR_BLOCK_READ,
+ HUGE_STRING_LEN);
+ if (status == APR_SUCCESS) {
+ --len;
+ status = apr_brigade_flatten(from_h1->bb, line, &len);
+ if (status == APR_SUCCESS) {
+ /* we assume a non-0 containing line and remove
+ * trailing crlf. */
+ line[len] = '\0';
+ if (len >= 2 && !strcmp(H2_CRLF, line + len - 2)) {
+ len -= 2;
+ line[len] = '\0';
+ }
+
+ apr_brigade_cleanup(from_h1->bb);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_from_h1(%d): read line: %s",
+ from_h1->stream_id, line);
+ }
+ }
+ return status;
+}
+
+apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1, ap_filter_t* f,
+ apr_bucket_brigade* bb)
+{
+ apr_status_t status = APR_SUCCESS;
+ char line[HUGE_STRING_LEN];
+
+ if ((from_h1->state == H2_RESP_ST_BODY)
+ || (from_h1->state == H2_RESP_ST_DONE)) {
+ if (from_h1->chunked) {
+ /* The httpd core HTTP_HEADER filter has or will install the
+ * "CHUNK" output transcode filter, which appears further down
+ * the filter chain. We do not want it for HTTP/2.
+ * Once we successfully deinstalled it, this filter has no
+ * further function and we remove it.
+ */
+ status = ap_remove_output_filter_byhandle(f->r->output_filters,
+ "CHUNK");
+ if (status == APR_SUCCESS) {
+ ap_remove_output_filter(f);
+ }
+ }
+
+ return ap_pass_brigade(f->next, bb);
+ }
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_from_h1(%d): read_response", from_h1->stream_id);
+
+ while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
+
+ switch (from_h1->state) {
+
+ case H2_RESP_ST_STATUS_LINE:
+ case H2_RESP_ST_HEADERS:
+ status = get_line(from_h1, bb, f, line, sizeof(line));
+ if (status != APR_SUCCESS) {
+ return status;
+ }
+ if (from_h1->state == H2_RESP_ST_STATUS_LINE) {
+ /* instead of parsing, just take it directly */
+ from_h1->status = apr_psprintf(from_h1->pool,
+ "%d", f->r->status);
+ from_h1->state = H2_RESP_ST_HEADERS;
+ }
+ else if (line[0] == '\0') {
+ /* end of headers, create the h2_response and
+ * pass the rest of the brigade down the filter
+ * chain.
+ */
+ status = make_h2_headers(from_h1, f->r);
+ if (from_h1->bb) {
+ apr_brigade_destroy(from_h1->bb);
+ from_h1->bb = NULL;
+ }
+ if (!APR_BRIGADE_EMPTY(bb)) {
+ return ap_pass_brigade(f->next, bb);
+ }
+ }
+ else {
+ status = parse_header(from_h1, f, line);
+ }
+ break;
+
+ default:
+ return ap_pass_brigade(f->next, bb);
+ }
+
+ }
+
+ return status;
+}
+
+/* This routine is called by apr_table_do and merges all instances of
+ * the passed field values into a single array that will be further
+ * processed by some later routine. Originally intended to help split
+ * and recombine multiple Vary fields, though it is generic to any field
+ * consisting of comma/space-separated tokens.
+ */
+static int uniq_field_values(void *d, const char *key, const char *val)
+{
+ apr_array_header_t *values;
+ char *start;
+ char *e;
+ char **strpp;
+ int i;
+
+ (void)key;
+ values = (apr_array_header_t *)d;
+
+ e = apr_pstrdup(values->pool, val);
+
+ do {
+ /* Find a non-empty fieldname */
+
+ while (*e == ',' || apr_isspace(*e)) {
+ ++e;
+ }
+ if (*e == '\0') {
+ break;
+ }
+ start = e;
+ while (*e != '\0' && *e != ',' && !apr_isspace(*e)) {
+ ++e;
+ }
+ if (*e != '\0') {
+ *e++ = '\0';
+ }
+
+ /* Now add it to values if it isn't already represented.
+ * Could be replaced by a ap_array_strcasecmp() if we had one.
+ */
+ for (i = 0, strpp = (char **) values->elts; i < values->nelts;
+ ++i, ++strpp) {
+ if (*strpp && strcasecmp(*strpp, start) == 0) {
+ break;
+ }
+ }
+ if (i == values->nelts) { /* if not found */
+ *(char **)apr_array_push(values) = start;
+ }
+ } while (*e != '\0');
+
+ return 1;
+}
+
+/*
+ * Since some clients choke violently on multiple Vary fields, or
+ * Vary fields with duplicate tokens, combine any multiples and remove
+ * any duplicates.
+ */
+static void fix_vary(request_rec *r)
+{
+ apr_array_header_t *varies;
+
+ varies = apr_array_make(r->pool, 5, sizeof(char *));
+
+ /* Extract all Vary fields from the headers_out, separate each into
+ * its comma-separated fieldname values, and then add them to varies
+ * if not already present in the array.
+ */
+ apr_table_do((int (*)(void *, const char *, const char *))uniq_field_values,
+ (void *) varies, r->headers_out, "Vary", NULL);
+
+ /* If we found any, replace old Vary fields with unique-ified value */
+
+ if (varies->nelts > 0) {
+ apr_table_setn(r->headers_out, "Vary",
+ apr_array_pstrcat(r->pool, varies, ','));
+ }
+}
+
+/* Confirm that the status line is well-formed and matches r->status.
+ * If they don't match, a filter may have negated the status line set by a
+ * handler.
+ * Zap r->status_line if bad.
+ */
+static apr_status_t validate_status_line(request_rec *r)
+{
+ char *end;
+
+ if (r->status_line) {
+ apr_size_t len = strlen(r->status_line);
+ if (len < 3
+ || apr_strtoi64(r->status_line, &end, 10) != r->status
+ || (end - 3) != r->status_line
+ || (len >= 4 && ! apr_isspace(r->status_line[3]))) {
+ r->status_line = NULL;
+ return APR_EGENERAL;
+ }
+ /* Since we passed the above check, we know that length three
+ * is equivalent to only a 3 digit numeric http status.
+ * RFC2616 mandates a trailing space, let's add it.
+ */
+ if (len == 3) {
+ r->status_line = apr_pstrcat(r->pool, r->status_line, " ", NULL);
+ return APR_EGENERAL;
+ }
+ return APR_SUCCESS;
+ }
+ return APR_EGENERAL;
+}
+
+static void set_basic_http_header(request_rec *r, apr_table_t *headers)
+{
+ char *date = NULL;
+ const char *proxy_date = NULL;
+ const char *server = NULL;
+ const char *us = ap_get_server_banner();
+
+ /*
+ * keep the set-by-proxy server and date headers, otherwise
+ * generate a new server header / date header
+ */
+ if (r->proxyreq != PROXYREQ_NONE) {
+ proxy_date = apr_table_get(r->headers_out, "Date");
+ if (!proxy_date) {
+ /*
+ * proxy_date needs to be const. So use date for the creation of
+ * our own Date header and pass it over to proxy_date later to
+ * avoid a compiler warning.
+ */
+ date = apr_palloc(r->pool, APR_RFC822_DATE_LEN);
+ ap_recent_rfc822_date(date, r->request_time);
+ }
+ server = apr_table_get(r->headers_out, "Server");
+ }
+ else {
+ date = apr_palloc(r->pool, APR_RFC822_DATE_LEN);
+ ap_recent_rfc822_date(date, r->request_time);
+ }
+
+ apr_table_setn(headers, "Date", proxy_date ? proxy_date : date );
+ apr_table_unset(r->headers_out, "Date");
+
+ if (!server && *us) {
+ server = us;
+ }
+ if (server) {
+ apr_table_setn(headers, "Server", server);
+ apr_table_unset(r->headers_out, "Server");
+ }
+}
+
+static int copy_header(void *ctx, const char *name, const char *value)
+{
+ apr_table_t *headers = ctx;
+
+ apr_table_addn(headers, name, value);
+ return 1;
+}
+
+static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r)
+{
+ apr_status_t status = APR_SUCCESS;
+ const char *clheader;
+ const char *ctype;
+ /*
+ * Now that we are ready to send a response, we need to combine the two
+ * header field tables into a single table. If we don't do this, our
+ * later attempts to set or unset a given fieldname might be bypassed.
+ */
+ if (!apr_is_empty_table(r->err_headers_out)) {
+ r->headers_out = apr_table_overlay(r->pool, r->err_headers_out,
+ r->headers_out);
+ }
+
+ /*
+ * Remove the 'Vary' header field if the client can't handle it.
+ * Since this will have nasty effects on HTTP/1.1 caches, force
+ * the response into HTTP/1.0 mode.
+ */
+ if (apr_table_get(r->subprocess_env, "force-no-vary") != NULL) {
+ apr_table_unset(r->headers_out, "Vary");
+ r->proto_num = HTTP_VERSION(1,0);
+ apr_table_setn(r->subprocess_env, "force-response-1.0", "1");
+ }
+ else {
+ fix_vary(r);
+ }
+
+ /*
+ * Now remove any ETag response header field if earlier processing
+ * says so (such as a 'FileETag None' directive).
+ */
+ if (apr_table_get(r->notes, "no-etag") != NULL) {
+ apr_table_unset(r->headers_out, "ETag");
+ }
+
+ /* determine the protocol and whether we should use keepalives. */
+ status = validate_status_line(r);
+ if (!r->status_line) {
+ r->status_line = ap_get_status_line(r->status);
+ }
+ else if (status != APR_SUCCESS) {
+ /* Status line is OK but our own reason phrase
+ * would be preferred if defined
+ */
+ const char *tmp = ap_get_status_line(r->status);
+ if (!strncmp(tmp, r->status_line, 3)) {
+ r->status_line = tmp;
+ }
+ }
+
+ if (r->chunked) {
+ apr_table_unset(r->headers_out, "Content-Length");
+ }
+
+ ctype = ap_make_content_type(r, r->content_type);
+ if (ctype) {
+ apr_table_setn(r->headers_out, "Content-Type", ctype);
+ }
+
+ if (r->content_encoding) {
+ apr_table_setn(r->headers_out, "Content-Encoding",
+ r->content_encoding);
+ }
+
+ if (!apr_is_empty_array(r->content_languages)) {
+ int i;
+ char *token;
+ char **languages = (char **)(r->content_languages->elts);
+ const char *field = apr_table_get(r->headers_out, "Content-Language");
+
+ while (field && (token = ap_get_list_item(r->pool, &field)) != NULL) {
+ for (i = 0; i < r->content_languages->nelts; ++i) {
+ if (!strcasecmp(token, languages[i]))
+ break;
+ }
+ if (i == r->content_languages->nelts) {
+ *((char **) apr_array_push(r->content_languages)) = token;
+ }
+ }
+
+ field = apr_array_pstrcat(r->pool, r->content_languages, ',');
+ apr_table_setn(r->headers_out, "Content-Language", field);
+ }
+
+ /*
+ * Control cachability for non-cachable responses if not already set by
+ * some other part of the server configuration.
+ */
+ if (r->no_cache && !apr_table_get(r->headers_out, "Expires")) {
+ char *date = apr_palloc(r->pool, APR_RFC822_DATE_LEN);
+ ap_recent_rfc822_date(date, r->request_time);
+ apr_table_addn(r->headers_out, "Expires", date);
+ }
+
+ /* This is a hack, but I can't find anyway around it. The idea is that
+ * we don't want to send out 0 Content-Lengths if it is a head request.
+ * This happens when modules try to outsmart the server, and return
+ * if they see a HEAD request. Apache 1.3 handlers were supposed to
+ * just return in that situation, and the core handled the HEAD. In
+ * 2.0, if a handler returns, then the core sends an EOS bucket down
+ * the filter stack, and the content-length filter computes a C-L of
+ * zero and that gets put in the headers, and we end up sending a
+ * zero C-L to the client. We can't just remove the C-L filter,
+ * because well behaved 2.0 handlers will send their data down the stack,
+ * and we will compute a real C-L for the head request. RBB
+ */
+ if (r->header_only
+ && (clheader = apr_table_get(r->headers_out, "Content-Length"))
+ && !strcmp(clheader, "0")) {
+ apr_table_unset(r->headers_out, "Content-Length");
+ }
+
+ apr_table_t *headers = apr_table_make(r->pool, 10);
+
+ set_basic_http_header(r, headers);
+ if (r->status == HTTP_NOT_MODIFIED) {
+ apr_table_do((int (*)(void *, const char *, const char *)) copy_header,
+ (void *) headers, r->headers_out,
+ "ETag",
+ "Content-Location",
+ "Expires",
+ "Cache-Control",
+ "Vary",
+ "Warning",
+ "WWW-Authenticate",
+ "Proxy-Authenticate",
+ "Set-Cookie",
+ "Set-Cookie2",
+ NULL);
+ }
+ else {
+ apr_table_do((int (*)(void *, const char *, const char *)) copy_header,
+ (void *) headers, r->headers_out, NULL);
+ }
+
+ return h2_response_rcreate(from_h1->stream_id, r, headers, r->pool);
+}
+
+apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
+{
+ h2_task_env *env = f->ctx;
+ h2_from_h1 *from_h1 = env->output? env->output->from_h1 : NULL;
+ request_rec *r = f->r;
+ apr_bucket *b;
+ ap_bucket_error *eb = NULL;
+
+ AP_DEBUG_ASSERT(from_h1 != NULL);
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_from_h1(%d): output_filter called", from_h1->stream_id);
+
+ if (r->header_only && env->output && from_h1->response) {
+ /* throw away any data after we have compiled the response */
+ apr_brigade_cleanup(bb);
+ return OK;
+ }
+
+ for (b = APR_BRIGADE_FIRST(bb);
+ b != APR_BRIGADE_SENTINEL(bb);
+ b = APR_BUCKET_NEXT(b))
+ {
+ if (AP_BUCKET_IS_ERROR(b) && !eb) {
+ eb = b->data;
+ continue;
+ }
+ /*
+ * If we see an EOC bucket it is a signal that we should get out
+ * of the way doing nothing.
+ */
+ if (AP_BUCKET_IS_EOC(b)) {
+ ap_remove_output_filter(f);
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, f->c,
+ "h2_from_h1(%d): eoc bucket passed",
+ from_h1->stream_id);
+ return ap_pass_brigade(f->next, bb);
+ }
+ }
+
+ if (eb) {
+ int st = eb->status;
+ apr_brigade_cleanup(bb);
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, f->c,
+ "h2_from_h1(%d): err bucket status=%d",
+ from_h1->stream_id, st);
+ ap_die(st, r);
+ return AP_FILTER_ERROR;
+ }
+
+ from_h1->response = create_response(from_h1, r);
+ if (from_h1->response == NULL) {
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, f->c,
+ "h2_from_h1(%d): unable to create response",
+ from_h1->stream_id);
+ return APR_ENOMEM;
+ }
+
+ if (r->header_only) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_from_h1(%d): header_only, cleanup output brigade",
+ from_h1->stream_id);
+ apr_brigade_cleanup(bb);
+ return OK;
+ }
+
+ r->sent_bodyct = 1; /* Whatever follows is real body stuff... */
+
+ ap_remove_output_filter(f);
+ if (APLOGctrace1(f->c)) {
+ apr_off_t len = 0;
+ apr_brigade_length(bb, 0, &len);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_from_h1(%d): removed header filter, passing brigade "
+ "len=%ld", from_h1->stream_id, (long)len);
+ }
+ return ap_pass_brigade(f->next, bb);
+}
+
+void h2_from_h1_die(h2_from_h1 *from_h1, int status, request_rec *r)
+{
+ r->status = status;
+ from_h1->response = create_response(from_h1, r);
+}
Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.h?rev=1688474&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.h (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.h Tue Jun 30 15:26:16 2015
@@ -0,0 +1,84 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_from_h1__
+#define __mod_h2__h2_from_h1__
+
+/**
+ * h2_from_h1 parses a HTTP/1.1 response into
+ * - response status
+ * - a list of header values
+ * - a series of bytes that represent the response body alone, without
+ * any meta data, such as inserted by chunked transfer encoding.
+ *
+ * All data is allocated from the stream memory pool.
+ *
+ * Again, see comments in h2_request: ideally we would take the headers
+ * and status from the httpd structures instead of parsing them here, but
+ * we need to have all handlers and filters involved in request/response
+ * processing, so this seems to be the way for now.
+ */
+
+typedef enum {
+ H2_RESP_ST_STATUS_LINE, /* parsing http/1 status line */
+ H2_RESP_ST_HEADERS, /* parsing http/1 response headers */
+ H2_RESP_ST_BODY, /* transferring response body */
+ H2_RESP_ST_DONE /* complete response converted */
+} h2_from_h1_state_t;
+
+struct h2_response;
+
+typedef struct h2_from_h1 h2_from_h1;
+
+struct h2_from_h1 {
+ int stream_id;
+ h2_from_h1_state_t state;
+ apr_pool_t *pool;
+ apr_bucket_brigade *bb;
+
+ apr_size_t content_length;
+ int chunked;
+
+ const char *status;
+ apr_array_header_t *hlines;
+
+ struct h2_response *response;
+};
+
+
+typedef void h2_from_h1_state_change_cb(struct h2_from_h1 *resp,
+ h2_from_h1_state_t prevstate,
+ void *cb_ctx);
+
+h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool);
+
+apr_status_t h2_from_h1_destroy(h2_from_h1 *response);
+
+void h2_from_h1_set_state_change_cb(h2_from_h1 *from_h1,
+ h2_from_h1_state_change_cb *callback,
+ void *cb_ctx);
+
+apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1,
+ ap_filter_t* f, apr_bucket_brigade* bb);
+
+struct h2_response *h2_from_h1_get_response(h2_from_h1 *from_h1);
+
+void h2_from_h1_die(h2_from_h1 *from_h1, int status, request_rec *r);
+
+h2_from_h1_state_t h2_from_h1_get_state(h2_from_h1 *from_h1);
+
+apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb);
+
+#endif /* defined(__mod_h2__h2_from_h1__) */
Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.c?rev=1688474&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.c (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.c Tue Jun 30 15:26:16 2015
@@ -0,0 +1,222 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+
+#include <apr_strings.h>
+#include <apr_optional.h>
+#include <apr_optional_hooks.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_config.h>
+#include <http_connection.h>
+#include <http_protocol.h>
+#include <http_log.h>
+
+#include "h2_private.h"
+
+#include "h2_stream.h"
+#include "h2_task.h"
+#include "h2_config.h"
+#include "h2_ctx.h"
+#include "h2_conn.h"
+#include "h2_alpn.h"
+#include "h2_h2.h"
+
+const char *h2_alpn_protos[] = {
+ "h2", "h2-16", "h2-14"
+};
+apr_size_t h2_alpn_protos_len = (sizeof(h2_alpn_protos)
+ / sizeof(h2_alpn_protos[0]));
+
+const char *h2_upgrade_protos[] = {
+ "h2c", "h2c-16", "h2c-14",
+};
+apr_size_t h2_upgrade_protos_len = (sizeof(h2_upgrade_protos)
+ / sizeof(h2_upgrade_protos[0]));
+
+const char *H2_MAGIC_TOKEN = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
+
+/*******************************************************************************
+ * The optional mod_ssl functions we need.
+ */
+APR_DECLARE_OPTIONAL_FN(int, ssl_engine_disable, (conn_rec*));
+APR_DECLARE_OPTIONAL_FN(int, ssl_is_https, (conn_rec*));
+
+static int (*opt_ssl_engine_disable)(conn_rec*);
+static int (*opt_ssl_is_https)(conn_rec*);
+/*******************************************************************************
+ * Hooks for processing incoming connections:
+ * - pre_conn_before_tls switches SSL off for stream connections
+ * - process_conn take over connection in case of h2
+ */
+static int h2_h2_process_conn(conn_rec* c);
+static int h2_h2_remove_timeout(conn_rec* c);
+static int h2_h2_post_read_req(request_rec *r);
+
+
+/*******************************************************************************
+ * Once per lifetime init, retrieve optional functions
+ */
+apr_status_t h2_h2_init(apr_pool_t *pool, server_rec *s)
+{
+ (void)pool;
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s, "h2_h2, child_init");
+ opt_ssl_engine_disable = APR_RETRIEVE_OPTIONAL_FN(ssl_engine_disable);
+ opt_ssl_is_https = APR_RETRIEVE_OPTIONAL_FN(ssl_is_https);
+
+ if (!opt_ssl_is_https) {
+ ap_log_error(APLOG_MARK, APLOG_WARNING, 0, s,
+ "mod_ssl does not seem to be enabled");
+ }
+
+ return APR_SUCCESS;
+}
+
+int h2_h2_is_tls(conn_rec *c)
+{
+ return opt_ssl_is_https && opt_ssl_is_https(c);
+}
+
+int h2_tls_disable(conn_rec *c)
+{
+ if (opt_ssl_engine_disable) {
+ return opt_ssl_engine_disable(c);
+ }
+ return 0;
+}
+
+/*******************************************************************************
+ * Register various hooks
+ */
+static const char *const mod_reqtimeout[] = { "reqtimeout.c", NULL};
+
+void h2_h2_register_hooks(void)
+{
+ /* When the connection processing actually starts, we might to
+ * take over, if h2* was selected by ALPN on a TLS connection.
+ */
+ ap_hook_process_connection(h2_h2_process_conn,
+ NULL, NULL, APR_HOOK_FIRST);
+ /* Perform connection cleanup before the actual processing happens.
+ */
+ ap_hook_process_connection(h2_h2_remove_timeout,
+ mod_reqtimeout, NULL, APR_HOOK_LAST);
+
+ ap_hook_post_read_request(h2_h2_post_read_req, NULL, NULL, APR_HOOK_MIDDLE);
+}
+
+int h2_h2_remove_timeout(conn_rec* c)
+{
+ h2_ctx *ctx = h2_ctx_get(c);
+
+ if (h2_ctx_is_task(ctx)) {
+ /* cleanup on task connections */
+ /* we once removed the reqtimeout filter on task connections,
+ * but timeouts here might have been a side effect of other things.
+ * Ideally mod_reqtimeout would do its work on task connections
+ * as it basically is a HTTP/1.1 request/response and it's made
+ * for that.
+ * So, let the filter stay for now and see if we ever encounter
+ * unexpected timeouts on tasks again.
+ */
+ //ap_remove_input_filter_byhandle(c->input_filters, "reqtimeout");
+ }
+ else if (h2_ctx_is_active(ctx)) {
+ /* cleanup on master h2 connections */
+ ap_remove_input_filter_byhandle(c->input_filters, "reqtimeout");
+ }
+
+ return DECLINED;
+}
+
+int h2_h2_process_conn(conn_rec* c)
+{
+ h2_ctx *ctx = h2_ctx_get(c);
+ h2_config *cfg = h2_config_get(c);
+ apr_bucket_brigade* temp;
+
+ if (h2_ctx_is_task(ctx)) {
+ /* out stream pseudo connection */
+ return DECLINED;
+ }
+
+ /* Protocol negoation, if started, may need some speculative reading
+ * to get triggered.
+ */
+ if (h2_ctx_pnego_is_ongoing(ctx)) {
+ temp = apr_brigade_create(c->pool, c->bucket_alloc);
+ ap_get_brigade(c->input_filters, temp,
+ AP_MODE_SPECULATIVE, APR_BLOCK_READ, 1);
+ apr_brigade_destroy(temp);
+ }
+
+ /* If we still do not know the protocol and H2Direct is enabled, check
+ * if we receive the magic PRIamble. A client sending this on connection
+ * start should know what it is doing.
+ */
+ if (!h2_ctx_pnego_is_done(ctx) && h2_config_geti(cfg, H2_CONF_DIRECT)) {
+ apr_status_t status;
+ temp = apr_brigade_create(c->pool, c->bucket_alloc);
+ status = ap_get_brigade(c->input_filters, temp,
+ AP_MODE_SPECULATIVE, APR_BLOCK_READ, 24);
+ if (status == APR_SUCCESS) {
+ char *s = NULL;
+ apr_size_t slen;
+
+ apr_brigade_pflatten(temp, &s, &slen, c->pool);
+ if ((slen == 24) && !memcmp(H2_MAGIC_TOKEN, s, 24)) {
+ h2_ctx_pnego_set_done(ctx, "h2");
+ }
+ }
+ apr_brigade_destroy(temp);
+ }
+
+ /* If "h2" was selected as protocol (by whatever mechanism), take over
+ * the connection.
+ */
+ if (h2_ctx_is_active(ctx)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
+ "h2_h2, connection, h2 active");
+
+ return h2_conn_main(c);
+ }
+
+ return DECLINED;
+}
+
+static int h2_h2_post_read_req(request_rec *r)
+{
+ h2_ctx *ctx = h2_ctx_rget(r);
+ struct h2_task_env *env = h2_ctx_get_task(ctx);
+ if (env) {
+ /* h2_task connection for a stream, not for h2c */
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ "adding h1_to_h2_resp output filter");
+ if (env->serialize_headers) {
+ ap_add_output_filter("H1_TO_H2_RESP", env, r, r->connection);
+ }
+ else {
+ /* replace the core http filter that formats response headers
+ * in HTTP/1 with our own that collects status and headers */
+ ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER");
+ ap_add_output_filter("H2_RESPONSE", env, r, r->connection);
+ }
+ }
+ return DECLINED;
+}
+
+
Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.h?rev=1688474&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.h (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.h Tue Jun 30 15:26:16 2015
@@ -0,0 +1,59 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_h2__
+#define __mod_h2__h2_h2__
+
+/**
+ * List of ALPN protocol identifiers that we support in ALPN/NPN
+ * negotiations.
+ */
+extern const char *h2_alpn_protos[];
+extern apr_size_t h2_alpn_protos_len;
+
+/**
+ * List of ALPN protocol identifiers that we suport in HTTP/1 Upgrade:
+ * negotiations.
+ */
+extern const char *h2_upgrade_protos[];
+extern apr_size_t h2_upgrade_protos_len;
+
+/**
+ * The magic PRIamble of RFC 7540 that is always sent when starting
+ * a h2 communication.
+ */
+extern const char *H2_MAGIC_TOKEN;
+
+/*
+ * One time, post config intialization.
+ */
+apr_status_t h2_h2_init(apr_pool_t *pool, server_rec *s);
+
+/* Is the connection a TLS connection?
+ */
+int h2_h2_is_tls(conn_rec *c);
+
+/* Disable SSL for this connection, can only be invoked in a pre-
+ * connection hook before mod_ssl.
+ * @return != 0 iff disable worked
+ */
+int h2_tls_disable(conn_rec *c);
+
+/* Register apache hooks for h2 protocol
+ */
+void h2_h2_register_hooks(void);
+
+
+#endif /* defined(__mod_h2__h2_h2__) */
Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_io.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_io.c?rev=1688474&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_io.c (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_io.c Tue Jun 30 15:26:16 2015
@@ -0,0 +1,157 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_log.h>
+#include <http_connection.h>
+
+#include "h2_private.h"
+#include "h2_io.h"
+#include "h2_response.h"
+#include "h2_util.h"
+
+h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc)
+{
+ h2_io *io = apr_pcalloc(pool, sizeof(*io));
+ if (io) {
+ io->id = id;
+ io->bbin = NULL;
+ io->bbout = apr_brigade_create(pool, bucket_alloc);
+ io->response = apr_pcalloc(pool, sizeof(h2_response));
+ }
+ return io;
+}
+
+static void h2_io_cleanup(h2_io *io)
+{
+ if (io->response) {
+ h2_response_cleanup(io->response);
+ }
+}
+
+void h2_io_destroy(h2_io *io)
+{
+ h2_io_cleanup(io);
+}
+
+int h2_io_in_has_eos_for(h2_io *io)
+{
+ return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, 0));
+}
+
+int h2_io_out_has_data(h2_io *io)
+{
+ return h2_util_bb_has_data_or_eos(io->bbout);
+}
+
+apr_size_t h2_io_out_length(h2_io *io)
+{
+ if (io->bbout) {
+ apr_off_t len = 0;
+ apr_brigade_length(io->bbout, 0, &len);
+ return (len > 0)? len : 0;
+ }
+ return 0;
+}
+
+apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
+ apr_size_t maxlen)
+{
+ apr_off_t start_len = 0;
+
+ if (!io->bbin || APR_BRIGADE_EMPTY(io->bbin)) {
+ return io->eos_in? APR_EOF : APR_EAGAIN;
+ }
+
+ apr_brigade_length(bb, 1, &start_len);
+ apr_bucket *last = APR_BRIGADE_LAST(bb);
+ apr_status_t status = h2_util_move(bb, io->bbin, maxlen, 0,
+ NULL, "h2_io_in_read");
+ if (status == APR_SUCCESS) {
+ apr_bucket *nlast = APR_BRIGADE_LAST(bb);
+ apr_off_t end_len = 0;
+ apr_brigade_length(bb, 1, &end_len);
+ if (last == nlast) {
+ return APR_EAGAIN;
+ }
+ io->input_consumed += (end_len - start_len);
+ }
+ return status;
+}
+
+apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb)
+{
+ if (io->eos_in) {
+ return APR_EOF;
+ }
+ io->eos_in = h2_util_has_eos(bb, 0);
+ if (!APR_BRIGADE_EMPTY(bb)) {
+ if (!io->bbin) {
+ io->bbin = apr_brigade_create(io->bbout->p,
+ io->bbout->bucket_alloc);
+ }
+ return h2_util_move(io->bbin, bb, 0, 0, NULL, "h2_io_in_write");
+ }
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_io_in_close(h2_io *io)
+{
+ if (io->bbin) {
+ APR_BRIGADE_INSERT_TAIL(io->bbin,
+ apr_bucket_eos_create(io->bbin->bucket_alloc));
+ }
+ io->eos_in = 1;
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_io_out_read(h2_io *io, char *buffer,
+ apr_size_t *plen, int *peos)
+{
+ if (buffer == NULL) {
+ /* just checking length available */
+ return h2_util_bb_avail(io->bbout, plen, peos);
+ }
+
+ return h2_util_bb_read(io->bbout, buffer, plen, peos);
+}
+
+apr_status_t h2_io_out_readx(h2_io *io,
+ h2_io_data_cb *cb, void *ctx,
+ apr_size_t *plen, int *peos)
+{
+ if (cb == NULL) {
+ /* just checking length available */
+ return h2_util_bb_avail(io->bbout, plen, peos);
+ }
+ return h2_util_bb_readx(io->bbout, cb, ctx, plen, peos);
+}
+
+apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
+ apr_size_t maxlen)
+{
+ return h2_util_move(io->bbout, bb, maxlen, 0, NULL, "h2_io_out_write");
+}
+
+
+apr_status_t h2_io_out_close(h2_io *io)
+{
+ APR_BRIGADE_INSERT_TAIL(io->bbout,
+ apr_bucket_eos_create(io->bbout->bucket_alloc));
+ return APR_SUCCESS;
+}
Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_io.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_io.h?rev=1688474&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_io.h (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_io.h Tue Jun 30 15:26:16 2015
@@ -0,0 +1,127 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_io__
+#define __mod_h2__h2_io__
+
+struct h2_response;
+struct apr_thread_cond_t;
+struct h2_task;
+
+
+typedef apr_status_t h2_io_data_cb(void *ctx,
+ const char *data, apr_size_t len);
+
+
+typedef struct h2_io h2_io;
+
+struct h2_io {
+ int id; /* stream identifier */
+ apr_bucket_brigade *bbin; /* input data for stream */
+ int eos_in;
+ int task_done;
+
+ apr_size_t input_consumed; /* how many bytes have been read */
+ struct apr_thread_cond_t *input_arrived; /* block on reading */
+
+ apr_bucket_brigade *bbout; /* output data from stream */
+ struct apr_thread_cond_t *output_drained; /* block on writing */
+
+ struct h2_response *response;/* submittable response created */
+};
+
+/*******************************************************************************
+ * Object lifecycle and information.
+ ******************************************************************************/
+
+/**
+ * Creates a new h2_io for the given stream id.
+ */
+h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc);
+
+/**
+ * Frees any resources hold by the h2_io instance.
+ */
+void h2_io_destroy(h2_io *io);
+
+/**
+ * The input data is completely queued. Blocked reads will return immediately
+ * and give either data or EOF.
+ */
+int h2_io_in_has_eos_for(h2_io *io);
+/**
+ * Output data is available.
+ */
+int h2_io_out_has_data(h2_io *io);
+
+/*******************************************************************************
+ * Input handling of streams.
+ ******************************************************************************/
+/**
+ * Reads the next bucket from the input. Returns APR_EAGAIN if none
+ * is currently available, APR_EOF if end of input has been reached.
+ */
+apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
+ apr_size_t maxlen);
+
+/**
+ * Appends given bucket to the input.
+ */
+apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb);
+
+/**
+ * Closes the input. After existing data has been read, APR_EOF will
+ * be returned.
+ */
+apr_status_t h2_io_in_close(h2_io *io);
+
+/*******************************************************************************
+ * Output handling of streams.
+ ******************************************************************************/
+
+/**
+ * Read a bucket from the output head. Return APR_EAGAIN if non is available,
+ * APR_EOF if none available and output has been closed.
+ * May be called with buffer == NULL in order to find out how much data
+ * is available.
+ * @param io the h2_io to read output from
+ * @param buffer the buffer to copy the data to, may be NULL
+ * @param plen the requested max len, set to amount of data on return
+ * @param peos != 0 iff the end of stream has been reached
+ */
+apr_status_t h2_io_out_read(h2_io *io, char *buffer,
+ apr_size_t *plen, int *peos);
+
+apr_status_t h2_io_out_readx(h2_io *io,
+ h2_io_data_cb *cb, void *ctx,
+ apr_size_t *plen, int *peos);
+
+apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
+ apr_size_t maxlen);
+
+/**
+ * Closes the input. After existing data has been read, APR_EOF will
+ * be returned.
+ */
+apr_status_t h2_io_out_close(h2_io *io);
+
+/**
+ * Gives the overall length of the data that is currently queued for
+ * output.
+ */
+apr_size_t h2_io_out_length(h2_io *io);
+
+
+#endif /* defined(__mod_h2__h2_io__) */
Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.c?rev=1688474&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.c (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.c Tue Jun 30 15:26:16 2015
@@ -0,0 +1,169 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stddef.h>
+
+#include <apr_strings.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_connection.h>
+#include <http_log.h>
+
+#include "h2_private.h"
+#include "h2_io.h"
+#include "h2_io_set.h"
+
+#define h2_io_IDX(list, i) ((h2_io**)(list)->elts)[i]
+
+struct h2_io_set {
+ apr_array_header_t *list;
+};
+
+h2_io_set *h2_io_set_create(apr_pool_t *pool)
+{
+ h2_io_set *sp = apr_pcalloc(pool, sizeof(h2_io_set));
+ if (sp) {
+ sp->list = apr_array_make(pool, 100, sizeof(h2_io*));
+ if (!sp->list) {
+ return NULL;
+ }
+ }
+ return sp;
+}
+
+void h2_io_set_destroy(h2_io_set *sp)
+{
+ for (int i = 0; i < sp->list->nelts; ++i) {
+ h2_io *io = h2_io_IDX(sp->list, i);
+ h2_io_destroy(io);
+ }
+ sp->list->nelts = 0;
+}
+
+static int h2_stream_id_cmp(const void *s1, const void *s2)
+{
+ h2_io **pio1 = (h2_io **)s1;
+ h2_io **pio2 = (h2_io **)s2;
+ return (*pio1)->id - (*pio2)->id;
+}
+
+h2_io *h2_io_set_get(h2_io_set *sp, int stream_id)
+{
+ /* we keep the array sorted by id, so lookup can be done
+ * by bsearch.
+ */
+ h2_io key = { stream_id, NULL, 0, 0, 0, NULL, NULL, NULL, NULL };
+ h2_io *pkey = &key;
+ h2_io **ps = bsearch(&pkey, sp->list->elts, sp->list->nelts,
+ sp->list->elt_size, h2_stream_id_cmp);
+ return ps? *ps : NULL;
+}
+
+h2_io *h2_io_set_get_highest_prio(h2_io_set *set)
+{
+ h2_io *highest = NULL;
+ for (int i = 0; i < set->list->nelts; ++i) {
+ h2_io *io = h2_io_IDX(set->list, i);
+ if (!highest /*|| io-prio even higher */ ) {
+ highest = io;
+ }
+ }
+ return highest;
+}
+
+static void h2_io_set_sort(h2_io_set *sp)
+{
+ qsort(sp->list->elts, sp->list->nelts, sp->list->elt_size,
+ h2_stream_id_cmp);
+}
+
+apr_status_t h2_io_set_add(h2_io_set *sp, h2_io *io)
+{
+ h2_io *existing = h2_io_set_get(sp, io->id);
+ if (!existing) {
+ APR_ARRAY_PUSH(sp->list, h2_io*) = io;
+ /* Normally, streams get added in ascending order if id. We
+ * keep the array sorted, so we just need to check of the newly
+ * appended stream has a lower id than the last one. if not,
+ * sorting is not necessary.
+ */
+ int last = sp->list->nelts - 1;
+ if (last > 0
+ && (h2_io_IDX(sp->list, last)->id
+ < h2_io_IDX(sp->list, last-1)->id)) {
+ h2_io_set_sort(sp);
+ }
+ }
+ return APR_SUCCESS;
+}
+
+h2_io *h2_io_set_remove(h2_io_set *sp, h2_io *io)
+{
+ for (int i = 0; i < sp->list->nelts; ++i) {
+ h2_io *e = h2_io_IDX(sp->list, i);
+ if (e == io) {
+ --sp->list->nelts;
+ int n = sp->list->nelts - i;
+ if (n > 0) {
+ /* Close the hole in the array by moving the upper
+ * parts down one step.
+ */
+ h2_io **selts = (h2_io**)sp->list->elts;
+ memmove(selts+i, selts+i+1, n * sizeof(h2_io*));
+ }
+ return e;
+ }
+ }
+ return NULL;
+}
+
+void h2_io_set_destroy_all(h2_io_set *sp)
+{
+ for (int i = 0; i < sp->list->nelts; ++i) {
+ h2_io *io = h2_io_IDX(sp->list, i);
+ h2_io_destroy(io);
+ }
+ sp->list->nelts = 0;
+}
+
+void h2_io_set_remove_all(h2_io_set *sp)
+{
+ sp->list->nelts = 0;
+}
+
+int h2_io_set_is_empty(h2_io_set *sp)
+{
+ AP_DEBUG_ASSERT(sp);
+ return sp->list->nelts == 0;
+}
+
+void h2_io_set_iter(h2_io_set *sp,
+ h2_io_set_iter_fn *iter, void *ctx)
+{
+ for (int i = 0; i < sp->list->nelts; ++i) {
+ h2_io *s = h2_io_IDX(sp->list, i);
+ if (!iter(ctx, s)) {
+ break;
+ }
+ }
+}
+
+apr_size_t h2_io_set_size(h2_io_set *sp)
+{
+ return sp->list->nelts;
+}
+
Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.h?rev=1688474&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.h (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.h Tue Jun 30 15:26:16 2015
@@ -0,0 +1,47 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_io_set__
+#define __mod_h2__h2_io_set__
+
+struct h2_io;
+
+/**
+ * A set of h2_io instances. Allows lookup by stream id
+ * and other criteria.
+ */
+typedef struct h2_io_set h2_io_set;
+
+h2_io_set *h2_io_set_create(apr_pool_t *pool);
+
+void h2_io_set_destroy(h2_io_set *set);
+
+apr_status_t h2_io_set_add(h2_io_set *set, struct h2_io *io);
+h2_io *h2_io_set_get(h2_io_set *set, int stream_id);
+h2_io *h2_io_set_get_highest_prio(h2_io_set *set);
+h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io);
+
+void h2_io_set_remove_all(h2_io_set *set);
+void h2_io_set_destroy_all(h2_io_set *set);
+int h2_io_set_is_empty(h2_io_set *set);
+apr_size_t h2_io_set_size(h2_io_set *set);
+
+
+typedef int h2_io_set_iter_fn(void *ctx, struct h2_io *io);
+
+void h2_io_set_iter(h2_io_set *set,
+ h2_io_set_iter_fn *iter, void *ctx);
+
+#endif /* defined(__mod_h2__h2_io_set__) */
Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.c?rev=1688474&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.c (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.c Tue Jun 30 15:26:16 2015
@@ -0,0 +1,788 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stddef.h>
+
+#include <apr_atomic.h>
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
+#include <apr_strings.h>
+#include <apr_time.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_log.h>
+
+#include "h2_private.h"
+#include "h2_config.h"
+#include "h2_conn.h"
+#include "h2_io.h"
+#include "h2_io_set.h"
+#include "h2_response.h"
+#include "h2_mplx.h"
+#include "h2_stream.h"
+#include "h2_stream_set.h"
+#include "h2_task.h"
+#include "h2_task_input.h"
+#include "h2_task_output.h"
+#include "h2_task_queue.h"
+#include "h2_workers.h"
+
+
+static int is_aborted(h2_mplx *m, apr_status_t *pstatus) {
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ *pstatus = APR_ECONNABORTED;
+ return 1;
+ }
+ return 0;
+}
+
+static void have_out_data_for(h2_mplx *m, int stream_id);
+
+static void h2_mplx_destroy(h2_mplx *m)
+{
+ AP_DEBUG_ASSERT(m);
+ m->aborted = 1;
+ if (m->q) {
+ h2_tq_destroy(m->q);
+ m->q = NULL;
+ }
+ if (m->ready_ios) {
+ h2_io_set_destroy(m->ready_ios);
+ m->ready_ios = NULL;
+ }
+ if (m->stream_ios) {
+ h2_io_set_destroy(m->stream_ios);
+ m->stream_ios = NULL;
+ }
+
+ if (m->lock) {
+ apr_thread_mutex_destroy(m->lock);
+ m->lock = NULL;
+ }
+
+ if (m->pool) {
+ apr_pool_destroy(m->pool);
+ }
+}
+
+/**
+ * A h2_mplx needs to be thread-safe *and* if will be called by
+ * the h2_session thread *and* the h2_worker threads. Therefore:
+ * - calls are protected by a mutex lock, m->lock
+ * - the pool needs its own allocator, since apr_allocator_t are
+ * not re-entrant. The separate allocator works without a
+ * separate lock since we already protect h2_mplx itself.
+ * Since HTTP/2 connections can be expected to live longer than
+ * their HTTP/1 cousins, the separate allocator seems to work better
+ * than protecting a shared h2_session one with an own lock.
+ */
+h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_workers *workers)
+{
+ apr_status_t status = APR_SUCCESS;
+ h2_config *conf = h2_config_get(c);
+ AP_DEBUG_ASSERT(conf);
+
+ apr_allocator_t *allocator = NULL;
+ status = apr_allocator_create(&allocator);
+ if (status != APR_SUCCESS) {
+ return NULL;
+ }
+
+ h2_mplx *m = apr_pcalloc(parent, sizeof(h2_mplx));
+ if (m) {
+ m->id = c->id;
+ APR_RING_ELEM_INIT(m, link);
+ apr_atomic_set32(&m->refs, 1);
+ m->c = c;
+ apr_pool_create_ex(&m->pool, parent, NULL, allocator);
+ if (!m->pool) {
+ return NULL;
+ }
+ apr_allocator_owner_set(allocator, m->pool);
+
+ status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
+ m->pool);
+ if (status != APR_SUCCESS) {
+ h2_mplx_destroy(m);
+ return NULL;
+ }
+
+ m->bucket_alloc = apr_bucket_alloc_create(m->pool);
+
+ m->q = h2_tq_create(m->id, m->pool);
+ m->stream_ios = h2_io_set_create(m->pool);
+ m->ready_ios = h2_io_set_create(m->pool);
+ m->closed = h2_stream_set_create(m->pool);
+ m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
+ m->workers = workers;
+ }
+ return m;
+}
+
+#define REF_COUNT_ATOMIC 1
+
+static void reference(h2_mplx *m)
+{
+ apr_atomic_inc32(&m->refs);
+}
+
+static void release(h2_mplx *m)
+{
+ if (!apr_atomic_dec32(&m->refs)) {
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
+ }
+}
+
+void h2_mplx_reference(h2_mplx *m)
+{
+ if (REF_COUNT_ATOMIC) {
+ reference(m);
+ }
+ else {
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ reference(m);
+ apr_thread_mutex_unlock(m->lock);
+ }
+ }
+}
+void h2_mplx_release(h2_mplx *m)
+{
+ if (REF_COUNT_ATOMIC) {
+ release(m);
+ }
+ else {
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ release(m);
+ apr_thread_mutex_unlock(m->lock);
+ }
+ }
+}
+
+static void workers_register(h2_mplx *m) {
+ /* Initially, there was ref count increase for this as well, but
+ * this is not needed, even harmful.
+ * h2_workers is only a hub for all the h2_worker instances.
+ * At the end-of-life of this h2_mplx, we always unregister at
+ * the workers. The thing to manage are all the h2_worker instances
+ * out there. Those may hold a reference to this h2_mplx and we cannot
+ * call them to unregister.
+ *
+ * Therefore: ref counting for h2_workers in not needed, ref counting
+ * for h2_worker using this is critical.
+ */
+ h2_workers_register(m->workers, m);
+}
+
+static void workers_unregister(h2_mplx *m) {
+ h2_workers_unregister(m->workers, m);
+}
+
+apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
+{
+ workers_unregister(m);
+
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ int attempts = 0;
+
+ release(m);
+ while (apr_atomic_read32(&m->refs) > 0) {
+ m->join_wait = wait;
+ ap_log_cerror(APLOG_MARK, (attempts? APLOG_INFO : APLOG_DEBUG),
+ 0, m->c,
+ "h2_mplx(%ld): release_join, refs=%d, waiting...",
+ m->id, m->refs);
+ apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(10));
+ if (++attempts >= 6) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): join attempts exhausted, refs=%d",
+ m->id, m->refs);
+ break;
+ }
+ }
+ if (m->join_wait) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+ "h2_mplx(%ld): release_join -> destroy", m->id);
+ }
+ m->join_wait = NULL;
+ apr_thread_mutex_unlock(m->lock);
+ h2_mplx_destroy(m);
+ }
+ return status;
+}
+
+void h2_mplx_abort(h2_mplx *m)
+{
+ AP_DEBUG_ASSERT(m);
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ m->aborted = 1;
+ h2_io_set_destroy_all(m->stream_ios);
+ apr_thread_mutex_unlock(m->lock);
+ }
+ workers_unregister(m);
+}
+
+
+h2_stream *h2_mplx_open_io(h2_mplx *m, int stream_id)
+{
+ h2_stream *stream = NULL;
+
+ if (m->aborted) {
+ return NULL;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ apr_pool_t *stream_pool = m->spare_pool;
+
+ if (!stream_pool) {
+ apr_pool_create(&stream_pool, m->pool);
+ }
+ else {
+ m->spare_pool = NULL;
+ }
+
+ stream = h2_stream_create(stream_id, stream_pool, m);
+ stream->state = H2_STREAM_ST_OPEN;
+
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (!io) {
+ io = h2_io_create(stream_id, stream_pool, m->bucket_alloc);
+ h2_io_set_add(m->stream_ios, io);
+ }
+ status = io? APR_SUCCESS : APR_ENOMEM;
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return stream;
+}
+
+static void stream_destroy(h2_mplx *m, h2_stream *stream, h2_io *io)
+{
+ apr_pool_t *pool = h2_stream_detach_pool(stream);
+ if (pool) {
+ apr_pool_clear(pool);
+ if (m->spare_pool) {
+ apr_pool_destroy(m->spare_pool);
+ }
+ m->spare_pool = pool;
+ }
+ h2_stream_destroy(stream);
+ if (io) {
+ h2_io_set_remove(m->stream_ios, io);
+ h2_io_destroy(io);
+ }
+}
+
+apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, h2_stream *stream)
+{
+ AP_DEBUG_ASSERT(m);
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream->id);
+ if (!io || io->task_done) {
+ /* No more io or task already done -> cleanup immediately */
+ stream_destroy(m, stream, io);
+ }
+ else {
+ /* Add stream to closed set for cleanup when task is done */
+ h2_stream_set_add(m->closed, stream);
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return status;
+}
+
+void h2_mplx_task_done(h2_mplx *m, int stream_id)
+{
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ h2_stream *stream = h2_stream_set_get(m->closed, stream_id);
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): task(%d) done", m->id, stream_id);
+ if (stream) {
+ /* stream was already closed by main connection and is in
+ * zombie state. Now that the task is done with it, we
+ * can free its resources. */
+ h2_stream_set_remove(m->closed, stream);
+ stream_destroy(m, stream, io);
+ }
+ else if (io) {
+ /* main connection has not finished stream. Mark task as done
+ * so that eventual cleanup can start immediately. */
+ io->task_done = 1;
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+}
+
+apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
+ int stream_id, apr_bucket_brigade *bb,
+ struct apr_thread_cond_t *iowait)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io) {
+ io->input_arrived = iowait;
+ status = h2_io_in_read(io, bb, 0);
+ while (status == APR_EAGAIN
+ && !is_aborted(m, &status)
+ && block == APR_BLOCK_READ) {
+ apr_thread_cond_wait(io->input_arrived, m->lock);
+ status = h2_io_in_read(io, bb, 0);
+ }
+ io->input_arrived = NULL;
+ }
+ else {
+ status = APR_EOF;
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return status;
+}
+
+apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
+ apr_bucket_brigade *bb)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io) {
+ status = h2_io_in_write(io, bb);
+ if (io->input_arrived) {
+ apr_thread_cond_signal(io->input_arrived);
+ }
+ }
+ else {
+ status = APR_EOF;
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return status;
+}
+
+apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io) {
+ status = h2_io_in_close(io);
+ if (io->input_arrived) {
+ apr_thread_cond_signal(io->input_arrived);
+ }
+ }
+ else {
+ status = APR_ECONNABORTED;
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return status;
+}
+
+typedef struct {
+ h2_mplx_consumed_cb *cb;
+ void *cb_ctx;
+ int streams_updated;
+} update_ctx;
+
+static int update_window(void *ctx, h2_io *io)
+{
+ if (io->input_consumed) {
+ update_ctx *uctx = (update_ctx*)ctx;
+ uctx->cb(uctx->cb_ctx, io->id, io->input_consumed);
+ io->input_consumed = 0;
+ ++uctx->streams_updated;
+ }
+ return 1;
+}
+
+apr_status_t h2_mplx_in_update_windows(h2_mplx *m,
+ h2_mplx_consumed_cb *cb, void *cb_ctx)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ update_ctx ctx = { cb, cb_ctx, 0 };
+ status = APR_EAGAIN;
+ h2_io_set_iter(m->stream_ios, update_window, &ctx);
+
+ if (ctx.streams_updated) {
+ status = APR_SUCCESS;
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return status;
+}
+
+apr_status_t h2_mplx_out_read(h2_mplx *m, int stream_id,
+ char *buffer, apr_size_t *plen, int *peos)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io) {
+ status = h2_io_out_read(io, buffer, plen, peos);
+ if (status == APR_SUCCESS && io->output_drained) {
+ apr_thread_cond_signal(io->output_drained);
+ }
+ }
+ else {
+ status = APR_ECONNABORTED;
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return status;
+}
+
+apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id,
+ h2_io_data_cb *cb, void *ctx,
+ apr_size_t *plen, int *peos)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io) {
+ status = h2_io_out_readx(io, cb, ctx, plen, peos);
+ if (status == APR_SUCCESS && io->output_drained) {
+ apr_thread_cond_signal(io->output_drained);
+ }
+ }
+ else {
+ status = APR_ECONNABORTED;
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return status;
+}
+
+h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return NULL;
+ }
+ h2_stream *stream = NULL;
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ h2_io *io = h2_io_set_get_highest_prio(m->ready_ios);
+ if (io) {
+ h2_response *response = io->response;
+ h2_io_set_remove(m->ready_ios, io);
+
+ stream = h2_stream_set_get(streams, response->stream_id);
+ if (stream) {
+ h2_stream_set_response(stream, response, io->bbout);
+ if (io->output_drained) {
+ apr_thread_cond_signal(io->output_drained);
+ }
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, APR_NOTFOUND, m->c,
+ "h2_mplx(%ld): stream for response %d",
+ m->id, response->stream_id);
+ }
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return stream;
+}
+
+static apr_status_t out_write(h2_mplx *m, h2_io *io,
+ ap_filter_t* f, apr_bucket_brigade *bb,
+ struct apr_thread_cond_t *iowait)
+{
+ apr_status_t status = APR_SUCCESS;
+ /* We check the memory footprint queued for this stream_id
+ * and block if it exceeds our configured limit.
+ * We will not split buckets to enforce the limit to the last
+ * byte. After all, the bucket is already in memory.
+ */
+ while (!APR_BRIGADE_EMPTY(bb)
+ && (status == APR_SUCCESS)
+ && !is_aborted(m, &status)) {
+
+ status = h2_io_out_write(io, bb, m->stream_max_mem);
+
+ /* Wait for data to drain until there is room again */
+ while (!APR_BRIGADE_EMPTY(bb)
+ && iowait
+ && status == APR_SUCCESS
+ && (m->stream_max_mem <= h2_io_out_length(io))
+ && !is_aborted(m, &status)) {
+ io->output_drained = iowait;
+ if (f) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
+ "h2_mplx(%ld-%d): waiting for out drain",
+ m->id, io->id);
+ }
+ apr_thread_cond_wait(io->output_drained, m->lock);
+ io->output_drained = NULL;
+ }
+ }
+ apr_brigade_cleanup(bb);
+ return status;
+}
+
+static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
+ ap_filter_t* f, apr_bucket_brigade *bb,
+ struct apr_thread_cond_t *iowait)
+{
+ apr_status_t status = APR_SUCCESS;
+
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io) {
+ if (f) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c,
+ "h2_mplx(%ld-%d): open response: %s",
+ m->id, stream_id, response->headers->status);
+ }
+
+ h2_response_copy(io->response, response);
+ h2_io_set_add(m->ready_ios, io);
+ if (bb) {
+ status = out_write(m, io, f, bb, iowait);
+ }
+ have_out_data_for(m, stream_id);
+ }
+ else {
+ status = APR_ECONNABORTED;
+ }
+ return status;
+}
+
+apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
+ ap_filter_t* f, apr_bucket_brigade *bb,
+ struct apr_thread_cond_t *iowait)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ status = out_open(m, stream_id, response, f, bb, iowait);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return status;
+}
+
+
+apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
+ ap_filter_t* f, apr_bucket_brigade *bb,
+ struct apr_thread_cond_t *iowait)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ if (!m->aborted) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io) {
+ status = out_write(m, io, f, bb, iowait);
+ have_out_data_for(m, stream_id);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ }
+ else {
+ status = APR_ECONNABORTED;
+ }
+ }
+
+ if (m->lock) {
+ apr_thread_mutex_unlock(m->lock);
+ }
+ }
+ return status;
+}
+
+apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ if (!m->aborted) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io) {
+ if (!io->response->headers) {
+ /* In case a close comes before a response was created,
+ * insert an error one so that our streams can properly
+ * reset.
+ */
+ h2_response *r = h2_response_create(stream_id,
+ "500", NULL, m->pool);
+ status = out_open(m, stream_id, r, NULL, NULL, NULL);
+ }
+ status = h2_io_out_close(io);
+ have_out_data_for(m, stream_id);
+ if (m->aborted) {
+ /* if we were the last output, the whole session might
+ * have gone down in the meantime.
+ */
+ return APR_SUCCESS;
+ }
+ }
+ else {
+ status = APR_ECONNABORTED;
+ }
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return status;
+}
+
+int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return 0;
+ }
+ int has_eos = 0;
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io) {
+ has_eos = h2_io_in_has_eos_for(io);
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return has_eos;
+}
+
+int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return 0;
+ }
+ int has_data = 0;
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io) {
+ has_data = h2_io_out_has_data(io);
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return has_data;
+}
+
+apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
+ apr_thread_cond_t *iowait)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ m->added_output = iowait;
+ status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): trywait on data for %f ms)",
+ m->id, timeout/1000.0);
+ m->added_output = NULL;
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return status;
+}
+
+static void have_out_data_for(h2_mplx *m, int stream_id)
+{
+ (void)stream_id;
+ AP_DEBUG_ASSERT(m);
+ if (m->added_output) {
+ apr_thread_cond_signal(m->added_output);
+ }
+}
+
+apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task)
+{
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ /* TODO: needs to sort queue by priority */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx: do task(%s)", task->id);
+ h2_tq_append(m->q, task);
+ apr_thread_mutex_unlock(m->lock);
+ }
+ workers_register(m);
+ return status;
+}
+
+h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
+{
+ h2_task *task = NULL;
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ *has_more = 0;
+ return NULL;
+ }
+ apr_status_t status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ task = h2_tq_pop_first(m->q);
+ if (task) {
+ h2_task_set_started(task);
+ }
+ *has_more = !h2_tq_empty(m->q);
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return task;
+}
+
Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.h?rev=1688474&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.h (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.h Tue Jun 30 15:26:16 2015
@@ -0,0 +1,322 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_mplx__
+#define __mod_h2__h2_mplx__
+
+/**
+ * The stream multiplexer. It pushes buckets from the connection
+ * thread to the stream task threads and vice versa. It's thread-safe
+ * to use.
+ *
+ * There is one h2_mplx instance for each h2_session, which sits on top
+ * of a particular httpd conn_rec. Input goes from the connection to
+ * the stream tasks. Output goes from the stream tasks to the connection,
+ * e.g. the client.
+ *
+ * For each stream, there can be at most "H2StreamMaxMemSize" output bytes
+ * queued in the multiplexer. If a task thread tries to write more
+ * data, it is blocked until space becomes available.
+ *
+ * Writing input is never blocked. In order to use flow control on the input,
+ * the mplx can be polled for input data consumption.
+ */
+
+struct apr_pool_t;
+struct apr_thread_mutex_t;
+struct apr_thread_cond_t;
+struct h2_config;
+struct h2_response;
+struct h2_task;
+struct h2_stream;
+struct h2_io_set;
+struct apr_thread_cond_t;
+struct h2_workers;
+struct h2_stream_set;
+struct h2_task_queue;
+
+#include "h2_io.h"
+
+typedef struct h2_mplx h2_mplx;
+
+struct h2_mplx {
+ long id;
+ APR_RING_ENTRY(h2_mplx) link;
+ volatile apr_uint32_t refs;
+ conn_rec *c;
+ apr_pool_t *pool;
+ apr_bucket_alloc_t *bucket_alloc;
+
+ struct h2_task_queue *q;
+ struct h2_io_set *stream_ios;
+ struct h2_io_set *ready_ios;
+
+ apr_thread_mutex_t *lock;
+ struct apr_thread_cond_t *added_output;
+ struct apr_thread_cond_t *join_wait;
+
+ int aborted;
+ apr_size_t stream_max_mem;
+
+ apr_pool_t *spare_pool; /* spare pool, ready for next stream */
+ struct h2_stream_set *closed; /* streams closed, but task ongoing */
+ struct h2_workers *workers;
+};
+
+/*******************************************************************************
+ * Object lifecycle and information.
+ ******************************************************************************/
+
+/**
+ * Create the multiplexer for the given HTTP2 session.
+ * Implicitly has reference count 1.
+ */
+h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *master,
+ struct h2_workers *workers);
+
+/**
+ * Increase the reference counter of this mplx.
+ */
+void h2_mplx_reference(h2_mplx *m);
+
+/**
+ * Decreases the reference counter of this mplx.
+ */
+void h2_mplx_release(h2_mplx *m);
+/**
+ * Decreases the reference counter of this mplx and waits for it
+ * to reached 0, destroy the mplx afterwards.
+ * This is to be called from the thread that created the mplx in
+ * the first place.
+ * @param m the mplx to be released and destroyed
+ * @param wait condition var to wait on for ref counter == 0
+ */
+apr_status_t h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait);
+
+/**
+ * Aborts the multiplexer. It will answer all future invocation with
+ * APR_ECONNABORTED, leading to early termination of ongoing tasks.
+ */
+void h2_mplx_abort(h2_mplx *mplx);
+
+void h2_mplx_task_done(h2_mplx *m, int stream_id);
+
+/*******************************************************************************
+ * IO lifetime of streams.
+ ******************************************************************************/
+/**
+ * Prepares the multiplexer to handle in-/output on the given stream id.
+ */
+struct h2_stream *h2_mplx_open_io(h2_mplx *mplx, int stream_id);
+
+/**
+ * Ends cleanup of a stream in sync with execution thread.
+ */
+apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, struct h2_stream *stream);
+
+/* Return != 0 iff the multiplexer has data for the given stream.
+ */
+int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
+
+/**
+ * Waits on output data from any stream in this session to become available.
+ * Returns APR_TIMEUP if no data arrived in the given time.
+ */
+apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
+ struct apr_thread_cond_t *iowait);
+
+/*******************************************************************************
+ * Stream processing.
+ ******************************************************************************/
+
+/**
+ * Perform the task on the given stream.
+ */
+apr_status_t h2_mplx_do_task(h2_mplx *mplx, struct h2_task *task);
+
+struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more);
+
+/*******************************************************************************
+ * Input handling of streams.
+ ******************************************************************************/
+
+/**
+ * Reads a buckets for the given stream_id. Will return ARP_EAGAIN when
+ * called with APR_NONBLOCK_READ and no data present. Will return APR_EOF
+ * when the end of the stream input has been reached.
+ * The condition passed in will be used for blocking/signalling and will
+ * be protected by the mplx's own mutex.
+ */
+apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
+ int stream_id, apr_bucket_brigade *bb,
+ struct apr_thread_cond_t *iowait);
+
+/**
+ * Appends data to the input of the given stream. Storage of input data is
+ * not subject to flow control.
+ */
+apr_status_t h2_mplx_in_write(h2_mplx *mplx, int stream_id,
+ apr_bucket_brigade *bb);
+
+/**
+ * Closes the input for the given stream_id.
+ */
+apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
+
+/**
+ * Returns != 0 iff the input for the given stream has been closed. There
+ * could still be data queued, but it can be read without blocking.
+ */
+int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id);
+
+/**
+ * Callback invoked for every stream that had input data read since
+ * the last invocation.
+ */
+typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_size_t consumed);
+
+/**
+ * Invoke the callback for all streams that had bytes read since the last
+ * call to this function. If no stream had input data consumed, the callback
+ * is not invoked.
+ * Returns APR_SUCCESS when an update happened, APR_EAGAIN if no update
+ * happened.
+ */
+apr_status_t h2_mplx_in_update_windows(h2_mplx *m,
+ h2_mplx_consumed_cb *cb, void *ctx);
+
+/*******************************************************************************
+ * Output handling of streams.
+ ******************************************************************************/
+
+/**
+ * Get a stream whose response is ready for submit. Will set response and
+ * any out data available in stream.
+ * @param m the mplxer to get a response from
+ * @param bb the brigade to place any existing repsonse body data into
+ */
+struct h2_stream *h2_mplx_next_submit(h2_mplx *m,
+ struct h2_stream_set *streams);
+
+/**
+ * Reads output data from the given stream. Will never block, but
+ * return APR_EAGAIN until data arrives or the stream is closed.
+ */
+apr_status_t h2_mplx_out_read(h2_mplx *mplx, int stream_id,
+ char *buffer, apr_size_t *plen, int *peos);
+
+apr_status_t h2_mplx_out_readx(h2_mplx *mplx, int stream_id,
+ h2_io_data_cb *cb, void *ctx,
+ apr_size_t *plen, int *peos);
+
+/**
+ * Opens the output for the given stream with the specified response.
+ */
+apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,
+ struct h2_response *response,
+ ap_filter_t* filter, apr_bucket_brigade *bb,
+ struct apr_thread_cond_t *iowait);
+
+/**
+ * Append the brigade to the stream output. Might block if amount
+ * of bytes buffered reaches configured max.
+ * @param stream_id the stream identifier
+ * @param filter the apache filter context of the data
+ * @param bb the bucket brigade to append
+ * @param iowait a conditional used for block/signalling in h2_mplx
+ */
+apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id,
+ ap_filter_t* filter, apr_bucket_brigade *bb,
+ struct apr_thread_cond_t *iowait);
+
+/**
+ * Closes the output stream. Readers of this stream will get all pending
+ * data and then only APR_EOF as result.
+ */
+apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id);
+
+/*******************************************************************************
+ * h2_mplx list Manipulation.
+ ******************************************************************************/
+
+/**
+ * The magic pointer value that indicates the head of a h2_mplx list
+ * @param b The mplx list
+ * @return The magic pointer value
+ */
+#define H2_MPLX_LIST_SENTINEL(b) APR_RING_SENTINEL((b), h2_mplx, link)
+
+/**
+ * Determine if the mplx list is empty
+ * @param b The list to check
+ * @return true or false
+ */
+#define H2_MPLX_LIST_EMPTY(b) APR_RING_EMPTY((b), h2_mplx, link)
+
+/**
+ * Return the first mplx in a list
+ * @param b The list to query
+ * @return The first mplx in the list
+ */
+#define H2_MPLX_LIST_FIRST(b) APR_RING_FIRST(b)
+
+/**
+ * Return the last mplx in a list
+ * @param b The list to query
+ * @return The last mplx int he list
+ */
+#define H2_MPLX_LIST_LAST(b) APR_RING_LAST(b)
+
+/**
+ * Insert a single mplx at the front of a list
+ * @param b The list to add to
+ * @param e The mplx to insert
+ */
+#define H2_MPLX_LIST_INSERT_HEAD(b, e) do { \
+h2_mplx *ap__b = (e); \
+APR_RING_INSERT_HEAD((b), ap__b, h2_mplx, link); \
+} while (0)
+
+/**
+ * Insert a single mplx at the end of a list
+ * @param b The list to add to
+ * @param e The mplx to insert
+ */
+#define H2_MPLX_LIST_INSERT_TAIL(b, e) do { \
+h2_mplx *ap__b = (e); \
+APR_RING_INSERT_TAIL((b), ap__b, h2_mplx, link); \
+} while (0)
+
+/**
+ * Get the next mplx in the list
+ * @param e The current mplx
+ * @return The next mplx
+ */
+#define H2_MPLX_NEXT(e) APR_RING_NEXT((e), link)
+/**
+ * Get the previous mplx in the list
+ * @param e The current mplx
+ * @return The previous mplx
+ */
+#define H2_MPLX_PREV(e) APR_RING_PREV((e), link)
+
+/**
+ * Remove a mplx from its list
+ * @param e The mplx to remove
+ */
+#define H2_MPLX_REMOVE(e) APR_RING_REMOVE((e), link)
+
+
+#endif /* defined(__mod_h2__h2_mplx__) */
Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_private.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_private.h?rev=1688474&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_private.h (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_private.h Tue Jun 30 15:26:16 2015
@@ -0,0 +1,36 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef mod_h2_h2_private_h
+#define mod_h2_h2_private_h
+
+#include <nghttp2/nghttp2.h>
+
+extern module AP_MODULE_DECLARE_DATA h2_module;
+
+APLOG_USE_MODULE(h2);
+
+
+#define H2_HEADER_METHOD ":method"
+#define H2_HEADER_METHOD_LEN 7
+#define H2_HEADER_SCHEME ":scheme"
+#define H2_HEADER_SCHEME_LEN 7
+#define H2_HEADER_AUTH ":authority"
+#define H2_HEADER_AUTH_LEN 10
+#define H2_HEADER_PATH ":path"
+#define H2_HEADER_PATH_LEN 5
+#define H2_CRLF "\r\n"
+
+#endif
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic