[prev in list] [next in list] [prev in thread] [next in thread] 

List:       sandesha-dev
Subject:    svn commit: r359586 - in
From:       chamikara () apache ! org
Date:       2005-12-28 18:41:08
Message-ID: 20051228184109.58642.qmail () minotaur ! apache ! org
[Download RAW message or body]

Author: chamikara
Date: Wed Dec 28 10:40:22 2005
New Revision: 359586

URL: http://svn.apache.org/viewcvs?rev=359586&view=rev
Log:
Added to functionality get an Report explainint the status of a sequence. Currently \
this has two functions. 1. To know weather a sequence has been completed (i.e. all \
messages have been received and terminate has been sent). 2. To get the number of \
messages that has been acked upto the current time.

Added:
    webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java
Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
  webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
  webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
  webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java

Added: webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java?rev=359586&view=auto
 ==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java Wed Dec 28 \
10:40:22 2005 @@ -0,0 +1,45 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sandesha2;
+
+
+/**
+ * @author Chamikara Jayalath <chamikaramj@gmail.com>
+ */
+public class RMReport {
+	
+	private long ackedMessageCount = 0;
+	private boolean sequenceCompleted = false;
+	
+	
+
+	public long getAckedMessageCount() {
+		return ackedMessageCount;
+	}
+	
+	public void setAckedMessageCount(long ackedMessageCount) {
+		this.ackedMessageCount = ackedMessageCount;
+	}
+	
+	public boolean isSequenceCompleted() {
+		return sequenceCompleted;
+	}
+	
+	public void setSequenceCompleted(boolean sequenceCompleted) {
+		this.sequenceCompleted = sequenceCompleted;
+	}
+}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java?rev=359586&r1=359585&r2=359586&view=diff
 ==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java \
                (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java Wed \
Dec 28 10:40:22 2005 @@ -16,6 +16,10 @@
 
 package org.apache.sandesha2;
 
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+
 /**
  * Contains all the Sandesha2Constants of Sandesha2.
  * Please see sub-interfaces to see grouped data.
@@ -23,12 +27,22 @@
  * @author Chamikara Jayalath <chamikaramj@gmail.com>
  */
 
-public interface Sandesha2ClientAPI {
+public class Sandesha2ClientAPI {
 
-	String AcksTo = "Sandesha2ClientAPIPropertyAcksTo";
-	String LAST_MESSAGE = "Sandesha2ClientAPIPropertyWSRMLastMessage";
-	String OFFERED_SEQUENCE_ID = "Sandesha2ClientAPIPropertyOfferedSequenceId";
-	String SANDESHA_DEBUG_MODE = "Sandesha2ClientAPIPropertyDebugMode";
-	String SEQUENCE_KEY = "Sandesha2ClientAPIPropertySequenceKey";
+	public static String AcksTo = "Sandesha2ClientAPIPropertyAcksTo";
+	public static String LAST_MESSAGE = "Sandesha2ClientAPIPropertyWSRMLastMessage";
+	public static String OFFERED_SEQUENCE_ID = \
"Sandesha2ClientAPIPropertyOfferedSequenceId"; +	public static String \
SANDESHA_DEBUG_MODE = "Sandesha2ClientAPIPropertyDebugMode"; +	public static String \
SEQUENCE_KEY = "Sandesha2ClientAPIPropertySequenceKey";  
+	public static RMReport getRMReport (String to, String \
sequenceKey,ConfigurationContext configurationContext) throws SandeshaException { +		
+		String internalSequenceID = SandeshaUtil.getInternalSequenceID (to,sequenceKey);
+		
+		RMReport rmReport = new RMReport ();
+		rmReport.setAckedMessageCount(SequenceManager.getAckedMessageCount \
(internalSequenceID,configurationContext)); \
+		rmReport.setSequenceCompleted(SequenceManager.isSequenceCompleted \
(internalSequenceID,configurationContext)); +		
+		return rmReport;
+	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=359586&r1=359585&r2=359586&view=diff
 ==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java \
                (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Wed \
Dec 28 10:40:22 2005 @@ -180,6 +180,10 @@
 		String TERMINATE_ADDED = "TerminateAdded";
 		
 		String LAST_ACTIVATED_TIME = "LastActivatedTime";
+		
+		String NO_OF_MSGS_ACKED = "NoOfMessagesAcked";
+		
+		String TRANSPORT_TO = "TransportTo";
 	}
 
 	public interface SOAPVersion {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=359586&r1=359585&r2=359586&view=diff
 ==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java \
                (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Wed Dec \
28 10:40:22 2005 @@ -151,7 +151,9 @@
 		Collection collection = retransmitterBeanMgr.find(internalSequenceId);
 		Iterator iterator = collection.iterator();
 		while (iterator.hasNext()) {
-			SenderBean retransmitterBean = (SenderBean) iterator.next();
+			Object obj = iterator.next();
+			System.out.println(obj);
+			SenderBean retransmitterBean = (SenderBean) obj;
 			retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
 		}
 		
@@ -183,6 +185,20 @@
 			SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) \
iterator.next();  sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName());
  }
+	}
+	
+	private boolean isProportyDeletable (String name) {
+		boolean deleatable = true;
 		
+		if (Sandesha2Constants.SequenceProperties.TERMINATE_ADDED.equals(name))
+			deleatable = false;
+		
+		if (Sandesha2Constants.SequenceProperties.NO_OF_MSGS_ACKED.equals(name))
+			deleatable = false;
+		
+		if (Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID.equals(name))
+			deleatable = false;
+		
+		return deleatable;
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
                
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=359586&r1=359585&r2=359586&view=diff
 ==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java \
                (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java \
Wed Dec 28 10:40:22 2005 @@ -168,12 +168,12 @@
 				throw new AxisFault(
 						"TO End Point Reference is not set correctly. This is a must for the sandesha \
client side.");  
-			internalSequenceId = toEPR.getAddress();
+			String to = toEPR.getAddress();
 			OperationContext opContext = msgCtx.getOperationContext();
 			String sequenceKey = (String) msgCtx
 					.getProperty(Sandesha2ClientAPI.SEQUENCE_KEY);
-			if (sequenceKey != null)
-				internalSequenceId = internalSequenceId + sequenceKey;
+			
+			internalSequenceId = SandeshaUtil.getInternalSequenceID(to,sequenceKey);
 
 		}
 

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
                
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2 \
/msgprocessors/AcknowledgementProcessor.java?rev=359586&r1=359585&r2=359586&view=diff \
                ==============================================================================
                
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java \
                (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java \
Wed Dec 28 10:40:22 2005 @@ -25,6 +25,7 @@
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.AbstractContext;
+import org.apache.axis2.context.MessageContextConstants;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
@@ -133,6 +134,28 @@
 			//TODO - Process Nack
 		}
 
+		
+		//setting acked message date.
+		//TODO add details specific to each message.
+		long noOfMsgsAcked = \
getNoOfMessagesAcked(sequenceAck.getAcknowledgementRanges().iterator()); \
+		SequencePropertyBean ackedMessagesBean = \
seqPropMgr.retrieve(outSequenceId,Sandesha2Constants.SequenceProperties.NO_OF_MSGS_ACKED);
 +		boolean added = false;
+		
+		if (ackedMessagesBean==null) {
+			added = true;
+			ackedMessagesBean = new SequencePropertyBean ();
+			ackedMessagesBean.setSequenceID(outSequenceId);
+			ackedMessagesBean.setName(Sandesha2Constants.SequenceProperties.NO_OF_MSGS_ACKED);
 +		}
+		
+		ackedMessagesBean.setValue(Long.toString(noOfMsgsAcked));
+		
+		if (added) 
+			seqPropMgr.insert(ackedMessagesBean);
+		else
+			seqPropMgr.update(ackedMessagesBean);
+		
+		
 		//following get called in the SandesaInHandler
 		//if (justSendTerminateIfNeeded) {
 		//If all messages up to last message have been acknowledged.
@@ -160,6 +183,8 @@
 			}
 		}
 		
+	
+		
 		//stopping the progress of the message further.
 		//rmMsgCtx.getMessageContext().pause();
 		rmMsgCtx.getMessageContext().setPausedTrue(new QName \
(Sandesha2Constants.IN_HANDLER_NAME)); @@ -222,6 +247,11 @@
 		terminateRMMessage
 				.setSOAPAction(Sandesha2Constants.WSRM.Actions.SOAP_ACTION_TERMINATE_SEQUENCE);
 
+		SequencePropertyBean transportToBean = \
seqPropMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
 +		if (transportToBean!=null) {
+			terminateRMMessage.setProperty(MessageContextConstants.TRANSPORT_URL,transportToBean.getValue());
 +		}
+		
 		try {
 			terminateRMMessage.addSOAPEnvelope();
 		} catch (AxisFault e) {
@@ -246,15 +276,34 @@
 		SenderBeanMgr retramsmitterMgr = storageManager
 				.getRetransmitterBeanMgr();
 
+		
+
+		retramsmitterMgr.insert(terminateBean);
+		
 		SequencePropertyBean terminateAdded = new SequencePropertyBean();
 		terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
 		terminateAdded.setSequenceID(outSequenceId);
 		terminateAdded.setValue("true");
 
 		seqPropMgr.insert(terminateAdded);
+		
+		
 
-		retramsmitterMgr.insert(terminateBean);
-
+	}
+	
+	private static long getNoOfMessagesAcked (Iterator ackRangeIterator) {
+		long noOfMsgs = 0;
+		while (ackRangeIterator.hasNext()) {
+			AcknowledgementRange acknowledgementRange = (AcknowledgementRange) \
ackRangeIterator.next(); +			long lower = acknowledgementRange.getLowerValue();
+			long upper = acknowledgementRange.getUpperValue();
+			
+			for (long i=lower;i<=upper;i++) {
+				noOfMsgs++;
+			}
+		}
+		
+		return noOfMsgs;
 	}
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
                
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2 \
/storage/inmemory/InMemorySenderBeanMgr.java?rev=359586&r1=359585&r2=359586&view=diff \
                ==============================================================================
                
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java \
                (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java \
Wed Dec 28 10:40:22 2005 @@ -71,7 +71,7 @@
 		while (iterator.hasNext()) {
 			SenderBean senderBean = (SenderBean) table.get(iterator.next());
 			if (internalSequenceID.equals(senderBean.getInternalSequenceID())) 
-					arrayList.add(internalSequenceID);
+					arrayList.add(senderBean);
 		}
 		
 		return arrayList;

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=359586&r1=359585&r2=359586&view=diff
 ==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java \
                (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Wed \
Dec 28 10:40:22 2005 @@ -582,5 +582,16 @@
 		
 		return retArr;
 	}
+	
+	public static String getInternalSequenceID (String to, String sequenceKey) {
+		if (to==null && sequenceKey==null)
+			return null;
+		else if (to==null) 
+			return sequenceKey;
+		else if (sequenceKey==null)
+			return to;
+		else 
+			return to + ":" +sequenceKey;
+	}
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
                
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=359586&r1=359585&r2=359586&view=diff
 ==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java \
                (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Wed \
Dec 28 10:40:22 2005 @@ -6,11 +6,15 @@
  */
 package org.apache.sandesha2.util;
 
+import java.util.ArrayList;
+import java.util.Collection;
+
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.AbstractContext;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2ClientAPI;
 import org.apache.sandesha2.Sandesha2Constants;
@@ -18,8 +22,10 @@
 import org.apache.sandesha2.policy.RMPolicyBean;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
 import org.apache.sandesha2.storage.beans.NextMsgBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.wsrm.CreateSequence;
@@ -110,7 +116,7 @@
 	}
 
 	public static void setupNewClientSequence(
-			MessageContext firstAplicationMsgCtx, String iternalSequenceId)
+			MessageContext firstAplicationMsgCtx, String internalSequenceId)
 			throws SandeshaException {
 
 		AbstractContext context = firstAplicationMsgCtx.getConfigurationContext();
@@ -130,7 +136,7 @@
 		if (toEPR == null)
 			throw new SandeshaException("WS-Addressing To is null");
 
-		SequencePropertyBean toBean = new SequencePropertyBean(iternalSequenceId,
+		SequencePropertyBean toBean = new SequencePropertyBean(internalSequenceId,
 				Sandesha2Constants.SequenceProperties.TO_EPR, toEPR.getAddress());
 
 		//Default value for acksTo is anonymous
@@ -139,10 +145,21 @@
 
 		EndpointReference acksToEPR = new EndpointReference(acksTo);
 		SequencePropertyBean acksToBean = new SequencePropertyBean(
-				iternalSequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR,
+				internalSequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR,
 				acksToEPR.getAddress());
 		seqPropMgr.insert(toBean);
 		seqPropMgr.insert(acksToBean);
+		
+		//saving transportTo value;
+		String transportTo = (String) \
firstAplicationMsgCtx.getProperty(MessageContextConstants.TRANSPORT_URL); +		if \
(transportTo!=null) { +			SequencePropertyBean transportToBean = new \
SequencePropertyBean (); +			transportToBean.setSequenceID(internalSequenceId);
+			transportToBean.setName(Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+			transportToBean.setValue(transportTo);
+			
+			seqPropMgr.insert(transportToBean);
+		}
 
 	}
 	
@@ -198,6 +215,9 @@
 			//loading default policies.
 			policyBean = PropertyManager.getInstance().getRMPolicyBean();
 		}
+		
+		if (policyBean.getInactiveTimeoutInterval()<=0)
+			return false;
 
 		boolean sequenceTimedOut = false;
 		
@@ -211,5 +231,63 @@
 		
 		return sequenceTimedOut;
 	}
+	
+	public static long getAckedMessageCount (String \
internalSequenceID,ConfigurationContext configurationContext) throws \
SandeshaException { +		StorageManager storageManager = \
SandeshaUtil.getSandeshaStorageManager(configurationContext); +		Transaction \
transaction = storageManager.getTransaction(); +		SequencePropertyBeanMgr \
seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr(); +		
+		SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
+		findSeqIDBean.setValue(internalSequenceID);
+		findSeqIDBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+		Collection seqIDBeans = seqPropBeanMgr.find(findSeqIDBean);
+		
+		if (seqIDBeans.size()==0)
+			throw new SandeshaException ("A sequence with give data has not been created");
+		
+		if (seqIDBeans.size()>1) 
+			throw new SandeshaException ("Sequence data is not unique. Cant generate \
report"); +		
+		SequencePropertyBean seqIDBean = (SequencePropertyBean) \
seqIDBeans.iterator().next(); +		String sequenceID = seqIDBean.getSequenceID();
+
+		SequencePropertyBean ackedMsgBean = \
seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.NO_OF_MSGS_ACKED);
 +		if (ackedMsgBean==null)
+			return 0; //No acknowledgement has been received yet.
+		
+		long noOfMessagesAcked = Long.parseLong(ackedMsgBean.getValue());
+		
+		return noOfMessagesAcked;
+	}
+	
+	public static boolean isSequenceCompleted (String \
internalSequenceID,ConfigurationContext configurationContext) throws \
SandeshaException { +		StorageManager storageManager = \
SandeshaUtil.getSandeshaStorageManager(configurationContext); +		Transaction \
transaction = storageManager.getTransaction(); +		SequencePropertyBeanMgr \
seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr(); +		
+		SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
+		findSeqIDBean.setValue(internalSequenceID);
+		findSeqIDBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+		Collection seqIDBeans = seqPropBeanMgr.find(findSeqIDBean);
+		
+		if (seqIDBeans.size()==0)
+			throw new SandeshaException ("A sequence with give data has not been created");
+		
+		if (seqIDBeans.size()>1) 
+			throw new SandeshaException ("Sequence data is not unique. Cant generate \
report"); +		
+		SequencePropertyBean seqIDBean = (SequencePropertyBean) \
seqIDBeans.iterator().next(); +		String sequenceID = seqIDBean.getSequenceID();
+		
+		SequencePropertyBean terminateAddedBean = \
seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
 +		if (terminateAddedBean==null)
+			return false;
+		
+		if ("true".equals(terminateAddedBean.getValue()))
+			return true;
+
+		return false;
+	}
+	
 	
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic