[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r642067 -
From:       chirino () apache ! org
Date:       2008-03-28 2:22:40
Message-ID: 20080328022240.6F2A21A9832 () eris ! apache ! org
[Download RAW message or body]

Author: chirino
Date: Thu Mar 27 19:22:39 2008
New Revision: 642067

URL: http://svn.apache.org/viewvc?rev=642067&view=rev
Log:
- Fixing out of order dispatch.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java


Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=642067&r1=642066&r2=642067&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java \
Thu Mar 27 19:22:39 2008 @@ -25,6 +25,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
@@ -107,7 +108,7 @@
     private long redeliveryDelay;
     private int ackCounter;
     private int dispatchedCount;
-    private MessageListener messageListener;
+    private final AtomicReference<MessageListener> messageListener = new \
AtomicReference<MessageListener>();  private JMSConsumerStatsImpl stats;
 
     private final String selector;
@@ -330,7 +331,7 @@
      */
     public MessageListener getMessageListener() throws JMSException {
         checkClosed();
-        return this.messageListener;
+        return this.messageListener.get();
     }
 
     /**
@@ -354,19 +355,20 @@
             throw new JMSException(
                                    "Illegal prefetch size of zero. This setting is \
not supported for asynchronous consumers please set a value of at least 1");  }
-        this.messageListener = listener;
         if (listener != null) {
             boolean wasRunning = session.isRunning();
             if (wasRunning) {
                 session.stop();
             }
 
+            this.messageListener.set(listener);
             session.redispatch(this, unconsumedMessages);
 
             if (wasRunning) {
                 session.start();
             }
-
+        } else {
+            this.messageListener.set(null);
         }
     }
 
@@ -934,7 +936,7 @@
     }
 
     public void dispatch(MessageDispatch md) {
-        MessageListener listener = this.messageListener;
+        MessageListener listener = this.messageListener.get();
         try {
             synchronized (unconsumedMessages.getMutex()) {
                 if (clearDispatchList) {
@@ -1024,7 +1026,7 @@
      * @throws JMSException
      */
     public boolean iterate() {
-        MessageListener listener = this.messageListener;
+        MessageListener listener = this.messageListener.get();
         if (listener != null) {
             MessageDispatch md = unconsumedMessages.dequeueNoWait();
             if (md != null) {


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic