[prev in list] [next in list] [prev in thread] [next in thread]
List: sandesha-dev
Subject: svn commit: r359586 - in
From: chamikara () apache ! org
Date: 2005-12-28 18:41:08
Message-ID: 20051228184109.58642.qmail () minotaur ! apache ! org
[Download RAW message or body]
Author: chamikara
Date: Wed Dec 28 10:40:22 2005
New Revision: 359586
URL: http://svn.apache.org/viewcvs?rev=359586&view=rev
Log:
Added to functionality get an Report explainint the status of a sequence. Currently \
this has two functions. 1. To know weather a sequence has been completed (i.e. all \
messages have been received and terminate has been sent). 2. To get the number of \
messages that has been acked upto the current time.
Added:
webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java
webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
Added: webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java?rev=359586&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java Wed Dec 28 \
10:40:22 2005 @@ -0,0 +1,45 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sandesha2;
+
+
+/**
+ * @author Chamikara Jayalath <chamikaramj@gmail.com>
+ */
+public class RMReport {
+
+ private long ackedMessageCount = 0;
+ private boolean sequenceCompleted = false;
+
+
+
+ public long getAckedMessageCount() {
+ return ackedMessageCount;
+ }
+
+ public void setAckedMessageCount(long ackedMessageCount) {
+ this.ackedMessageCount = ackedMessageCount;
+ }
+
+ public boolean isSequenceCompleted() {
+ return sequenceCompleted;
+ }
+
+ public void setSequenceCompleted(boolean sequenceCompleted) {
+ this.sequenceCompleted = sequenceCompleted;
+ }
+}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java \
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java Wed \
Dec 28 10:40:22 2005 @@ -16,6 +16,10 @@
package org.apache.sandesha2;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+
/**
* Contains all the Sandesha2Constants of Sandesha2.
* Please see sub-interfaces to see grouped data.
@@ -23,12 +27,22 @@
* @author Chamikara Jayalath <chamikaramj@gmail.com>
*/
-public interface Sandesha2ClientAPI {
+public class Sandesha2ClientAPI {
- String AcksTo = "Sandesha2ClientAPIPropertyAcksTo";
- String LAST_MESSAGE = "Sandesha2ClientAPIPropertyWSRMLastMessage";
- String OFFERED_SEQUENCE_ID = "Sandesha2ClientAPIPropertyOfferedSequenceId";
- String SANDESHA_DEBUG_MODE = "Sandesha2ClientAPIPropertyDebugMode";
- String SEQUENCE_KEY = "Sandesha2ClientAPIPropertySequenceKey";
+ public static String AcksTo = "Sandesha2ClientAPIPropertyAcksTo";
+ public static String LAST_MESSAGE = "Sandesha2ClientAPIPropertyWSRMLastMessage";
+ public static String OFFERED_SEQUENCE_ID = \
"Sandesha2ClientAPIPropertyOfferedSequenceId"; + public static String \
SANDESHA_DEBUG_MODE = "Sandesha2ClientAPIPropertyDebugMode"; + public static String \
SEQUENCE_KEY = "Sandesha2ClientAPIPropertySequenceKey";
+ public static RMReport getRMReport (String to, String \
sequenceKey,ConfigurationContext configurationContext) throws SandeshaException { +
+ String internalSequenceID = SandeshaUtil.getInternalSequenceID (to,sequenceKey);
+
+ RMReport rmReport = new RMReport ();
+ rmReport.setAckedMessageCount(SequenceManager.getAckedMessageCount \
(internalSequenceID,configurationContext)); \
+ rmReport.setSequenceCompleted(SequenceManager.isSequenceCompleted \
(internalSequenceID,configurationContext)); +
+ return rmReport;
+ }
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java \
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Wed \
Dec 28 10:40:22 2005 @@ -180,6 +180,10 @@
String TERMINATE_ADDED = "TerminateAdded";
String LAST_ACTIVATED_TIME = "LastActivatedTime";
+
+ String NO_OF_MSGS_ACKED = "NoOfMessagesAcked";
+
+ String TRANSPORT_TO = "TransportTo";
}
public interface SOAPVersion {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java \
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Wed Dec \
28 10:40:22 2005 @@ -151,7 +151,9 @@
Collection collection = retransmitterBeanMgr.find(internalSequenceId);
Iterator iterator = collection.iterator();
while (iterator.hasNext()) {
- SenderBean retransmitterBean = (SenderBean) iterator.next();
+ Object obj = iterator.next();
+ System.out.println(obj);
+ SenderBean retransmitterBean = (SenderBean) obj;
retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
}
@@ -183,6 +185,20 @@
SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) \
iterator.next(); sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName());
}
+ }
+
+ private boolean isProportyDeletable (String name) {
+ boolean deleatable = true;
+ if (Sandesha2Constants.SequenceProperties.TERMINATE_ADDED.equals(name))
+ deleatable = false;
+
+ if (Sandesha2Constants.SequenceProperties.NO_OF_MSGS_ACKED.equals(name))
+ deleatable = false;
+
+ if (Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID.equals(name))
+ deleatable = false;
+
+ return deleatable;
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java \
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java \
Wed Dec 28 10:40:22 2005 @@ -168,12 +168,12 @@
throw new AxisFault(
"TO End Point Reference is not set correctly. This is a must for the sandesha \
client side.");
- internalSequenceId = toEPR.getAddress();
+ String to = toEPR.getAddress();
OperationContext opContext = msgCtx.getOperationContext();
String sequenceKey = (String) msgCtx
.getProperty(Sandesha2ClientAPI.SEQUENCE_KEY);
- if (sequenceKey != null)
- internalSequenceId = internalSequenceId + sequenceKey;
+
+ internalSequenceId = SandeshaUtil.getInternalSequenceID(to,sequenceKey);
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2 \
/msgprocessors/AcknowledgementProcessor.java?rev=359586&r1=359585&r2=359586&view=diff \
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java \
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java \
Wed Dec 28 10:40:22 2005 @@ -25,6 +25,7 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.AbstractContext;
+import org.apache.axis2.context.MessageContextConstants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
@@ -133,6 +134,28 @@
//TODO - Process Nack
}
+
+ //setting acked message date.
+ //TODO add details specific to each message.
+ long noOfMsgsAcked = \
getNoOfMessagesAcked(sequenceAck.getAcknowledgementRanges().iterator()); \
+ SequencePropertyBean ackedMessagesBean = \
seqPropMgr.retrieve(outSequenceId,Sandesha2Constants.SequenceProperties.NO_OF_MSGS_ACKED);
+ boolean added = false;
+
+ if (ackedMessagesBean==null) {
+ added = true;
+ ackedMessagesBean = new SequencePropertyBean ();
+ ackedMessagesBean.setSequenceID(outSequenceId);
+ ackedMessagesBean.setName(Sandesha2Constants.SequenceProperties.NO_OF_MSGS_ACKED);
+ }
+
+ ackedMessagesBean.setValue(Long.toString(noOfMsgsAcked));
+
+ if (added)
+ seqPropMgr.insert(ackedMessagesBean);
+ else
+ seqPropMgr.update(ackedMessagesBean);
+
+
//following get called in the SandesaInHandler
//if (justSendTerminateIfNeeded) {
//If all messages up to last message have been acknowledged.
@@ -160,6 +183,8 @@
}
}
+
+
//stopping the progress of the message further.
//rmMsgCtx.getMessageContext().pause();
rmMsgCtx.getMessageContext().setPausedTrue(new QName \
(Sandesha2Constants.IN_HANDLER_NAME)); @@ -222,6 +247,11 @@
terminateRMMessage
.setSOAPAction(Sandesha2Constants.WSRM.Actions.SOAP_ACTION_TERMINATE_SEQUENCE);
+ SequencePropertyBean transportToBean = \
seqPropMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+ if (transportToBean!=null) {
+ terminateRMMessage.setProperty(MessageContextConstants.TRANSPORT_URL,transportToBean.getValue());
+ }
+
try {
terminateRMMessage.addSOAPEnvelope();
} catch (AxisFault e) {
@@ -246,15 +276,34 @@
SenderBeanMgr retramsmitterMgr = storageManager
.getRetransmitterBeanMgr();
+
+
+ retramsmitterMgr.insert(terminateBean);
+
SequencePropertyBean terminateAdded = new SequencePropertyBean();
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
terminateAdded.setSequenceID(outSequenceId);
terminateAdded.setValue("true");
seqPropMgr.insert(terminateAdded);
+
+
- retramsmitterMgr.insert(terminateBean);
-
+ }
+
+ private static long getNoOfMessagesAcked (Iterator ackRangeIterator) {
+ long noOfMsgs = 0;
+ while (ackRangeIterator.hasNext()) {
+ AcknowledgementRange acknowledgementRange = (AcknowledgementRange) \
ackRangeIterator.next(); + long lower = acknowledgementRange.getLowerValue();
+ long upper = acknowledgementRange.getUpperValue();
+
+ for (long i=lower;i<=upper;i++) {
+ noOfMsgs++;
+ }
+ }
+
+ return noOfMsgs;
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2 \
/storage/inmemory/InMemorySenderBeanMgr.java?rev=359586&r1=359585&r2=359586&view=diff \
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java \
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java \
Wed Dec 28 10:40:22 2005 @@ -71,7 +71,7 @@
while (iterator.hasNext()) {
SenderBean senderBean = (SenderBean) table.get(iterator.next());
if (internalSequenceID.equals(senderBean.getInternalSequenceID()))
- arrayList.add(internalSequenceID);
+ arrayList.add(senderBean);
}
return arrayList;
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java \
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Wed \
Dec 28 10:40:22 2005 @@ -582,5 +582,16 @@
return retArr;
}
+
+ public static String getInternalSequenceID (String to, String sequenceKey) {
+ if (to==null && sequenceKey==null)
+ return null;
+ else if (to==null)
+ return sequenceKey;
+ else if (sequenceKey==null)
+ return to;
+ else
+ return to + ":" +sequenceKey;
+ }
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java \
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Wed \
Dec 28 10:40:22 2005 @@ -6,11 +6,15 @@
*/
package org.apache.sandesha2.util;
+import java.util.ArrayList;
+import java.util.Collection;
+
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.AbstractContext;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2ClientAPI;
import org.apache.sandesha2.Sandesha2Constants;
@@ -18,8 +22,10 @@
import org.apache.sandesha2.policy.RMPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
import org.apache.sandesha2.storage.beans.NextMsgBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.wsrm.CreateSequence;
@@ -110,7 +116,7 @@
}
public static void setupNewClientSequence(
- MessageContext firstAplicationMsgCtx, String iternalSequenceId)
+ MessageContext firstAplicationMsgCtx, String internalSequenceId)
throws SandeshaException {
AbstractContext context = firstAplicationMsgCtx.getConfigurationContext();
@@ -130,7 +136,7 @@
if (toEPR == null)
throw new SandeshaException("WS-Addressing To is null");
- SequencePropertyBean toBean = new SequencePropertyBean(iternalSequenceId,
+ SequencePropertyBean toBean = new SequencePropertyBean(internalSequenceId,
Sandesha2Constants.SequenceProperties.TO_EPR, toEPR.getAddress());
//Default value for acksTo is anonymous
@@ -139,10 +145,21 @@
EndpointReference acksToEPR = new EndpointReference(acksTo);
SequencePropertyBean acksToBean = new SequencePropertyBean(
- iternalSequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR,
+ internalSequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR,
acksToEPR.getAddress());
seqPropMgr.insert(toBean);
seqPropMgr.insert(acksToBean);
+
+ //saving transportTo value;
+ String transportTo = (String) \
firstAplicationMsgCtx.getProperty(MessageContextConstants.TRANSPORT_URL); + if \
(transportTo!=null) { + SequencePropertyBean transportToBean = new \
SequencePropertyBean (); + transportToBean.setSequenceID(internalSequenceId);
+ transportToBean.setName(Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+ transportToBean.setValue(transportTo);
+
+ seqPropMgr.insert(transportToBean);
+ }
}
@@ -198,6 +215,9 @@
//loading default policies.
policyBean = PropertyManager.getInstance().getRMPolicyBean();
}
+
+ if (policyBean.getInactiveTimeoutInterval()<=0)
+ return false;
boolean sequenceTimedOut = false;
@@ -211,5 +231,63 @@
return sequenceTimedOut;
}
+
+ public static long getAckedMessageCount (String \
internalSequenceID,ConfigurationContext configurationContext) throws \
SandeshaException { + StorageManager storageManager = \
SandeshaUtil.getSandeshaStorageManager(configurationContext); + Transaction \
transaction = storageManager.getTransaction(); + SequencePropertyBeanMgr \
seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr(); +
+ SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
+ findSeqIDBean.setValue(internalSequenceID);
+ findSeqIDBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+ Collection seqIDBeans = seqPropBeanMgr.find(findSeqIDBean);
+
+ if (seqIDBeans.size()==0)
+ throw new SandeshaException ("A sequence with give data has not been created");
+
+ if (seqIDBeans.size()>1)
+ throw new SandeshaException ("Sequence data is not unique. Cant generate \
report"); +
+ SequencePropertyBean seqIDBean = (SequencePropertyBean) \
seqIDBeans.iterator().next(); + String sequenceID = seqIDBean.getSequenceID();
+
+ SequencePropertyBean ackedMsgBean = \
seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.NO_OF_MSGS_ACKED);
+ if (ackedMsgBean==null)
+ return 0; //No acknowledgement has been received yet.
+
+ long noOfMessagesAcked = Long.parseLong(ackedMsgBean.getValue());
+
+ return noOfMessagesAcked;
+ }
+
+ public static boolean isSequenceCompleted (String \
internalSequenceID,ConfigurationContext configurationContext) throws \
SandeshaException { + StorageManager storageManager = \
SandeshaUtil.getSandeshaStorageManager(configurationContext); + Transaction \
transaction = storageManager.getTransaction(); + SequencePropertyBeanMgr \
seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr(); +
+ SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
+ findSeqIDBean.setValue(internalSequenceID);
+ findSeqIDBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+ Collection seqIDBeans = seqPropBeanMgr.find(findSeqIDBean);
+
+ if (seqIDBeans.size()==0)
+ throw new SandeshaException ("A sequence with give data has not been created");
+
+ if (seqIDBeans.size()>1)
+ throw new SandeshaException ("Sequence data is not unique. Cant generate \
report"); +
+ SequencePropertyBean seqIDBean = (SequencePropertyBean) \
seqIDBeans.iterator().next(); + String sequenceID = seqIDBean.getSequenceID();
+
+ SequencePropertyBean terminateAddedBean = \
seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+ if (terminateAddedBean==null)
+ return false;
+
+ if ("true".equals(terminateAddedBean.getValue()))
+ return true;
+
+ return false;
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic