[prev in list] [next in list] [prev in thread] [next in thread] 

List:       apache-cvs
Subject:    svn commit: r1688474 [5/21] - in /httpd/httpd/trunk/modules/http2: ./ m4/ mod-h2.xcodeproj/ mod-h2.x
From:       jim () apache ! org
Date:       2015-06-30 15:26:19
Message-ID: 20150630152622.34FB7AC08E0 () hades ! apache ! org
[Download RAW message or body]

Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.c?rev=1688474&view=auto
 ==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.c (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.c Tue Jun 30 15:26:16 2015
@@ -0,0 +1,616 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stdio.h>
+
+#include <apr_lib.h>
+#include <apr_strings.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_log.h>
+#include <http_connection.h>
+#include <http_protocol.h>
+#include <http_request.h>
+#include <util_time.h>
+
+#include "h2_private.h"
+#include "h2_response.h"
+#include "h2_from_h1.h"
+#include "h2_task.h"
+#include "h2_task_output.h"
+#include "h2_util.h"
+
+
+static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state);
+
+h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool)
+{
+    h2_from_h1 *from_h1 = apr_pcalloc(pool, sizeof(h2_from_h1));
+    if (from_h1) {
+        from_h1->stream_id = stream_id;
+        from_h1->pool = pool;
+        from_h1->state = H2_RESP_ST_STATUS_LINE;
+        from_h1->hlines = apr_array_make(pool, 10, sizeof(char *));
+    }
+    return from_h1;
+}
+
+apr_status_t h2_from_h1_destroy(h2_from_h1 *from_h1)
+{
+    if (from_h1->response) {
+        h2_response_destroy(from_h1->response);
+        from_h1->response = NULL;
+    }
+    from_h1->bb = NULL;
+    return APR_SUCCESS;
+}
+
+h2_from_h1_state_t h2_from_h1_get_state(h2_from_h1 *from_h1)
+{
+    return from_h1->state;
+}
+
+static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state)
+{
+    if (from_h1->state != state) {
+        from_h1->state = state;
+    }
+}
+
+h2_response *h2_from_h1_get_response(h2_from_h1 *from_h1)
+{
+    return from_h1->response;
+}
+
+static apr_status_t make_h2_headers(h2_from_h1 *from_h1, request_rec *r)
+{
+    from_h1->response = h2_response_create(from_h1->stream_id, 
+                                       from_h1->status, from_h1->hlines,
+                                       from_h1->pool);
+    if (from_h1->response == NULL) {
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EINVAL, r->connection,
+                      "h2_from_h1(%d): unable to create resp_head",
+                      from_h1->stream_id);
+        return APR_EINVAL;
+    }
+    from_h1->content_length = from_h1->response->content_length;
+    from_h1->chunked = r->chunked;
+    
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
+                  "h2_from_h1(%d): converted headers, content-length: %d"
+                  ", chunked=%d",
+                  from_h1->stream_id, (int)from_h1->content_length, 
+                  (int)from_h1->chunked);
+    
+    set_state(from_h1, ((from_h1->chunked || from_h1->content_length > 0)?
+                        H2_RESP_ST_BODY : H2_RESP_ST_DONE));
+    /* We are ready to be sent to the client */
+    return APR_SUCCESS;
+}
+
+static apr_status_t parse_header(h2_from_h1 *from_h1, ap_filter_t* f, 
+                                 char *line) {
+    (void)f;
+    
+    if (line[0] == ' ' || line[0] == '\t') {
+        /* continuation line from the header before this */
+        while (line[0] == ' ' || line[0] == '\t') {
+            ++line;
+        }
+        
+        char **plast = apr_array_pop(from_h1->hlines);
+        if (plast == NULL) {
+            /* not well formed */
+            return APR_EINVAL;
+        }
+        APR_ARRAY_PUSH(from_h1->hlines, const char*) = apr_psprintf(from_h1->pool, \
"%s %s", *plast, line); +    }
+    else {
+        /* new header line */
+        APR_ARRAY_PUSH(from_h1->hlines, const char*) = apr_pstrdup(from_h1->pool, \
line); +    }
+    return APR_SUCCESS;
+}
+
+static apr_status_t get_line(h2_from_h1 *from_h1, apr_bucket_brigade *bb,
+                             ap_filter_t* f, char *line, apr_size_t len)
+{
+    if (!from_h1->bb) {
+        from_h1->bb = apr_brigade_create(from_h1->pool, f->c->bucket_alloc);
+    }
+    else {
+        apr_brigade_cleanup(from_h1->bb);                
+    }
+    apr_status_t status = apr_brigade_split_line(from_h1->bb, bb, 
+                                                 APR_BLOCK_READ, 
+                                                 HUGE_STRING_LEN);
+    if (status == APR_SUCCESS) {
+        --len;
+        status = apr_brigade_flatten(from_h1->bb, line, &len);
+        if (status == APR_SUCCESS) {
+            /* we assume a non-0 containing line and remove
+             * trailing crlf. */
+            line[len] = '\0';
+            if (len >= 2 && !strcmp(H2_CRLF, line + len - 2)) {
+                len -= 2;
+                line[len] = '\0';
+            }
+            
+            apr_brigade_cleanup(from_h1->bb);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+                          "h2_from_h1(%d): read line: %s",
+                          from_h1->stream_id, line);
+        }
+    }
+    return status;
+}
+
+apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1, ap_filter_t* f,
+                                      apr_bucket_brigade* bb)
+{
+    apr_status_t status = APR_SUCCESS;
+    char line[HUGE_STRING_LEN];
+    
+    if ((from_h1->state == H2_RESP_ST_BODY) 
+        || (from_h1->state == H2_RESP_ST_DONE)) {
+        if (from_h1->chunked) {
+            /* The httpd core HTTP_HEADER filter has or will install the 
+             * "CHUNK" output transcode filter, which appears further down 
+             * the filter chain. We do not want it for HTTP/2.
+             * Once we successfully deinstalled it, this filter has no
+             * further function and we remove it.
+             */
+            status = ap_remove_output_filter_byhandle(f->r->output_filters, 
+                                                      "CHUNK");
+            if (status == APR_SUCCESS) {
+                ap_remove_output_filter(f);
+            }
+        }
+        
+        return ap_pass_brigade(f->next, bb);
+    }
+    
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+                  "h2_from_h1(%d): read_response", from_h1->stream_id);
+    
+    while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
+        
+        switch (from_h1->state) {
+                
+            case H2_RESP_ST_STATUS_LINE:
+            case H2_RESP_ST_HEADERS:
+                status = get_line(from_h1, bb, f, line, sizeof(line));
+                if (status != APR_SUCCESS) {
+                    return status;
+                }
+                if (from_h1->state == H2_RESP_ST_STATUS_LINE) {
+                    /* instead of parsing, just take it directly */
+                    from_h1->status = apr_psprintf(from_h1->pool, 
+                                                   "%d", f->r->status);
+                    from_h1->state = H2_RESP_ST_HEADERS;
+                }
+                else if (line[0] == '\0') {
+                    /* end of headers, create the h2_response and
+                     * pass the rest of the brigade down the filter
+                     * chain.
+                     */
+                    status = make_h2_headers(from_h1, f->r);
+                    if (from_h1->bb) {
+                        apr_brigade_destroy(from_h1->bb);
+                        from_h1->bb = NULL;
+                    }
+                    if (!APR_BRIGADE_EMPTY(bb)) {
+                        return ap_pass_brigade(f->next, bb);
+                    }
+                }
+                else {
+                    status = parse_header(from_h1, f, line);
+                }
+                break;
+                
+            default:
+                return ap_pass_brigade(f->next, bb);
+        }
+        
+    }
+    
+    return status;
+}
+
+/* This routine is called by apr_table_do and merges all instances of
+ * the passed field values into a single array that will be further
+ * processed by some later routine.  Originally intended to help split
+ * and recombine multiple Vary fields, though it is generic to any field
+ * consisting of comma/space-separated tokens.
+ */
+static int uniq_field_values(void *d, const char *key, const char *val)
+{
+    apr_array_header_t *values;
+    char *start;
+    char *e;
+    char **strpp;
+    int  i;
+    
+    (void)key;
+    values = (apr_array_header_t *)d;
+    
+    e = apr_pstrdup(values->pool, val);
+    
+    do {
+        /* Find a non-empty fieldname */
+        
+        while (*e == ',' || apr_isspace(*e)) {
+            ++e;
+        }
+        if (*e == '\0') {
+            break;
+        }
+        start = e;
+        while (*e != '\0' && *e != ',' && !apr_isspace(*e)) {
+            ++e;
+        }
+        if (*e != '\0') {
+            *e++ = '\0';
+        }
+        
+        /* Now add it to values if it isn't already represented.
+         * Could be replaced by a ap_array_strcasecmp() if we had one.
+         */
+        for (i = 0, strpp = (char **) values->elts; i < values->nelts;
+             ++i, ++strpp) {
+            if (*strpp && strcasecmp(*strpp, start) == 0) {
+                break;
+            }
+        }
+        if (i == values->nelts) {  /* if not found */
+            *(char **)apr_array_push(values) = start;
+        }
+    } while (*e != '\0');
+    
+    return 1;
+}
+
+/*
+ * Since some clients choke violently on multiple Vary fields, or
+ * Vary fields with duplicate tokens, combine any multiples and remove
+ * any duplicates.
+ */
+static void fix_vary(request_rec *r)
+{
+    apr_array_header_t *varies;
+    
+    varies = apr_array_make(r->pool, 5, sizeof(char *));
+    
+    /* Extract all Vary fields from the headers_out, separate each into
+     * its comma-separated fieldname values, and then add them to varies
+     * if not already present in the array.
+     */
+    apr_table_do((int (*)(void *, const char *, const char *))uniq_field_values,
+                 (void *) varies, r->headers_out, "Vary", NULL);
+    
+    /* If we found any, replace old Vary fields with unique-ified value */
+    
+    if (varies->nelts > 0) {
+        apr_table_setn(r->headers_out, "Vary",
+                       apr_array_pstrcat(r->pool, varies, ','));
+    }
+}
+
+/* Confirm that the status line is well-formed and matches r->status.
+ * If they don't match, a filter may have negated the status line set by a
+ * handler.
+ * Zap r->status_line if bad.
+ */
+static apr_status_t validate_status_line(request_rec *r)
+{
+    char *end;
+    
+    if (r->status_line) {
+        apr_size_t len = strlen(r->status_line);
+        if (len < 3
+            || apr_strtoi64(r->status_line, &end, 10) != r->status
+            || (end - 3) != r->status_line
+            || (len >= 4 && ! apr_isspace(r->status_line[3]))) {
+            r->status_line = NULL;
+            return APR_EGENERAL;
+        }
+        /* Since we passed the above check, we know that length three
+         * is equivalent to only a 3 digit numeric http status.
+         * RFC2616 mandates a trailing space, let's add it.
+         */
+        if (len == 3) {
+            r->status_line = apr_pstrcat(r->pool, r->status_line, " ", NULL);
+            return APR_EGENERAL;
+        }
+        return APR_SUCCESS;
+    }
+    return APR_EGENERAL;
+}
+
+static void set_basic_http_header(request_rec *r, apr_table_t *headers)
+{
+    char *date = NULL;
+    const char *proxy_date = NULL;
+    const char *server = NULL;
+    const char *us = ap_get_server_banner();
+    
+    /*
+     * keep the set-by-proxy server and date headers, otherwise
+     * generate a new server header / date header
+     */
+    if (r->proxyreq != PROXYREQ_NONE) {
+        proxy_date = apr_table_get(r->headers_out, "Date");
+        if (!proxy_date) {
+            /*
+             * proxy_date needs to be const. So use date for the creation of
+             * our own Date header and pass it over to proxy_date later to
+             * avoid a compiler warning.
+             */
+            date = apr_palloc(r->pool, APR_RFC822_DATE_LEN);
+            ap_recent_rfc822_date(date, r->request_time);
+        }
+        server = apr_table_get(r->headers_out, "Server");
+    }
+    else {
+        date = apr_palloc(r->pool, APR_RFC822_DATE_LEN);
+        ap_recent_rfc822_date(date, r->request_time);
+    }
+    
+    apr_table_setn(headers, "Date", proxy_date ? proxy_date : date );
+    apr_table_unset(r->headers_out, "Date");
+    
+    if (!server && *us) {
+        server = us;
+    }
+    if (server) {
+        apr_table_setn(headers, "Server", server);
+        apr_table_unset(r->headers_out, "Server");
+    }
+}
+
+static int copy_header(void *ctx, const char *name, const char *value)
+{
+    apr_table_t *headers = ctx;
+    
+    apr_table_addn(headers, name, value);
+    return 1;
+}
+
+static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r)
+{
+    apr_status_t status = APR_SUCCESS;
+    const char *clheader;
+    const char *ctype;
+    /*
+     * Now that we are ready to send a response, we need to combine the two
+     * header field tables into a single table.  If we don't do this, our
+     * later attempts to set or unset a given fieldname might be bypassed.
+     */
+    if (!apr_is_empty_table(r->err_headers_out)) {
+        r->headers_out = apr_table_overlay(r->pool, r->err_headers_out,
+                                           r->headers_out);
+    }
+    
+    /*
+     * Remove the 'Vary' header field if the client can't handle it.
+     * Since this will have nasty effects on HTTP/1.1 caches, force
+     * the response into HTTP/1.0 mode.
+     */
+    if (apr_table_get(r->subprocess_env, "force-no-vary") != NULL) {
+        apr_table_unset(r->headers_out, "Vary");
+        r->proto_num = HTTP_VERSION(1,0);
+        apr_table_setn(r->subprocess_env, "force-response-1.0", "1");
+    }
+    else {
+        fix_vary(r);
+    }
+    
+    /*
+     * Now remove any ETag response header field if earlier processing
+     * says so (such as a 'FileETag None' directive).
+     */
+    if (apr_table_get(r->notes, "no-etag") != NULL) {
+        apr_table_unset(r->headers_out, "ETag");
+    }
+    
+    /* determine the protocol and whether we should use keepalives. */
+    status = validate_status_line(r);
+    if (!r->status_line) {
+        r->status_line = ap_get_status_line(r->status);
+    } 
+    else if (status != APR_SUCCESS) {
+        /* Status line is OK but our own reason phrase
+         * would be preferred if defined
+         */
+        const char *tmp = ap_get_status_line(r->status);
+        if (!strncmp(tmp, r->status_line, 3)) {
+            r->status_line = tmp;
+        }
+    }
+    
+    if (r->chunked) {
+        apr_table_unset(r->headers_out, "Content-Length");
+    }
+    
+    ctype = ap_make_content_type(r, r->content_type);
+    if (ctype) {
+        apr_table_setn(r->headers_out, "Content-Type", ctype);
+    }
+    
+    if (r->content_encoding) {
+        apr_table_setn(r->headers_out, "Content-Encoding",
+                       r->content_encoding);
+    }
+    
+    if (!apr_is_empty_array(r->content_languages)) {
+        int i;
+        char *token;
+        char **languages = (char **)(r->content_languages->elts);
+        const char *field = apr_table_get(r->headers_out, "Content-Language");
+        
+        while (field && (token = ap_get_list_item(r->pool, &field)) != NULL) {
+            for (i = 0; i < r->content_languages->nelts; ++i) {
+                if (!strcasecmp(token, languages[i]))
+                    break;
+            }
+            if (i == r->content_languages->nelts) {
+                *((char **) apr_array_push(r->content_languages)) = token;
+            }
+        }
+        
+        field = apr_array_pstrcat(r->pool, r->content_languages, ',');
+        apr_table_setn(r->headers_out, "Content-Language", field);
+    }
+    
+    /*
+     * Control cachability for non-cachable responses if not already set by
+     * some other part of the server configuration.
+     */
+    if (r->no_cache && !apr_table_get(r->headers_out, "Expires")) {
+        char *date = apr_palloc(r->pool, APR_RFC822_DATE_LEN);
+        ap_recent_rfc822_date(date, r->request_time);
+        apr_table_addn(r->headers_out, "Expires", date);
+    }
+    
+    /* This is a hack, but I can't find anyway around it.  The idea is that
+     * we don't want to send out 0 Content-Lengths if it is a head request.
+     * This happens when modules try to outsmart the server, and return
+     * if they see a HEAD request.  Apache 1.3 handlers were supposed to
+     * just return in that situation, and the core handled the HEAD.  In
+     * 2.0, if a handler returns, then the core sends an EOS bucket down
+     * the filter stack, and the content-length filter computes a C-L of
+     * zero and that gets put in the headers, and we end up sending a
+     * zero C-L to the client.  We can't just remove the C-L filter,
+     * because well behaved 2.0 handlers will send their data down the stack,
+     * and we will compute a real C-L for the head request. RBB
+     */
+    if (r->header_only
+        && (clheader = apr_table_get(r->headers_out, "Content-Length"))
+        && !strcmp(clheader, "0")) {
+        apr_table_unset(r->headers_out, "Content-Length");
+    }
+    
+    apr_table_t *headers = apr_table_make(r->pool, 10);
+    
+    set_basic_http_header(r, headers);
+    if (r->status == HTTP_NOT_MODIFIED) {
+        apr_table_do((int (*)(void *, const char *, const char *)) copy_header,
+                     (void *) headers, r->headers_out,
+                     "ETag",
+                     "Content-Location",
+                     "Expires",
+                     "Cache-Control",
+                     "Vary",
+                     "Warning",
+                     "WWW-Authenticate",
+                     "Proxy-Authenticate",
+                     "Set-Cookie",
+                     "Set-Cookie2",
+                     NULL);
+    }
+    else {
+        apr_table_do((int (*)(void *, const char *, const char *)) copy_header,
+                     (void *) headers, r->headers_out, NULL);
+    }
+    
+    return h2_response_rcreate(from_h1->stream_id, r, headers, r->pool);
+}
+
+apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
+{
+    h2_task_env *env = f->ctx;
+    h2_from_h1 *from_h1 = env->output? env->output->from_h1 : NULL;
+    request_rec *r = f->r;
+    apr_bucket *b;
+    ap_bucket_error *eb = NULL;
+
+    AP_DEBUG_ASSERT(from_h1 != NULL);
+    
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+                  "h2_from_h1(%d): output_filter called", from_h1->stream_id);
+    
+    if (r->header_only && env->output && from_h1->response) {
+        /* throw away any data after we have compiled the response */
+        apr_brigade_cleanup(bb);
+        return OK;
+    }
+    
+    for (b = APR_BRIGADE_FIRST(bb);
+         b != APR_BRIGADE_SENTINEL(bb);
+         b = APR_BUCKET_NEXT(b))
+    {
+        if (AP_BUCKET_IS_ERROR(b) && !eb) {
+            eb = b->data;
+            continue;
+        }
+        /*
+         * If we see an EOC bucket it is a signal that we should get out
+         * of the way doing nothing.
+         */
+        if (AP_BUCKET_IS_EOC(b)) {
+            ap_remove_output_filter(f);
+            ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, f->c,
+                          "h2_from_h1(%d): eoc bucket passed", 
+                          from_h1->stream_id);
+            return ap_pass_brigade(f->next, bb);
+        }
+    }
+    
+    if (eb) {
+        int st = eb->status;
+        apr_brigade_cleanup(bb);
+        ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, f->c,
+                      "h2_from_h1(%d): err bucket status=%d", 
+                      from_h1->stream_id, st);
+        ap_die(st, r);
+        return AP_FILTER_ERROR;
+    }
+    
+    from_h1->response = create_response(from_h1, r);
+    if (from_h1->response == NULL) {
+        ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, f->c,
+                      "h2_from_h1(%d): unable to create response", 
+                      from_h1->stream_id);
+        return APR_ENOMEM;
+    }
+    
+    if (r->header_only) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+                      "h2_from_h1(%d): header_only, cleanup output brigade", 
+                      from_h1->stream_id);
+        apr_brigade_cleanup(bb);
+        return OK;
+    }
+    
+    r->sent_bodyct = 1;         /* Whatever follows is real body stuff... */
+    
+    ap_remove_output_filter(f);
+    if (APLOGctrace1(f->c)) {
+        apr_off_t len = 0;
+        apr_brigade_length(bb, 0, &len);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+                      "h2_from_h1(%d): removed header filter, passing brigade "
+                      "len=%ld", from_h1->stream_id, (long)len);
+    }
+    return ap_pass_brigade(f->next, bb);
+}
+
+void h2_from_h1_die(h2_from_h1 *from_h1, int status, request_rec *r)
+{
+    r->status = status;
+    from_h1->response = create_response(from_h1, r);
+}

Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.h?rev=1688474&view=auto
 ==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.h (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_from_h1.h Tue Jun 30 15:26:16 2015
@@ -0,0 +1,84 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_from_h1__
+#define __mod_h2__h2_from_h1__
+
+/**
+ * h2_from_h1 parses a HTTP/1.1 response into
+ * - response status
+ * - a list of header values
+ * - a series of bytes that represent the response body alone, without
+ *   any meta data, such as inserted by chunked transfer encoding.
+ *
+ * All data is allocated from the stream memory pool. 
+ *
+ * Again, see comments in h2_request: ideally we would take the headers
+ * and status from the httpd structures instead of parsing them here, but
+ * we need to have all handlers and filters involved in request/response
+ * processing, so this seems to be the way for now.
+ */
+
+typedef enum {
+    H2_RESP_ST_STATUS_LINE, /* parsing http/1 status line */
+    H2_RESP_ST_HEADERS,     /* parsing http/1 response headers */
+    H2_RESP_ST_BODY,        /* transferring response body */
+    H2_RESP_ST_DONE         /* complete response converted */
+} h2_from_h1_state_t;
+
+struct h2_response;
+
+typedef struct h2_from_h1 h2_from_h1;
+
+struct h2_from_h1 {
+    int stream_id;
+    h2_from_h1_state_t state;
+    apr_pool_t *pool;
+    apr_bucket_brigade *bb;
+    
+    apr_size_t content_length;
+    int chunked;
+    
+    const char *status;
+    apr_array_header_t *hlines;
+    
+    struct h2_response *response;
+};
+
+
+typedef void h2_from_h1_state_change_cb(struct h2_from_h1 *resp,
+                                         h2_from_h1_state_t prevstate,
+                                         void *cb_ctx);
+
+h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool);
+
+apr_status_t h2_from_h1_destroy(h2_from_h1 *response);
+
+void h2_from_h1_set_state_change_cb(h2_from_h1 *from_h1,
+                                     h2_from_h1_state_change_cb *callback,
+                                     void *cb_ctx);
+
+apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1,
+                                      ap_filter_t* f, apr_bucket_brigade* bb);
+
+struct h2_response *h2_from_h1_get_response(h2_from_h1 *from_h1);
+
+void h2_from_h1_die(h2_from_h1 *from_h1, int status, request_rec *r);
+
+h2_from_h1_state_t h2_from_h1_get_state(h2_from_h1 *from_h1);
+
+apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb);
+
+#endif /* defined(__mod_h2__h2_from_h1__) */

Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.c?rev=1688474&view=auto
 ==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.c (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.c Tue Jun 30 15:26:16 2015
@@ -0,0 +1,222 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+
+#include <apr_strings.h>
+#include <apr_optional.h>
+#include <apr_optional_hooks.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_config.h>
+#include <http_connection.h>
+#include <http_protocol.h>
+#include <http_log.h>
+
+#include "h2_private.h"
+
+#include "h2_stream.h"
+#include "h2_task.h"
+#include "h2_config.h"
+#include "h2_ctx.h"
+#include "h2_conn.h"
+#include "h2_alpn.h"
+#include "h2_h2.h"
+
+const char *h2_alpn_protos[] = {
+    "h2", "h2-16", "h2-14"
+};
+apr_size_t h2_alpn_protos_len = (sizeof(h2_alpn_protos)
+                                 / sizeof(h2_alpn_protos[0]));
+
+const char *h2_upgrade_protos[] = {
+    "h2c", "h2c-16", "h2c-14",
+};
+apr_size_t h2_upgrade_protos_len = (sizeof(h2_upgrade_protos)
+                                    / sizeof(h2_upgrade_protos[0]));
+
+const char *H2_MAGIC_TOKEN = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
+
+/*******************************************************************************
+ * The optional mod_ssl functions we need. 
+ */
+APR_DECLARE_OPTIONAL_FN(int, ssl_engine_disable, (conn_rec*));
+APR_DECLARE_OPTIONAL_FN(int, ssl_is_https, (conn_rec*));
+
+static int (*opt_ssl_engine_disable)(conn_rec*);
+static int (*opt_ssl_is_https)(conn_rec*);
+/*******************************************************************************
+ * Hooks for processing incoming connections:
+ * - pre_conn_before_tls switches SSL off for stream connections
+ * - process_conn take over connection in case of h2
+ */
+static int h2_h2_process_conn(conn_rec* c);
+static int h2_h2_remove_timeout(conn_rec* c);
+static int h2_h2_post_read_req(request_rec *r);
+
+
+/*******************************************************************************
+ * Once per lifetime init, retrieve optional functions
+ */
+apr_status_t h2_h2_init(apr_pool_t *pool, server_rec *s)
+{
+    (void)pool;
+    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s, "h2_h2, child_init");
+    opt_ssl_engine_disable = APR_RETRIEVE_OPTIONAL_FN(ssl_engine_disable);
+    opt_ssl_is_https = APR_RETRIEVE_OPTIONAL_FN(ssl_is_https);
+    
+    if (!opt_ssl_is_https) {
+        ap_log_error(APLOG_MARK, APLOG_WARNING, 0, s,
+                     "mod_ssl does not seem to be enabled");
+    }
+    
+    return APR_SUCCESS;
+}
+
+int h2_h2_is_tls(conn_rec *c)
+{
+    return opt_ssl_is_https && opt_ssl_is_https(c);
+}
+
+int h2_tls_disable(conn_rec *c)
+{
+    if (opt_ssl_engine_disable) {
+        return opt_ssl_engine_disable(c);
+    }
+    return 0;
+}
+
+/*******************************************************************************
+ * Register various hooks
+ */
+static const char *const mod_reqtimeout[] = { "reqtimeout.c", NULL};
+
+void h2_h2_register_hooks(void)
+{
+    /* When the connection processing actually starts, we might to
+     * take over, if h2* was selected by ALPN on a TLS connection.
+     */
+    ap_hook_process_connection(h2_h2_process_conn, 
+                               NULL, NULL, APR_HOOK_FIRST);
+    /* Perform connection cleanup before the actual processing happens.
+     */
+    ap_hook_process_connection(h2_h2_remove_timeout, 
+                               mod_reqtimeout, NULL, APR_HOOK_LAST);
+    
+    ap_hook_post_read_request(h2_h2_post_read_req, NULL, NULL, APR_HOOK_MIDDLE);
+}
+
+int h2_h2_remove_timeout(conn_rec* c)
+{
+    h2_ctx *ctx = h2_ctx_get(c);
+    
+    if (h2_ctx_is_task(ctx)) {
+        /* cleanup on task connections */
+        /* we once removed the reqtimeout filter on task connections,
+         * but timeouts here might have been a side effect of other things.
+         * Ideally mod_reqtimeout would do its work on task connections
+         * as it basically is a HTTP/1.1 request/response and it's made
+         * for that.
+         * So, let the filter stay for now and see if we ever encounter
+         * unexpected timeouts on tasks again.
+         */
+        //ap_remove_input_filter_byhandle(c->input_filters, "reqtimeout");
+    }
+    else if (h2_ctx_is_active(ctx)) {
+        /* cleanup on master h2 connections */
+        ap_remove_input_filter_byhandle(c->input_filters, "reqtimeout");
+    }
+    
+    return DECLINED;
+}
+
+int h2_h2_process_conn(conn_rec* c)
+{
+    h2_ctx *ctx = h2_ctx_get(c);
+    h2_config *cfg = h2_config_get(c);
+    apr_bucket_brigade* temp;
+
+    if (h2_ctx_is_task(ctx)) {
+        /* out stream pseudo connection */
+        return DECLINED;
+    }
+
+    /* Protocol negoation, if started, may need some speculative reading
+     * to get triggered.
+     */
+    if (h2_ctx_pnego_is_ongoing(ctx)) {
+        temp = apr_brigade_create(c->pool, c->bucket_alloc);
+        ap_get_brigade(c->input_filters, temp,
+                       AP_MODE_SPECULATIVE, APR_BLOCK_READ, 1);
+        apr_brigade_destroy(temp);
+    }
+
+    /* If we still do not know the protocol and H2Direct is enabled, check
+     * if we receive the magic PRIamble. A client sending this on connection
+     * start should know what it is doing.
+     */
+    if (!h2_ctx_pnego_is_done(ctx) && h2_config_geti(cfg, H2_CONF_DIRECT)) {
+        apr_status_t status;
+        temp = apr_brigade_create(c->pool, c->bucket_alloc);
+        status = ap_get_brigade(c->input_filters, temp,
+                                AP_MODE_SPECULATIVE, APR_BLOCK_READ, 24);
+        if (status == APR_SUCCESS) {
+            char *s = NULL;
+            apr_size_t slen;
+            
+            apr_brigade_pflatten(temp, &s, &slen, c->pool);
+            if ((slen == 24) && !memcmp(H2_MAGIC_TOKEN, s, 24)) {
+                h2_ctx_pnego_set_done(ctx, "h2");
+            }
+        }
+        apr_brigade_destroy(temp);
+    }
+
+    /* If "h2" was selected as protocol (by whatever mechanism), take over
+     * the connection.
+     */
+    if (h2_ctx_is_active(ctx)) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
+                      "h2_h2, connection, h2 active");
+        
+        return h2_conn_main(c);
+    }
+    
+    return DECLINED;
+}
+
+static int h2_h2_post_read_req(request_rec *r)
+{
+    h2_ctx *ctx = h2_ctx_rget(r);
+    struct h2_task_env *env = h2_ctx_get_task(ctx);
+    if (env) {
+        /* h2_task connection for a stream, not for h2c */
+        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+                      "adding h1_to_h2_resp output filter");
+        if (env->serialize_headers) {
+            ap_add_output_filter("H1_TO_H2_RESP", env, r, r->connection);
+        }
+        else {
+            /* replace the core http filter that formats response headers
+             * in HTTP/1 with our own that collects status and headers */
+            ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER");
+            ap_add_output_filter("H2_RESPONSE", env, r, r->connection);
+        }
+    }
+    return DECLINED;
+}
+
+

Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.h?rev=1688474&view=auto
 ==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.h (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_h2.h Tue Jun 30 15:26:16 2015
@@ -0,0 +1,59 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_h2__
+#define __mod_h2__h2_h2__
+
+/**
+ * List of ALPN protocol identifiers that we support in ALPN/NPN 
+ * negotiations.
+ */
+extern const char *h2_alpn_protos[];
+extern apr_size_t h2_alpn_protos_len;
+
+/**
+ * List of ALPN protocol identifiers that we suport in HTTP/1 Upgrade:
+ * negotiations.
+ */
+extern const char *h2_upgrade_protos[];
+extern apr_size_t h2_upgrade_protos_len;
+
+/**
+ * The magic PRIamble of RFC 7540 that is always sent when starting
+ * a h2 communication.
+ */
+extern const char *H2_MAGIC_TOKEN;
+
+/*
+ * One time, post config intialization.
+ */
+apr_status_t h2_h2_init(apr_pool_t *pool, server_rec *s);
+
+/* Is the connection a TLS connection?
+ */
+int h2_h2_is_tls(conn_rec *c);
+
+/* Disable SSL for this connection, can only be invoked in a pre-
+ * connection hook before mod_ssl.
+ * @return != 0 iff disable worked
+ */
+int h2_tls_disable(conn_rec *c);
+
+/* Register apache hooks for h2 protocol
+ */
+void h2_h2_register_hooks(void);
+
+
+#endif /* defined(__mod_h2__h2_h2__) */

Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_io.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_io.c?rev=1688474&view=auto
 ==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_io.c (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_io.c Tue Jun 30 15:26:16 2015
@@ -0,0 +1,157 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_log.h>
+#include <http_connection.h>
+
+#include "h2_private.h"
+#include "h2_io.h"
+#include "h2_response.h"
+#include "h2_util.h"
+
+h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc)
+{
+    h2_io *io = apr_pcalloc(pool, sizeof(*io));
+    if (io) {
+        io->id = id;
+        io->bbin = NULL;
+        io->bbout = apr_brigade_create(pool, bucket_alloc);
+        io->response = apr_pcalloc(pool, sizeof(h2_response));
+    }
+    return io;
+}
+
+static void h2_io_cleanup(h2_io *io)
+{
+    if (io->response) {
+        h2_response_cleanup(io->response);
+    }
+}
+
+void h2_io_destroy(h2_io *io)
+{
+    h2_io_cleanup(io);
+}
+
+int h2_io_in_has_eos_for(h2_io *io)
+{
+    return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, 0));
+}
+
+int h2_io_out_has_data(h2_io *io)
+{
+    return h2_util_bb_has_data_or_eos(io->bbout);
+}
+
+apr_size_t h2_io_out_length(h2_io *io)
+{
+    if (io->bbout) {
+        apr_off_t len = 0;
+        apr_brigade_length(io->bbout, 0, &len);
+        return (len > 0)? len : 0;
+    }
+    return 0;
+}
+
+apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, 
+                           apr_size_t maxlen)
+{
+    apr_off_t start_len = 0;
+
+    if (!io->bbin || APR_BRIGADE_EMPTY(io->bbin)) {
+        return io->eos_in? APR_EOF : APR_EAGAIN;
+    }
+    
+    apr_brigade_length(bb, 1, &start_len);
+    apr_bucket *last = APR_BRIGADE_LAST(bb);
+    apr_status_t status = h2_util_move(bb, io->bbin, maxlen, 0, 
+                                       NULL, "h2_io_in_read");
+    if (status == APR_SUCCESS) {
+        apr_bucket *nlast = APR_BRIGADE_LAST(bb);
+        apr_off_t end_len = 0;
+        apr_brigade_length(bb, 1, &end_len);
+        if (last == nlast) {
+            return APR_EAGAIN;
+        }
+        io->input_consumed += (end_len - start_len);
+    }
+    return status;
+}
+
+apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb)
+{
+    if (io->eos_in) {
+        return APR_EOF;
+    }
+    io->eos_in = h2_util_has_eos(bb, 0);
+    if (!APR_BRIGADE_EMPTY(bb)) {
+        if (!io->bbin) {
+            io->bbin = apr_brigade_create(io->bbout->p, 
+                                          io->bbout->bucket_alloc);
+        }
+        return h2_util_move(io->bbin, bb, 0, 0, NULL, "h2_io_in_write");
+    }
+    return APR_SUCCESS;
+}
+
+apr_status_t h2_io_in_close(h2_io *io)
+{
+    if (io->bbin) {
+        APR_BRIGADE_INSERT_TAIL(io->bbin, 
+                                apr_bucket_eos_create(io->bbin->bucket_alloc));
+    }
+    io->eos_in = 1;
+    return APR_SUCCESS;
+}
+
+apr_status_t h2_io_out_read(h2_io *io, char *buffer, 
+                            apr_size_t *plen, int *peos)
+{
+    if (buffer == NULL) {
+        /* just checking length available */
+        return h2_util_bb_avail(io->bbout, plen, peos);
+    }
+    
+    return h2_util_bb_read(io->bbout, buffer, plen, peos);
+}
+
+apr_status_t h2_io_out_readx(h2_io *io,  
+                             h2_io_data_cb *cb, void *ctx, 
+                             apr_size_t *plen, int *peos)
+{
+    if (cb == NULL) {
+        /* just checking length available */
+        return h2_util_bb_avail(io->bbout, plen, peos);
+    }
+    return h2_util_bb_readx(io->bbout, cb, ctx, plen, peos);
+}
+
+apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, 
+                             apr_size_t maxlen)
+{
+    return h2_util_move(io->bbout, bb, maxlen, 0, NULL, "h2_io_out_write");
+}
+
+
+apr_status_t h2_io_out_close(h2_io *io)
+{
+    APR_BRIGADE_INSERT_TAIL(io->bbout, 
+                            apr_bucket_eos_create(io->bbout->bucket_alloc));
+    return APR_SUCCESS;
+}

Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_io.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_io.h?rev=1688474&view=auto
 ==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_io.h (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_io.h Tue Jun 30 15:26:16 2015
@@ -0,0 +1,127 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_io__
+#define __mod_h2__h2_io__
+
+struct h2_response;
+struct apr_thread_cond_t;
+struct h2_task;
+
+
+typedef apr_status_t h2_io_data_cb(void *ctx, 
+                                   const char *data, apr_size_t len);
+
+
+typedef struct h2_io h2_io;
+
+struct h2_io {
+    int id;                      /* stream identifier */
+    apr_bucket_brigade *bbin;    /* input data for stream */
+    int eos_in;
+    int task_done;
+    
+    apr_size_t input_consumed;   /* how many bytes have been read */
+    struct apr_thread_cond_t *input_arrived; /* block on reading */
+    
+    apr_bucket_brigade *bbout;   /* output data from stream */
+    struct apr_thread_cond_t *output_drained; /* block on writing */
+    
+    struct h2_response *response;/* submittable response created */
+};
+
+/*******************************************************************************
+ * Object lifecycle and information.
+ ******************************************************************************/
+
+/**
+ * Creates a new h2_io for the given stream id. 
+ */
+h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc);
+
+/**
+ * Frees any resources hold by the h2_io instance. 
+ */
+void h2_io_destroy(h2_io *io);
+
+/**
+ * The input data is completely queued. Blocked reads will return immediately
+ * and give either data or EOF.
+ */
+int h2_io_in_has_eos_for(h2_io *io);
+/**
+ * Output data is available.
+ */
+int h2_io_out_has_data(h2_io *io);
+
+/*******************************************************************************
+ * Input handling of streams.
+ ******************************************************************************/
+/**
+ * Reads the next bucket from the input. Returns APR_EAGAIN if none
+ * is currently available, APR_EOF if end of input has been reached.
+ */
+apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, 
+                           apr_size_t maxlen);
+
+/**
+ * Appends given bucket to the input.
+ */
+apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb);
+
+/**
+ * Closes the input. After existing data has been read, APR_EOF will
+ * be returned.
+ */
+apr_status_t h2_io_in_close(h2_io *io);
+
+/*******************************************************************************
+ * Output handling of streams.
+ ******************************************************************************/
+
+/**
+ * Read a bucket from the output head. Return APR_EAGAIN if non is available,
+ * APR_EOF if none available and output has been closed. 
+ * May be called with buffer == NULL in order to find out how much data
+ * is available.
+ * @param io the h2_io to read output from
+ * @param buffer the buffer to copy the data to, may be NULL
+ * @param plen the requested max len, set to amount of data on return
+ * @param peos != 0 iff the end of stream has been reached
+ */
+apr_status_t h2_io_out_read(h2_io *io, char *buffer, 
+                            apr_size_t *plen, int *peos);
+
+apr_status_t h2_io_out_readx(h2_io *io,  
+                             h2_io_data_cb *cb, void *ctx, 
+                             apr_size_t *plen, int *peos);
+
+apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, 
+                             apr_size_t maxlen);
+
+/**
+ * Closes the input. After existing data has been read, APR_EOF will
+ * be returned.
+ */
+apr_status_t h2_io_out_close(h2_io *io);
+
+/**
+ * Gives the overall length of the data that is currently queued for
+ * output.
+ */
+apr_size_t h2_io_out_length(h2_io *io);
+
+
+#endif /* defined(__mod_h2__h2_io__) */

Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.c?rev=1688474&view=auto
 ==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.c (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.c Tue Jun 30 15:26:16 2015
@@ -0,0 +1,169 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stddef.h>
+
+#include <apr_strings.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_connection.h>
+#include <http_log.h>
+
+#include "h2_private.h"
+#include "h2_io.h"
+#include "h2_io_set.h"
+
+#define h2_io_IDX(list, i) ((h2_io**)(list)->elts)[i]
+
+struct h2_io_set {
+    apr_array_header_t *list;
+};
+
+h2_io_set *h2_io_set_create(apr_pool_t *pool)
+{
+    h2_io_set *sp = apr_pcalloc(pool, sizeof(h2_io_set));
+    if (sp) {
+        sp->list = apr_array_make(pool, 100, sizeof(h2_io*));
+        if (!sp->list) {
+            return NULL;
+        }
+    }
+    return sp;
+}
+
+void h2_io_set_destroy(h2_io_set *sp)
+{
+    for (int i = 0; i < sp->list->nelts; ++i) {
+        h2_io *io = h2_io_IDX(sp->list, i);
+        h2_io_destroy(io);
+    }
+    sp->list->nelts = 0;
+}
+
+static int h2_stream_id_cmp(const void *s1, const void *s2)
+{
+    h2_io **pio1 = (h2_io **)s1;
+    h2_io **pio2 = (h2_io **)s2;
+    return (*pio1)->id - (*pio2)->id;
+}
+
+h2_io *h2_io_set_get(h2_io_set *sp, int stream_id)
+{
+    /* we keep the array sorted by id, so lookup can be done
+     * by bsearch.
+     */
+    h2_io key = { stream_id, NULL, 0, 0, 0, NULL, NULL, NULL, NULL };
+    h2_io *pkey = &key;
+    h2_io **ps = bsearch(&pkey, sp->list->elts, sp->list->nelts, 
+                         sp->list->elt_size, h2_stream_id_cmp);
+    return ps? *ps : NULL;
+}
+
+h2_io *h2_io_set_get_highest_prio(h2_io_set *set)
+{
+    h2_io *highest = NULL;
+    for (int i = 0; i < set->list->nelts; ++i) {
+        h2_io *io = h2_io_IDX(set->list, i);
+        if (!highest /*|| io-prio even higher */ ) {
+            highest = io;
+        }
+    }
+    return highest;
+}
+
+static void h2_io_set_sort(h2_io_set *sp)
+{
+    qsort(sp->list->elts, sp->list->nelts, sp->list->elt_size, 
+          h2_stream_id_cmp);
+}
+
+apr_status_t h2_io_set_add(h2_io_set *sp, h2_io *io)
+{
+    h2_io *existing = h2_io_set_get(sp, io->id);
+    if (!existing) {
+        APR_ARRAY_PUSH(sp->list, h2_io*) = io;
+        /* Normally, streams get added in ascending order if id. We
+         * keep the array sorted, so we just need to check of the newly
+         * appended stream has a lower id than the last one. if not,
+         * sorting is not necessary.
+         */
+        int last = sp->list->nelts - 1;
+        if (last > 0 
+            && (h2_io_IDX(sp->list, last)->id 
+                < h2_io_IDX(sp->list, last-1)->id)) {
+                h2_io_set_sort(sp);
+            }
+    }
+    return APR_SUCCESS;
+}
+
+h2_io *h2_io_set_remove(h2_io_set *sp, h2_io *io)
+{
+    for (int i = 0; i < sp->list->nelts; ++i) {
+        h2_io *e = h2_io_IDX(sp->list, i);
+        if (e == io) {
+            --sp->list->nelts;
+            int n = sp->list->nelts - i;
+            if (n > 0) {
+                /* Close the hole in the array by moving the upper
+                 * parts down one step.
+                 */
+                h2_io **selts = (h2_io**)sp->list->elts;
+                memmove(selts+i, selts+i+1, n * sizeof(h2_io*));
+            }
+            return e;
+        }
+    }
+    return NULL;
+}
+
+void h2_io_set_destroy_all(h2_io_set *sp)
+{
+    for (int i = 0; i < sp->list->nelts; ++i) {
+        h2_io *io = h2_io_IDX(sp->list, i);
+        h2_io_destroy(io);
+    }
+    sp->list->nelts = 0;
+}
+
+void h2_io_set_remove_all(h2_io_set *sp)
+{
+    sp->list->nelts = 0;
+}
+
+int h2_io_set_is_empty(h2_io_set *sp)
+{
+    AP_DEBUG_ASSERT(sp);
+    return sp->list->nelts == 0;
+}
+
+void h2_io_set_iter(h2_io_set *sp,
+                        h2_io_set_iter_fn *iter, void *ctx)
+{
+    for (int i = 0; i < sp->list->nelts; ++i) {
+        h2_io *s = h2_io_IDX(sp->list, i);
+        if (!iter(ctx, s)) {
+            break;
+        }
+    }
+}
+
+apr_size_t h2_io_set_size(h2_io_set *sp)
+{
+    return sp->list->nelts;
+}
+

Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.h?rev=1688474&view=auto
 ==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.h (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_io_set.h Tue Jun 30 15:26:16 2015
@@ -0,0 +1,47 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_io_set__
+#define __mod_h2__h2_io_set__
+
+struct h2_io;
+
+/**
+ * A set of h2_io instances. Allows lookup by stream id
+ * and other criteria.
+ */
+typedef struct h2_io_set h2_io_set;
+
+h2_io_set *h2_io_set_create(apr_pool_t *pool);
+
+void h2_io_set_destroy(h2_io_set *set);
+
+apr_status_t h2_io_set_add(h2_io_set *set, struct h2_io *io);
+h2_io *h2_io_set_get(h2_io_set *set, int stream_id);
+h2_io *h2_io_set_get_highest_prio(h2_io_set *set);
+h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io);
+
+void h2_io_set_remove_all(h2_io_set *set);
+void h2_io_set_destroy_all(h2_io_set *set);
+int h2_io_set_is_empty(h2_io_set *set);
+apr_size_t h2_io_set_size(h2_io_set *set);
+
+
+typedef int h2_io_set_iter_fn(void *ctx, struct h2_io *io);
+
+void h2_io_set_iter(h2_io_set *set,
+                           h2_io_set_iter_fn *iter, void *ctx);
+
+#endif /* defined(__mod_h2__h2_io_set__) */

Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.c?rev=1688474&view=auto
 ==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.c (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.c Tue Jun 30 15:26:16 2015
@@ -0,0 +1,788 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stddef.h>
+
+#include <apr_atomic.h>
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
+#include <apr_strings.h>
+#include <apr_time.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_log.h>
+
+#include "h2_private.h"
+#include "h2_config.h"
+#include "h2_conn.h"
+#include "h2_io.h"
+#include "h2_io_set.h"
+#include "h2_response.h"
+#include "h2_mplx.h"
+#include "h2_stream.h"
+#include "h2_stream_set.h"
+#include "h2_task.h"
+#include "h2_task_input.h"
+#include "h2_task_output.h"
+#include "h2_task_queue.h"
+#include "h2_workers.h"
+
+
+static int is_aborted(h2_mplx *m, apr_status_t *pstatus) {
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        *pstatus = APR_ECONNABORTED;
+        return 1;
+    }
+    return 0;
+}
+
+static void have_out_data_for(h2_mplx *m, int stream_id);
+
+static void h2_mplx_destroy(h2_mplx *m)
+{
+    AP_DEBUG_ASSERT(m);
+    m->aborted = 1;
+    if (m->q) {
+        h2_tq_destroy(m->q);
+        m->q = NULL;
+    }
+    if (m->ready_ios) {
+        h2_io_set_destroy(m->ready_ios);
+        m->ready_ios = NULL;
+    }
+    if (m->stream_ios) {
+        h2_io_set_destroy(m->stream_ios);
+        m->stream_ios = NULL;
+    }
+    
+    if (m->lock) {
+        apr_thread_mutex_destroy(m->lock);
+        m->lock = NULL;
+    }
+    
+    if (m->pool) {
+        apr_pool_destroy(m->pool);
+    }
+}
+
+/**
+ * A h2_mplx needs to be thread-safe *and* if will be called by
+ * the h2_session thread *and* the h2_worker threads. Therefore:
+ * - calls are protected by a mutex lock, m->lock
+ * - the pool needs its own allocator, since apr_allocator_t are 
+ *   not re-entrant. The separate allocator works without a 
+ *   separate lock since we already protect h2_mplx itself.
+ *   Since HTTP/2 connections can be expected to live longer than
+ *   their HTTP/1 cousins, the separate allocator seems to work better
+ *   than protecting a shared h2_session one with an own lock.
+ */
+h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_workers *workers)
+{
+    apr_status_t status = APR_SUCCESS;
+    h2_config *conf = h2_config_get(c);
+    AP_DEBUG_ASSERT(conf);
+    
+    apr_allocator_t *allocator = NULL;
+    status = apr_allocator_create(&allocator);
+    if (status != APR_SUCCESS) {
+        return NULL;
+    }
+
+    h2_mplx *m = apr_pcalloc(parent, sizeof(h2_mplx));
+    if (m) {
+        m->id = c->id;
+        APR_RING_ELEM_INIT(m, link);
+        apr_atomic_set32(&m->refs, 1);
+        m->c = c;
+        apr_pool_create_ex(&m->pool, parent, NULL, allocator);
+        if (!m->pool) {
+            return NULL;
+        }
+        apr_allocator_owner_set(allocator, m->pool);
+        
+        status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
+                                         m->pool);
+        if (status != APR_SUCCESS) {
+            h2_mplx_destroy(m);
+            return NULL;
+        }
+        
+        m->bucket_alloc = apr_bucket_alloc_create(m->pool);
+        
+        m->q = h2_tq_create(m->id, m->pool);
+        m->stream_ios = h2_io_set_create(m->pool);
+        m->ready_ios = h2_io_set_create(m->pool);
+        m->closed = h2_stream_set_create(m->pool);
+        m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
+        m->workers = workers;
+    }
+    return m;
+}
+
+#define REF_COUNT_ATOMIC    1
+
+static void reference(h2_mplx *m)
+{
+    apr_atomic_inc32(&m->refs);
+}
+
+static void release(h2_mplx *m)
+{
+    if (!apr_atomic_dec32(&m->refs)) {
+        if (m->join_wait) {
+            apr_thread_cond_signal(m->join_wait);
+        }
+    }
+}
+
+void h2_mplx_reference(h2_mplx *m)
+{
+    if (REF_COUNT_ATOMIC) {
+        reference(m);
+    }
+    else {
+        apr_status_t status = apr_thread_mutex_lock(m->lock);
+        if (APR_SUCCESS == status) {
+            reference(m);
+            apr_thread_mutex_unlock(m->lock);
+        }
+    }
+}
+void h2_mplx_release(h2_mplx *m)
+{
+    if (REF_COUNT_ATOMIC) {
+        release(m);
+    }
+    else {
+        apr_status_t status = apr_thread_mutex_lock(m->lock);
+        if (APR_SUCCESS == status) {
+            release(m);
+            apr_thread_mutex_unlock(m->lock);
+        }
+    }
+}
+
+static void workers_register(h2_mplx *m) {
+    /* Initially, there was ref count increase for this as well, but
+     * this is not needed, even harmful.
+     * h2_workers is only a hub for all the h2_worker instances.
+     * At the end-of-life of this h2_mplx, we always unregister at
+     * the workers. The thing to manage are all the h2_worker instances
+     * out there. Those may hold a reference to this h2_mplx and we cannot
+     * call them to unregister.
+     * 
+     * Therefore: ref counting for h2_workers in not needed, ref counting
+     * for h2_worker using this is critical.
+     */
+    h2_workers_register(m->workers, m);
+}
+
+static void workers_unregister(h2_mplx *m) {
+    h2_workers_unregister(m->workers, m);
+}
+
+apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
+{
+    workers_unregister(m);
+
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        int attempts = 0;
+        
+        release(m);
+        while (apr_atomic_read32(&m->refs) > 0) {
+            m->join_wait = wait;
+            ap_log_cerror(APLOG_MARK, (attempts? APLOG_INFO : APLOG_DEBUG), 
+                          0, m->c,
+                          "h2_mplx(%ld): release_join, refs=%d, waiting...", 
+                          m->id, m->refs);
+            apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(10));
+            if (++attempts >= 6) {
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                              "h2_mplx(%ld): join attempts exhausted, refs=%d", 
+                              m->id, m->refs);
+                break;
+            }
+        }
+        if (m->join_wait) {
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+                          "h2_mplx(%ld): release_join -> destroy", m->id);
+        }
+        m->join_wait = NULL;
+        apr_thread_mutex_unlock(m->lock);
+        h2_mplx_destroy(m);
+    }
+    return status;
+}
+
+void h2_mplx_abort(h2_mplx *m)
+{
+    AP_DEBUG_ASSERT(m);
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        m->aborted = 1;
+        h2_io_set_destroy_all(m->stream_ios);
+        apr_thread_mutex_unlock(m->lock);
+    }
+    workers_unregister(m);
+}
+
+
+h2_stream *h2_mplx_open_io(h2_mplx *m, int stream_id)
+{
+    h2_stream *stream = NULL;
+
+    if (m->aborted) {
+        return NULL;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        apr_pool_t *stream_pool = m->spare_pool;
+        
+        if (!stream_pool) {
+            apr_pool_create(&stream_pool, m->pool);
+        }
+        else {
+            m->spare_pool = NULL;
+        }
+        
+        stream = h2_stream_create(stream_id, stream_pool, m);
+        stream->state = H2_STREAM_ST_OPEN;
+        
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (!io) {
+            io = h2_io_create(stream_id, stream_pool, m->bucket_alloc);
+            h2_io_set_add(m->stream_ios, io);
+        }
+        status = io? APR_SUCCESS : APR_ENOMEM;
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return stream;
+}
+
+static void stream_destroy(h2_mplx *m, h2_stream *stream, h2_io *io)
+{
+    apr_pool_t *pool = h2_stream_detach_pool(stream);
+    if (pool) {
+        apr_pool_clear(pool);
+        if (m->spare_pool) {
+            apr_pool_destroy(m->spare_pool);
+        }
+        m->spare_pool = pool;
+    }
+    h2_stream_destroy(stream);
+    if (io) {
+        h2_io_set_remove(m->stream_ios, io);
+        h2_io_destroy(io);
+    }
+}
+
+apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, h2_stream *stream)
+{
+    AP_DEBUG_ASSERT(m);
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        h2_io *io = h2_io_set_get(m->stream_ios, stream->id);
+        if (!io || io->task_done) {
+            /* No more io or task already done -> cleanup immediately */
+            stream_destroy(m, stream, io);
+        }
+        else {
+            /* Add stream to closed set for cleanup when task is done */
+            h2_stream_set_add(m->closed, stream);
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return status;
+}
+
+void h2_mplx_task_done(h2_mplx *m, int stream_id)
+{
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        h2_stream *stream = h2_stream_set_get(m->closed, stream_id);
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                      "h2_mplx(%ld): task(%d) done", m->id, stream_id);
+        if (stream) {
+            /* stream was already closed by main connection and is in 
+             * zombie state. Now that the task is done with it, we
+             * can free its resources. */
+            h2_stream_set_remove(m->closed, stream);
+            stream_destroy(m, stream, io);
+        }
+        else if (io) {
+            /* main connection has not finished stream. Mark task as done
+             * so that eventual cleanup can start immediately. */
+            io->task_done = 1;
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+}
+
+apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
+                             int stream_id, apr_bucket_brigade *bb,
+                             struct apr_thread_cond_t *iowait)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io) {
+            io->input_arrived = iowait;
+            status = h2_io_in_read(io, bb, 0);
+            while (status == APR_EAGAIN 
+                   && !is_aborted(m, &status)
+                   && block == APR_BLOCK_READ) {
+                apr_thread_cond_wait(io->input_arrived, m->lock);
+                status = h2_io_in_read(io, bb, 0);
+            }
+            io->input_arrived = NULL;
+        }
+        else {
+            status = APR_EOF;
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return status;
+}
+
+apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, 
+                              apr_bucket_brigade *bb)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io) {
+            status = h2_io_in_write(io, bb);
+            if (io->input_arrived) {
+                apr_thread_cond_signal(io->input_arrived);
+            }
+        }
+        else {
+            status = APR_EOF;
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return status;
+}
+
+apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io) {
+            status = h2_io_in_close(io);
+            if (io->input_arrived) {
+                apr_thread_cond_signal(io->input_arrived);
+            }
+        }
+        else {
+            status = APR_ECONNABORTED;
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return status;
+}
+
+typedef struct {
+    h2_mplx_consumed_cb *cb;
+    void *cb_ctx;
+    int streams_updated;
+} update_ctx;
+
+static int update_window(void *ctx, h2_io *io)
+{
+    if (io->input_consumed) {
+        update_ctx *uctx = (update_ctx*)ctx;
+        uctx->cb(uctx->cb_ctx, io->id, io->input_consumed);
+        io->input_consumed = 0;
+        ++uctx->streams_updated;
+    }
+    return 1;
+}
+
+apr_status_t h2_mplx_in_update_windows(h2_mplx *m, 
+                                       h2_mplx_consumed_cb *cb, void *cb_ctx)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        update_ctx ctx = { cb, cb_ctx, 0 };
+        status = APR_EAGAIN;
+        h2_io_set_iter(m->stream_ios, update_window, &ctx);
+        
+        if (ctx.streams_updated) {
+            status = APR_SUCCESS;
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return status;
+}
+
+apr_status_t h2_mplx_out_read(h2_mplx *m, int stream_id, 
+                              char *buffer, apr_size_t *plen, int *peos)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io) {
+            status = h2_io_out_read(io, buffer, plen, peos);
+            if (status == APR_SUCCESS && io->output_drained) {
+                apr_thread_cond_signal(io->output_drained);
+            }
+        }
+        else {
+            status = APR_ECONNABORTED;
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return status;
+}
+
+apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id, 
+                               h2_io_data_cb *cb, void *ctx, 
+                               apr_size_t *plen, int *peos)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io) {
+            status = h2_io_out_readx(io, cb, ctx, plen, peos);
+            if (status == APR_SUCCESS && io->output_drained) {
+                apr_thread_cond_signal(io->output_drained);
+            }
+        }
+        else {
+            status = APR_ECONNABORTED;
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return status;
+}
+
+h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return NULL;
+    }
+    h2_stream *stream = NULL;
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        h2_io *io = h2_io_set_get_highest_prio(m->ready_ios);
+        if (io) {
+            h2_response *response = io->response;
+            h2_io_set_remove(m->ready_ios, io);
+            
+            stream = h2_stream_set_get(streams, response->stream_id);
+            if (stream) {
+                h2_stream_set_response(stream, response, io->bbout);
+                if (io->output_drained) {
+                    apr_thread_cond_signal(io->output_drained);
+                }
+            }
+            else {
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, APR_NOTFOUND, m->c,
+                              "h2_mplx(%ld): stream for response %d",
+                              m->id, response->stream_id);
+            }
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return stream;
+}
+
+static apr_status_t out_write(h2_mplx *m, h2_io *io, 
+                              ap_filter_t* f, apr_bucket_brigade *bb,
+                              struct apr_thread_cond_t *iowait)
+{
+    apr_status_t status = APR_SUCCESS;
+    /* We check the memory footprint queued for this stream_id
+     * and block if it exceeds our configured limit.
+     * We will not split buckets to enforce the limit to the last
+     * byte. After all, the bucket is already in memory.
+     */
+    while (!APR_BRIGADE_EMPTY(bb) 
+           && (status == APR_SUCCESS)
+           && !is_aborted(m, &status)) {
+        
+        status = h2_io_out_write(io, bb, m->stream_max_mem);
+        
+        /* Wait for data to drain until there is room again */
+        while (!APR_BRIGADE_EMPTY(bb) 
+               && iowait
+               && status == APR_SUCCESS
+               && (m->stream_max_mem <= h2_io_out_length(io))
+               && !is_aborted(m, &status)) {
+            io->output_drained = iowait;
+            if (f) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
+                              "h2_mplx(%ld-%d): waiting for out drain", 
+                              m->id, io->id);
+            }
+            apr_thread_cond_wait(io->output_drained, m->lock);
+            io->output_drained = NULL;
+        }
+    }
+    apr_brigade_cleanup(bb);
+    return status;
+}
+
+static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
+                             ap_filter_t* f, apr_bucket_brigade *bb,
+                             struct apr_thread_cond_t *iowait)
+{
+    apr_status_t status = APR_SUCCESS;
+    
+    h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+    if (io) {
+        if (f) {
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c,
+                          "h2_mplx(%ld-%d): open response: %s",
+                          m->id, stream_id, response->headers->status);
+        }
+        
+        h2_response_copy(io->response, response);
+        h2_io_set_add(m->ready_ios, io);
+        if (bb) {
+            status = out_write(m, io, f, bb, iowait);
+        }
+        have_out_data_for(m, stream_id);
+    }
+    else {
+        status = APR_ECONNABORTED;
+    }
+    return status;
+}
+
+apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
+                              ap_filter_t* f, apr_bucket_brigade *bb,
+                              struct apr_thread_cond_t *iowait)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        status = out_open(m, stream_id, response, f, bb, iowait);
+        if (m->aborted) {
+            return APR_ECONNABORTED;
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return status;
+}
+
+
+apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, 
+                               ap_filter_t* f, apr_bucket_brigade *bb,
+                               struct apr_thread_cond_t *iowait)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        if (!m->aborted) {
+            h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+            if (io) {
+                status = out_write(m, io, f, bb, iowait);
+                have_out_data_for(m, stream_id);
+                if (m->aborted) {
+                    return APR_ECONNABORTED;
+                }
+            }
+            else {
+                status = APR_ECONNABORTED;
+            }
+        }
+        
+        if (m->lock) {
+            apr_thread_mutex_unlock(m->lock);
+        }
+    }
+    return status;
+}
+
+apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        if (!m->aborted) {
+            h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+            if (io) {
+                if (!io->response->headers) {
+                    /* In case a close comes before a response was created,
+                     * insert an error one so that our streams can properly
+                     * reset.
+                     */
+                    h2_response *r = h2_response_create(stream_id, 
+                                                        "500", NULL, m->pool);
+                    status = out_open(m, stream_id, r, NULL, NULL, NULL);
+                }
+                status = h2_io_out_close(io);
+                have_out_data_for(m, stream_id);
+                if (m->aborted) {
+                    /* if we were the last output, the whole session might
+                     * have gone down in the meantime.
+                     */
+                    return APR_SUCCESS;
+                }
+            }
+            else {
+                status = APR_ECONNABORTED;
+            }
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return status;
+}
+
+int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return 0;
+    }
+    int has_eos = 0;
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io) {
+            has_eos = h2_io_in_has_eos_for(io);
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return has_eos;
+}
+
+int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return 0;
+    }
+    int has_data = 0;
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io) {
+            has_data = h2_io_out_has_data(io);
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return has_data;
+}
+
+apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
+                                 apr_thread_cond_t *iowait)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        m->added_output = iowait;
+        status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                      "h2_mplx(%ld): trywait on data for %f ms)",
+                      m->id, timeout/1000.0);
+        m->added_output = NULL;
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return status;
+}
+
+static void have_out_data_for(h2_mplx *m, int stream_id)
+{
+    (void)stream_id;
+    AP_DEBUG_ASSERT(m);
+    if (m->added_output) {
+        apr_thread_cond_signal(m->added_output);
+    }
+}
+
+apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task)
+{
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        /* TODO: needs to sort queue by priority */
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                      "h2_mplx: do task(%s)", task->id);
+        h2_tq_append(m->q, task);
+        apr_thread_mutex_unlock(m->lock);
+    }
+    workers_register(m);
+    return status;
+}
+
+h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
+{
+    h2_task *task = NULL;
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        *has_more = 0;
+        return NULL;
+    }
+    apr_status_t status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        task = h2_tq_pop_first(m->q);
+        if (task) {
+            h2_task_set_started(task);
+        }
+        *has_more = !h2_tq_empty(m->q);
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return task;
+}
+

Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.h?rev=1688474&view=auto
 ==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.h (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_mplx.h Tue Jun 30 15:26:16 2015
@@ -0,0 +1,322 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_mplx__
+#define __mod_h2__h2_mplx__
+
+/**
+ * The stream multiplexer. It pushes buckets from the connection
+ * thread to the stream task threads and vice versa. It's thread-safe
+ * to use.
+ *
+ * There is one h2_mplx instance for each h2_session, which sits on top
+ * of a particular httpd conn_rec. Input goes from the connection to
+ * the stream tasks. Output goes from the stream tasks to the connection,
+ * e.g. the client.
+ *
+ * For each stream, there can be at most "H2StreamMaxMemSize" output bytes
+ * queued in the multiplexer. If a task thread tries to write more
+ * data, it is blocked until space becomes available.
+ *
+ * Writing input is never blocked. In order to use flow control on the input,
+ * the mplx can be polled for input data consumption.
+ */
+
+struct apr_pool_t;
+struct apr_thread_mutex_t;
+struct apr_thread_cond_t;
+struct h2_config;
+struct h2_response;
+struct h2_task;
+struct h2_stream;
+struct h2_io_set;
+struct apr_thread_cond_t;
+struct h2_workers;
+struct h2_stream_set;
+struct h2_task_queue;
+
+#include "h2_io.h"
+
+typedef struct h2_mplx h2_mplx;
+
+struct h2_mplx {
+    long id;
+    APR_RING_ENTRY(h2_mplx) link;
+    volatile apr_uint32_t refs;
+    conn_rec *c;
+    apr_pool_t *pool;
+    apr_bucket_alloc_t *bucket_alloc;
+
+    struct h2_task_queue *q;
+    struct h2_io_set *stream_ios;
+    struct h2_io_set *ready_ios;
+    
+    apr_thread_mutex_t *lock;
+    struct apr_thread_cond_t *added_output;
+    struct apr_thread_cond_t *join_wait;
+    
+    int aborted;
+    apr_size_t stream_max_mem;
+    
+    apr_pool_t *spare_pool;           /* spare pool, ready for next stream */
+    struct h2_stream_set *closed;     /* streams closed, but task ongoing */
+    struct h2_workers *workers;
+};
+
+/*******************************************************************************
+ * Object lifecycle and information.
+ ******************************************************************************/
+
+/**
+ * Create the multiplexer for the given HTTP2 session. 
+ * Implicitly has reference count 1.
+ */
+h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *master, 
+                        struct h2_workers *workers);
+
+/**
+ * Increase the reference counter of this mplx.
+ */
+void h2_mplx_reference(h2_mplx *m);
+
+/**
+ * Decreases the reference counter of this mplx.
+ */
+void h2_mplx_release(h2_mplx *m);
+/**
+ * Decreases the reference counter of this mplx and waits for it
+ * to reached 0, destroy the mplx afterwards.
+ * This is to be called from the thread that created the mplx in
+ * the first place.
+ * @param m the mplx to be released and destroyed
+ * @param wait condition var to wait on for ref counter == 0
+ */ 
+apr_status_t h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait);
+
+/**
+ * Aborts the multiplexer. It will answer all future invocation with
+ * APR_ECONNABORTED, leading to early termination of ongoing tasks.
+ */
+void h2_mplx_abort(h2_mplx *mplx);
+
+void h2_mplx_task_done(h2_mplx *m, int stream_id);
+
+/*******************************************************************************
+ * IO lifetime of streams.
+ ******************************************************************************/
+/**
+ * Prepares the multiplexer to handle in-/output on the given stream id.
+ */
+struct h2_stream *h2_mplx_open_io(h2_mplx *mplx, int stream_id);
+
+/**
+ * Ends cleanup of a stream in sync with execution thread.
+ */
+apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, struct h2_stream *stream);
+
+/* Return != 0 iff the multiplexer has data for the given stream. 
+ */
+int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
+
+/**
+ * Waits on output data from any stream in this session to become available. 
+ * Returns APR_TIMEUP if no data arrived in the given time.
+ */
+apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
+                                 struct apr_thread_cond_t *iowait);
+
+/*******************************************************************************
+ * Stream processing.
+ ******************************************************************************/
+
+/**
+ * Perform the task on the given stream.
+ */
+apr_status_t h2_mplx_do_task(h2_mplx *mplx, struct h2_task *task);
+
+struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more);
+
+/*******************************************************************************
+ * Input handling of streams.
+ ******************************************************************************/
+
+/**
+ * Reads a buckets for the given stream_id. Will return ARP_EAGAIN when
+ * called with APR_NONBLOCK_READ and no data present. Will return APR_EOF
+ * when the end of the stream input has been reached.
+ * The condition passed in will be used for blocking/signalling and will
+ * be protected by the mplx's own mutex.
+ */
+apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
+                             int stream_id, apr_bucket_brigade *bb,
+                             struct apr_thread_cond_t *iowait);
+
+/**
+ * Appends data to the input of the given stream. Storage of input data is
+ * not subject to flow control.
+ */
+apr_status_t h2_mplx_in_write(h2_mplx *mplx, int stream_id, 
+                              apr_bucket_brigade *bb);
+
+/**
+ * Closes the input for the given stream_id.
+ */
+apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
+
+/**
+ * Returns != 0 iff the input for the given stream has been closed. There
+ * could still be data queued, but it can be read without blocking.
+ */
+int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id);
+
+/**
+ * Callback invoked for every stream that had input data read since
+ * the last invocation.
+ */
+typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_size_t consumed);
+
+/**
+ * Invoke the callback for all streams that had bytes read since the last
+ * call to this function. If no stream had input data consumed, the callback
+ * is not invoked.
+ * Returns APR_SUCCESS when an update happened, APR_EAGAIN if no update
+ * happened.
+ */
+apr_status_t h2_mplx_in_update_windows(h2_mplx *m, 
+                                       h2_mplx_consumed_cb *cb, void *ctx);
+
+/*******************************************************************************
+ * Output handling of streams.
+ ******************************************************************************/
+
+/**
+ * Get a stream whose response is ready for submit. Will set response and
+ * any out data available in stream. 
+ * @param m the mplxer to get a response from
+ * @param bb the brigade to place any existing repsonse body data into
+ */
+struct h2_stream *h2_mplx_next_submit(h2_mplx *m, 
+                                      struct h2_stream_set *streams);
+
+/**
+ * Reads output data from the given stream. Will never block, but
+ * return APR_EAGAIN until data arrives or the stream is closed.
+ */
+apr_status_t h2_mplx_out_read(h2_mplx *mplx, int stream_id, 
+                              char *buffer, apr_size_t *plen, int *peos);
+
+apr_status_t h2_mplx_out_readx(h2_mplx *mplx, int stream_id, 
+                               h2_io_data_cb *cb, void *ctx, 
+                               apr_size_t *plen, int *peos);
+
+/**
+ * Opens the output for the given stream with the specified response.
+ */
+apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,
+                              struct h2_response *response,
+                              ap_filter_t* filter, apr_bucket_brigade *bb,
+                              struct apr_thread_cond_t *iowait);
+
+/**
+ * Append the brigade to the stream output. Might block if amount
+ * of bytes buffered reaches configured max.
+ * @param stream_id the stream identifier
+ * @param filter the apache filter context of the data
+ * @param bb the bucket brigade to append
+ * @param iowait a conditional used for block/signalling in h2_mplx
+ */
+apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id, 
+                               ap_filter_t* filter, apr_bucket_brigade *bb,
+                               struct apr_thread_cond_t *iowait);
+
+/**
+ * Closes the output stream. Readers of this stream will get all pending 
+ * data and then only APR_EOF as result. 
+ */
+apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id);
+
+/*******************************************************************************
+ * h2_mplx list Manipulation.
+ ******************************************************************************/
+
+/**
+ * The magic pointer value that indicates the head of a h2_mplx list
+ * @param  b The mplx list
+ * @return The magic pointer value
+ */
+#define H2_MPLX_LIST_SENTINEL(b)	APR_RING_SENTINEL((b), h2_mplx, link)
+
+/**
+ * Determine if the mplx list is empty
+ * @param b The list to check
+ * @return true or false
+ */
+#define H2_MPLX_LIST_EMPTY(b)	APR_RING_EMPTY((b), h2_mplx, link)
+
+/**
+ * Return the first mplx in a list
+ * @param b The list to query
+ * @return The first mplx in the list
+ */
+#define H2_MPLX_LIST_FIRST(b)	APR_RING_FIRST(b)
+
+/**
+ * Return the last mplx in a list
+ * @param b The list to query
+ * @return The last mplx int he list
+ */
+#define H2_MPLX_LIST_LAST(b)	APR_RING_LAST(b)
+
+/**
+ * Insert a single mplx at the front of a list
+ * @param b The list to add to
+ * @param e The mplx to insert
+ */
+#define H2_MPLX_LIST_INSERT_HEAD(b, e) do {				\
+h2_mplx *ap__b = (e);                                        \
+APR_RING_INSERT_HEAD((b), ap__b, h2_mplx, link);	\
+} while (0)
+
+/**
+ * Insert a single mplx at the end of a list
+ * @param b The list to add to
+ * @param e The mplx to insert
+ */
+#define H2_MPLX_LIST_INSERT_TAIL(b, e) do {				\
+h2_mplx *ap__b = (e);					\
+APR_RING_INSERT_TAIL((b), ap__b, h2_mplx, link);	\
+} while (0)
+
+/**
+ * Get the next mplx in the list
+ * @param e The current mplx
+ * @return The next mplx
+ */
+#define H2_MPLX_NEXT(e)	APR_RING_NEXT((e), link)
+/**
+ * Get the previous mplx in the list
+ * @param e The current mplx
+ * @return The previous mplx
+ */
+#define H2_MPLX_PREV(e)	APR_RING_PREV((e), link)
+
+/**
+ * Remove a mplx from its list
+ * @param e The mplx to remove
+ */
+#define H2_MPLX_REMOVE(e)	APR_RING_REMOVE((e), link)
+
+
+#endif /* defined(__mod_h2__h2_mplx__) */

Added: httpd/httpd/trunk/modules/http2/mod_h2/h2_private.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_h2/h2_private.h?rev=1688474&view=auto
 ==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_h2/h2_private.h (added)
+++ httpd/httpd/trunk/modules/http2/mod_h2/h2_private.h Tue Jun 30 15:26:16 2015
@@ -0,0 +1,36 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef mod_h2_h2_private_h
+#define mod_h2_h2_private_h
+
+#include <nghttp2/nghttp2.h>
+
+extern module AP_MODULE_DECLARE_DATA h2_module;
+
+APLOG_USE_MODULE(h2);
+
+
+#define H2_HEADER_METHOD     ":method"
+#define H2_HEADER_METHOD_LEN 7
+#define H2_HEADER_SCHEME     ":scheme"
+#define H2_HEADER_SCHEME_LEN 7
+#define H2_HEADER_AUTH       ":authority"
+#define H2_HEADER_AUTH_LEN   10
+#define H2_HEADER_PATH       ":path"
+#define H2_HEADER_PATH_LEN   5
+#define H2_CRLF             "\r\n"
+
+#endif


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic