[prev in list] [next in list] [prev in thread] [next in thread] 

List:       httpcomponents-commits
Subject:    [httpcomponents-client] 02/02: HTTPCLIENT-2152: Fixed handling of unexpected unchecked exception by 
From:       olegk () apache ! org
Date:       2021-04-27 11:08:43
Message-ID: 20210427110842.5B9DB81640 () gitbox ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git

commit 09f50cd80c9cabc9f7532b6063fe67695f5484fa
Author: Oleg Kalnichevski <olegk@apache.org>
AuthorDate: Sun Apr 25 15:08:52 2021 +0200

    HTTPCLIENT-2152: Fixed handling of unexpected unchecked exception by the async \
                request retry exec interceptor
---
 .../async/ServiceUnavailableAsyncDecorator.java    | 138 +++++++++++++++
 .../testing/async/TestHttp1RequestReExecution.java | 187 +++++++++++++++++++++
 .../http/impl/async/AsyncHttpRequestRetryExec.java |  28 ++-
 3 files changed, 337 insertions(+), 16 deletions(-)

diff --git a/httpclient5-testing/src/main/java/org/apache/hc/client5/testing/async/ServiceUnavailableAsyncDecorator.java \
b/httpclient5-testing/src/main/java/org/apache/hc/client5/testing/async/ServiceUnavailableAsyncDecorator.java
 new file mode 100644
index 0000000..16d883e
--- /dev/null
+++ b/httpclient5-testing/src/main/java/org/apache/hc/client5/testing/async/ServiceUnavailableAsyncDecorator.java
 @@ -0,0 +1,138 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.client5.testing.async;
+
+import org.apache.hc.core5.function.Resolver;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ServiceUnavailableAsyncDecorator implements AsyncServerExchangeHandler \
{ +
+    private final AsyncServerExchangeHandler exchangeHandler;
+    private final Resolver<HttpRequest, TimeValue> serviceAvailabilityResolver;
+    private final AtomicBoolean serviceUnavailable;
+
+    public ServiceUnavailableAsyncDecorator(final AsyncServerExchangeHandler \
exchangeHandler, +                                            final \
Resolver<HttpRequest, TimeValue> serviceAvailabilityResolver) { +        \
this.exchangeHandler = Args.notNull(exchangeHandler, "Exchange handler"); +        \
this.serviceAvailabilityResolver = Args.notNull(serviceAvailabilityResolver, "Service \
availability resolver"); +        this.serviceUnavailable = new AtomicBoolean();
+    }
+
+    @Override
+    public void handleRequest(final HttpRequest request,
+                              final EntityDetails entityDetails,
+                              final ResponseChannel responseChannel,
+                              final HttpContext context) throws HttpException, \
IOException { +        final TimeValue retryAfter = \
serviceAvailabilityResolver.resolve(request); +        \
serviceUnavailable.set(TimeValue.isPositive(retryAfter)); +        if \
(serviceUnavailable.get()) { +            final HttpResponse response = new \
BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE); +            \
response.addHeader(HttpHeaders.RETRY_AFTER, Long.toString(retryAfter.toSeconds())); + \
final ProtocolVersion version = request.getVersion(); +            if (version != \
null && version.compareToVersion(HttpVersion.HTTP_2) < 0) { +                \
response.addHeader(HttpHeaders.CONNECTION, "Close"); +            }
+            responseChannel.sendResponse(response, null, context);
+        } else {
+            exchangeHandler.handleRequest(request, entityDetails, responseChannel, \
context); +        }
+    }
+
+    @Override
+    public final void updateCapacity(final CapacityChannel capacityChannel) throws \
IOException { +        if (!serviceUnavailable.get()) {
+            exchangeHandler.updateCapacity(capacityChannel);
+        } else {
+            capacityChannel.update(Integer.MAX_VALUE);
+        }
+    }
+
+    @Override
+    public final void consume(final ByteBuffer src) throws IOException {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.consume(src);
+        }
+    }
+
+    @Override
+    public final void streamEnd(final List<? extends Header> trailers) throws \
HttpException, IOException { +        if (!serviceUnavailable.get()) {
+            exchangeHandler.streamEnd(trailers);
+        }
+    }
+
+    @Override
+    public int available() {
+        if (!serviceUnavailable.get()) {
+            return exchangeHandler.available();
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    public void produce(final DataStreamChannel channel) throws IOException {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.produce(channel);
+        }
+    }
+
+    @Override
+    public void failed(final Exception cause) {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.failed(cause);
+        }
+    }
+
+    @Override
+    public void releaseResources() {
+        exchangeHandler.releaseResources();
+    }
+
+}
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1RequestReExecution.java \
b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1RequestReExecution.java
 new file mode 100644
index 0000000..88aeb84
--- /dev/null
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1RequestReExecution.java
 @@ -0,0 +1,187 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.testing.async;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
 +import org.apache.hc.core5.function.Decorator;
+import org.apache.hc.core5.function.Resolver;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.config.Http1Config;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.util.TimeValue;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(Parameterized.class)
+public class TestHttp1RequestReExecution extends \
AbstractIntegrationTestBase<CloseableHttpAsyncClient> { +
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> protocolVersions() {
+        return Arrays.asList(new Object[][]{
+                { HttpVersion.HTTP_1_1 },
+                { HttpVersion.HTTP_2 }
+        });
+    }
+
+    private final HttpVersion version;
+
+    public TestHttp1RequestReExecution(final HttpVersion version) {
+        super(URIScheme.HTTP);
+        this.version = version;
+    }
+
+    HttpAsyncClientBuilder clientBuilder;
+    PoolingAsyncClientConnectionManager connManager;
+
+    @Rule
+    public ExternalResource connManagerResource = new ExternalResource() {
+
+        @Override
+        protected void before() throws Throwable {
+            connManager = PoolingAsyncClientConnectionManagerBuilder.create()
+                    .build();
+        }
+
+        @Override
+        protected void after() {
+            if (connManager != null) {
+                connManager.close();
+                connManager = null;
+            }
+        }
+
+    };
+
+    @Rule
+    public ExternalResource clientBuilderResource = new ExternalResource() {
+
+        @Override
+        protected void before() throws Throwable {
+            clientBuilder = HttpAsyncClientBuilder.create()
+                    .setDefaultRequestConfig(RequestConfig.custom()
+                            .setConnectionRequestTimeout(TIMEOUT)
+                            .setConnectTimeout(TIMEOUT)
+                            .build())
+                    .setConnectionManager(connManager)
+                    .setVersionPolicy(version.greaterEquals(HttpVersion.HTTP_2) ? \
HttpVersionPolicy.FORCE_HTTP_2 : HttpVersionPolicy.FORCE_HTTP_1); +        }
+
+    };
+
+    @Override
+    public final HttpHost start() throws Exception {
+
+        final Resolver<HttpRequest, TimeValue> serviceAvailabilityResolver = new \
Resolver<HttpRequest, TimeValue>() { +
+            private final AtomicInteger count = new AtomicInteger(0);
+
+            @Override
+            public TimeValue resolve(final HttpRequest request) {
+                final int n = count.incrementAndGet();
+                return n <= 3 ? TimeValue.ofSeconds(1) : null;
+            }
+
+        };
+
+        if (version.greaterEquals(HttpVersion.HTTP_2)) {
+            return super.start(null, new Decorator<AsyncServerExchangeHandler>() {
+
+                @Override
+                public AsyncServerExchangeHandler decorate(final \
AsyncServerExchangeHandler handler) { +                    return new \
ServiceUnavailableAsyncDecorator(handler, serviceAvailabilityResolver); +             \
} +
+            }, H2Config.DEFAULT);
+        } else {
+            return super.start(null, new Decorator<AsyncServerExchangeHandler>() {
+
+                @Override
+                public AsyncServerExchangeHandler decorate(final \
AsyncServerExchangeHandler handler) { +                    return new \
ServiceUnavailableAsyncDecorator(handler, serviceAvailabilityResolver); +             \
} +
+            }, Http1Config.DEFAULT);
+        }
+    }
+
+    @Override
+    protected CloseableHttpAsyncClient createClient() throws Exception {
+        return clientBuilder.build();
+    }
+
+    @Test
+    public void testGiveUpAfterOneRetry() throws Exception {
+        clientBuilder.setRetryStrategy(new DefaultHttpRequestRetryStrategy(1, \
TimeValue.ofSeconds(1))); +        final HttpHost target = start();
+        final Future<SimpleHttpResponse> future = httpclient.execute(
+                SimpleRequestBuilder.get()
+                        .setHttpHost(target)
+                        .setPath("/random/2048")
+                        .build(), null);
+        final SimpleHttpResponse response = future.get();
+        Assert.assertThat(response, CoreMatchers.notNullValue());
+        Assert.assertThat(response.getCode(), \
CoreMatchers.equalTo(HttpStatus.SC_SERVICE_UNAVAILABLE)); +    }
+
+    @Test
+    public void testDoNotGiveUpEasily() throws Exception {
+        clientBuilder.setRetryStrategy(new DefaultHttpRequestRetryStrategy(5, \
TimeValue.ofSeconds(1))); +        final HttpHost target = start();
+        final Future<SimpleHttpResponse> future = httpclient.execute(
+                SimpleRequestBuilder.get()
+                        .setHttpHost(target)
+                        .setPath("/random/2048")
+                        .build(), null);
+        final SimpleHttpResponse response = future.get();
+        Assert.assertThat(response, CoreMatchers.notNullValue());
+        Assert.assertThat(response.getCode(), \
CoreMatchers.equalTo(HttpStatus.SC_OK)); +    }
+
+}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java \
b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java
 index 8111828..186a331 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java
                
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java
 @@ -46,6 +46,7 @@ import org.apache.hc.core5.http.nio.AsyncEntityProducer;
 import org.apache.hc.core5.http.nio.entity.NoopEntityConsumer;
 import org.apache.hc.core5.http.support.BasicRequestBuilder;
 import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,8 +79,8 @@ public final class AsyncHttpRequestRetryExec implements \
AsyncExecChainHandler {  
     private static class State {
 
-        volatile int execCount;
         volatile boolean retrying;
+        volatile TimeValue delay;
 
     }
 
@@ -106,8 +107,12 @@ public final class AsyncHttpRequestRetryExec implements \
AsyncExecChainHandler {  }
                     return asyncExecCallback.handleResponse(response, \
entityDetails);  }
-                state.retrying = retryStrategy.retryRequest(response, \
state.execCount, clientContext); +                state.retrying = \
retryStrategy.retryRequest(response, scope.execCount.get(), clientContext);  if \
(state.retrying) { +                    state.delay = \
retryStrategy.getRetryInterval(response, scope.execCount.get(), clientContext); +     \
if (LOG.isDebugEnabled()) { +                        LOG.debug("{} retrying request \
in {}", exchangeId, state.delay); +                    }
                     return new NoopEntityConsumer();
                 } else {
                     return asyncExecCallback.handleResponse(response, \
entityDetails); @@ -122,12 +127,8 @@ public final class AsyncHttpRequestRetryExec \
implements AsyncExecChainHandler {  @Override
             public void completed() {
                 if (state.retrying) {
-                    state.execCount++;
-                    try {
-                        internalExecute(state, request, entityProducer, scope, \
                chain, asyncExecCallback);
-                    } catch (final IOException | HttpException ex) {
-                        asyncExecCallback.failed(ex);
-                    }
+                    scope.execCount.incrementAndGet();
+                    scope.scheduler.scheduleExecution(request, entityProducer, \
scope, asyncExecCallback, state.delay);  } else {
                     asyncExecCallback.completed();
                 }
@@ -142,7 +143,7 @@ public final class AsyncHttpRequestRetryExec implements \
AsyncExecChainHandler {  if (LOG.isDebugEnabled()) {
                             LOG.debug("{} cannot retry non-repeatable request", \
exchangeId);  }
-                    } else if (retryStrategy.retryRequest(request, (IOException) \
cause, state.execCount, clientContext)) { +                    } else if \
(retryStrategy.retryRequest(request, (IOException) cause, scope.execCount.get(), \
clientContext)) {  if (LOG.isDebugEnabled()) {
                             LOG.debug("{} {}", exchangeId, cause.getMessage(), \
cause);  }
@@ -155,12 +156,8 @@ public final class AsyncHttpRequestRetryExec implements \
AsyncExecChainHandler {  entityProducer.releaseResources();
                         }
                         state.retrying = true;
-                        state.execCount++;
-                        try {
-                            internalExecute(state, request, entityProducer, scope, \
                chain, asyncExecCallback);
-                        } catch (final IOException | HttpException ex) {
-                            asyncExecCallback.failed(ex);
-                        }
+                        scope.execCount.incrementAndGet();
+                        scope.scheduler.scheduleExecution(request, entityProducer, \
scope, asyncExecCallback, state.delay);  return;
                     }
                 }
@@ -179,7 +176,6 @@ public final class AsyncHttpRequestRetryExec implements \
AsyncExecChainHandler {  final AsyncExecChain chain,
             final AsyncExecCallback asyncExecCallback) throws HttpException, \
IOException {  final State state = new State();
-        state.execCount = 1;
         state.retrying = false;
         internalExecute(state, request, entityProducer, scope, chain, \
asyncExecCallback);  }


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic