[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r1075346 - in /activemq/trunk/activemq-core/src:
From:       dejanb () apache ! org
Date:       2011-02-28 14:23:31
Message-ID: 20110228142331.4ABB623888CD () eris ! apache ! org
[Download RAW message or body]

Author: dejanb
Date: Mon Feb 28 14:23:30 2011
New Revision: 1075346

URL: http://svn.apache.org/viewvc?rev=1075346&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3193 - consumers don't get messages after \
JMX remove

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
  activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java


Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1075346&r1=1075345&r2=1075346&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java \
Mon Feb 28 14:23:30 2011 @@ -1437,6 +1437,23 @@ public class Queue extends \
BaseDestinati  } catch (Throwable e) {
                     LOG.error("Failed to page in more queue messages ", e);
                 }
+            } else {
+                // if there are already paged messages
+                // dispatch them
+                if (pagedInMessages.size() != 0) {
+                    pagedInMessagesLock.writeLock().lock();
+                    ArrayList paged = new ArrayList();
+                    try {
+                       paged.addAll(pagedInMessages.values());
+                    } finally {
+                       pagedInMessagesLock.writeLock().unlock();
+                    }
+                    try {
+                        doDispatch(paged);
+                    } catch (Exception e) {
+                       LOG.error("Failed to dispatch already paged messages ", e);
+                    }
+                }
             }
 
             if (pendingBrowserDispatch != null) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1075346&r1=1075345&r2=1075346&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java \
                (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java \
Mon Feb 28 14:23:30 2011 @@ -48,6 +48,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTempQueue;
 import org.apache.activemq.util.JMXSupport;
@@ -161,6 +162,39 @@ public class MBeanTest extends EmbeddedB
         assertTrue("cache enabled", queueNew.isCacheEnabled());
     }
 
+    public void testRemoveMessages() throws Exception {
+        ObjectName brokerName = assertRegisteredObjectName(domain + \
":Type=Broker,BrokerName=localhost"); +        BrokerViewMBean broker = \
(BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, \
brokerName, BrokerViewMBean.class, true); +        \
broker.addQueue(getDestinationString()); +
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + \
":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); +
+        QueueViewMBean queue = \
(QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, \
queueViewMBeanName, QueueViewMBean.class, true); +        String msg1 = \
queue.sendTextMessage("message 1"); +        String msg2 = \
queue.sendTextMessage("message 2"); +
+        assertTrue(queue.removeMessage(msg2));
+
+        connection = connectionFactory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQDestination dest = createDestination();
+
+        MessageConsumer consumer = session.createConsumer(dest);
+        Message message = consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals(msg1, message.getJMSMessageID());
+
+        String msg3 = queue.sendTextMessage("message 3");
+        message = consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals(msg3, message.getJMSMessageID());
+
+        message = consumer.receive(1000);
+        assertNull(message);
+
+    }
+
     public void testRetryMessages() throws Exception {
         // lets speed up redelivery
         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) \
connectionFactory;


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic