[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r642067 -
From: chirino () apache ! org
Date: 2008-03-28 2:22:40
Message-ID: 20080328022240.6F2A21A9832 () eris ! apache ! org
[Download RAW message or body]
Author: chirino
Date: Thu Mar 27 19:22:39 2008
New Revision: 642067
URL: http://svn.apache.org/viewvc?rev=642067&view=rev
Log:
- Fixing out of order dispatch.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=642067&r1=642066&r2=642067&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java \
Thu Mar 27 19:22:39 2008 @@ -25,6 +25,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
@@ -107,7 +108,7 @@
private long redeliveryDelay;
private int ackCounter;
private int dispatchedCount;
- private MessageListener messageListener;
+ private final AtomicReference<MessageListener> messageListener = new \
AtomicReference<MessageListener>(); private JMSConsumerStatsImpl stats;
private final String selector;
@@ -330,7 +331,7 @@
*/
public MessageListener getMessageListener() throws JMSException {
checkClosed();
- return this.messageListener;
+ return this.messageListener.get();
}
/**
@@ -354,19 +355,20 @@
throw new JMSException(
"Illegal prefetch size of zero. This setting is \
not supported for asynchronous consumers please set a value of at least 1"); }
- this.messageListener = listener;
if (listener != null) {
boolean wasRunning = session.isRunning();
if (wasRunning) {
session.stop();
}
+ this.messageListener.set(listener);
session.redispatch(this, unconsumedMessages);
if (wasRunning) {
session.start();
}
-
+ } else {
+ this.messageListener.set(null);
}
}
@@ -934,7 +936,7 @@
}
public void dispatch(MessageDispatch md) {
- MessageListener listener = this.messageListener;
+ MessageListener listener = this.messageListener.get();
try {
synchronized (unconsumedMessages.getMutex()) {
if (clearDispatchList) {
@@ -1024,7 +1026,7 @@
* @throws JMSException
*/
public boolean iterate() {
- MessageListener listener = this.messageListener;
+ MessageListener listener = this.messageListener.get();
if (listener != null) {
MessageDispatch md = unconsumedMessages.dequeueNoWait();
if (md != null) {
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic