[prev in list] [next in list] [prev in thread] [next in thread] 

List:       james-dev
Subject:    [james-project] 14/18: JAMES-2899 Stop leaking receivers in RabbitMQWorkQueue
From:       btellier () apache ! org
Date:       2019-09-27 11:50:15
Message-ID: 20190927115001.DBCBB81E94 () gitbox ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4ba2d6c81ef09451dc6253f6d0852b67d10c4376
Author: Gautier DI FOLCO <gdifolco@linagora.com>
AuthorDate: Thu Sep 26 17:00:34 2019 +0200

    JAMES-2899 Stop leaking receivers in RabbitMQWorkQueue
---
 .../james/task/eventsourcing/distributed/RabbitMQWorkQueue.java  | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java \
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
 index f0869c3..b88eaa8 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
                
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
 @@ -81,6 +81,8 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     private RabbitMQExclusiveConsumer receiver;
     private UnicastProcessor<TaskId> sendCancelRequestsQueue;
     private Disposable sendCancelRequestsQueueHandle;
+    private Disposable receiverHandle;
+    private Disposable cancelRequestListenerHandle;
 
     public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool \
simpleConnectionPool, JsonTaskSerializer taskSerializer) {  this.worker = worker;
@@ -105,7 +107,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
 
     private void consumeWorkqueue() {
         receiver = new RabbitMQExclusiveConsumer(new \
                ReceiverOptions().connectionMono(connectionMono));
-        receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions())
+        receiverHandle = receiver.consumeExclusiveManualAck(QUEUE_NAME, new \
                ConsumeOptions())
             .subscribeOn(Schedulers.boundedElastic())
             .flatMap(this::executeTask)
             .subscribe();
@@ -145,7 +147,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     }
 
     private void registerCancelRequestsListener(String queueName) {
-        RabbitFlux
+        cancelRequestListenerHandle = RabbitFlux
             .createReceiver(new ReceiverOptions().connectionMono(connectionMono))
             .consumeAutoAck(queueName)
             .subscribeOn(Schedulers.boundedElastic())
@@ -187,7 +189,10 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     @Override
     @PreDestroy
     public void close() {
+        System.out.println("close");
+        Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose);
         Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
 +        Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
  channelPool.close();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic