[prev in list] [next in list] [prev in thread] [next in thread]
List: sandesha-dev
Subject: svn commit: r479987 - in
From: mlovett () apache ! org
Date: 2006-11-28 11:08:28
Message-ID: 20061128110829.5C8E41A9846 () eris ! apache ! org
[Download RAW message or body]
Author: mlovett
Date: Tue Nov 28 03:08:23 2006
New Revision: 479987
URL: http://svn.apache.org/viewvc?view=rev&rev=479987
Log:
Andy's patch to ensure each message is only stored once, instead of being stored and \
then updated. See SANDESHA2-52.
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sande \
sha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java \
(original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java \
Tue Nov 28 03:08:23 2006 @@ -269,16 +269,15 @@
ackBean.setTimeToSend(timeToSend);
- storageManager.storeMessageContext(key, ackMsgCtx);
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, \
Sandesha2Constants.VALUE_FALSE);
- // inserting the new ack.
- retransmitterBeanMgr.insert(ackBean);
-
// passing the message through sandesha2sender
SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+ // inserting the new ack.
+ retransmitterBeanMgr.insert(ackBean);
+
SandeshaUtil.startSenderForTheSequence(configurationContext, sequenceId);
msgContext.pause();
@@ -386,8 +385,6 @@
SenderBean ackRequestBean = new SenderBean();
ackRequestBean.setMessageContextRefKey(key);
- storageManager.storeMessageContext(key, msgContext);
-
// Set a retransmitter lastSentTime so that terminate will be send with
// some delay.
// Otherwise this get send before return of the current request (ack).
@@ -413,9 +410,9 @@
ackRequestBean.setInternalSequenceID(internalSeqenceID);
ackRequestBean.setSequenceID(outSequenceID);
- retramsmitterMgr.insert(ackRequestBean);
-
SandeshaUtil.executeAndStore(ackRequestRMMsg, key);
+
+ retramsmitterMgr.insert(ackRequestBean);
if (log.isDebugEnabled())
log.debug("Exit: AckRequestedProcessor::processOutgoingAckRequestMessage " + \
Boolean.TRUE);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sande \
sha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java \
(original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java \
Tue Nov 28 03:08:23 2006 @@ -577,12 +577,11 @@
createSeqEntry.setToAddress(to.getAddress());
createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, \
Sandesha2Constants.VALUE_FALSE);
- storageManager.storeMessageContext(createSequenceMessageStoreKey, createSeqMsg); \
// storing the message
+ SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey);
+
retransmitterMgr.insert(createSeqEntry);
- SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey);
-
if (log.isDebugEnabled())
log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage");
}
@@ -752,16 +751,16 @@
appMsgEntry.setToAddress(to.getAddress());
appMsgEntry.setInternalSequenceID(internalSequenceId);
- storageManager.storeMessageContext(storageKey, msg);
msg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, \
Sandesha2Constants.VALUE_FALSE);
- retransmitterMgr.insert(appMsgEntry);
// increasing the current handler index, so that the message will not be
// going throught the SandeshaOutHandler again.
msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
SandeshaUtil.executeAndStore(rmMsg, storageKey);
+
+ retransmitterMgr.insert(appMsgEntry);
if (log.isDebugEnabled())
log.debug("Exit: ApplicationMsgProcessor::processResponseMessage");
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sande \
sha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java \
(original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java \
Tue Nov 28 03:08:23 2006 @@ -249,8 +249,6 @@
SenderBean closeBean = new SenderBean();
closeBean.setMessageContextRefKey(key);
- storageManager.storeMessageContext(key, msgContext);
-
closeBean.setTimeToSend(System.currentTimeMillis());
closeBean.setMessageID(msgContext.getMessageID());
@@ -272,9 +270,9 @@
closeBean.setSequenceID(outSequenceID);
closeBean.setInternalSequenceID(internalSeqenceID);
- retramsmitterMgr.insert(closeBean);
-
SandeshaUtil.executeAndStore(rmMsgCtx, key);
+
+ retramsmitterMgr.insert(closeBean);
if (log.isDebugEnabled())
log.debug("Exit: CloseSeqMsgProcessor::processOutMessage " + Boolean.TRUE);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sande \
sha2/msgprocessors/SequenceProcessor.java?view=diff&rev=479987&r1=479986&r2=479987 \
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java \
(original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java \
Tue Nov 28 03:08:23 2006 @@ -394,12 +394,9 @@
}
ackBean.setTimeToSend(timeToSend);
- storageManager.storeMessageContext(key, ackMsgCtx);
ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, \
Sandesha2Constants.VALUE_FALSE);
- // inserting the new ack.
- retransmitterBeanMgr.insert(ackBean);
// / asyncAckTransaction.commit();
// passing the message through sandesha2sender
@@ -407,6 +404,9 @@
ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+
+ // inserting the new ack.
+ retransmitterBeanMgr.insert(ackBean);
SandeshaUtil.startSenderForTheSequence(ackRMMsgCtx.getConfigurationContext(), \
sequenceId); }
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sande \
sha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java \
(original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java \
Tue Nov 28 03:08:23 2006 @@ -393,8 +393,6 @@
SenderBean terminateBean = new SenderBean();
terminateBean.setMessageContextRefKey(key);
- storageManager.storeMessageContext(key, msgContext);
-
// Set a retransmitter lastSentTime so that terminate will be send with
// some delay.
// Otherwise this get send before return of the current request (ack).
@@ -420,8 +418,6 @@
SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
- retramsmitterMgr.insert(terminateBean);
-
SequencePropertyBean terminateAdded = new SequencePropertyBean();
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
terminateAdded.setSequencePropertyKey(outSequenceID);
@@ -430,7 +426,9 @@
seqPropMgr.insert(terminateAdded);
SandeshaUtil.executeAndStore(rmMsgCtx, key);
-
+
+ retramsmitterMgr.insert(terminateBean);
+
// Pause the message context
rmMsgCtx.pause();
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java \
(original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java \
Tue Nov 28 03:08:23 2006 @@ -127,8 +127,6 @@
makeConnectionRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY,
sequencePropertyKey);
- storageManager.storeMessageContext(makeConnectionMsgStoreKey,makeConnectionRMMessage.getMessageContext());
-
//add an entry for the MakeConnection message to the sender (with ,send=true, \
resend=false) SenderBean makeConnectionSenderBean = new SenderBean ();
// makeConnectionSenderBean.setInternalSequenceID(internalSequenceId);
@@ -147,9 +145,9 @@
//this message should not be sent until it is qualified. I.e. till it is sent \
through the Sandesha2TransportSender.
makeConnectionRMMessage.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, \
Sandesha2Constants.VALUE_FALSE);
- senderBeanMgr.insert(makeConnectionSenderBean);
-
SandeshaUtil.executeAndStore(makeConnectionRMMessage, \
makeConnectionMsgStoreKey); +
+ senderBeanMgr.insert(makeConnectionSenderBean);
} catch (SandeshaStorageException e) {
e.printStackTrace();
} catch (SandeshaException e) {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sande \
sha2/transport/Sandesha2TransportSender.java?view=diff&rev=479987&r1=479986&r2=479987 \
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java \
(original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java \
Tue Nov 28 03:08:23 2006 @@ -73,7 +73,7 @@
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,Sandesha2Constants.VALUE_TRUE);
- storageManager.updateMessageContext(key,msgContext);
+ storageManager.storeMessageContext(key,msgContext);
if (log.isDebugEnabled())
log.debug("Exit: Sandesha2TransportSender::invoke");
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java \
(original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java \
Tue Nov 28 03:08:23 2006 @@ -1070,8 +1070,17 @@
if (msgContext.isPaused())
engine.resumeSend(msgContext);
- else
+ else {
+ //this invocation has to be a blocking one.
+
+ Boolean isTransportNonBlocking = (Boolean) \
msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING); + if \
(isTransportNonBlocking!=null && isTransportNonBlocking.booleanValue()) \
+ msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE); \
+ engine.send(msgContext);
+
+ msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, \
isTransportNonBlocking); + }
if (log.isDebugEnabled())
log.debug("Exit: SandeshaUtil::executeAndStore");
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=479987&r1=479986&r2=479987
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java \
(original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java \
Tue Nov 28 03:08:23 2006 @@ -416,8 +416,6 @@
SenderBean terminateBean = new SenderBean();
terminateBean.setMessageContextRefKey(key);
- storageManager.storeMessageContext(key, terminateRMMessage.getMessageContext());
-
// Set a retransmitter lastSentTime so that terminate will be send with
// some delay.
// Otherwise this get send before return of the current request (ack).
@@ -438,10 +436,6 @@
if (to!=null)
terminateBean.setToAddress(to.getAddress());
- SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
-
- retramsmitterMgr.insert(terminateBean);
-
SequencePropertyBean terminateAdded = new SequencePropertyBean();
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
terminateAdded.setSequencePropertyKey(outSequenceId);
@@ -456,6 +450,9 @@
// / addTerminateSeqTransaction.commit();
SandeshaUtil.executeAndStore(terminateRMMessage, key);
+
+ SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
+ retramsmitterMgr.insert(terminateBean);
if(log.isDebugEnabled())
log.debug("Exit: TerminateManager::addTerminateSequenceMessage");
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic