[prev in list] [next in list] [prev in thread] [next in thread]
List: sandesha-dev
Subject: svn commit: r579708 - in
From: deepal () apache ! org
Date: 2007-09-26 16:20:27
Message-ID: 20070926162028.A378C1A9832 () eris ! apache ! org
[Download RAW message or body]
Author: deepal
Date: Wed Sep 26 09:20:24 2007
New Revision: 579708
URL: http://svn.apache.org/viewvc?rev=579708&view=rev
Log:
- Fixing issue when we send a CS request and receive non create sequence response , \
in that case we need to stop sending CS req and need to notify client \
abt that
- When the timeout happen it never going to notify to the client
- This commit fix both of the above , Chamikara or someone who expert about the code \
base please validate the patch
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/mod \
ules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java \
(original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java \
Wed Sep 26 09:20:24 2007 @@ -247,7 +247,6 @@
// with the same internal sequenceid
// Check that someone hasn't created the bean
rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, \
internalSequenceId);
-
// if first message - setup the sending side sequence - both for the
// server and the client sides.
if (rmsBean == null) {
@@ -363,7 +362,8 @@
}
// Update the rmsBean
- storageManager.getRMSBeanMgr().update(rmsBean);
+ rmsBean.setApplicationMessageMessageId(msgContext.getMessageID());
+ storageManager.getRMSBeanMgr().update(rmsBean);
if(startPolling) {
SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), \
rmsBean);
Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/mod \
ules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java \
(original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java \
Wed Sep 26 09:20:24 2007 @@ -144,7 +144,10 @@
*/
private boolean avoidAutoTermination = false;
- /**
+ //To store the message id if the outgoing appliction message
+ private String applicationMessageMessageId ;
+
+ /**
* Flags that are used to check if the primitive types on this bean
* have been set. If a primitive type has not been set then it will
* be ignored within the match method.
@@ -512,4 +515,12 @@
return match;
}
+
+ public String getApplicationMessageMessageId() {
+ return applicationMessageMessageId;
+ }
+
+ public void setApplicationMessageMessageId(String applicationMessageMessageId) {
+ this.applicationMessageMessageId = applicationMessageMessageId;
+ }
}
Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/mod \
ules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java \
(original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java \
Wed Sep 26 09:20:24 2007 @@ -7,6 +7,9 @@
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
+import org.apache.axis2.client.async.Callback;
+import org.apache.axis2.client.async.AxisCallback;
+import org.apache.axis2.util.CallbackReceiver;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
@@ -17,6 +20,7 @@
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.OutOnlyAxisOperation;
import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.MessageReceiver;
import org.apache.axis2.engine.Handler.InvocationResponse;
import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.axis2.transport.TransportUtils;
@@ -45,13 +49,7 @@
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CloseSequence;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.LastMessage;
-import org.apache.sandesha2.wsrm.MessageNumber;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.sandesha2.wsrm.*;
public class SenderWorker extends SandeshaWorker implements Runnable {
@@ -82,7 +80,7 @@
try {
StorageManager storageManager = \
SandeshaUtil.getSandeshaStorageManager(configurationContext, \
configurationContext.getAxisConfiguration()); SenderBeanMgr senderBeanMgr = \
storageManager.getSenderBeanMgr();
-
+
transaction = storageManager.getTransaction();
String key = senderBean.getMessageContextRefKey();
@@ -195,8 +193,8 @@
}
//if the message belong to the Replay Model, it will be send out only if
-
-
+
+
boolean continueSending = updateMessage(rmMsgCtx,senderBean,storageManager);
//save changes done @ updateMessage -> \
MessageRetransmissionAdjuster.adjustRetransmittion \
storageManager.getSenderBeanMgr().update(senderBean); @@ -210,7 +208,7 @@
transaction.commit();
transaction = null;
}
-
+ invokeCallBackObject(storageManager,msgCtx ,"Exit: SenderWorker::run, \
!continueSending"); return;
}
@@ -236,7 +234,7 @@
senderBeanMgr.update(bean2);
}
}
-
+
// have to commit the transaction before sending. This may
// get changed when WS-AT is available.
if(transaction != null) {
@@ -335,10 +333,15 @@
transaction = null;
- if ((processResponseForFaults || successfullySent) && !msgCtx.isServerSide())
- checkForSyncResponses(msgCtx);
-
- if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
+ if ((processResponseForFaults || successfullySent) && \
!msgCtx.isServerSide()) { + boolean validCs = \
checkForSyncResponses(msgCtx ); + if (!validCs) {
+ invokeCallBackObject(storageManager,msgCtx ,
+ "Sandesha2 sender thread has not received a valid \
CreateSequnceResponse"); + }
+ }
+
+ if ((rmMsgCtx.getMessageType() == \
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) &&
(Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmMsgCtx.getRMNamespaceValue()))) \
{ try {
@@ -505,8 +508,13 @@
log.debug("Exit: SenderWorker::isAckPiggybackableMsgType, " + piggybackable);
return piggybackable;
}
-
- private void checkForSyncResponses(MessageContext msgCtx) {
+
+ /**
+ * return value will be false if the create sequence fails else it will be true
+ * @param msgCtx
+ * @return
+ */
+ private boolean checkForSyncResponses(MessageContext msgCtx ) {
if (log.isDebugEnabled())
log.debug("Enter: SenderWorker::checkForSyncResponses, " + \
msgCtx.getEnvelope().getHeader());
@@ -522,7 +530,7 @@
boolean transportInPresent = (msgCtx.getProperty(MessageContext.TRANSPORT_IN) != \
null); if (!transportInPresent && (responseMessageContext==null || \
responseMessageContext.getEnvelope()==null)) {
if(log.isDebugEnabled()) log.debug("Exit: SenderWorker::checkForSyncResponses, \
no response present");
- return;
+ return true;
}
//to find out weather the response was built by me.
@@ -592,7 +600,7 @@
log.error ("Caught exception", e);
}
- return;
+ return true;
}
//If addressing is disabled we will be adding this message simply as the \
application response of the request message. @@ -631,7 +639,7 @@
//if the syncResponseWas not built here and the client was not expecting a sync \
response. We will not try to execute //here. Doing so will cause a double \
invocation for a async message. if \
(msgCtx.getOptions().isUseSeparateListener()==true && \
!syncResponseBuilt) {
- return;
+ return true;
}
@@ -650,13 +658,22 @@
}
} catch (Exception e) {
- String message = \
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
- if (log.isWarnEnabled())
- log.warn(message, e);
+
+ String message = \
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse); + \
if (msgCtx != null &&! msgCtx.isServerSide() && + \
(Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())
+ || \
Sandesha2Constants.SPEC_2007_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())) \
){ + // We have not received a valid createSequnce reponse for the \
request we send so we need to terminate the seunce here + return \
false; + } else {
+ if (log.isWarnEnabled())
+ log.warn(message, e);
+ }
}
if (log.isDebugEnabled())
log.debug("Exit: SenderWorker::checkForSyncResponses");
- }
+ return true;
+ }
private void recordError (Exception e, RMMsgContext outRMMsg, StorageManager \
storageManager) throws SandeshaStorageException { // Store the Exception as a \
sequence property to enable the client to lookup the last @@ -702,5 +719,60 @@
}
}
}
-
+
+ private void invokeCallBackObject(StorageManager storageManager,
+ MessageContext msgCtx,
+ String message) throws \
SandeshaStorageException { + Transaction transaction = null;
+ if (msgCtx.isServerSide()) {
+ return;
+ }
+ try {
+ transaction = storageManager.getTransaction();
+ //terminate message sent using the SandeshaClient. Since the terminate \
message will simply get the + //InFlow of the reference message get called \
which could be zero sized (OutOnly operations). +
+ // terminate sending side if this is the WSRM 1.0 spec.
+ // If the WSRM versoion is 1.1 termination will happen in the terminate \
sequence response message. +
+ String internalSequenceId = (String) \
msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID); \
+ if (internalSequenceId == null) internalSequenceId = \
senderBean.getInternalSequenceID(); + if (internalSequenceId != null) {
+ // Create a new Transaction
+ transaction = storageManager.getTransaction();
+ RMSBean bean = \
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId); + \
TerminateManager.terminateSendingSide(bean, storageManager); +
+ OperationContext opCtx =
+ \
configurationContext.getOperationContext(bean.getApplicationMessageMessageId()); + \
if (opCtx != null) { + AxisOperation applicationAxisOperation = \
opCtx.getAxisOperation(); + if (applicationAxisOperation != null) \
{ + MessageReceiver msgReceiver = \
applicationAxisOperation.getMessageReceiver(); + if \
((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver)) { + \
Object callback = ((CallbackReceiver) msgReceiver) + \
.lookupCallback(bean.getApplicationMessageMessageId()); + \
if (callback != null) { + AxisCallback axisCallback = \
((AxisCallback) callback); + axisCallback.onError(new \
Exception(message)); + axisCallback.onComplete();
+ }
+ }
+ }
+ }
+ if (transaction != null && transaction.isActive()) \
transaction.commit(); + transaction = null;
+ }
+
+ } catch (Exception e) {
+ if (log.isWarnEnabled())
+ log.warn(e);
+ } finally {
+ if (transaction != null && transaction.isActive()) {
+ transaction.rollback();
+ transaction = null;
+ }
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic