[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r949742 - in /activemq/trunk:
From:       gtully () apache ! org
Date:       2010-05-31 13:44:17
Message-ID: 20100531134417.B292423889DA () eris ! apache ! org
[Download RAW message or body]

Author: gtully
Date: Mon May 31 13:44:17 2010
New Revision: 949742

URL: http://svn.apache.org/viewvc?rev=949742&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2754 - fix issue with conduit \
sub removal

Added:
    activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/
    activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java   \
(with props) Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java


Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=949742&r1=949741&r2=949742&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java \
Mon May 31 13:44:17 2010 @@ -79,7 +79,7 @@ public class ConduitBridge extends Deman
             if (filter.matches(info.getDestination())) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(configuration.getBrokerName() + " matched (add \
                interest) to exsting sub for: " + ds.getRemoteInfo()
-                            + " with sub: " + info);
+                            + " with sub: " + info.getConsumerId());
                 }
                 // add the interest in the subscription
                 // ds.add(ds.getRemoteInfo().getConsumerId());
@@ -96,7 +96,6 @@ public class ConduitBridge extends Deman
 
     @Override
     protected void removeDemandSubscription(ConsumerId id) throws IOException {
-        super.removeDemandSubscription(id);
         List<DemandSubscription> tmpList = new ArrayList<DemandSubscription>();
 
         for (Iterator i = subscriptionMapByLocalId.values().iterator(); \
i.hasNext();) {

Added: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java?rev=949742&view=auto
 ==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java \
                (added)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java Mon \
May 31 13:44:17 2010 @@ -0,0 +1,148 @@
+package org.apache.bugs;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+//import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class AMQ2754Test extends TestCase {
+
+    public void testNetworkOfBrokers() throws Exception {
+        BrokerService brokerService1 = null;
+        BrokerService brokerService2 = null;
+
+        final int total = 100;
+        final CountDownLatch latch = new CountDownLatch(total);
+        final boolean conduitSubscriptions = true;
+        try {
+
+        {
+            brokerService1 = new BrokerService();
+            brokerService1.setBrokerName("consumer");
+            brokerService1.setUseJmx(false);
+            brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter());
+            brokerService1.addConnector("tcp://0.0.0.0:61616");
+            brokerService1.start();
+        }
+
+        {
+            brokerService2 = new BrokerService();
+            brokerService2.setBrokerName("producer");
+            brokerService2.setUseJmx(false);
+            brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter());
+            brokerService2.addConnector("tcp://0.0.0.0:51515");
+            NetworkConnector network2 = \
brokerService2.addNetworkConnector("static:(tcp://localhost:61616)"); +            \
network2.setName("network1"); +            network2.setDynamicOnly(true);
+            network2.setConduitSubscriptions(conduitSubscriptions);
+            network2.setNetworkTTL(3);
+            network2.setPrefetchSize(1);
+            brokerService2.start();
+        }
+
+        ExecutorService pool = Executors.newSingleThreadExecutor();
+
+        ActiveMQConnectionFactory connectionFactory1 = 
+            new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
+
+        connectionFactory1.setWatchTopicAdvisories(false);
+        final DefaultMessageListenerContainer container = new \
DefaultMessageListenerContainer(); +        \
container.setConnectionFactory(connectionFactory1); +        \
container.setMaxConcurrentConsumers(10); +        \
container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); +        \
container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); +        \
container.setDestination(new ActiveMQQueue("testingqueue")); +        \
container.setMessageListener(new MessageListener() { +            public void \
onMessage(Message message) { +                latch.countDown();
+            }
+        });
+        container.setMaxMessagesPerTask(1);
+        container.afterPropertiesSet();
+        container.start();
+
+        pool.submit(new Callable<Object>() {
+            public Object call() throws Exception {
+                try {
+                    final int batch = 10;
+                    ActiveMQConnectionFactory connectionFactory2 = 
+                        new \
ActiveMQConnectionFactory("failover:(tcp://localhost:51515)"); +                    \
PooledConnectionFactory pooledConnectionFactory = new \
PooledConnectionFactory(connectionFactory2); +                    \
connectionFactory2.setWatchTopicAdvisories(false); +                    JmsTemplate \
template = new JmsTemplate(pooledConnectionFactory); +                    \
ActiveMQQueue queue = new ActiveMQQueue("testingqueue"); +                    for(int \
b = 0; b < batch; b++) { +                        for(int i = 0; i < (total / batch); \
i++) { +                            final String id = ":batch=" + b + "i=" + i;
+                            template.send(queue, new MessageCreator() {
+                                public Message createMessage(Session session) throws \
JMSException { +                                    TextMessage message = \
session.createTextMessage(); +                                    \
message.setText("Hello World!" + id); +                                    return \
message; +                                }
+                            });
+                        }
+                        // give spring time to scale back again
+                        while(container.getActiveConsumerCount() > 1) {
+                            System.out.println("active consumer count:" + \
container.getActiveConsumerCount()); +                            \
System.out.println("concurrent consumer count: " + \
container.getConcurrentConsumers()); +                            Thread.sleep(1000);
+                        }
+                    }
+                    //pooledConnectionFactory.stop();
+                } catch(Throwable t) {
+                    t.printStackTrace();
+                }
+                return null;
+            }
+        });
+
+        pool.shutdown();
+        pool.awaitTermination(10, TimeUnit.SECONDS);
+
+        int count = 0;
+
+        // give it 20 seconds
+        while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) {
+            System.out.println("count " + latch.getCount());
+        }
+
+
+        container.destroy();
+
+        } finally {
+            try { if(brokerService1 != null) { 
+                brokerService1.stop();
+            }} catch(Throwable t) { t.printStackTrace(); }
+            try { if(brokerService2 != null) { 
+                brokerService2.stop();
+            }} catch(Throwable t) { t.printStackTrace(); }
+        }
+
+        if(latch.getCount() > 0) {
+            fail("latch should have gone down to 0 but was " + latch.getCount());
+        }
+
+    }
+
+} 
\ No newline at end of file

Propchange: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java
                
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java
                
------------------------------------------------------------------------------
    svn:keywords = Rev Date


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic