[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r747384 - in /activemq/trunk/activemq-core/src:
From:       gtully () apache ! org
Date:       2009-02-24 14:05:28
Message-ID: 20090224140528.C9F1D238898E () eris ! apache ! org
[Download RAW message or body]

Author: gtully
Date: Tue Feb 24 14:05:28 2009
New Revision: 747384

URL: http://svn.apache.org/viewvc?rev=747384&view=rev
Log:
apply patch from AMQ-2109, with thanks

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java \
(with props) Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java


Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/network/DurableConduitBridge.java?rev=747384&r1=747383&r2=747384&view=diff \
                ==============================================================================
                
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java \
Tue Feb 24 14:05:28 2009 @@ -82,18 +82,14 @@
         }
         //add our original id to ourselves
         info.addNetworkConsumerId(info.getConsumerId());
-        // not matched so create a new one
-        // but first, if it's durable - changed set the
-        // ConsumerId here - so it won't be removed if the
-        // durable subscriber goes away on the other end
-        if (info.isDurable() || (info.getDestination().isQueue() && \
                !info.getDestination().isTemporary())) {  
-            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), \
                consumerIdGenerator
-                .getNextSequenceId()));
-        }
+
         if (info.isDurable()) {
             // set the subscriber name to something reproducible
-
             info.setSubscriptionName(getSubscriberName(info.getDestination()));
+            // and override the consumerId with something unique so that it won't
+            // be removed if the durable subscriber (at the other end) goes away
+            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), \
consumerIdGenerator +                    .getNextSequenceId()));
         }
         info.setSelector(null);
         return doCreateDemandSubscription(info);

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java?rev=747384&view=auto
 ==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java \
                (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java \
Tue Feb 24 14:05:28 2009 @@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.MalformedURLException;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class NetworkBrokerDetachTest extends TestCase {
+
+	private final static String BROKER_NAME = "broker";
+	private final static String REM_BROKER_NAME = "networkedBroker";
+	private final static String QUEUE_NAME = "testQ";
+	private final static int    NUM_CONSUMERS = 1;
+	
+    protected static final Log LOG = \
LogFactory.getLog(NetworkBrokerDetachTest.class); +    protected final int \
numRestarts = 3; +
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName(BROKER_NAME);
+        broker.addConnector("tcp://localhost:61617");
+        NetworkConnector networkConnector = \
broker.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
 +        networkConnector.setDuplex(false);
+        return broker;
+    }
+    
+    protected BrokerService createNetworkedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName(REM_BROKER_NAME);
+        broker.addConnector("tcp://localhost:62617");
+        return broker;
+    }
+    
+    public void testNetworkedBrokerDetach() throws Exception {
+        BrokerService broker = createBroker();
+        broker.start();
+        
+        BrokerService networkedBroker = createNetworkedBroker();
+        networkedBroker.start();
+        
+        LOG.info("Creating Consumer on the networked broker ...");
+        // Create a consumer on the networked broker 
+        ConnectionFactory consFactory = createConnectionFactory(networkedBroker);
+        Connection consConn = consFactory.createConnection();
+        Session consSession = consConn.createSession(false, \
Session.AUTO_ACKNOWLEDGE); +        
+        for(int i=0; i<NUM_CONSUMERS; i++) {
+          MessageConsumer consumer = \
consSession.createConsumer(consSession.createQueue(QUEUE_NAME)); +        }
+
+        
+        Thread.sleep(5000);
+        
+        MBeanServerConnection mbsc = getMBeanServerConnection();
+        // We should have 1 consumer for the queue on the local broker
+        Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, \
"ConsumerCount"); +        LOG.info("Consumers for " + QUEUE_NAME + " on " + \
BROKER_NAME + " : " + consumers); +        assertEquals(1L, \
((Long)consumers).longValue());        +        
+        
+        LOG.info("Stopping Consumer on the networked broker ...");
+        // Closing the connection will also close the consumer 
+        consConn.close();
+        
+        Thread.sleep(5000);
+        
+        // We should have 0 consumer for the queue on the local broker
+        consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, \
"ConsumerCount"); +        LOG.info("Consumers for " + QUEUE_NAME + " on " + \
BROKER_NAME + " : " + consumers); +        assertEquals(0L, \
((Long)consumers).longValue());        +        
+        networkedBroker.stop();
+        networkedBroker.waitUntilStopped();
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    protected ConnectionFactory createConnectionFactory(final BrokerService broker) \
throws Exception { +        
+        String url = ((TransportConnector) \
broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString(); +     \
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); +   \
connectionFactory.setOptimizedMessageDispatch(true); +        \
connectionFactory.setCopyMessageOnSend(false); +        \
connectionFactory.setUseCompression(false); +        \
connectionFactory.setDispatchAsync(false); +        \
connectionFactory.setUseAsyncSend(false); +        \
connectionFactory.setOptimizeAcknowledge(false); +        \
connectionFactory.setWatchTopicAdvisories(true); +        ActiveMQPrefetchPolicy \
qPrefetchPolicy= new ActiveMQPrefetchPolicy(); +        \
qPrefetchPolicy.setQueuePrefetch(100); +        \
qPrefetchPolicy.setTopicPrefetch(1000); +        \
connectionFactory.setPrefetchPolicy(qPrefetchPolicy); +        \
connectionFactory.setAlwaysSyncSend(true); +        return connectionFactory;
+    }
+    
+    // JMX Helper Methods 
+    
+    private MBeanServerConnection getMBeanServerConnection() throws \
MalformedURLException { +        final JMXServiceURL url = new \
JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"); +        \
MBeanServerConnection mbsc = null; +        try {
+            JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
+            mbsc = jmxc.getMBeanServerConnection();
+
+//            // trace all existing MBeans
+//            Set<?> all = mbsc.queryMBeans(null, null);
+//            LOG.info("Total MBean count=" + all.size());
+//            for (Object o : all) {
+//                ObjectInstance bean = (ObjectInstance)o;
+//                LOG.info(bean.getObjectName());
+//            }
+        } catch (Exception ignored) {
+        }
+        return mbsc;
+    }
+    
+    private Object getAttribute(MBeanServerConnection mbsc, String type, String \
pattern, String attrName) throws Exception { +        Object obj = \
mbsc.getAttribute(getObjectName(BROKER_NAME, type, pattern), attrName); +        \
return obj; +    }
+    
+    private ObjectName getObjectName(String brokerName, String type, String pattern) \
throws Exception { +      ObjectName beanName = new ObjectName(
+        "org.apache.activemq:BrokerName=" + brokerName + ",Type=" + type +"," + \
pattern +      );
+      
+      return beanName;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
                
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
                
------------------------------------------------------------------------------
    svn:keywords = Rev Date


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic