[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r739307 - in /activemq/trunk/activemq-core/src:
From:       gtully () apache ! org
Date:       2009-01-30 15:30:25
Message-ID: 20090130153025.70CE423888A0 () eris ! apache ! org
[Download RAW message or body]

Author: gtully
Date: Fri Jan 30 15:30:24 2009
New Revision: 739307

URL: http://svn.apache.org/viewvc?rev=739307&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1593

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
  activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
  activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java


Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/PrefetchSubscription.java?rev=739307&r1=739306&r2=739307&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java \
Fri Jan 30 15:30:24 2009 @@ -316,7 +316,6 @@
                         inAckRange = true;
                     }
                     if (inAckRange) {
-                        node.incrementRedeliveryCounter();
                         if (ack.getLastMessageId().equals(messageId)) {
                             destination = node.getRegionDestination();
                             callDispatchMatched = true;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apach \
e/activemq/JmsRollbackRedeliveryTest.java?rev=739307&r1=739306&r2=739307&view=diff \
                ==============================================================================
                
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java \
                (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java \
Fri Jan 30 15:30:24 2009 @@ -70,9 +70,6 @@
     
     public void doTestRedelivery(String brokerUrl, boolean interleaveProducer) \
throws Exception {  
-        final int nbMessages = 10;
-        final String destinationName = "Destination";
-
         ConnectionFactory connectionFactory = new \
ActiveMQConnectionFactory(brokerUrl);  
         Connection connection = connectionFactory.createConnection();
@@ -179,36 +176,66 @@
         }
     }
     
-    public void testRedeliveryOnSessionCloseWithNoRollback() throws Exception {
+    // AMQ-1593
+    public void testValidateRedeliveryCountOnRollback() throws Exception {
 
-        ConnectionFactory connectionFactory = 
+        final int numMessages = 1;
+       ConnectionFactory connectionFactory = 
             new ActiveMQConnectionFactory(brokerUrl);
         Connection connection = connectionFactory.createConnection();
         connection.start();
 
-        populateDestination(nbMessages, destinationName, connection);
+        populateDestination(numMessages, destinationName, connection);
 
         {
             AtomicInteger received = new AtomicInteger();
-            Map<String, Boolean> rolledback = new ConcurrentHashMap<String, \
                Boolean>();
-            while (received.get() < nbMessages) {
-                Session session = connection.createSession(true, \
Session.AUTO_ACKNOWLEDGE); +            final int maxRetries = new \
RedeliveryPolicy().getMaximumRedeliveries(); +            while (received.get() < \
maxRetries) { +                Session session = connection.createSession(true, \
                Session.SESSION_TRANSACTED);
                 Destination destination = session.createQueue(destinationName);
 
                 MessageConsumer consumer = session.createConsumer(destination);      \
  TextMessage msg = (TextMessage) consumer.receive(1000);
                 if (msg != null) {
-                    if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) \
                != null) {
-                        LOG.info("Received message " + msg.getText() + " (" + \
                received.getAndIncrement() + ")" + msg.getJMSMessageID());
-                        assertTrue(msg.getJMSRedelivered());
-                        session.commit();
-                    }
+                    LOG.info("Received message " + msg.getText() + " (" + \
received.getAndIncrement() + ")" + msg.getJMSMessageID()); +                    \
assertEquals("redelivery property matches deliveries", received.get(), \
msg.getLongProperty("JMSXDeliveryCount")); +                    session.rollback();
                 }
                 session.close();
             }
         }
     }
     
+    // AMQ-1593
+    public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws \
Exception { +
+        final int numMessages = 1;
+       ConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(brokerUrl + \
"?jms.prefetchPolicy.queuePrefetch=0"); +        Connection connection = \
connectionFactory.createConnection(); +        connection.start();
+
+        populateDestination(numMessages, destinationName, connection);
+
+        {
+            AtomicInteger received = new AtomicInteger();
+            final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
+            while (received.get() < maxRetries) {
+                Session session = connection.createSession(true, \
Session.SESSION_TRANSACTED); +                Destination destination = \
session.createQueue(destinationName); +
+                MessageConsumer consumer = session.createConsumer(destination);      \
 +                TextMessage msg = (TextMessage) consumer.receive(1000);
+                if (msg != null) {
+                    LOG.info("Received message " + msg.getText() + " (" + \
received.getAndIncrement() + ")" + msg.getJMSMessageID()); +                    \
assertEquals("redelivery property matches deliveries", received.get(), \
msg.getLongProperty("JMSXDeliveryCount")); +                    session.rollback();
+                }
+                session.close();
+            }
+        }
+    }
+
     public void testRedeliveryPropertyWithNoRollback() throws Exception {
         ConnectionFactory connectionFactory = 
             new ActiveMQConnectionFactory(brokerUrl);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apach \
e/activemq/network/SimpleNetworkTest.java?rev=739307&r1=739306&r2=739307&view=diff \
                ==============================================================================
                
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java \
                (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java \
Fri Jan 30 15:30:24 2009 @@ -140,7 +140,7 @@
         doSetUp();
         remoteConsumer = remoteSession.createDurableSubscriber(included, \
consumerName);  for (int i = 0; i < MESSAGE_COUNT; i++) {
-            assertNotNull(remoteConsumer.receive(500));
+            assertNotNull("message count: " + i, remoteConsumer.receive(1000));
         }
     }
 


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic