[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    activemq git commit: [AMQ-6907] add selectorAware option to conditionalNetworkBridgeFilterFactory su
From:       gtully () apache ! org
Date:       2018-02-28 16:08:13
Message-ID: 08c7b1963d2a4650a32447899e735237 () git ! apache ! org
[Download RAW message or body]

Repository: activemq
Updated Branches:
  refs/heads/master efaf9cd77 -> 82c9f9531


[AMQ-6907] add selectorAware option to conditionalNetworkBridgeFilterFactory such \
that replay back to origin can happen if there are no matching local consumers


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/82c9f953
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/82c9f953
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/82c9f953

Branch: refs/heads/master
Commit: 82c9f9531ecce01d56de0ac24f930d8f86f4721f
Parents: efaf9cd
Author: gtully <gary.tully@gmail.com>
Authored: Wed Feb 28 16:07:52 2018 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Feb 28 16:07:52 2018 +0000

----------------------------------------------------------------------
 .../ConditionalNetworkBridgeFilterFactory.java  | 40 ++++++++-
 .../org/apache/activemq/bugs/AMQ4607Test.java   | 89 ++++++++++++++++++++
 2 files changed, 125 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/82c9f953/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
                
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java \
b/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
 index 61b21bc..70c7343 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
                
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
 @@ -43,6 +43,7 @@ public class ConditionalNetworkBridgeFilterFactory implements \
NetworkBridgeFilte  int replayDelay = 0;
     int rateLimit = 0;
     int rateDuration = 1000;
+    private boolean selectorAware = false;
 
     @Override
     public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] \
remoteBrokerPath, int messageTTL, int consumerTTL) { @@ -54,6 +55,7 @@ public class \
ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte  \
filter.setRateLimit(getRateLimit());  filter.setRateDuration(getRateDuration());
         filter.setReplayDelay(getReplayDelay());
+        filter.setSelectorAware(isSelectorAware());
         return filter;
     }
 
@@ -89,6 +91,14 @@ public class ConditionalNetworkBridgeFilterFactory implements \
NetworkBridgeFilte  this.replayDelay = replayDelay;
     }
 
+    public void setSelectorAware(boolean selectorAware) {
+        this.selectorAware = selectorAware;
+    }
+
+    public boolean isSelectorAware() {
+        return selectorAware;
+    }
+
     private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter \
                {
         final static Logger LOG = \
LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class);  private int \
rateLimit; @@ -98,6 +108,7 @@ public class ConditionalNetworkBridgeFilterFactory \
implements NetworkBridgeFilte  
         private int matchCount;
         private long rateDurationEnd;
+        private boolean selectorAware = false;
 
         @Override
         protected boolean matchesForwardingFilter(Message message, final \
MessageEvaluationContext mec) { @@ -136,10 +147,23 @@ public class \
                ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
             List<Subscription> consumers = regionDestination.getConsumers();
             for (Subscription sub : consumers) {
                 if (!sub.getConsumerInfo().isNetworkSubscription() && \
                !sub.getConsumerInfo().isBrowser()) {
-                    LOG.trace("Not replaying [{}] for [{}] to origin due to existing \
                local consumer: {}", new Object[]{
-                            message.getMessageId(), message.getDestination(), \
                sub.getConsumerInfo()
-                    });
-                    return false;
+
+                    if (!isSelectorAware()) {
+                        LOG.trace("Not replaying [{}] for [{}] to origin due to \
existing local consumer: {}", new Object[]{ +                                \
message.getMessageId(), message.getDestination(), sub.getConsumerInfo() +             \
}); +                        return false;
+
+                    } else {
+                        try {
+                            if (sub.matches(message, mec)) {
+                                LOG.trace("Not replaying [{}] for [{}] to origin due \
to existing selector matching local consumer: {}", new Object[]{ +                    \
message.getMessageId(), message.getDestination(), sub.getConsumerInfo() +             \
}); +                                return false;
+                            }
+                        } catch (Exception ignored) {}
+                    }
                 }
             }
             return true;
@@ -172,5 +196,13 @@ public class ConditionalNetworkBridgeFilterFactory implements \
                NetworkBridgeFilte
         public void setAllowReplayWhenNoConsumers(boolean \
allowReplayWhenNoConsumers) {  this.allowReplayWhenNoConsumers = \
allowReplayWhenNoConsumers;  }
+
+        public void setSelectorAware(boolean selectorAware) {
+            this.selectorAware = selectorAware;
+        }
+
+        public boolean isSelectorAware() {
+            return selectorAware;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/82c9f953/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
                
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java \
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java index \
                265b692..e8495da 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
@@ -178,6 +178,95 @@ public class AMQ4607Test extends JmsMultipleBrokersTestSupport \
implements Uncaug  
     }
 
+    public void testMigratingConsumerSelectorAwareTrue() throws Exception {
+        bridge("Broker0", "Broker1");
+        if (!duplex) bridge("Broker1", "Broker0");
+
+        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory \
= new ConditionalNetworkBridgeFilterFactory(); +        \
conditionalNetworkBridgeFilterFactory.setReplayDelay(0); +        \
conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); +        \
conditionalNetworkBridgeFilterFactory.setSelectorAware(true); +        \
brokers.get("Broker1").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
 +
+        startAllBrokers();
+        this.waitForBridgeFormation();
+
+        Destination dest = createDestination("TEST.FOO", false);
+        sendMessages("Broker0", dest, 1);
+
+        assertExactMessageCount("Broker0", dest, 1, TIMEOUT);
+
+        MessageConsumer messageConsumerNoMatch = createConsumer("Broker1", dest, \
"DoNotConsume = 'true'"); +
+        assertExactConsumersConnect("Broker0", dest, 1, TIMEOUT);
+        assertExactConsumersConnect("Broker1", dest, 1, TIMEOUT);
+
+        assertExactMessageCount("Broker1", dest, 1, TIMEOUT);
+        assertExactMessageCount("Broker0", dest, 0, TIMEOUT);
+
+        // now consume the message
+        final String brokerId = "Broker0";
+        MessageConsumer messageConsumer = createConsumer(brokerId, dest);
+
+        assertExactConsumersConnect("Broker0", dest, 2, TIMEOUT);
+        assertExactConsumersConnect("Broker1", dest, 2, TIMEOUT);
+
+
+        assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokers.get(brokerId).allMessages.getMessageIds().size() == \
1; +            }
+        }));
+        messageConsumer.close();
+    }
+
+    public void testMigratingConsumerSelectorAwareFalse() throws Exception {
+        bridge("Broker0", "Broker1");
+        if (!duplex) bridge("Broker1", "Broker0");
+
+        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory \
= new ConditionalNetworkBridgeFilterFactory(); +        \
conditionalNetworkBridgeFilterFactory.setReplayDelay(0); +        \
conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); +        \
conditionalNetworkBridgeFilterFactory.setSelectorAware(false); +        \
brokers.get("Broker1").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
 +
+        startAllBrokers();
+        this.waitForBridgeFormation();
+
+        Destination dest = createDestination("TEST.FOO", false);
+        sendMessages("Broker0", dest, 1);
+
+        assertExactMessageCount("Broker0", dest, 1, TIMEOUT);
+
+        MessageConsumer messageConsumerNoMatch = createConsumer("Broker1", dest, \
"DoNotConsume = 'true'"); +
+        assertExactConsumersConnect("Broker0", dest, 1, TIMEOUT);
+        assertExactConsumersConnect("Broker1", dest, 1, TIMEOUT);
+
+        assertExactMessageCount("Broker1", dest, 1, TIMEOUT);
+        assertExactMessageCount("Broker0", dest, 0, TIMEOUT);
+
+        // now try consume the message
+        final String brokerId = "Broker0";
+        MessageConsumer messageConsumer = createConsumer(brokerId, dest);
+
+        assertExactConsumersConnect("Broker0", dest, 2, TIMEOUT);
+        assertExactConsumersConnect("Broker1", dest, 2, TIMEOUT);
+
+        assertExactMessageCount("Broker1", dest, 1, TIMEOUT);
+        assertExactMessageCount("Broker0", dest, 0, TIMEOUT);
+
+        assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokers.get(brokerId).allMessages.getMessageIds().size() == \
0; +            }
+        }));
+        messageConsumer.close();
+    }
+
+
     protected void assertExactMessageCount(final String brokerName, Destination \
                destination, final int count, long timeout) throws Exception {
         ManagementContext context = \
brokers.get(brokerName).broker.getManagementContext();  final QueueViewMBean \
queueViewMBean = (QueueViewMBean) \
context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], \
QueueViewMBean.class, false);


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic