[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r739307 - in /activemq/trunk/activemq-core/src:
From: gtully () apache ! org
Date: 2009-01-30 15:30:25
Message-ID: 20090130153025.70CE423888A0 () eris ! apache ! org
[Download RAW message or body]
Author: gtully
Date: Fri Jan 30 15:30:24 2009
New Revision: 739307
URL: http://svn.apache.org/viewvc?rev=739307&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1593
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/PrefetchSubscription.java?rev=739307&r1=739306&r2=739307&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java \
Fri Jan 30 15:30:24 2009 @@ -316,7 +316,6 @@
inAckRange = true;
}
if (inAckRange) {
- node.incrementRedeliveryCounter();
if (ack.getLastMessageId().equals(messageId)) {
destination = node.getRegionDestination();
callDispatchMatched = true;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apach \
e/activemq/JmsRollbackRedeliveryTest.java?rev=739307&r1=739306&r2=739307&view=diff \
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java \
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java \
Fri Jan 30 15:30:24 2009 @@ -70,9 +70,6 @@
public void doTestRedelivery(String brokerUrl, boolean interleaveProducer) \
throws Exception {
- final int nbMessages = 10;
- final String destinationName = "Destination";
-
ConnectionFactory connectionFactory = new \
ActiveMQConnectionFactory(brokerUrl);
Connection connection = connectionFactory.createConnection();
@@ -179,36 +176,66 @@
}
}
- public void testRedeliveryOnSessionCloseWithNoRollback() throws Exception {
+ // AMQ-1593
+ public void testValidateRedeliveryCountOnRollback() throws Exception {
- ConnectionFactory connectionFactory =
+ final int numMessages = 1;
+ ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(brokerUrl);
Connection connection = connectionFactory.createConnection();
connection.start();
- populateDestination(nbMessages, destinationName, connection);
+ populateDestination(numMessages, destinationName, connection);
{
AtomicInteger received = new AtomicInteger();
- Map<String, Boolean> rolledback = new ConcurrentHashMap<String, \
Boolean>();
- while (received.get() < nbMessages) {
- Session session = connection.createSession(true, \
Session.AUTO_ACKNOWLEDGE); + final int maxRetries = new \
RedeliveryPolicy().getMaximumRedeliveries(); + while (received.get() < \
maxRetries) { + Session session = connection.createSession(true, \
Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(destinationName);
MessageConsumer consumer = session.createConsumer(destination); \
TextMessage msg = (TextMessage) consumer.receive(1000);
if (msg != null) {
- if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) \
!= null) {
- LOG.info("Received message " + msg.getText() + " (" + \
received.getAndIncrement() + ")" + msg.getJMSMessageID());
- assertTrue(msg.getJMSRedelivered());
- session.commit();
- }
+ LOG.info("Received message " + msg.getText() + " (" + \
received.getAndIncrement() + ")" + msg.getJMSMessageID()); + \
assertEquals("redelivery property matches deliveries", received.get(), \
msg.getLongProperty("JMSXDeliveryCount")); + session.rollback();
}
session.close();
}
}
}
+ // AMQ-1593
+ public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws \
Exception { +
+ final int numMessages = 1;
+ ConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(brokerUrl + \
"?jms.prefetchPolicy.queuePrefetch=0"); + Connection connection = \
connectionFactory.createConnection(); + connection.start();
+
+ populateDestination(numMessages, destinationName, connection);
+
+ {
+ AtomicInteger received = new AtomicInteger();
+ final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
+ while (received.get() < maxRetries) {
+ Session session = connection.createSession(true, \
Session.SESSION_TRANSACTED); + Destination destination = \
session.createQueue(destinationName); +
+ MessageConsumer consumer = session.createConsumer(destination); \
+ TextMessage msg = (TextMessage) consumer.receive(1000);
+ if (msg != null) {
+ LOG.info("Received message " + msg.getText() + " (" + \
received.getAndIncrement() + ")" + msg.getJMSMessageID()); + \
assertEquals("redelivery property matches deliveries", received.get(), \
msg.getLongProperty("JMSXDeliveryCount")); + session.rollback();
+ }
+ session.close();
+ }
+ }
+ }
+
public void testRedeliveryPropertyWithNoRollback() throws Exception {
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(brokerUrl);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apach \
e/activemq/network/SimpleNetworkTest.java?rev=739307&r1=739306&r2=739307&view=diff \
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java \
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java \
Fri Jan 30 15:30:24 2009 @@ -140,7 +140,7 @@
doSetUp();
remoteConsumer = remoteSession.createDurableSubscriber(included, \
consumerName); for (int i = 0; i < MESSAGE_COUNT; i++) {
- assertNotNull(remoteConsumer.receive(500));
+ assertNotNull("message count: " + i, remoteConsumer.receive(1000));
}
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic