[prev in list] [next in list] [prev in thread] [next in thread]
List: james-dev
Subject: [james-project] 14/18: JAMES-2899 Stop leaking receivers in RabbitMQWorkQueue
From: btellier () apache ! org
Date: 2019-09-27 11:50:15
Message-ID: 20190927115001.DBCBB81E94 () gitbox ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4ba2d6c81ef09451dc6253f6d0852b67d10c4376
Author: Gautier DI FOLCO <gdifolco@linagora.com>
AuthorDate: Thu Sep 26 17:00:34 2019 +0200
JAMES-2899 Stop leaking receivers in RabbitMQWorkQueue
---
.../james/task/eventsourcing/distributed/RabbitMQWorkQueue.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java \
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index f0869c3..b88eaa8 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -81,6 +81,8 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
private RabbitMQExclusiveConsumer receiver;
private UnicastProcessor<TaskId> sendCancelRequestsQueue;
private Disposable sendCancelRequestsQueueHandle;
+ private Disposable receiverHandle;
+ private Disposable cancelRequestListenerHandle;
public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool \
simpleConnectionPool, JsonTaskSerializer taskSerializer) { this.worker = worker;
@@ -105,7 +107,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
private void consumeWorkqueue() {
receiver = new RabbitMQExclusiveConsumer(new \
ReceiverOptions().connectionMono(connectionMono));
- receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions())
+ receiverHandle = receiver.consumeExclusiveManualAck(QUEUE_NAME, new \
ConsumeOptions())
.subscribeOn(Schedulers.boundedElastic())
.flatMap(this::executeTask)
.subscribe();
@@ -145,7 +147,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
}
private void registerCancelRequestsListener(String queueName) {
- RabbitFlux
+ cancelRequestListenerHandle = RabbitFlux
.createReceiver(new ReceiverOptions().connectionMono(connectionMono))
.consumeAutoAck(queueName)
.subscribeOn(Schedulers.boundedElastic())
@@ -187,7 +189,10 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
@Override
@PreDestroy
public void close() {
+ System.out.println("close");
+ Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose);
Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
+ Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
channelPool.close();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic