[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: [4/4] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6267
From: cshannon () apache ! org
Date: 2016-04-28 11:49:22
Message-ID: 0d2b699d44784183af90918c82f999dd () git ! apache ! org
[Download RAW message or body]
https://issues.apache.org/jira/browse/AMQ-6267
Added two new properties for configuration to a network bridge,
advisoryPrefetchSize and advisoryAckPercentage. By default
advisoryPrefetchSize is set to 0, which is disabled, and will use the
prefetchSize value unless otherwise set. Also added validation to
prefetchSize to make sure it is greater than 0 as 0 is not allowed.
(cherry picked from commit 297eadf7461fe4043c81c6f8d806a7c61b680731)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d38c5906
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d38c5906
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d38c5906
Branch: refs/heads/activemq-5.13.x
Commit: d38c5906f6eadbd20aee97ce37c665b3356dbfd1
Parents: 4c109cf
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Wed Apr 27 14:07:33 2016 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Thu Apr 28 11:48:53 2016 +0000
----------------------------------------------------------------------
.../broker/jmx/NetworkConnectorView.java | 38 ++++
.../broker/jmx/NetworkConnectorViewMBean.java | 9 +
.../network/DemandForwardingBridgeSupport.java | 18 +-
.../network/NetworkBridgeConfiguration.java | 40 ++++
.../usecases/AdvisoryViaNetworkTest.java | 194 +++++++++++++++++++
5 files changed, 296 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/d38c5906/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java \
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
index e0bae88..b3d0762 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
@@ -26,102 +26,137 @@ public class NetworkConnectorView implements \
NetworkConnectorViewMBean { this.connector = connector;
}
+ @Override
public void start() throws Exception {
connector.start();
}
+ @Override
public void stop() throws Exception {
connector.stop();
}
+ @Override
public String getName() {
return connector.getName();
}
+ @Override
public int getMessageTTL() {
return connector.getMessageTTL();
}
+ @Override
public int getConsumerTTL() {
return connector.getConsumerTTL();
}
+ @Override
public int getPrefetchSize() {
return connector.getPrefetchSize();
}
+ @Override
+ public int getAdvisoryPrefetchSize() {
+ return connector.getAdvisoryPrefetchSize();
+ }
+
+ @Override
public String getUserName() {
return connector.getUserName();
}
+ @Override
public boolean isBridgeTempDestinations() {
return connector.isBridgeTempDestinations();
}
+ @Override
public boolean isConduitSubscriptions() {
return connector.isConduitSubscriptions();
}
+ @Override
public boolean isDecreaseNetworkConsumerPriority() {
return connector.isDecreaseNetworkConsumerPriority();
}
+ @Override
public boolean isDispatchAsync() {
return connector.isDispatchAsync();
}
+ @Override
public boolean isDynamicOnly() {
return connector.isDynamicOnly();
}
+ @Override
public boolean isDuplex() {
return connector.isDuplex();
}
+ @Override
public boolean isSuppressDuplicateQueueSubscriptions() {
return connector.isSuppressDuplicateQueueSubscriptions();
}
+ @Override
public boolean isSuppressDuplicateTopicSubscriptions() {
return connector.isSuppressDuplicateTopicSubscriptions();
}
+ @Override
public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
connector.setBridgeTempDestinations(bridgeTempDestinations);
}
+ @Override
public void setConduitSubscriptions(boolean conduitSubscriptions) {
connector.setConduitSubscriptions(conduitSubscriptions);
}
+ @Override
public void setDispatchAsync(boolean dispatchAsync) {
connector.setDispatchAsync(dispatchAsync);
}
+ @Override
public void setDynamicOnly(boolean dynamicOnly) {
connector.setDynamicOnly(dynamicOnly);
}
+ @Override
public void setMessageTTL(int messageTTL) {
connector.setMessageTTL(messageTTL);
}
+ @Override
public void setConsumerTTL(int consumerTTL) {
connector.setConsumerTTL(consumerTTL);
}
+ @Override
public void setPassword(String password) {
connector.setPassword(password);
}
+ @Override
public void setPrefetchSize(int prefetchSize) {
connector.setPrefetchSize(prefetchSize);
}
+ @Override
+ public void setAdvisoryPrefetchSize(int advisoryPrefetchSize) {
+ connector.setAdvisoryPrefetchSize(advisoryPrefetchSize);
+ }
+
+ @Override
public void setUserName(String userName) {
connector.setUserName(userName);
}
+ @Override
public String getPassword() {
String pw = connector.getPassword();
// Hide the password for security reasons.
@@ -131,14 +166,17 @@ public class NetworkConnectorView implements \
NetworkConnectorViewMBean { return pw;
}
+ @Override
public void setDecreaseNetworkConsumerPriority(boolean \
decreaseNetworkConsumerPriority) {
connector.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
}
+ @Override
public void setSuppressDuplicateQueueSubscriptions(boolean val) {
connector.setSuppressDuplicateQueueSubscriptions(val);
}
+ @Override
public void setSuppressDuplicateTopicSubscriptions(boolean val) {
connector.setSuppressDuplicateTopicSubscriptions(val);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d38c5906/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java \
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
index ab7b865..99974ce 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
@@ -28,6 +28,13 @@ public interface NetworkConnectorViewMBean extends Service {
int getPrefetchSize();
+ /**
+ * @return Advisory prefetch setting.
+ */
+ @MBeanInfo("The prefetch setting for the advisory message consumer. If set to \
<= 0 then this setting is disabled " + + "and the prefetchSize attribute \
is used instead for configuring the advisory consumer.") + int \
getAdvisoryPrefetchSize(); +
String getUserName();
boolean isBridgeTempDestinations();
@@ -62,6 +69,8 @@ public interface NetworkConnectorViewMBean extends Service {
void setPrefetchSize(int prefetchSize);
+ void setAdvisoryPrefetchSize(int advisoryPrefetchSize);
+
void setUserName(String userName);
String getPassword();
http://git-wip-us.apache.org/repos/asf/activemq/blob/d38c5906/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java \
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 7f3eeb4..d46f73b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -570,7 +570,7 @@ public abstract class DemandForwardingBridgeSupport implements \
NetworkBridge, Br
advisoryTopic += "," + \
AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; }
demandConsumerInfo.setDestination(new \
ActiveMQTopic(advisoryTopic));
- \
demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize()); + \
configureConsumerPrefetch(demandConsumerInfo); \
remoteBroker.oneway(demandConsumerInfo); }
startedLatch.countDown();
@@ -726,7 +726,8 @@ public abstract class DemandForwardingBridgeSupport implements \
NetworkBridge, Br
private void ackAdvisory(Message message) throws IOException {
demandConsumerDispatched++;
- if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) \
{ + if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
+ (configuration.getAdvisoryAckPercentage() / 100f))) {
MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, \
demandConsumerDispatched); ack.setConsumerId(demandConsumerInfo.getConsumerId());
remoteBroker.oneway(ack);
@@ -1364,7 +1365,7 @@ public abstract class DemandForwardingBridgeSupport implements \
NetworkBridge, Br } else {
sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
}
- sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
+ configureConsumerPrefetch(sub.getLocalInfo());
subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
@@ -1720,4 +1721,15 @@ public abstract class DemandForwardingBridgeSupport implements \
NetworkBridge, Br return -1;
}
+ protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) {
+ //If a consumer on an advisory topic and advisoryPrefetchSize has been \
explicitly + //set then use it, else default to the prefetchSize setting
+ if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) &&
+ configuration.getAdvisoryPrefetchSize() > 0) {
+ consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize());
+ } else {
+ consumerInfo.setPrefetchSize(configuration.getPrefetchSize());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/d38c5906/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java \
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index 796b14f..039aba0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -37,6 +37,12 @@ public class NetworkBridgeConfiguration {
private boolean duplex;
private boolean bridgeTempDestinations = true;
private int prefetchSize = 1000;
+ /**
+ * By default set to 0, which is disabled and prefetchSize value will be
+ * used instead.
+ */
+ private int advisoryPrefetchSize = 0;
+ private int advisoryAckPercentage = 75;
private int networkTTL = 1;
private int consumerTTL = networkTTL;
private int messageTTL = networkTTL;
@@ -205,9 +211,43 @@ public class NetworkBridgeConfiguration {
* @org.apache.xbean.Property \
propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/
public void setPrefetchSize(int prefetchSize) {
+ if (prefetchSize < 1) {
+ throw new IllegalArgumentException("prefetchSize must be > 0"
+ + " because network consumers do not poll for messages.");
+ }
this.prefetchSize = prefetchSize;
}
+ public int getAdvisoryPrefetchSize() {
+ return advisoryPrefetchSize;
+ }
+
+ /**
+ * Prefetch size for advisory consumers. Just like prefetchSize, if set, this
+ * value must be greater than 0 because network consumers do not poll for \
messages. + * Setting this to 0 or less means this value is disabled and \
prefetchSize will be + * used instead.
+ *
+ * @param advisoryPrefetchSize
+ */
+ public void setAdvisoryPrefetchSize(int advisoryPrefetchSize) {
+ this.advisoryPrefetchSize = advisoryPrefetchSize;
+ }
+
+ public int getAdvisoryAckPercentage() {
+ return advisoryAckPercentage;
+ }
+
+ /**
+ * @param advisoryAckPercentage the percentage of the advisory prefetch size
+ * value that can be dispatched before an ack will be sent, defaults to 75
+ * which means that when the number of received messages is greater than 75% of
+ * the prefetch size an ack will be sent back
+ */
+ public void setAdvisoryAckPercentage(int advisoryAckPercentage) {
+ this.advisoryAckPercentage = advisoryAckPercentage;
+ }
+
/**
* @return the userName
*/
http://git-wip-us.apache.org/repos/asf/activemq/blob/d38c5906/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java \
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
index ab61709..2d8f26a 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
@@ -18,7 +18,9 @@ package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Arrays;
+
import javax.jms.MessageConsumer;
+
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
@@ -39,6 +41,7 @@ public class AdvisoryViaNetworkTest extends \
JmsMultipleBrokersTestSupport {
private static final Logger LOG = \
LoggerFactory.getLogger(AdvisoryViaNetworkTest.class);
+ @Override
protected BrokerService createBroker(String brokerName) throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
@@ -76,6 +79,197 @@ public class AdvisoryViaNetworkTest extends \
JmsMultipleBrokersTestSupport { messagesB.assertMessagesReceived(2);
}
+ /**
+ * Test that explicitly setting advisoryPrefetchSize works for advisory topics
+ * on a network connector
+ *
+ * @throws Exception
+ */
+ public void testAdvisoryPrefetchSize() throws Exception {
+ ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>"); + ActiveMQTopic topic1 \
= new ActiveMQTopic("A.FOO"); +
+ createBroker("A");
+ BrokerService brokerB = createBroker("B");
+ NetworkConnector networkBridge = bridgeBrokers("A", "B");
+ networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+ networkBridge.addStaticallyIncludedDestination(topic1);
+ networkBridge.setDuplex(true);
+ networkBridge.setAdvisoryPrefetchSize(10);
+ networkBridge.setPrefetchSize(1);
+
+ startAllBrokers();
+ verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+ createConsumer("A", topic1);
+ createConsumer("A", new ActiveMQTopic("A.FOO2"));
+
+ //verify that brokerB's advisory prefetch is 10 but normal topic prefetch is \
1 + assertEquals(10, \
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); + \
assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
+
+ //both advisory messages are not acked yet because of optimized acks
+ assertDeqInflight(0, 2);
+ }
+
+ /**
+ * Test that explicitly setting advisoryPrefetchSize to 1 works for advisory \
topics + * on a network connector
+ *
+ * @throws Exception
+ */
+ public void testAdvisoryPrefetchSize1() throws Exception {
+ ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>"); + ActiveMQTopic topic1 \
= new ActiveMQTopic("A.FOO"); +
+ createBroker("A");
+ BrokerService brokerB = createBroker("B");
+ NetworkConnector networkBridge = bridgeBrokers("A", "B");
+ networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+ networkBridge.addStaticallyIncludedDestination(topic1);
+ networkBridge.setDuplex(true);
+ networkBridge.setAdvisoryPrefetchSize(1);
+ networkBridge.setPrefetchSize(10);
+
+ startAllBrokers();
+ verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+ createConsumer("A", topic1);
+ createConsumer("A", new ActiveMQTopic("A.FOO2"));
+
+ //verify that brokerB's advisory prefetch is 1 but normal topic prefetch is \
10 + assertEquals(1, \
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); + \
assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
+
+ assertDeqInflight(2, 0);
+ }
+
+ /**
+ * Test that if advisoryPrefetchSize isn't set then prefetchSize is used instead
+ * for backwards compatibility
+ *
+ * @throws Exception
+ */
+ public void testAdvisoryPrefetchSizeNotSet() throws Exception {
+ ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>"); + ActiveMQTopic topic1 \
= new ActiveMQTopic("A.FOO"); +
+ createBroker("A");
+ BrokerService brokerB = createBroker("B");
+ NetworkConnector networkBridge = bridgeBrokers("A", "B");
+ networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+ networkBridge.addStaticallyIncludedDestination(topic1);
+ networkBridge.setDuplex(true);
+ networkBridge.setPrefetchSize(10);
+
+ startAllBrokers();
+ verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+ createConsumer("A", topic1);
+ createConsumer("A", new ActiveMQTopic("A.FOO2"));
+
+ //verify that both consumers have a prefetch of 10
+ assertEquals(10, \
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); + \
assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
+
+ assertDeqInflight(0, 2);
+ }
+
+ /**
+ * Test that if advisoryPrefetchSize isn't set then prefetchSize is used instead
+ * for backwards compatibility (test when set to 1)
+ *
+ * @throws Exception
+ */
+ public void testPrefetchSize1() throws Exception {
+ ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>"); + ActiveMQTopic topic1 \
= new ActiveMQTopic("A.FOO"); +
+ createBroker("A");
+ BrokerService brokerB = createBroker("B");
+ NetworkConnector networkBridge = bridgeBrokers("A", "B");
+ networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+ networkBridge.setDuplex(true);
+ networkBridge.setPrefetchSize(1);
+
+ startAllBrokers();
+ verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+ createConsumer("A", topic1);
+ createConsumer("A", new ActiveMQTopic("A.FOO2"));
+
+ //verify that both consumers have a prefetch of 1
+ assertEquals(1, \
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); + \
assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
+
+ assertDeqInflight(2, 0);
+ }
+
+ /**
+ * Test configuring the advisoryAckPercentage works with advisoryPrefetchSize
+ * @throws Exception
+ */
+ public void testAdvisoryPrefetchSizePercent() throws Exception {
+ ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>"); +
+ createBroker("A");
+ createBroker("B");
+ NetworkConnector networkBridge = bridgeBrokers("A", "B");
+ networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+ networkBridge.setDuplex(true);
+ networkBridge.setAdvisoryPrefetchSize(10);
+ networkBridge.setAdvisoryAckPercentage(65);
+
+ startAllBrokers();
+ verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+ for (int i = 0; i < 10; i++) {
+ createConsumer("A", new ActiveMQTopic("A.FOO"));
+ }
+
+ assertDeqInflight(7, 3);
+ }
+
+ /**
+ * Test configuring the advisoryAckPercentage works when only prefetchSize \
exists + * and is applied against that instead for advisory consumers
+ *
+ * @throws Exception
+ */
+ public void testPrefetchSizePercent() throws Exception {
+ ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>"); +
+ createBroker("A");
+ createBroker("B");
+ NetworkConnector networkBridge = bridgeBrokers("A", "B");
+ networkBridge.addStaticallyIncludedDestination(advisoryTopic);
+ networkBridge.setDuplex(true);
+ networkBridge.setPrefetchSize(10);
+ networkBridge.setAdvisoryAckPercentage(65);
+
+ startAllBrokers();
+ verifyPeerBrokerInfo(brokers.get("A"), 1);
+
+ for (int i = 0; i < 10; i++) {
+ createConsumer("A", new ActiveMQTopic("A.FOO"));
+ }
+
+ assertDeqInflight(7, 3);
+ }
+
+ private void assertDeqInflight(final int dequeue, final int inflight) throws \
Exception { + assertTrue("deq and inflight as expected", Wait.waitFor(new \
Wait.Condition() { + @Override
+ public boolean isSatisified() throws Exception {
+ RegionBroker regionBroker = (RegionBroker) \
brokers.get("A").broker.getRegionBroker(); + LOG.info("A Deq:" + \
regionBroker.getDestinationStatistics().getDequeues().getCount()); + \
LOG.info("A Inflight:" + \
regionBroker.getDestinationStatistics().getInflight().getCount()); + \
return regionBroker.getDestinationStatistics().getDequeues().getCount() == dequeue + \
&& regionBroker.getDestinationStatistics().getInflight().getCount() == inflight; + \
} + }));
+ }
+
public void testAdvisoryForwardingDuplexNC() throws Exception {
ActiveMQTopic advisoryTopic = new \
ActiveMQTopic("ActiveMQ.Advisory.Producer.Topic.FOO");
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic