[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    [4/4] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6267
From:       cshannon () apache ! org
Date:       2016-04-28 11:49:22
Message-ID: 0d2b699d44784183af90918c82f999dd () git ! apache ! org
[Download RAW message or body]

https://issues.apache.org/jira/browse/AMQ-6267

Added two new properties for configuration to a network bridge,
advisoryPrefetchSize and advisoryAckPercentage.  By default
advisoryPrefetchSize is set to 0, which is disabled, and will use the
prefetchSize value unless otherwise set.  Also added validation to
prefetchSize to make sure it is greater than 0 as 0 is not allowed.

(cherry picked from commit 297eadf7461fe4043c81c6f8d806a7c61b680731)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d38c5906
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d38c5906
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d38c5906

Branch: refs/heads/activemq-5.13.x
Commit: d38c5906f6eadbd20aee97ce37c665b3356dbfd1
Parents: 4c109cf
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Wed Apr 27 14:07:33 2016 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Thu Apr 28 11:48:53 2016 +0000

----------------------------------------------------------------------
 .../broker/jmx/NetworkConnectorView.java        |  38 ++++
 .../broker/jmx/NetworkConnectorViewMBean.java   |   9 +
 .../network/DemandForwardingBridgeSupport.java  |  18 +-
 .../network/NetworkBridgeConfiguration.java     |  40 ++++
 .../usecases/AdvisoryViaNetworkTest.java        | 194 +++++++++++++++++++
 5 files changed, 296 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d38c5906/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
                
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java \
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
 index e0bae88..b3d0762 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
                
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
 @@ -26,102 +26,137 @@ public class NetworkConnectorView implements \
NetworkConnectorViewMBean {  this.connector = connector;
     }
 
+    @Override
     public void start() throws Exception {
         connector.start();
     }
 
+    @Override
     public void stop() throws Exception {
         connector.stop();
     }
 
+    @Override
     public String getName() {
         return connector.getName();
     }
 
+    @Override
     public int getMessageTTL() {
         return connector.getMessageTTL();
     }
 
+    @Override
     public int getConsumerTTL() {
         return connector.getConsumerTTL();
     }
 
+    @Override
     public int getPrefetchSize() {
         return connector.getPrefetchSize();
     }
 
+    @Override
+    public int getAdvisoryPrefetchSize() {
+        return connector.getAdvisoryPrefetchSize();
+    }
+
+    @Override
     public String getUserName() {
         return connector.getUserName();
     }
 
+    @Override
     public boolean isBridgeTempDestinations() {
         return connector.isBridgeTempDestinations();
     }
 
+    @Override
     public boolean isConduitSubscriptions() {
         return connector.isConduitSubscriptions();
     }
 
+    @Override
     public boolean isDecreaseNetworkConsumerPriority() {
         return connector.isDecreaseNetworkConsumerPriority();
     }
 
+    @Override
     public boolean isDispatchAsync() {
         return connector.isDispatchAsync();
     }
 
+    @Override
     public boolean isDynamicOnly() {
         return connector.isDynamicOnly();
     }
 
+    @Override
     public boolean isDuplex() {
         return connector.isDuplex();
     }
 
+    @Override
     public boolean isSuppressDuplicateQueueSubscriptions() {
         return connector.isSuppressDuplicateQueueSubscriptions();
     }
 
+    @Override
     public boolean isSuppressDuplicateTopicSubscriptions() {
         return connector.isSuppressDuplicateTopicSubscriptions();
     }
 
+    @Override
     public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
         connector.setBridgeTempDestinations(bridgeTempDestinations);
     }
 
+    @Override
     public void setConduitSubscriptions(boolean conduitSubscriptions) {
         connector.setConduitSubscriptions(conduitSubscriptions);
     }
 
+    @Override
     public void setDispatchAsync(boolean dispatchAsync) {
         connector.setDispatchAsync(dispatchAsync);
     }
 
+    @Override
     public void setDynamicOnly(boolean dynamicOnly) {
         connector.setDynamicOnly(dynamicOnly);
     }
 
+    @Override
     public void setMessageTTL(int messageTTL) {
         connector.setMessageTTL(messageTTL);
     }
 
+    @Override
     public void setConsumerTTL(int consumerTTL) {
         connector.setConsumerTTL(consumerTTL);
     }
 
+    @Override
     public void setPassword(String password) {
         connector.setPassword(password);
     }
 
+    @Override
     public void setPrefetchSize(int prefetchSize) {
         connector.setPrefetchSize(prefetchSize);
     }
 
+    @Override
+    public void setAdvisoryPrefetchSize(int advisoryPrefetchSize) {
+        connector.setAdvisoryPrefetchSize(advisoryPrefetchSize);
+    }
+
+    @Override
     public void setUserName(String userName) {
         connector.setUserName(userName);
     }
 
+    @Override
     public String getPassword() {
         String pw = connector.getPassword();
         // Hide the password for security reasons.
@@ -131,14 +166,17 @@ public class NetworkConnectorView implements \
NetworkConnectorViewMBean {  return pw;
     }
 
+    @Override
     public void setDecreaseNetworkConsumerPriority(boolean \
                decreaseNetworkConsumerPriority) {
         connector.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
  }
 
+    @Override
     public void setSuppressDuplicateQueueSubscriptions(boolean val) {
         connector.setSuppressDuplicateQueueSubscriptions(val);
     }
 
+    @Override
     public void setSuppressDuplicateTopicSubscriptions(boolean val) {
         connector.setSuppressDuplicateTopicSubscriptions(val);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d38c5906/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
                
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java \
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
 index ab7b865..99974ce 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
                
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
 @@ -28,6 +28,13 @@ public interface NetworkConnectorViewMBean extends Service {
 
     int getPrefetchSize();
 
+    /**
+     * @return Advisory prefetch setting.
+     */
+    @MBeanInfo("The prefetch setting for the advisory message consumer.  If set to \
<= 0 then this setting is disabled " +            + "and the prefetchSize attribute \
is used instead for configuring the advisory consumer.") +    int \
getAdvisoryPrefetchSize(); +
     String getUserName();
 
     boolean isBridgeTempDestinations();
@@ -62,6 +69,8 @@ public interface NetworkConnectorViewMBean extends Service {
 
     void setPrefetchSize(int prefetchSize);
 
+    void setAdvisoryPrefetchSize(int advisoryPrefetchSize);
+
     void setUserName(String userName);
 
     String getPassword();

http://git-wip-us.apache.org/repos/asf/activemq/blob/d38c5906/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
                
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java \
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 index 7f3eeb4..d46f73b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
                
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 @@ -570,7 +570,7 @@ public abstract class DemandForwardingBridgeSupport implements \
                NetworkBridge, Br
                         advisoryTopic += "," + \
AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;  }
                     demandConsumerInfo.setDestination(new \
                ActiveMQTopic(advisoryTopic));
-                    \
demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize()); +                \
configureConsumerPrefetch(demandConsumerInfo);  \
remoteBroker.oneway(demandConsumerInfo);  }
                 startedLatch.countDown();
@@ -726,7 +726,8 @@ public abstract class DemandForwardingBridgeSupport implements \
NetworkBridge, Br  
     private void ackAdvisory(Message message) throws IOException {
         demandConsumerDispatched++;
-        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) \
{ +        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
+                (configuration.getAdvisoryAckPercentage() / 100f))) {
             MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, \
demandConsumerDispatched);  ack.setConsumerId(demandConsumerInfo.getConsumerId());
             remoteBroker.oneway(ack);
@@ -1364,7 +1365,7 @@ public abstract class DemandForwardingBridgeSupport implements \
NetworkBridge, Br  } else {
             sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
         }
-        sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
+        configureConsumerPrefetch(sub.getLocalInfo());
         subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
         subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
 
@@ -1720,4 +1721,15 @@ public abstract class DemandForwardingBridgeSupport implements \
NetworkBridge, Br  return -1;
     }
 
+    protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) {
+        //If a consumer on an advisory topic and advisoryPrefetchSize has been \
explicitly +        //set then use it, else default to the prefetchSize setting
+        if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) &&
+                configuration.getAdvisoryPrefetchSize() > 0) {
+            consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize());
+        } else {
+            consumerInfo.setPrefetchSize(configuration.getPrefetchSize());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d38c5906/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
                
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java \
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
 index 796b14f..039aba0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
                
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
 @@ -37,6 +37,12 @@ public class NetworkBridgeConfiguration {
     private boolean duplex;
     private boolean bridgeTempDestinations = true;
     private int prefetchSize = 1000;
+    /**
+     * By default set to 0, which is disabled and prefetchSize value will be
+     * used instead.
+     */
+    private int advisoryPrefetchSize = 0;
+    private int advisoryAckPercentage = 75;
     private int networkTTL = 1;
     private int consumerTTL = networkTTL;
     private int messageTTL = networkTTL;
@@ -205,9 +211,43 @@ public class NetworkBridgeConfiguration {
      * @org.apache.xbean.Property \
                propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
      */
     public void setPrefetchSize(int prefetchSize) {
+        if (prefetchSize < 1) {
+            throw new IllegalArgumentException("prefetchSize must be > 0"
+                    + " because network consumers do not poll for messages.");
+        }
         this.prefetchSize = prefetchSize;
     }
 
+    public int getAdvisoryPrefetchSize() {
+        return advisoryPrefetchSize;
+    }
+
+    /**
+     * Prefetch size for advisory consumers.  Just like prefetchSize, if set, this
+     * value must be greater than 0 because network consumers do not poll for \
messages. +     * Setting this to 0 or less means this value is disabled and \
prefetchSize will be +     * used instead.
+     *
+     * @param advisoryPrefetchSize
+     */
+    public void setAdvisoryPrefetchSize(int advisoryPrefetchSize) {
+        this.advisoryPrefetchSize = advisoryPrefetchSize;
+    }
+
+    public int getAdvisoryAckPercentage() {
+        return advisoryAckPercentage;
+    }
+
+    /**
+     * @param advisoryAckPercentage the percentage of the advisory prefetch size
+     * value that can be dispatched before an ack will be sent, defaults to 75
+     * which means that when the number of received messages is greater than 75% of
+     * the prefetch size an ack will be sent back
+     */
+    public void setAdvisoryAckPercentage(int advisoryAckPercentage) {
+        this.advisoryAckPercentage = advisoryAckPercentage;
+    }
+
     /**
      * @return the userName
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/d38c5906/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
                
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java \
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
 index ab61709..2d8f26a 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
                
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
 @@ -18,7 +18,9 @@ package org.apache.activemq.usecases;
 
 import java.net.URI;
 import java.util.Arrays;
+
 import javax.jms.MessageConsumer;
+
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.DestinationInterceptor;
@@ -39,6 +41,7 @@ public class AdvisoryViaNetworkTest extends \
                JmsMultipleBrokersTestSupport {
     private static final Logger LOG = \
LoggerFactory.getLogger(AdvisoryViaNetworkTest.class);  
 
+    @Override
     protected BrokerService createBroker(String brokerName) throws Exception {
         BrokerService broker = new BrokerService();
         broker.setPersistent(false);
@@ -76,6 +79,197 @@ public class AdvisoryViaNetworkTest extends \
JmsMultipleBrokersTestSupport {  messagesB.assertMessagesReceived(2);
     }
 
+    /**
+     * Test that explicitly setting advisoryPrefetchSize works for advisory topics
+     * on a network connector
+     *
+     * @throws Exception
+     */
+    public void testAdvisoryPrefetchSize() throws Exception {
+        ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>"); +        ActiveMQTopic topic1 \
= new ActiveMQTopic("A.FOO"); +
+        createBroker("A");
+        BrokerService brokerB = createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.addStaticallyIncludedDestination(topic1);
+        networkBridge.setDuplex(true);
+        networkBridge.setAdvisoryPrefetchSize(10);
+        networkBridge.setPrefetchSize(1);
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+        createConsumer("A", topic1);
+        createConsumer("A", new ActiveMQTopic("A.FOO2"));
+
+        //verify that brokerB's advisory prefetch is 10 but normal topic prefetch is \
1 +        assertEquals(10, \
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); +     \
assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
 +
+        //both advisory messages are not acked yet because of optimized acks
+        assertDeqInflight(0, 2);
+    }
+
+    /**
+     * Test that explicitly setting advisoryPrefetchSize to 1 works for advisory \
topics +     * on a network connector
+     *
+     * @throws Exception
+     */
+    public void testAdvisoryPrefetchSize1() throws Exception {
+        ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>"); +        ActiveMQTopic topic1 \
= new ActiveMQTopic("A.FOO"); +
+        createBroker("A");
+        BrokerService brokerB = createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.addStaticallyIncludedDestination(topic1);
+        networkBridge.setDuplex(true);
+        networkBridge.setAdvisoryPrefetchSize(1);
+        networkBridge.setPrefetchSize(10);
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+        createConsumer("A", topic1);
+        createConsumer("A", new ActiveMQTopic("A.FOO2"));
+
+        //verify that brokerB's advisory prefetch is 1 but normal topic prefetch is \
10 +        assertEquals(1, \
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); +     \
assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
 +
+        assertDeqInflight(2, 0);
+    }
+
+    /**
+     * Test that if advisoryPrefetchSize isn't set then prefetchSize is used instead
+     * for backwards compatibility
+     *
+     * @throws Exception
+     */
+    public void testAdvisoryPrefetchSizeNotSet() throws Exception {
+        ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>"); +        ActiveMQTopic topic1 \
= new ActiveMQTopic("A.FOO"); +
+        createBroker("A");
+        BrokerService brokerB = createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.addStaticallyIncludedDestination(topic1);
+        networkBridge.setDuplex(true);
+        networkBridge.setPrefetchSize(10);
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+        createConsumer("A", topic1);
+        createConsumer("A", new ActiveMQTopic("A.FOO2"));
+
+        //verify that both consumers have a prefetch of 10
+        assertEquals(10, \
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); +     \
assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
 +
+        assertDeqInflight(0, 2);
+    }
+
+    /**
+     * Test that if advisoryPrefetchSize isn't set then prefetchSize is used instead
+     * for backwards compatibility (test when set to 1)
+     *
+     * @throws Exception
+     */
+    public void testPrefetchSize1() throws Exception {
+        ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>"); +        ActiveMQTopic topic1 \
= new ActiveMQTopic("A.FOO"); +
+        createBroker("A");
+        BrokerService brokerB = createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.setDuplex(true);
+        networkBridge.setPrefetchSize(1);
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+        createConsumer("A", topic1);
+        createConsumer("A", new ActiveMQTopic("A.FOO2"));
+
+        //verify that both consumers have a prefetch of 1
+        assertEquals(1, \
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); +     \
assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
 +
+        assertDeqInflight(2, 0);
+    }
+
+    /**
+     * Test configuring the advisoryAckPercentage works with advisoryPrefetchSize
+     * @throws Exception
+     */
+    public void testAdvisoryPrefetchSizePercent() throws Exception {
+        ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>"); +
+        createBroker("A");
+        createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.setDuplex(true);
+        networkBridge.setAdvisoryPrefetchSize(10);
+        networkBridge.setAdvisoryAckPercentage(65);
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+        for (int i = 0; i < 10; i++) {
+            createConsumer("A", new ActiveMQTopic("A.FOO"));
+        }
+
+        assertDeqInflight(7, 3);
+    }
+
+    /**
+     * Test configuring the advisoryAckPercentage works when only prefetchSize \
exists +     * and is applied against that instead for advisory consumers
+     *
+     * @throws Exception
+     */
+    public void testPrefetchSizePercent() throws Exception {
+        ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>"); +
+        createBroker("A");
+        createBroker("B");
+        NetworkConnector networkBridge = bridgeBrokers("A", "B");
+        networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+        networkBridge.setDuplex(true);
+        networkBridge.setPrefetchSize(10);
+        networkBridge.setAdvisoryAckPercentage(65);
+
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+        for (int i = 0; i < 10; i++) {
+            createConsumer("A", new ActiveMQTopic("A.FOO"));
+        }
+
+        assertDeqInflight(7, 3);
+    }
+
+    private void assertDeqInflight(final int dequeue, final int inflight) throws \
Exception { +        assertTrue("deq and inflight as expected", Wait.waitFor(new \
Wait.Condition() { +            @Override
+            public boolean isSatisified() throws Exception {
+                RegionBroker regionBroker = (RegionBroker) \
brokers.get("A").broker.getRegionBroker(); +                LOG.info("A Deq:" + \
regionBroker.getDestinationStatistics().getDequeues().getCount()); +                \
LOG.info("A Inflight:" + \
regionBroker.getDestinationStatistics().getInflight().getCount()); +                \
return regionBroker.getDestinationStatistics().getDequeues().getCount() == dequeue +  \
&& regionBroker.getDestinationStatistics().getInflight().getCount() == inflight; +    \
} +        }));
+    }
+
 
     public void testAdvisoryForwardingDuplexNC() throws Exception {
         ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Producer.Topic.FOO");


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic