[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r738904 - in /activemq/trunk/activemq-core/src:
From: dejanb () apache ! org
Date: 2009-01-29 16:01:36
Message-ID: 20090129160137.BDCF0238895D () eris ! apache ! org
[Download RAW message or body]
Author: dejanb
Date: Thu Jan 29 16:01:35 2009
New Revision: 738904
URL: http://svn.apache.org/viewvc?rev=738904&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1807
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/transport/stomp/ProtocolConverter.java?rev=738904&r1=738903&r2=738904&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java \
Thu Jan 29 16:01:35 2009 @@ -275,7 +275,7 @@
boolean acked = false;
for (Iterator<StompSubscription> iter = \
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { StompSubscription \
sub = iter.next();
- MessageAck ack = sub.onStompMessageAck(messageId);
+ MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
if (ack != null) {
ack.setTransactionId(activemqTx);
sendToActiveMQ(ack, createResponseHandler(command));
@@ -331,6 +331,11 @@
if (activemqTx == null) {
throw new ProtocolException("Invalid transaction id: " + stompTx);
}
+
+ for (Iterator<StompSubscription> iter = \
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { + \
StompSubscription sub = iter.next(); + sub.onStompCommit(activemqTx);
+ }
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
@@ -338,6 +343,7 @@
tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
sendToActiveMQ(tx, createResponseHandler(command));
+
}
protected void onStompAbort(StompFrame command) throws ProtocolException {
@@ -353,6 +359,14 @@
if (activemqTx == null) {
throw new ProtocolException("Invalid transaction id: " + stompTx);
}
+ for (Iterator<StompSubscription> iter = \
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { + \
StompSubscription sub = iter.next(); + try {
+ sub.onStompAbort(activemqTx);
+ } catch (Exception e) {
+ throw new ProtocolException("Transaction abort failed", false, e);
+ }
+ }
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
@@ -543,7 +557,6 @@
* @throws IOException
*/
public void onActiveMQCommad(Command command) throws IOException, JMSException {
-
if (command.isResponse()) {
Response response = (Response)command;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/transport/stomp/StompSubscription.java?rev=738904&r1=738903&r2=738904&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java \
Thu Jan 29 16:01:35 2009 @@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
@@ -30,9 +31,10 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
/**
- * Keeps track of the STOMP susbscription so that acking is correctly done.
+ * Keeps track of the STOMP subscription so that acking is correctly done.
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
@@ -46,11 +48,13 @@
private final String subscriptionId;
private final ConsumerInfo consumerInfo;
- private final LinkedHashMap<String, MessageId> dispatchedMessage = new \
LinkedHashMap<String, MessageId>(); + private final LinkedHashMap<MessageId, \
MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>(); \
+ private final LinkedList<MessageDispatch> unconsumedMessage = new \
LinkedList<MessageDispatch>();
private String ackMode = AUTO_ACK;
private ActiveMQDestination destination;
private String transformation;
+
public StompSubscription(ProtocolConverter stompTransport, String \
subscriptionId, ConsumerInfo consumerInfo, String transformation) { \
this.protocolConverter = stompTransport; @@ -60,16 +64,14 @@
}
void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
-
ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
-
if (ackMode == CLIENT_ACK) {
synchronized (this) {
- dispatchedMessage.put(message.getJMSMessageID(), \
message.getMessageId()); + \
dispatchedMessage.put(message.getMessageId(), md); }
} else if (ackMode == INDIVIDUAL_ACK) {
synchronized (this) {
- dispatchedMessage.put(message.getJMSMessageID(), \
message.getMessageId()); + \
dispatchedMessage.put(message.getMessageId(), md); }
} else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
@@ -86,19 +88,60 @@
ignoreTransformation = true;
}
}
+
StompFrame command = protocolConverter.convertMessage(message, \
ignoreTransformation);
command.setAction(Stomp.Responses.MESSAGE);
if (subscriptionId != null) {
command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, \
subscriptionId); }
-
+
protocolConverter.getTransportFilter().sendToStomp(command);
}
-
- synchronized MessageAck onStompMessageAck(String messageId) {
-
- if (!dispatchedMessage.containsKey(messageId)) {
+
+ synchronized void onStompAbort(TransactionId transactionId) throws IOException, \
JMSException { + //ack all unacked messages
+ for (MessageDispatch md : dispatchedMessage.values()) {
+ if (!unconsumedMessage.contains(md)) {
+ MessageAck ack = new MessageAck();
+ ack.setDestination(consumerInfo.getDestination());
+ ack.setConsumerId(consumerInfo.getConsumerId());
+ ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
+ ack.setFirstMessageId(md.getMessage().getMessageId());
+ ack.setLastMessageId(md.getMessage().getMessageId());
+ ack.setMessageCount(1);
+ ack.setTransactionId(transactionId);
+ protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+ unconsumedMessage.add(md);
+ }
+ }
+ // redeliver all unconsumed messages
+ for (MessageDispatch md : unconsumedMessage) {
+ onMessageDispatch(md);
+ }
+ }
+
+ synchronized void onStompCommit(TransactionId transactionId) {
+ // ack all messages
+ MessageAck ack = new MessageAck();
+ ack.setDestination(consumerInfo.getDestination());
+ ack.setConsumerId(consumerInfo.getConsumerId());
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
+ ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
+ ack.setMessageCount(unconsumedMessage.size());
+ ack.setTransactionId(transactionId);
+ protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+ // clear lists
+ unconsumedMessage.clear();
+ dispatchedMessage.clear();
+ }
+
+ synchronized MessageAck onStompMessageAck(String messageId, TransactionId \
transactionId) { +
+ MessageId msgId = new MessageId(messageId);
+
+ if (!dispatchedMessage.containsKey(msgId)) {
return null;
}
@@ -107,33 +150,50 @@
ack.setConsumerId(consumerInfo.getConsumerId());
if (ackMode == CLIENT_ACK) {
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ if (transactionId != null) {
+ ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
+ } else {
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ }
int count = 0;
for (Iterator iter = dispatchedMessage.entrySet().iterator(); \
iter.hasNext();) {
Map.Entry entry = (Entry)iter.next();
- String id = (String)entry.getKey();
- MessageId msgid = (MessageId)entry.getValue();
+ MessageId id = (MessageId)entry.getKey();
+ MessageDispatch msg = (MessageDispatch)entry.getValue();
if (ack.getFirstMessageId() == null) {
- ack.setFirstMessageId(msgid);
+ ack.setFirstMessageId(id);
}
-
- iter.remove();
+
+ if (transactionId != null) {
+ if (!unconsumedMessage.contains(msg))
+ unconsumedMessage.add(msg);
+ } else {
+ iter.remove();
+ }
+
+
count++;
- if (id.equals(messageId)) {
- ack.setLastMessageId(msgid);
+ if (id.equals(msgId)) {
+ ack.setLastMessageId(id);
break;
}
}
ack.setMessageCount(count);
+ if (transactionId != null) {
+ ack.setTransactionId(transactionId);
+ }
}
else if (ackMode == INDIVIDUAL_ACK) {
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
- MessageId msgid = (MessageId)dispatchedMessage.get(messageId);
- ack.setMessageID(msgid);
+ ack.setMessageID(msgId);
+ if (transactionId != null) {
+ unconsumedMessage.add(dispatchedMessage.get(msgId));
+ ack.setTransactionId(transactionId);
+ }
dispatchedMessage.remove(messageId);
}
return ack;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apach \
e/activemq/transport/stomp/StompTest.java?rev=738904&r1=738903&r2=738904&view=diff \
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java \
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java \
Thu Jan 29 16:01:35 2009 @@ -46,7 +46,6 @@
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -937,24 +936,55 @@
sendMessage("message 1");
sendMessage("message 2");
sendMessage("message 3");
+ sendMessage("message 4");
+ sendMessage("message 5");
+
- StompFrame frame = stompConnection.receive();
+ StompFrame frame = stompConnection.receive();
+ assertEquals(frame.getBody(), "message 1");
+
stompConnection.begin("tx1");
stompConnection.ack(frame, "tx1");
StompFrame frame1 = stompConnection.receive();
-
+ assertEquals(frame1.getBody(), "message 2");
+
try {
StompFrame frame2 = stompConnection.receive(500);
if (frame2 != null) {
fail("Should not have received the second message");
}
} catch (SocketTimeoutException soe) {}
+
+ Thread.sleep(100);
+ stompConnection.abort("tx1");
+
+ stompConnection.begin("tx2");
+
+ StompFrame frame3 = stompConnection.receive();
+ assertEquals(frame3.getBody(), "message 1");
+ stompConnection.ack(frame3, "tx2");
+
+ StompFrame frame4 = stompConnection.receive();
+ assertEquals(frame4.getBody(), "message 2");
+ stompConnection.ack(frame4, "tx2");
+
+ StompFrame frame5 = stompConnection.receive();
+ assertEquals(frame5.getBody(), "message 3");
+ stompConnection.ack(frame5, "tx2");
+
+ stompConnection.commit("tx2");
+
+ stompConnection.begin("tx3");
+ StompFrame frame6 = stompConnection.receive();
+ assertEquals(frame6.getBody(), "message 4");
+ stompConnection.ack(frame6, "tx3");
+ stompConnection.commit("tx3");
+
stompDisconnect();
- }
-
+ }
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients = \
broker.getBroker().getClients(); int actual = clients.length;
@@ -969,3 +999,5 @@
Thread.sleep(2000);
}
}
+
+
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic