[prev in list] [next in list] [prev in thread] [next in thread] 

List:       sandesha-dev
Subject:    svn commit: r579708 - in
From:       deepal () apache ! org
Date:       2007-09-26 16:20:27
Message-ID: 20070926162028.A378C1A9832 () eris ! apache ! org
[Download RAW message or body]

Author: deepal
Date: Wed Sep 26 09:20:24 2007
New Revision: 579708

URL: http://svn.apache.org/viewvc?rev=579708&view=rev
Log:
- Fixing issue when we send a CS request and receive non create sequence response , \
                in that case we need to stop sending CS req and need to notify client \
                abt that
- When the timeout happen it never going to notify to the client 

- This commit fix both of the above , Chamikara or someone who expert about the code \
base please validate the patch 

 

Modified:
    webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
  webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
  webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java


Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
                
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/mod \
ules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=579708&r1=579707&r2=579708&view=diff
 ==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java \
                (original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java \
Wed Sep 26 09:20:24 2007 @@ -247,7 +247,6 @@
 				// with the same internal sequenceid
 				// Check that someone hasn't created the bean
 				rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, \
                internalSequenceId);
-				
 				// if first message - setup the sending side sequence - both for the
 				// server and the client sides.
 				if (rmsBean == null) {
@@ -363,7 +362,8 @@
 		}
 
 		// Update the rmsBean
-		storageManager.getRMSBeanMgr().update(rmsBean);
+        rmsBean.setApplicationMessageMessageId(msgContext.getMessageID());
+        storageManager.getRMSBeanMgr().update(rmsBean);
 		
 		if(startPolling) {
 			SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), \
rmsBean);

Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
                
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/mod \
ules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=579708&r1=579707&r2=579708&view=diff
 ==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java \
                (original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java \
Wed Sep 26 09:20:24 2007 @@ -144,7 +144,10 @@
 	 */
 	private boolean avoidAutoTermination = false;
 
-	/**
+    //To store the message id if the outgoing appliction message
+    private String applicationMessageMessageId ;
+
+    /**
 	 * Flags that are used to check if the primitive types on this bean
 	 * have been set. If a primitive type has not been set then it will
 	 * be ignored within the match method.
@@ -512,4 +515,12 @@
 		return match;
 	}
 
+
+    public String getApplicationMessageMessageId() {
+        return applicationMessageMessageId;
+    }
+
+    public void setApplicationMessageMessageId(String applicationMessageMessageId) {
+        this.applicationMessageMessageId = applicationMessageMessageId;
+    }
 }

Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
                
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/mod \
ules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=579708&r1=579707&r2=579708&view=diff
 ==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java \
                (original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java \
Wed Sep 26 09:20:24 2007 @@ -7,6 +7,9 @@
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
+import org.apache.axis2.client.async.Callback;
+import org.apache.axis2.client.async.AxisCallback;
+import org.apache.axis2.util.CallbackReceiver;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
@@ -17,6 +20,7 @@
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.OutOnlyAxisOperation;
 import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.MessageReceiver;
 import org.apache.axis2.engine.Handler.InvocationResponse;
 import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.axis2.transport.TransportUtils;
@@ -45,13 +49,7 @@
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CloseSequence;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.LastMessage;
-import org.apache.sandesha2.wsrm.MessageNumber;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.sandesha2.wsrm.*;
 
 public class SenderWorker extends SandeshaWorker implements Runnable {
 
@@ -82,7 +80,7 @@
 		try {
 			StorageManager storageManager = \
SandeshaUtil.getSandeshaStorageManager(configurationContext, \
configurationContext.getAxisConfiguration());  SenderBeanMgr senderBeanMgr = \
                storageManager.getSenderBeanMgr();
-			
+
 			transaction = storageManager.getTransaction();
 
 			String key = senderBean.getMessageContextRefKey();
@@ -195,8 +193,8 @@
 			}
 			
 			//if the message belong to the Replay Model, it will be send out only if 
-			
-			
+
+
 			boolean continueSending = updateMessage(rmMsgCtx,senderBean,storageManager);
 			//save changes done @ updateMessage -> \
MessageRetransmissionAdjuster.adjustRetransmittion  \
storageManager.getSenderBeanMgr().update(senderBean); @@ -210,7 +208,7 @@
 					transaction.commit();
 					transaction = null;
 				}
-				
+				invokeCallBackObject(storageManager,msgCtx ,"Exit: SenderWorker::run, \
!continueSending");  return;
 			}
 
@@ -236,7 +234,7 @@
 					senderBeanMgr.update(bean2);
 				}
 			}
-			
+
 			// have to commit the transaction before sending. This may
 			// get changed when WS-AT is available.
 			if(transaction != null) {
@@ -335,10 +333,15 @@
 			
 			transaction = null;
 
-			if ((processResponseForFaults || successfullySent) && !msgCtx.isServerSide()) 
-				checkForSyncResponses(msgCtx);
-			
-			if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
+            if ((processResponseForFaults || successfullySent) && \
!msgCtx.isServerSide()) { +                boolean  validCs = \
checkForSyncResponses(msgCtx ); +                if (!validCs) {
+                    invokeCallBackObject(storageManager,msgCtx ,
+                            "Sandesha2 sender thread has not received a valid \
CreateSequnceResponse"); +                }
+            }
+
+            if ((rmMsgCtx.getMessageType() == \
Sandesha2Constants.MessageTypes.TERMINATE_SEQ)  &&
 					 (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmMsgCtx.getRMNamespaceValue()))) \
{  try {
@@ -505,8 +508,13 @@
 			log.debug("Exit: SenderWorker::isAckPiggybackableMsgType, " + piggybackable);
 		return piggybackable;
 	}
-	
-	private void checkForSyncResponses(MessageContext msgCtx) {
+
+    /**
+     * return value will be false if the create sequence fails else it will be true
+     * @param msgCtx
+     * @return
+     */
+    private boolean checkForSyncResponses(MessageContext msgCtx ) {
 		if (log.isDebugEnabled())
 			log.debug("Enter: SenderWorker::checkForSyncResponses, " + \
msgCtx.getEnvelope().getHeader());  
@@ -522,7 +530,7 @@
 			boolean transportInPresent = (msgCtx.getProperty(MessageContext.TRANSPORT_IN) != \
null);  if (!transportInPresent && (responseMessageContext==null || \
                responseMessageContext.getEnvelope()==null)) {
 				if(log.isDebugEnabled()) log.debug("Exit: SenderWorker::checkForSyncResponses, \
                no response present");
-				return;
+				return true;
 			}
 			
 			//to find out weather the response was built by me.
@@ -592,7 +600,7 @@
 					log.error ("Caught exception", e);
 					}
 				
-					return;
+					return true;
 				}
 				
 				//If addressing is disabled we will be adding this message simply as the \
application response of the request message. @@ -631,7 +639,7 @@
 			//if the syncResponseWas not built here and the client was not expecting a sync \
response. We will not try to execute   //here. Doing so will cause a double \
invocation for a async message.   if \
                (msgCtx.getOptions().isUseSeparateListener()==true &&  \
                !syncResponseBuilt) {
-				return;
+				return true;
 			}
 			
 			
@@ -650,13 +658,22 @@
 			}
 
 		} catch (Exception e) {
-			String message = \
                SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
                
-			if (log.isWarnEnabled())
-				log.warn(message, e);
+
+            String message = \
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse); +          \
if (msgCtx != null &&! msgCtx.isServerSide() && +                    \
(Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())
 +                            || \
Sandesha2Constants.SPEC_2007_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())) \
){ +                // We have not received a valid createSequnce reponse for the \
request we send so we need to terminate the seunce here +                return \
false; +            } else {
+                if (log.isWarnEnabled())
+                    log.warn(message, e);
+            }
 		}
 		if (log.isDebugEnabled())
 			log.debug("Exit: SenderWorker::checkForSyncResponses");
-	}
+        return true;
+    }
 	
 	private void recordError (Exception e, RMMsgContext outRMMsg, StorageManager \
storageManager) throws SandeshaStorageException {  // Store the Exception as a \
sequence property to enable the client to lookup the last  @@ -702,5 +719,60 @@
 			}
 		}
 	}
-	
+
+    private void invokeCallBackObject(StorageManager storageManager,
+                                      MessageContext msgCtx,
+                                      String message) throws \
SandeshaStorageException { +        Transaction transaction = null;
+        if (msgCtx.isServerSide()) {
+            return;
+        }
+        try {
+            transaction = storageManager.getTransaction();
+            //terminate message sent using the SandeshaClient. Since the terminate \
message will simply get the +            //InFlow of the reference message get called \
which could be zero sized (OutOnly operations). +
+            // terminate sending side if this is the WSRM 1.0 spec.
+            // If the WSRM versoion is 1.1 termination will happen in the terminate \
sequence response message. +
+            String internalSequenceId = (String) \
msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID); \
+            if (internalSequenceId == null) internalSequenceId = \
senderBean.getInternalSequenceID(); +            if (internalSequenceId != null) {
+                // Create a new Transaction
+                transaction = storageManager.getTransaction();
+                RMSBean bean = \
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId); +  \
TerminateManager.terminateSendingSide(bean, storageManager); +
+                OperationContext opCtx =
+                        \
configurationContext.getOperationContext(bean.getApplicationMessageMessageId()); +    \
if (opCtx != null) { +                    AxisOperation applicationAxisOperation = \
opCtx.getAxisOperation(); +                    if (applicationAxisOperation != null) \
{ +                        MessageReceiver msgReceiver = \
applicationAxisOperation.getMessageReceiver(); +                        if \
((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver)) { +              \
Object callback = ((CallbackReceiver) msgReceiver) +                                  \
.lookupCallback(bean.getApplicationMessageMessageId()); +                            \
if (callback != null) { +                                AxisCallback axisCallback = \
((AxisCallback) callback); +                                axisCallback.onError(new \
Exception(message)); +                                axisCallback.onComplete();
+                            }
+                        }
+                    }
+                }
+                if (transaction != null && transaction.isActive()) \
transaction.commit(); +                transaction = null;
+            }
+
+        } catch (Exception e) {
+            if (log.isWarnEnabled())
+                log.warn(e);
+        } finally {
+            if (transaction != null && transaction.isActive()) {
+                transaction.rollback();
+                transaction = null;
+            }
+        }
+    }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic