[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r738904 - in /activemq/trunk/activemq-core/src:
From:       dejanb () apache ! org
Date:       2009-01-29 16:01:36
Message-ID: 20090129160137.BDCF0238895D () eris ! apache ! org
[Download RAW message or body]

Author: dejanb
Date: Thu Jan 29 16:01:35 2009
New Revision: 738904

URL: http://svn.apache.org/viewvc?rev=738904&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1807

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
  activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java


Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/transport/stomp/ProtocolConverter.java?rev=738904&r1=738903&r2=738904&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java \
Thu Jan 29 16:01:35 2009 @@ -275,7 +275,7 @@
         boolean acked = false;
         for (Iterator<StompSubscription> iter = \
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {  StompSubscription \
                sub = iter.next();
-            MessageAck ack = sub.onStompMessageAck(messageId);
+            MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
             if (ack != null) {
                 ack.setTransactionId(activemqTx);
                 sendToActiveMQ(ack, createResponseHandler(command));
@@ -331,6 +331,11 @@
         if (activemqTx == null) {
             throw new ProtocolException("Invalid transaction id: " + stompTx);
         }
+        
+        for (Iterator<StompSubscription> iter = \
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { +            \
StompSubscription sub = iter.next(); +            sub.onStompCommit(activemqTx);
+        }
 
         TransactionInfo tx = new TransactionInfo();
         tx.setConnectionId(connectionId);
@@ -338,6 +343,7 @@
         tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
 
         sendToActiveMQ(tx, createResponseHandler(command));
+        
     }
 
     protected void onStompAbort(StompFrame command) throws ProtocolException {
@@ -353,6 +359,14 @@
         if (activemqTx == null) {
             throw new ProtocolException("Invalid transaction id: " + stompTx);
         }
+        for (Iterator<StompSubscription> iter = \
subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { +            \
StompSubscription sub = iter.next(); +            try {
+            	sub.onStompAbort(activemqTx);
+            } catch (Exception e) {
+            	throw new ProtocolException("Transaction abort failed", false, e);
+            }
+        }
 
         TransactionInfo tx = new TransactionInfo();
         tx.setConnectionId(connectionId);
@@ -543,7 +557,6 @@
      * @throws IOException
      */
     public void onActiveMQCommad(Command command) throws IOException, JMSException {
-
         if (command.isResponse()) {
 
             Response response = (Response)command;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/transport/stomp/StompSubscription.java?rev=738904&r1=738903&r2=738904&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java \
Thu Jan 29 16:01:35 2009 @@ -19,6 +19,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -30,9 +31,10 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
 
 /**
- * Keeps track of the STOMP susbscription so that acking is correctly done.
+ * Keeps track of the STOMP subscription so that acking is correctly done.
  * 
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
@@ -46,11 +48,13 @@
     private final String subscriptionId;
     private final ConsumerInfo consumerInfo;
 
-    private final LinkedHashMap<String, MessageId> dispatchedMessage = new \
LinkedHashMap<String, MessageId>(); +    private final LinkedHashMap<MessageId, \
MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>(); \
+    private final LinkedList<MessageDispatch> unconsumedMessage = new \
LinkedList<MessageDispatch>();  
     private String ackMode = AUTO_ACK;
     private ActiveMQDestination destination;
     private String transformation;
+    
 
     public StompSubscription(ProtocolConverter stompTransport, String \
subscriptionId, ConsumerInfo consumerInfo, String transformation) {  \
this.protocolConverter = stompTransport; @@ -60,16 +64,14 @@
     }
 
     void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
-
         ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
-
         if (ackMode == CLIENT_ACK) {
             synchronized (this) {
-                dispatchedMessage.put(message.getJMSMessageID(), \
message.getMessageId()); +                \
dispatchedMessage.put(message.getMessageId(), md);  }
         } else if (ackMode == INDIVIDUAL_ACK) {
             synchronized (this) {
-                dispatchedMessage.put(message.getJMSMessageID(), \
message.getMessageId()); +                \
dispatchedMessage.put(message.getMessageId(), md);  }
         } else if (ackMode == AUTO_ACK) {
             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
@@ -86,19 +88,60 @@
         		ignoreTransformation = true;
         	}
         }
+        
         StompFrame command = protocolConverter.convertMessage(message, \
ignoreTransformation);  
         command.setAction(Stomp.Responses.MESSAGE);
         if (subscriptionId != null) {
             command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, \
subscriptionId);  }
-
+        
         protocolConverter.getTransportFilter().sendToStomp(command);
     }
-
-    synchronized MessageAck onStompMessageAck(String messageId) {
-
-        if (!dispatchedMessage.containsKey(messageId)) {
+    
+    synchronized void onStompAbort(TransactionId transactionId) throws IOException, \
JMSException { +    	//ack all unacked messages
+    	for (MessageDispatch md : dispatchedMessage.values()) {
+    		if (!unconsumedMessage.contains(md)) {
+    	        MessageAck ack = new MessageAck();
+    	        ack.setDestination(consumerInfo.getDestination());
+    	        ack.setConsumerId(consumerInfo.getConsumerId());
+    	        ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
+    	        ack.setFirstMessageId(md.getMessage().getMessageId());
+    	        ack.setLastMessageId(md.getMessage().getMessageId());
+    	        ack.setMessageCount(1);
+    	        ack.setTransactionId(transactionId);
+    	        protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+    	        unconsumedMessage.add(md);
+    		}
+    	}
+    	// redeliver all unconsumed messages
+    	for (MessageDispatch md : unconsumedMessage) {
+    		onMessageDispatch(md);
+    	}
+    }
+    
+    synchronized void onStompCommit(TransactionId transactionId) {
+    	// ack all messages
+        MessageAck ack = new MessageAck();
+        ack.setDestination(consumerInfo.getDestination());
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+        ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
 +        ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
 +        ack.setMessageCount(unconsumedMessage.size());
+        ack.setTransactionId(transactionId);
+        protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+        // clear lists
+    	unconsumedMessage.clear();
+    	dispatchedMessage.clear();
+    }
+
+    synchronized MessageAck onStompMessageAck(String messageId, TransactionId \
transactionId) { +    	
+    	MessageId msgId = new MessageId(messageId);
+    	
+        if (!dispatchedMessage.containsKey(msgId)) {
             return null;
         }
 
@@ -107,33 +150,50 @@
         ack.setConsumerId(consumerInfo.getConsumerId());
 
         if (ackMode == CLIENT_ACK) {
-            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+        	if (transactionId != null) {
+        		ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
+        	} else {
+        		ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+        	}
             int count = 0;
             for (Iterator iter = dispatchedMessage.entrySet().iterator(); \
iter.hasNext();) {  
                 Map.Entry entry = (Entry)iter.next();
-                String id = (String)entry.getKey();
-                MessageId msgid = (MessageId)entry.getValue();
+                MessageId id = (MessageId)entry.getKey();
+                MessageDispatch msg = (MessageDispatch)entry.getValue();
 
                 if (ack.getFirstMessageId() == null) {
-                    ack.setFirstMessageId(msgid);
+                    ack.setFirstMessageId(id);
                 }
-
-                iter.remove();
+                
+                if (transactionId != null) {
+                	if (!unconsumedMessage.contains(msg))
+                		unconsumedMessage.add(msg);
+                } else {
+                	iter.remove();
+                }
+                
+                
                 count++;
 
-                if (id.equals(messageId)) {
-                    ack.setLastMessageId(msgid);
+                if (id.equals(msgId)) {
+                    ack.setLastMessageId(id);
                     break;
                 }
 
             }
             ack.setMessageCount(count);
+            if (transactionId != null) {
+            	ack.setTransactionId(transactionId);
+            }
         }
         else if (ackMode == INDIVIDUAL_ACK) {
             ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
-            MessageId msgid = (MessageId)dispatchedMessage.get(messageId);
-            ack.setMessageID(msgid);
+            ack.setMessageID(msgId);
+            if (transactionId != null) {
+            	unconsumedMessage.add(dispatchedMessage.get(msgId));
+            	ack.setTransactionId(transactionId);
+            } 
             dispatchedMessage.remove(messageId);
         }
         return ack;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apach \
e/activemq/transport/stomp/StompTest.java?rev=738904&r1=738903&r2=738904&view=diff \
                ==============================================================================
                
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java \
                (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java \
Thu Jan 29 16:01:35 2009 @@ -46,7 +46,6 @@
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -937,24 +936,55 @@
         sendMessage("message 1");
         sendMessage("message 2");
         sendMessage("message 3");
+        sendMessage("message 4");
+        sendMessage("message 5");
+        
         
-        StompFrame frame = stompConnection.receive();
 
+        StompFrame frame = stompConnection.receive();
+        assertEquals(frame.getBody(), "message 1");
+        
         stompConnection.begin("tx1");
         stompConnection.ack(frame, "tx1");
 
         StompFrame frame1 = stompConnection.receive();
-        
+        assertEquals(frame1.getBody(), "message 2");
+               
         try {
         	StompFrame frame2 = stompConnection.receive(500);
         	if (frame2 != null) {
         		fail("Should not have received the second message");
         	}
         } catch (SocketTimeoutException soe) {}
+        
+        Thread.sleep(100);
+        stompConnection.abort("tx1");
+        
+        stompConnection.begin("tx2");
+        
+        StompFrame frame3 = stompConnection.receive();
+        assertEquals(frame3.getBody(), "message 1");
+        stompConnection.ack(frame3, "tx2");
+        
+        StompFrame frame4 = stompConnection.receive();
+        assertEquals(frame4.getBody(), "message 2");
+        stompConnection.ack(frame4, "tx2");
+        
+        StompFrame frame5 = stompConnection.receive();
+        assertEquals(frame5.getBody(), "message 3");        
+        stompConnection.ack(frame5, "tx2");
+        
+        stompConnection.commit("tx2");
+        
+        stompConnection.begin("tx3");
+        StompFrame frame6 = stompConnection.receive();
+        assertEquals(frame6.getBody(), "message 4");
+        stompConnection.ack(frame6, "tx3");
+        stompConnection.commit("tx3");
+        
         stompDisconnect();
     	
-    }    
-    
+    }       
     protected void assertClients(int expected) throws Exception {
         org.apache.activemq.broker.Connection[] clients = \
broker.getBroker().getClients();  int actual = clients.length;
@@ -969,3 +999,5 @@
         Thread.sleep(2000);
     }
 }
+
+


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic