[prev in list] [next in list] [prev in thread] [next in thread] 

List:       httpcomponents-commits
Subject:    [httpcomponents-client] 01/02: Async clients to support scheduled (delayed) re-execution of requests
From:       olegk () apache ! org
Date:       2021-04-27 5:13:24
Message-ID: 20210427051323.F37328DC7B () gitbox ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git

commit 00ca8105f49aa92fcea75f6c91e5395e9bf2c7ff
Author: Oleg Kalnichevski <olegk@apache.org>
AuthorDate: Sun Apr 25 15:04:36 2021 +0200

    Async clients to support scheduled (delayed) re-execution of requests
---
 .../client5/http/impl/cache/AsyncCachingExec.java  |  4 +-
 .../hc/client5/http/async/AsyncExecChain.java      | 36 ++++++++-
 .../client5/http/impl/async/AsyncRedirectExec.java | 11 ++-
 .../async/InternalAbstractHttpAsyncClient.java     | 93 +++++++++++++++++++++-
 4 files changed, 138 insertions(+), 6 deletions(-)

diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java \
b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java
 index d12215a..93419a7 100644
--- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java
                
+++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java
 @@ -669,7 +669,9 @@ class AsyncCachingExec extends CachingExecBase implements \
AsyncExecChainHandler  scope.originalRequest,
                             new ComplexFuture<>(null),
                             HttpClientContext.create(),
-                            scope.execRuntime.fork());
+                            scope.execRuntime.fork(),
+                            scope.scheduler,
+                            scope.execCount);
                     cacheRevalidator.revalidateCacheEntry(
                             responseCache.generateKey(target, request, entry),
                             asyncExecCallback,
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java \
b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java \
                index b2cff0b..fb7d3d2 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java
@@ -27,6 +27,7 @@
 package org.apache.hc.client5.http.async;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hc.client5.http.HttpRoute;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -37,6 +38,7 @@ import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
 import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
 
 /**
  * Represents a single element in the client side asynchronous request execution \
chain. @@ -59,6 +61,8 @@ public interface AsyncExecChain {
         public final CancellableDependency cancellableDependency;
         public final HttpClientContext clientContext;
         public final AsyncExecRuntime execRuntime;
+        public final Scheduler scheduler;
+        public final AtomicInteger execCount;
 
         public Scope(
                 final String exchangeId,
@@ -66,18 +70,48 @@ public interface AsyncExecChain {
                 final HttpRequest originalRequest,
                 final CancellableDependency cancellableDependency,
                 final HttpClientContext clientContext,
-                final AsyncExecRuntime execRuntime) {
+                final AsyncExecRuntime execRuntime,
+                final Scheduler scheduler,
+                final AtomicInteger execCount) {
             this.exchangeId = Args.notBlank(exchangeId, "Exchange id");
             this.route = Args.notNull(route, "Route");
             this.originalRequest = Args.notNull(originalRequest, "Original \
                request");
             this.cancellableDependency = Args.notNull(cancellableDependency, \
                "Dependency");
             this.clientContext = clientContext != null ? clientContext : \
HttpClientContext.create();  this.execRuntime = Args.notNull(execRuntime, "Exec \
runtime"); +            this.scheduler = Args.notNull(scheduler, "Exec scheduler");
+            this.execCount = execCount != null ? execCount : new AtomicInteger(1);
         }
 
     }
 
     /**
+     * Request execution scheduler
+     *
+     * @since 5.1
+     */
+    interface Scheduler {
+
+        /**
+         * Schedules request re-execution immediately or after a delay.
+         * @param request the actual request.
+         * @param entityProducer the request entity producer or {@code null} if the \
request +         *                      does not enclose an entity.
+         * @param scope the execution scope .
+         * @param asyncExecCallback the execution callback.
+         * @param delay re-execution delay. Can be {@code null} if the request is to \
be +         *              re-executed immediately.
+         */
+        void scheduleExecution(
+                HttpRequest request,
+                AsyncEntityProducer entityProducer,
+                AsyncExecChain.Scope scope,
+                AsyncExecCallback asyncExecCallback,
+                TimeValue delay);
+
+    }
+
+    /**
      * Proceeds to the next element in the request execution chain.
      *
      * @param request the actual request.
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java \
b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java
 index b5c7eaf..d04944b 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java
                
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java
 @@ -187,8 +187,15 @@ public final class AsyncRedirectExec implements \
AsyncExecChainHandler {  proxyAuthExchange.reset();
                                 }
                             }
-                            state.currentScope = new \
                AsyncExecChain.Scope(scope.exchangeId, newRoute,
-                                    scope.originalRequest, \
scope.cancellableDependency, clientContext, scope.execRuntime); +                     \
state.currentScope = new AsyncExecChain.Scope( +                                    \
scope.exchangeId, +                                    newRoute,
+                                    scope.originalRequest,
+                                    scope.cancellableDependency,
+                                    scope.clientContext,
+                                    scope.execRuntime,
+                                    scope.scheduler,
+                                    scope.execCount);
                         }
                     }
                 }
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java \
b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
 index 0b08573..6dac275 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
                
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
 @@ -32,9 +32,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hc.client5.http.HttpRoute;
 import org.apache.hc.client5.http.async.AsyncExecCallback;
@@ -49,7 +52,9 @@ import org.apache.hc.client5.http.cookie.CookieStore;
 import org.apache.hc.client5.http.impl.ExecSupport;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
 import org.apache.hc.client5.http.routing.RoutingSupport;
+import org.apache.hc.core5.concurrent.Cancellable;
 import org.apache.hc.core5.concurrent.ComplexFuture;
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.HttpException;
@@ -71,11 +76,14 @@ import org.apache.hc.core5.http.support.BasicRequestBuilder;
 import org.apache.hc.core5.io.CloseMode;
 import org.apache.hc.core5.io.ModalCloseable;
 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
+import org.apache.hc.core5.util.TimeValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
 
+    private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new \
DefaultThreadFactory("Scheduled-executor"); +
     private static final Logger LOG = \
LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);  private final \
AsyncExecChainElement execChain;  private final Lookup<CookieSpecFactory> \
cookieSpecRegistry; @@ -84,6 +92,7 @@ abstract class InternalAbstractHttpAsyncClient \
extends AbstractHttpAsyncClientBa  private final CredentialsProvider \
credentialsProvider;  private final RequestConfig defaultConfig;
     private final ConcurrentLinkedQueue<Closeable> closeables;
+    private final ScheduledExecutorService scheduledExecutorService;
 
     InternalAbstractHttpAsyncClient(
             final DefaultConnectingIOReactor ioReactor,
@@ -104,6 +113,7 @@ abstract class InternalAbstractHttpAsyncClient extends \
AbstractHttpAsyncClientBa  this.credentialsProvider = credentialsProvider;
         this.defaultConfig = defaultConfig;
         this.closeables = closeables != null ? new \
ConcurrentLinkedQueue<>(closeables) : null; +        this.scheduledExecutorService = \
Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);  }
 
     @Override
@@ -122,6 +132,12 @@ abstract class InternalAbstractHttpAsyncClient extends \
AbstractHttpAsyncClientBa  }
             }
         }
+        final List<Runnable> runnables = \
this.scheduledExecutorService.shutdownNow(); +        for (final Runnable runnable: \
runnables) { +            if (runnable instanceof Cancellable) {
+                ((Cancellable) runnable).cancel();
+            }
+        }
     }
 
     private void setupContext(final HttpClientContext context) {
@@ -187,10 +203,23 @@ abstract class InternalAbstractHttpAsyncClient extends \
AbstractHttpAsyncClientBa  clientContext.setExchangeId(exchangeId);
                     setupContext(clientContext);
 
+                    final AsyncExecChain.Scheduler scheduler = new \
AsyncExecChain.Scheduler() { +
+                        @Override
+                        public void scheduleExecution(final HttpRequest request,
+                                                      final AsyncEntityProducer \
entityProducer, +                                                      final \
AsyncExecChain.Scope scope, +                                                      \
final AsyncExecCallback asyncExecCallback, +                                          \
final TimeValue delay) { +                            executeScheduled(request, \
entityProducer, scope, asyncExecCallback, delay); +                        }
+
+                    };
+
                     final AsyncExecChain.Scope scope = new \
                AsyncExecChain.Scope(exchangeId, route, request, future,
-                            clientContext, execRuntime);
+                            clientContext, execRuntime, scheduler, new \
                AtomicInteger(1));
                     final AtomicBoolean outputTerminated = new AtomicBoolean(false);
-                    execChain.execute(
+                    executeImmediate(
                             BasicRequestBuilder.copy(request).build(),
                             entityDetails != null ? new AsyncEntityProducer() {
 
@@ -329,4 +358,64 @@ abstract class InternalAbstractHttpAsyncClient extends \
AbstractHttpAsyncClientBa  return future;
     }
 
+    void executeImmediate(
+            final HttpRequest request,
+            final AsyncEntityProducer entityProducer,
+            final AsyncExecChain.Scope scope,
+            final AsyncExecCallback asyncExecCallback) throws HttpException, \
IOException { +        execChain.execute(request, entityProducer, scope, \
asyncExecCallback); +    }
+
+    void executeScheduled(
+            final HttpRequest request,
+            final AsyncEntityProducer entityProducer,
+            final AsyncExecChain.Scope scope,
+            final AsyncExecCallback asyncExecCallback,
+            final TimeValue delay) {
+        final ScheduledRequestExecution scheduledTask = new \
ScheduledRequestExecution( +                request, entityProducer, scope, \
asyncExecCallback, delay); +        if (TimeValue.isPositive(delay)) {
+            scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), \
delay.getTimeUnit()); +        } else {
+            scheduledExecutorService.execute(scheduledTask);
+        }
+    }
+
+    class ScheduledRequestExecution implements Runnable, Cancellable {
+
+        final HttpRequest request;
+        final AsyncEntityProducer entityProducer;
+        final AsyncExecChain.Scope scope;
+        final AsyncExecCallback asyncExecCallback;
+        final TimeValue delay;
+
+        ScheduledRequestExecution(final HttpRequest request,
+                                  final AsyncEntityProducer entityProducer,
+                                  final AsyncExecChain.Scope scope,
+                                  final AsyncExecCallback asyncExecCallback,
+                                  final TimeValue delay) {
+            this.request = request;
+            this.entityProducer = entityProducer;
+            this.scope = scope;
+            this.asyncExecCallback = asyncExecCallback;
+            this.delay = delay;
+        }
+
+        @Override
+        public void run() {
+            try {
+                execChain.execute(request, entityProducer, scope, \
asyncExecCallback); +            } catch (final Exception ex) {
+                asyncExecCallback.failed(ex);
+            }
+        }
+
+        @Override
+        public boolean cancel() {
+            asyncExecCallback.failed(new CancellationException("Request execution \
cancelled")); +            return true;
+        }
+
+    }
+
 }


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic