[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r747384 - in /activemq/trunk/activemq-core/src:
From: gtully () apache ! org
Date: 2009-02-24 14:05:28
Message-ID: 20090224140528.C9F1D238898E () eris ! apache ! org
[Download RAW message or body]
Author: gtully
Date: Tue Feb 24 14:05:28 2009
New Revision: 747384
URL: http://svn.apache.org/viewvc?rev=747384&view=rev
Log:
apply patch from AMQ-2109, with thanks
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java \
(with props) Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/network/DurableConduitBridge.java?rev=747384&r1=747383&r2=747384&view=diff \
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java \
Tue Feb 24 14:05:28 2009 @@ -82,18 +82,14 @@
}
//add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());
- // not matched so create a new one
- // but first, if it's durable - changed set the
- // ConsumerId here - so it won't be removed if the
- // durable subscriber goes away on the other end
- if (info.isDurable() || (info.getDestination().isQueue() && \
!info.getDestination().isTemporary())) {
- info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), \
consumerIdGenerator
- .getNextSequenceId()));
- }
+
if (info.isDurable()) {
// set the subscriber name to something reproducible
-
info.setSubscriptionName(getSubscriberName(info.getDestination()));
+ // and override the consumerId with something unique so that it won't
+ // be removed if the durable subscriber (at the other end) goes away
+ info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), \
consumerIdGenerator + .getNextSequenceId()));
}
info.setSelector(null);
return doCreateDemandSubscription(info);
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java?rev=747384&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java \
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java \
Tue Feb 24 14:05:28 2009 @@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.MalformedURLException;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class NetworkBrokerDetachTest extends TestCase {
+
+ private final static String BROKER_NAME = "broker";
+ private final static String REM_BROKER_NAME = "networkedBroker";
+ private final static String QUEUE_NAME = "testQ";
+ private final static int NUM_CONSUMERS = 1;
+
+ protected static final Log LOG = \
LogFactory.getLog(NetworkBrokerDetachTest.class); + protected final int \
numRestarts = 3; +
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setBrokerName(BROKER_NAME);
+ broker.addConnector("tcp://localhost:61617");
+ NetworkConnector networkConnector = \
broker.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
+ networkConnector.setDuplex(false);
+ return broker;
+ }
+
+ protected BrokerService createNetworkedBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setBrokerName(REM_BROKER_NAME);
+ broker.addConnector("tcp://localhost:62617");
+ return broker;
+ }
+
+ public void testNetworkedBrokerDetach() throws Exception {
+ BrokerService broker = createBroker();
+ broker.start();
+
+ BrokerService networkedBroker = createNetworkedBroker();
+ networkedBroker.start();
+
+ LOG.info("Creating Consumer on the networked broker ...");
+ // Create a consumer on the networked broker
+ ConnectionFactory consFactory = createConnectionFactory(networkedBroker);
+ Connection consConn = consFactory.createConnection();
+ Session consSession = consConn.createSession(false, \
Session.AUTO_ACKNOWLEDGE); +
+ for(int i=0; i<NUM_CONSUMERS; i++) {
+ MessageConsumer consumer = \
consSession.createConsumer(consSession.createQueue(QUEUE_NAME)); + }
+
+
+ Thread.sleep(5000);
+
+ MBeanServerConnection mbsc = getMBeanServerConnection();
+ // We should have 1 consumer for the queue on the local broker
+ Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, \
"ConsumerCount"); + LOG.info("Consumers for " + QUEUE_NAME + " on " + \
BROKER_NAME + " : " + consumers); + assertEquals(1L, \
((Long)consumers).longValue()); +
+
+ LOG.info("Stopping Consumer on the networked broker ...");
+ // Closing the connection will also close the consumer
+ consConn.close();
+
+ Thread.sleep(5000);
+
+ // We should have 0 consumer for the queue on the local broker
+ consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, \
"ConsumerCount"); + LOG.info("Consumers for " + QUEUE_NAME + " on " + \
BROKER_NAME + " : " + consumers); + assertEquals(0L, \
((Long)consumers).longValue()); +
+ networkedBroker.stop();
+ networkedBroker.waitUntilStopped();
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ protected ConnectionFactory createConnectionFactory(final BrokerService broker) \
throws Exception { +
+ String url = ((TransportConnector) \
broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString(); + \
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); + \
connectionFactory.setOptimizedMessageDispatch(true); + \
connectionFactory.setCopyMessageOnSend(false); + \
connectionFactory.setUseCompression(false); + \
connectionFactory.setDispatchAsync(false); + \
connectionFactory.setUseAsyncSend(false); + \
connectionFactory.setOptimizeAcknowledge(false); + \
connectionFactory.setWatchTopicAdvisories(true); + ActiveMQPrefetchPolicy \
qPrefetchPolicy= new ActiveMQPrefetchPolicy(); + \
qPrefetchPolicy.setQueuePrefetch(100); + \
qPrefetchPolicy.setTopicPrefetch(1000); + \
connectionFactory.setPrefetchPolicy(qPrefetchPolicy); + \
connectionFactory.setAlwaysSyncSend(true); + return connectionFactory;
+ }
+
+ // JMX Helper Methods
+
+ private MBeanServerConnection getMBeanServerConnection() throws \
MalformedURLException { + final JMXServiceURL url = new \
JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"); + \
MBeanServerConnection mbsc = null; + try {
+ JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
+ mbsc = jmxc.getMBeanServerConnection();
+
+// // trace all existing MBeans
+// Set<?> all = mbsc.queryMBeans(null, null);
+// LOG.info("Total MBean count=" + all.size());
+// for (Object o : all) {
+// ObjectInstance bean = (ObjectInstance)o;
+// LOG.info(bean.getObjectName());
+// }
+ } catch (Exception ignored) {
+ }
+ return mbsc;
+ }
+
+ private Object getAttribute(MBeanServerConnection mbsc, String type, String \
pattern, String attrName) throws Exception { + Object obj = \
mbsc.getAttribute(getObjectName(BROKER_NAME, type, pattern), attrName); + \
return obj; + }
+
+ private ObjectName getObjectName(String brokerName, String type, String pattern) \
throws Exception { + ObjectName beanName = new ObjectName(
+ "org.apache.activemq:BrokerName=" + brokerName + ",Type=" + type +"," + \
pattern + );
+
+ return beanName;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic