[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    activemq git commit: https://issues.apache.org/jira/browse/AMQ-5951 - ensure failover oneway won't r
From:       gtully () apache ! org
Date:       2015-08-31 14:56:08
Message-ID: 27ac52b2c4674910b79d256e2728c1da () git ! apache ! org
[Download RAW message or body]

Repository: activemq
Updated Branches:
  refs/heads/master 1ea289736 -> ae9af4b8b


https://issues.apache.org/jira/browse/AMQ-5951 - ensure failover oneway won't retry \
if reconnect will not happen


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ae9af4b8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ae9af4b8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ae9af4b8

Branch: refs/heads/master
Commit: ae9af4b8b29e792db213ca2cc2879ddc7c4118e5
Parents: 1ea2897
Author: gtully <gary.tully@gmail.com>
Authored: Mon Aug 31 15:55:44 2015 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Mon Aug 31 15:55:44 2015 +0100

----------------------------------------------------------------------
 .../transport/failover/FailoverTransport.java   |  2 +-
 .../transport/failover/FailoverTimeoutTest.java | 65 +++++++++++++++++++-
 2 files changed, 64 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ae9af4b8/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
                
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java \
b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
 index 0bccac0..4e196b3 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
                
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
 @@ -674,7 +674,7 @@ public class FailoverTransport implements CompositeTransport {
 
                             // If the command was not tracked.. we will retry in
                             // this method
-                            if (tracked == null) {
+                            if (tracked == null && canReconnect()) {
 
                                 // since we will retry in this method.. take it
                                 // out of the request

http://git-wip-us.apache.org/repos/asf/activemq/blob/ae9af4b8/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
                
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java \
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
 index 865d7c9..35a970f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
                
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
 @@ -20,9 +20,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.net.Socket;
 import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -31,6 +37,7 @@ import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.MessageAck;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,7 +56,7 @@ public class FailoverTimeoutTest {
     public void setUp() throws Exception {
         bs = new BrokerService();
         bs.setUseJmx(false);
-        bs.addConnector("tcp://localhost:0");
+        bs.addConnector(getTransportUri());
         bs.start();
         tcpUri = bs.getTransportConnectors().get(0).getConnectUri();
     }
@@ -119,8 +126,62 @@ public class FailoverTimeoutTest {
         bs.waitUntilStarted();
 
         producer.send(message);
-
         bs.stop();
+        connection.close();
+    }
+
+    @Test
+    public void testInterleaveSendAndException() throws Exception {
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + \
tcpUri + ")?maxReconnectAttempts=0"); +        final ActiveMQConnection connection = \
(ActiveMQConnection) cf.createConnection(); +        connection.start();
+
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                try {
+                    LOG.info("Deal with exception - invoke op that may block pending \
outstanding oneway"); +                    // try and invoke on connection as part of \
handling exception +                    connection.asyncSendPacket(new MessageAck());
+                } catch (Exception e) {
+                }
+            }
+        });
+
+        final ExecutorService executorService = Executors.newCachedThreadPool();
+
+        final int NUM_TASKS = 200;
+        final CountDownLatch enqueueOnExecutorDone = new CountDownLatch(NUM_TASKS);
+
+        for (int i=0; i < NUM_TASKS; i++) {
+
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        connection.asyncSendPacket(new MessageAck());
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    } finally {
+                        enqueueOnExecutorDone.countDown();
+                    }
+
+                }
+            });
+        }
+
+        while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 20)) {
+            enqueueOnExecutorDone.await(20, TimeUnit.MILLISECONDS);
+        }
+
+        // force IOException
+        final Socket socket = connection.getTransport().narrow(Socket.class);
+        socket.close();
+
+        executorService.shutdown();
+
+        assertTrue("all ops finish", enqueueOnExecutorDone.await(15, \
TimeUnit.SECONDS));  }
 
     @Test


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic