[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r1189980 - in /activemq/trunk/activemq-core/src:
From: tabish () apache ! org
Date: 2011-10-27 19:58:44
Message-ID: 20111027195844.43DAE23888CD () eris ! apache ! org
[Download RAW message or body]
Author: tabish
Date: Thu Oct 27 19:58:42 2011
New Revision: 1189980
URL: http://svn.apache.org/viewvc?rev=1189980&view=rev
Log:
Apply patch for https://issues.apache.org/jira/browse/AMQ-3541
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java \
(with props) Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/util/TimeStampingBrokerPlugin.java?rev=1189980&r1=1189979&r2=1189980&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java \
Thu Oct 27 19:58:42 2011 @@ -18,6 +18,9 @@ package org.apache.activemq.broker.util;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,12 +29,12 @@ import org.slf4j.LoggerFactory;
* A Broker interceptor which updates a JMS Client's timestamp on the message
* with a broker timestamp. Useful when the clocks on client machines are known
* to not be correct and you can only trust the time set on the broker machines.
- *
+ *
* Enabling this plugin will break JMS compliance since the timestamp that the
* producer sees on the messages after as send() will be different from the
* timestamp the consumer will observe when he receives the message. This plugin
* is not enabled in the default ActiveMQ configuration.
- *
+ *
* 2 new attributes have been added which will allow the administrator some override \
control
* over the expiration time for incoming messages:
*
@@ -41,38 +44,38 @@ import org.slf4j.LoggerFactory;
* Attribute 'ttlCeiling' can be used to apply a limit to the expiration time
*
* @org.apache.xbean.XBean element="timeStampingBrokerPlugin"
- *
- *
+ *
+ *
*/
public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
private static final Logger LOG = \
LoggerFactory.getLogger(TimeStampingBrokerPlugin.class);
- /**
+ /**
* variable which (when non-zero) is used to override
* the expiration date for messages that arrive with
* no expiration date set (in Milliseconds).
*/
long zeroExpirationOverride = 0;
- /**
+ /**
* variable which (when non-zero) is used to limit
- * the expiration date (in Milliseconds).
+ * the expiration date (in Milliseconds).
*/
long ttlCeiling = 0;
-
+
/**
* If true, the plugin will not update timestamp to past values
* False by default
*/
boolean futureOnly = false;
-
-
+
+
/**
* if true, update timestamp even if message has passed through a network
* default false
*/
boolean processNetworkMessages = false;
- /**
+ /**
* setter method for zeroExpirationOverride
*/
public void setZeroExpirationOverride(long ttl)
@@ -80,7 +83,7 @@ public class TimeStampingBrokerPlugin ex
this.zeroExpirationOverride = ttl;
}
- /**
+ /**
* setter method for ttlCeiling
*/
public void setTtlCeiling(long ttlCeiling)
@@ -88,19 +91,21 @@ public class TimeStampingBrokerPlugin ex
this.ttlCeiling = ttlCeiling;
}
- public void setFutureOnly(boolean futureOnly) {
- this.futureOnly = futureOnly;
- }
-
- public void setProcessNetworkMessages(Boolean processNetworkMessages) {
- this.processNetworkMessages = processNetworkMessages;
- }
+ public void setFutureOnly(boolean futureOnly) {
+ this.futureOnly = futureOnly;
+ }
+
+ public void setProcessNetworkMessages(Boolean processNetworkMessages) {
+ this.processNetworkMessages = processNetworkMessages;
+ }
- @Override
+ @Override
public void send(ProducerBrokerExchange producerExchange, Message message) \
throws Exception {
- if (message.getTimestamp() > 0
- && (processNetworkMessages || (message.getBrokerPath() == null || \
message.getBrokerPath().length == 0))) { +
+ if (message.getTimestamp() > 0 && !isDestinationDLQ(message) &&
+ (processNetworkMessages || (message.getBrokerPath() == null || \
message.getBrokerPath().length == 0))) {
// timestamp not been disabled and has not passed through a network or \
processNetworkMessages=true +
long oldExpiration = message.getExpiration();
long newTimeStamp = System.currentTimeMillis();
long timeToLive = zeroExpirationOverride;
@@ -112,17 +117,40 @@ public class TimeStampingBrokerPlugin ex
timeToLive = ttlCeiling;
}
long expiration = timeToLive + newTimeStamp;
- //In the scenario that the Broker is behind the clients we never want to set the \
Timestamp and Expiration in the past
- if(!futureOnly || (expiration > oldExpiration)) {
- if (timeToLive > 0 && expiration > 0) {
- message.setExpiration(expiration);
- }
- message.setTimestamp(newTimeStamp);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Set message " + message.getMessageId() + " timestamp from " + \
oldTimestamp + " to " + newTimeStamp);
- }
- }
+ // In the scenario that the Broker is behind the clients we never want \
to set the + // Timestamp and Expiration in the past
+ if(!futureOnly || (expiration > oldExpiration)) {
+ if (timeToLive > 0 && expiration > 0) {
+ message.setExpiration(expiration);
+ }
+ message.setTimestamp(newTimeStamp);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Set message " + message.getMessageId() + " timestamp \
from " + oldTimestamp + " to " + newTimeStamp); + }
+ }
}
super.send(producerExchange, message);
}
+
+ private boolean isDestinationDLQ(Message message) {
+ DeadLetterStrategy deadLetterStrategy;
+ Message tmp;
+
+ if (message != null && message.getRegionDestination() != null) {
+ deadLetterStrategy = \
message.getRegionDestination().getDeadLetterStrategy(); + if \
(deadLetterStrategy != null) { + // Cheap copy, since we only need two \
fields + tmp = new ActiveMQMessage();
+ tmp.setDestination(message.getOriginalDestination());
+ tmp.setRegionDestination(message.getRegionDestination());
+
+ // Determine if we are headed for a DLQ
+ ActiveMQDestination deadLetterDestination = \
deadLetterStrategy.getDeadLetterQueueFor(tmp, null); + if \
(deadLetterDestination.equals(message.getDestination())) { + \
return true; + }
+ }
+ }
+ return false;
+ }
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java?rev=1189980&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java \
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java \
Thu Oct 27 19:58:42 2011 @@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TimeStampingBrokerPluginTest extends TestCase {
+
+ BrokerService broker;
+ TransportConnector tcpConnector;
+ MessageProducer producer;
+ MessageConsumer consumer;
+ Connection connection;
+ Session session;
+ Destination destination;
+ String queue = "TEST.FOO";
+ long expiry = 500;
+
+ @Before
+ public void setUp() throws Exception {
+ TimeStampingBrokerPlugin tsbp = new TimeStampingBrokerPlugin();
+ tsbp.setZeroExpirationOverride(expiry);
+ tsbp.setTtlCeiling(expiry);
+
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(true);
+ broker.setPlugins(new BrokerPlugin[] {tsbp});
+ tcpConnector = broker.addConnector("tcp://localhost:0");
+
+ // Add policy and individual DLQ strategy
+ PolicyEntry policy = new PolicyEntry();
+ DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+ strategy.setProcessExpired(true);
+ ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
+ ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
+ strategy.setProcessNonPersistent(true);
+ policy.setDeadLetterStrategy(strategy);
+
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+
+ broker.setDestinationPolicy(pMap);
+
+ broker.start();
+ // Create a ConnectionFactory
+ ActiveMQConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(tcpConnector.getConnectUri());
+
+ // Create a Connection
+ connection = connectionFactory.createConnection();
+ connection.start();
+
+ // Create a Session
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create the destination Queue
+ destination = session.createQueue(queue);
+
+ // Create a MessageProducer from the Session to the Topic or Queue
+ producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // Clean up
+ producer.close();
+ consumer.close();
+ session.close();
+ connection.close();
+ broker.stop();
+ }
+ @Test
+ public void testExpirationSet() throws Exception {
+
+ // Create a messages
+ Message sentMessage = session.createMessage();
+
+ // Tell the producer to send the message
+ long beforeSend = System.currentTimeMillis();
+ producer.send(sentMessage);
+
+ // Create a MessageConsumer from the Session to the Topic or Queue
+ consumer = session.createConsumer(destination);
+
+ // Wait for a message
+ Message receivedMessage = consumer.receive(1000);
+
+ // assert we got the same message ID we sent
+ assertEquals(sentMessage.getJMSMessageID(), \
receivedMessage.getJMSMessageID()); +
+ // assert message timestamp is in window
+ assertTrue("Expiration should be not null" + \
receivedMessage.getJMSExpiration() + "\n", \
Long.valueOf(receivedMessage.getJMSExpiration()) != null); +
+ // assert message expiration is in window
+ assertTrue("Before send: " + beforeSend + " Msg ts: " + \
receivedMessage.getJMSTimestamp() + " Msg Expiry: " + \
receivedMessage.getJMSExpiration(), beforeSend <= receivedMessage.getJMSExpiration() \
&& receivedMessage.getJMSExpiration() <= (receivedMessage.getJMSTimestamp() + \
expiry)); + }
+ @Test
+ public void testExpirationCelingSet() throws Exception {
+
+ // Create a messages
+ Message sentMessage = session.createMessage();
+ // Tell the producer to send the message
+ long beforeSend = System.currentTimeMillis();
+ long sendExpiry = beforeSend + (expiry*22);
+ sentMessage.setJMSExpiration(sendExpiry);
+
+ producer.send(sentMessage);
+
+ // Create a MessageConsumer from the Session to the Topic or Queue
+ consumer = session.createConsumer(destination);
+
+ // Wait for a message
+ Message receivedMessage = consumer.receive(1000);
+
+ // assert we got the same message ID we sent
+ assertEquals(sentMessage.getJMSMessageID(), \
receivedMessage.getJMSMessageID()); +
+ // assert message timestamp is in window
+ assertTrue("Expiration should be not null" + \
receivedMessage.getJMSExpiration() + "\n", \
Long.valueOf(receivedMessage.getJMSExpiration()) != null); +
+ // assert message expiration is in window
+ assertTrue("Sent expiry: " + sendExpiry + " Recv ts: " + \
receivedMessage.getJMSTimestamp() + " Recv expiry: " + \
receivedMessage.getJMSExpiration(), beforeSend <= receivedMessage.getJMSExpiration() \
&& receivedMessage.getJMSExpiration() <= (receivedMessage.getJMSTimestamp() + \
expiry)); + }
+
+ @Test
+ public void testExpirationDLQ() throws Exception {
+
+ // Create a messages
+ Message sentMessage = session.createMessage();
+ // Tell the producer to send the message
+ long beforeSend = System.currentTimeMillis();
+ long sendExpiry = beforeSend + expiry;
+ sentMessage.setJMSExpiration(sendExpiry);
+
+ producer.send(sentMessage);
+
+ // Create a MessageConsumer from the Session to the Topic or Queue
+ consumer = session.createConsumer(destination);
+
+ Thread.sleep(expiry+250);
+
+ // Wait for a message
+ Message receivedMessage = consumer.receive(1000);
+
+ // Message should roll to DLQ
+ assertNull(receivedMessage);
+
+ // Close old consumer, setup DLQ listener
+ consumer.close();
+ consumer = session.createConsumer(session.createQueue("DLQ."+queue));
+
+ // Get mesage from DLQ
+ receivedMessage = consumer.receive(1000);
+
+ // assert we got the same message ID we sent
+ assertEquals(sentMessage.getJMSMessageID(), \
receivedMessage.getJMSMessageID()); +
+ // assert message timestamp is in window
+ //System.out.println("Recv: " + receivedMessage.getJMSExpiration());
+ assertEquals("Expiration should be zero" + \
receivedMessage.getJMSExpiration() + "\n", receivedMessage.getJMSExpiration(), 0); + \
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java
------------------------------------------------------------------------------
svn:eol-style = native
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic