[prev in list] [next in list] [prev in thread] [next in thread]
List: httpcomponents-commits
Subject: [httpcomponents-client] 01/02: Async clients to support scheduled (delayed) re-execution of requests
From: olegk () apache ! org
Date: 2021-04-27 5:13:24
Message-ID: 20210427051323.F37328DC7B () gitbox ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git
commit 00ca8105f49aa92fcea75f6c91e5395e9bf2c7ff
Author: Oleg Kalnichevski <olegk@apache.org>
AuthorDate: Sun Apr 25 15:04:36 2021 +0200
Async clients to support scheduled (delayed) re-execution of requests
---
.../client5/http/impl/cache/AsyncCachingExec.java | 4 +-
.../hc/client5/http/async/AsyncExecChain.java | 36 ++++++++-
.../client5/http/impl/async/AsyncRedirectExec.java | 11 ++-
.../async/InternalAbstractHttpAsyncClient.java | 93 +++++++++++++++++++++-
4 files changed, 138 insertions(+), 6 deletions(-)
diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java \
b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java
index d12215a..93419a7 100644
--- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java
+++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java
@@ -669,7 +669,9 @@ class AsyncCachingExec extends CachingExecBase implements \
AsyncExecChainHandler scope.originalRequest,
new ComplexFuture<>(null),
HttpClientContext.create(),
- scope.execRuntime.fork());
+ scope.execRuntime.fork(),
+ scope.scheduler,
+ scope.execCount);
cacheRevalidator.revalidateCacheEntry(
responseCache.generateKey(target, request, entry),
asyncExecCallback,
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java \
b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java \
index b2cff0b..fb7d3d2 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java
@@ -27,6 +27,7 @@
package org.apache.hc.client5.http.async;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -37,6 +38,7 @@ import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
/**
* Represents a single element in the client side asynchronous request execution \
chain. @@ -59,6 +61,8 @@ public interface AsyncExecChain {
public final CancellableDependency cancellableDependency;
public final HttpClientContext clientContext;
public final AsyncExecRuntime execRuntime;
+ public final Scheduler scheduler;
+ public final AtomicInteger execCount;
public Scope(
final String exchangeId,
@@ -66,18 +70,48 @@ public interface AsyncExecChain {
final HttpRequest originalRequest,
final CancellableDependency cancellableDependency,
final HttpClientContext clientContext,
- final AsyncExecRuntime execRuntime) {
+ final AsyncExecRuntime execRuntime,
+ final Scheduler scheduler,
+ final AtomicInteger execCount) {
this.exchangeId = Args.notBlank(exchangeId, "Exchange id");
this.route = Args.notNull(route, "Route");
this.originalRequest = Args.notNull(originalRequest, "Original \
request");
this.cancellableDependency = Args.notNull(cancellableDependency, \
"Dependency");
this.clientContext = clientContext != null ? clientContext : \
HttpClientContext.create(); this.execRuntime = Args.notNull(execRuntime, "Exec \
runtime"); + this.scheduler = Args.notNull(scheduler, "Exec scheduler");
+ this.execCount = execCount != null ? execCount : new AtomicInteger(1);
}
}
/**
+ * Request execution scheduler
+ *
+ * @since 5.1
+ */
+ interface Scheduler {
+
+ /**
+ * Schedules request re-execution immediately or after a delay.
+ * @param request the actual request.
+ * @param entityProducer the request entity producer or {@code null} if the \
request + * does not enclose an entity.
+ * @param scope the execution scope .
+ * @param asyncExecCallback the execution callback.
+ * @param delay re-execution delay. Can be {@code null} if the request is to \
be + * re-executed immediately.
+ */
+ void scheduleExecution(
+ HttpRequest request,
+ AsyncEntityProducer entityProducer,
+ AsyncExecChain.Scope scope,
+ AsyncExecCallback asyncExecCallback,
+ TimeValue delay);
+
+ }
+
+ /**
* Proceeds to the next element in the request execution chain.
*
* @param request the actual request.
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java \
b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java
index b5c7eaf..d04944b 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java
@@ -187,8 +187,15 @@ public final class AsyncRedirectExec implements \
AsyncExecChainHandler { proxyAuthExchange.reset();
}
}
- state.currentScope = new \
AsyncExecChain.Scope(scope.exchangeId, newRoute,
- scope.originalRequest, \
scope.cancellableDependency, clientContext, scope.execRuntime); + \
state.currentScope = new AsyncExecChain.Scope( + \
scope.exchangeId, + newRoute,
+ scope.originalRequest,
+ scope.cancellableDependency,
+ scope.clientContext,
+ scope.execRuntime,
+ scope.scheduler,
+ scope.execCount);
}
}
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java \
b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
index 0b08573..6dac275 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
@@ -32,9 +32,12 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecCallback;
@@ -49,7 +52,9 @@ import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.impl.ExecSupport;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.routing.RoutingSupport;
+import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.ComplexFuture;
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
@@ -71,11 +76,14 @@ import org.apache.hc.core5.http.support.BasicRequestBuilder;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.ModalCloseable;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
+import org.apache.hc.core5.util.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
+ private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new \
DefaultThreadFactory("Scheduled-executor"); +
private static final Logger LOG = \
LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class); private final \
AsyncExecChainElement execChain; private final Lookup<CookieSpecFactory> \
cookieSpecRegistry; @@ -84,6 +92,7 @@ abstract class InternalAbstractHttpAsyncClient \
extends AbstractHttpAsyncClientBa private final CredentialsProvider \
credentialsProvider; private final RequestConfig defaultConfig;
private final ConcurrentLinkedQueue<Closeable> closeables;
+ private final ScheduledExecutorService scheduledExecutorService;
InternalAbstractHttpAsyncClient(
final DefaultConnectingIOReactor ioReactor,
@@ -104,6 +113,7 @@ abstract class InternalAbstractHttpAsyncClient extends \
AbstractHttpAsyncClientBa this.credentialsProvider = credentialsProvider;
this.defaultConfig = defaultConfig;
this.closeables = closeables != null ? new \
ConcurrentLinkedQueue<>(closeables) : null; + this.scheduledExecutorService = \
Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY); }
@Override
@@ -122,6 +132,12 @@ abstract class InternalAbstractHttpAsyncClient extends \
AbstractHttpAsyncClientBa }
}
}
+ final List<Runnable> runnables = \
this.scheduledExecutorService.shutdownNow(); + for (final Runnable runnable: \
runnables) { + if (runnable instanceof Cancellable) {
+ ((Cancellable) runnable).cancel();
+ }
+ }
}
private void setupContext(final HttpClientContext context) {
@@ -187,10 +203,23 @@ abstract class InternalAbstractHttpAsyncClient extends \
AbstractHttpAsyncClientBa clientContext.setExchangeId(exchangeId);
setupContext(clientContext);
+ final AsyncExecChain.Scheduler scheduler = new \
AsyncExecChain.Scheduler() { +
+ @Override
+ public void scheduleExecution(final HttpRequest request,
+ final AsyncEntityProducer \
entityProducer, + final \
AsyncExecChain.Scope scope, + \
final AsyncExecCallback asyncExecCallback, + \
final TimeValue delay) { + executeScheduled(request, \
entityProducer, scope, asyncExecCallback, delay); + }
+
+ };
+
final AsyncExecChain.Scope scope = new \
AsyncExecChain.Scope(exchangeId, route, request, future,
- clientContext, execRuntime);
+ clientContext, execRuntime, scheduler, new \
AtomicInteger(1));
final AtomicBoolean outputTerminated = new AtomicBoolean(false);
- execChain.execute(
+ executeImmediate(
BasicRequestBuilder.copy(request).build(),
entityDetails != null ? new AsyncEntityProducer() {
@@ -329,4 +358,64 @@ abstract class InternalAbstractHttpAsyncClient extends \
AbstractHttpAsyncClientBa return future;
}
+ void executeImmediate(
+ final HttpRequest request,
+ final AsyncEntityProducer entityProducer,
+ final AsyncExecChain.Scope scope,
+ final AsyncExecCallback asyncExecCallback) throws HttpException, \
IOException { + execChain.execute(request, entityProducer, scope, \
asyncExecCallback); + }
+
+ void executeScheduled(
+ final HttpRequest request,
+ final AsyncEntityProducer entityProducer,
+ final AsyncExecChain.Scope scope,
+ final AsyncExecCallback asyncExecCallback,
+ final TimeValue delay) {
+ final ScheduledRequestExecution scheduledTask = new \
ScheduledRequestExecution( + request, entityProducer, scope, \
asyncExecCallback, delay); + if (TimeValue.isPositive(delay)) {
+ scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), \
delay.getTimeUnit()); + } else {
+ scheduledExecutorService.execute(scheduledTask);
+ }
+ }
+
+ class ScheduledRequestExecution implements Runnable, Cancellable {
+
+ final HttpRequest request;
+ final AsyncEntityProducer entityProducer;
+ final AsyncExecChain.Scope scope;
+ final AsyncExecCallback asyncExecCallback;
+ final TimeValue delay;
+
+ ScheduledRequestExecution(final HttpRequest request,
+ final AsyncEntityProducer entityProducer,
+ final AsyncExecChain.Scope scope,
+ final AsyncExecCallback asyncExecCallback,
+ final TimeValue delay) {
+ this.request = request;
+ this.entityProducer = entityProducer;
+ this.scope = scope;
+ this.asyncExecCallback = asyncExecCallback;
+ this.delay = delay;
+ }
+
+ @Override
+ public void run() {
+ try {
+ execChain.execute(request, entityProducer, scope, \
asyncExecCallback); + } catch (final Exception ex) {
+ asyncExecCallback.failed(ex);
+ }
+ }
+
+ @Override
+ public boolean cancel() {
+ asyncExecCallback.failed(new CancellationException("Request execution \
cancelled")); + return true;
+ }
+
+ }
+
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic