[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: activemq git commit: [AMQ-6907] add selectorAware option to conditionalNetworkBridgeFilterFactory su
From: gtully () apache ! org
Date: 2018-02-28 16:08:13
Message-ID: 08c7b1963d2a4650a32447899e735237 () git ! apache ! org
[Download RAW message or body]
Repository: activemq
Updated Branches:
refs/heads/master efaf9cd77 -> 82c9f9531
[AMQ-6907] add selectorAware option to conditionalNetworkBridgeFilterFactory such \
that replay back to origin can happen if there are no matching local consumers
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/82c9f953
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/82c9f953
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/82c9f953
Branch: refs/heads/master
Commit: 82c9f9531ecce01d56de0ac24f930d8f86f4721f
Parents: efaf9cd
Author: gtully <gary.tully@gmail.com>
Authored: Wed Feb 28 16:07:52 2018 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Feb 28 16:07:52 2018 +0000
----------------------------------------------------------------------
.../ConditionalNetworkBridgeFilterFactory.java | 40 ++++++++-
.../org/apache/activemq/bugs/AMQ4607Test.java | 89 ++++++++++++++++++++
2 files changed, 125 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/82c9f953/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java \
b/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
index 61b21bc..70c7343 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
@@ -43,6 +43,7 @@ public class ConditionalNetworkBridgeFilterFactory implements \
NetworkBridgeFilte int replayDelay = 0;
int rateLimit = 0;
int rateDuration = 1000;
+ private boolean selectorAware = false;
@Override
public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] \
remoteBrokerPath, int messageTTL, int consumerTTL) { @@ -54,6 +55,7 @@ public class \
ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte \
filter.setRateLimit(getRateLimit()); filter.setRateDuration(getRateDuration());
filter.setReplayDelay(getReplayDelay());
+ filter.setSelectorAware(isSelectorAware());
return filter;
}
@@ -89,6 +91,14 @@ public class ConditionalNetworkBridgeFilterFactory implements \
NetworkBridgeFilte this.replayDelay = replayDelay;
}
+ public void setSelectorAware(boolean selectorAware) {
+ this.selectorAware = selectorAware;
+ }
+
+ public boolean isSelectorAware() {
+ return selectorAware;
+ }
+
private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter \
{
final static Logger LOG = \
LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class); private int \
rateLimit; @@ -98,6 +108,7 @@ public class ConditionalNetworkBridgeFilterFactory \
implements NetworkBridgeFilte
private int matchCount;
private long rateDurationEnd;
+ private boolean selectorAware = false;
@Override
protected boolean matchesForwardingFilter(Message message, final \
MessageEvaluationContext mec) { @@ -136,10 +147,23 @@ public class \
ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
List<Subscription> consumers = regionDestination.getConsumers();
for (Subscription sub : consumers) {
if (!sub.getConsumerInfo().isNetworkSubscription() && \
!sub.getConsumerInfo().isBrowser()) {
- LOG.trace("Not replaying [{}] for [{}] to origin due to existing \
local consumer: {}", new Object[]{
- message.getMessageId(), message.getDestination(), \
sub.getConsumerInfo()
- });
- return false;
+
+ if (!isSelectorAware()) {
+ LOG.trace("Not replaying [{}] for [{}] to origin due to \
existing local consumer: {}", new Object[]{ + \
message.getMessageId(), message.getDestination(), sub.getConsumerInfo() + \
}); + return false;
+
+ } else {
+ try {
+ if (sub.matches(message, mec)) {
+ LOG.trace("Not replaying [{}] for [{}] to origin due \
to existing selector matching local consumer: {}", new Object[]{ + \
message.getMessageId(), message.getDestination(), sub.getConsumerInfo() + \
}); + return false;
+ }
+ } catch (Exception ignored) {}
+ }
}
}
return true;
@@ -172,5 +196,13 @@ public class ConditionalNetworkBridgeFilterFactory implements \
NetworkBridgeFilte
public void setAllowReplayWhenNoConsumers(boolean \
allowReplayWhenNoConsumers) { this.allowReplayWhenNoConsumers = \
allowReplayWhenNoConsumers; }
+
+ public void setSelectorAware(boolean selectorAware) {
+ this.selectorAware = selectorAware;
+ }
+
+ public boolean isSelectorAware() {
+ return selectorAware;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/82c9f953/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java \
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java index \
265b692..e8495da 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
@@ -178,6 +178,95 @@ public class AMQ4607Test extends JmsMultipleBrokersTestSupport \
implements Uncaug
}
+ public void testMigratingConsumerSelectorAwareTrue() throws Exception {
+ bridge("Broker0", "Broker1");
+ if (!duplex) bridge("Broker1", "Broker0");
+
+ ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory \
= new ConditionalNetworkBridgeFilterFactory(); + \
conditionalNetworkBridgeFilterFactory.setReplayDelay(0); + \
conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); + \
conditionalNetworkBridgeFilterFactory.setSelectorAware(true); + \
brokers.get("Broker1").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
+
+ startAllBrokers();
+ this.waitForBridgeFormation();
+
+ Destination dest = createDestination("TEST.FOO", false);
+ sendMessages("Broker0", dest, 1);
+
+ assertExactMessageCount("Broker0", dest, 1, TIMEOUT);
+
+ MessageConsumer messageConsumerNoMatch = createConsumer("Broker1", dest, \
"DoNotConsume = 'true'"); +
+ assertExactConsumersConnect("Broker0", dest, 1, TIMEOUT);
+ assertExactConsumersConnect("Broker1", dest, 1, TIMEOUT);
+
+ assertExactMessageCount("Broker1", dest, 1, TIMEOUT);
+ assertExactMessageCount("Broker0", dest, 0, TIMEOUT);
+
+ // now consume the message
+ final String brokerId = "Broker0";
+ MessageConsumer messageConsumer = createConsumer(brokerId, dest);
+
+ assertExactConsumersConnect("Broker0", dest, 2, TIMEOUT);
+ assertExactConsumersConnect("Broker1", dest, 2, TIMEOUT);
+
+
+ assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokers.get(brokerId).allMessages.getMessageIds().size() == \
1; + }
+ }));
+ messageConsumer.close();
+ }
+
+ public void testMigratingConsumerSelectorAwareFalse() throws Exception {
+ bridge("Broker0", "Broker1");
+ if (!duplex) bridge("Broker1", "Broker0");
+
+ ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory \
= new ConditionalNetworkBridgeFilterFactory(); + \
conditionalNetworkBridgeFilterFactory.setReplayDelay(0); + \
conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); + \
conditionalNetworkBridgeFilterFactory.setSelectorAware(false); + \
brokers.get("Broker1").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
+
+ startAllBrokers();
+ this.waitForBridgeFormation();
+
+ Destination dest = createDestination("TEST.FOO", false);
+ sendMessages("Broker0", dest, 1);
+
+ assertExactMessageCount("Broker0", dest, 1, TIMEOUT);
+
+ MessageConsumer messageConsumerNoMatch = createConsumer("Broker1", dest, \
"DoNotConsume = 'true'"); +
+ assertExactConsumersConnect("Broker0", dest, 1, TIMEOUT);
+ assertExactConsumersConnect("Broker1", dest, 1, TIMEOUT);
+
+ assertExactMessageCount("Broker1", dest, 1, TIMEOUT);
+ assertExactMessageCount("Broker0", dest, 0, TIMEOUT);
+
+ // now try consume the message
+ final String brokerId = "Broker0";
+ MessageConsumer messageConsumer = createConsumer(brokerId, dest);
+
+ assertExactConsumersConnect("Broker0", dest, 2, TIMEOUT);
+ assertExactConsumersConnect("Broker1", dest, 2, TIMEOUT);
+
+ assertExactMessageCount("Broker1", dest, 1, TIMEOUT);
+ assertExactMessageCount("Broker0", dest, 0, TIMEOUT);
+
+ assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokers.get(brokerId).allMessages.getMessageIds().size() == \
0; + }
+ }));
+ messageConsumer.close();
+ }
+
+
protected void assertExactMessageCount(final String brokerName, Destination \
destination, final int count, long timeout) throws Exception {
ManagementContext context = \
brokers.get(brokerName).broker.getManagementContext(); final QueueViewMBean \
queueViewMBean = (QueueViewMBean) \
context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], \
QueueViewMBean.class, false);
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic