[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r410123 - in
From: chirino () apache ! org
Date: 2006-05-29 17:27:00
Message-ID: 20060529172701.6D4BD1A983A () eris ! apache ! org
[Download RAW message or body]
Author: chirino
Date: Mon May 29 10:27:00 2006
New Revision: 410123
URL: http://svn.apache.org/viewvc?rev=410123&view=rev
Log:
Fix for: http://issues.apache.org/activemq/browse/AMQ-726
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-co \
re/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=410123&r1=410122&r2=410123&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java \
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java \
Mon May 29 10:27:00 2006 @@ -101,6 +101,7 @@
protected boolean decreaseNetworkConsumerPriority;
protected boolean shutDown;
protected int networkTTL = 1;
+ protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
public DemandForwardingBridgeSupport(final Transport localBroker, final \
Transport remoteBroker) { @@ -130,28 +131,49 @@
public synchronized void transportInterupted(){
//clear any subscriptions - to try and prevent the bridge from \
stalling the broker
- log.warn("Outbound transport to " + remoteBrokerName + " \
interrupted ...");
- clearDownSubscriptions();
- doStopLocal();
- startedLatch = new CountDownLatch(2);
- try{
- triggerLocalStartBridge();
- }catch(IOException e){
- log.warn("Caught exception from local start",e);
+ if( remoteInterupted.compareAndSet(false, true) ) {
+ log.warn("Outbound transport to " + remoteBrokerName + " \
interrupted ..."); + clearDownSubscriptions();
+ try{
+ \
localBroker.oneway(remoteConnectionInfo.createRemoveCommand()); + \
}catch(IOException e){ + log.warn("Caught exception from local \
start",e); + }
+ localBridgeStarted.set(false);
+ remoteBridgeStarted.set(false);
+ startedLatch = new CountDownLatch(2);
}
}
public synchronized void transportResumed(){
- //restart and static subscriptions - the consumer advisories will be \
replayed
- log.info("Outbound transport to " + remoteBrokerName + " resumed");
- setupStaticDestinations();
- startedLatch.countDown();
+
+ if( remoteInterupted.compareAndSet(true, false) ) {
+
+ //restart and static subscriptions - the consumer advisories \
will be replayed + log.info("Outbound transport to " + \
remoteBrokerName + " resumed"); +
+// try{
+// triggerLocalStartBridge();
+// }catch(IOException e){
+// log.warn("Caught exception from local start",e);
+// }
+
+ try{
+ // clear out the previous connection as it may have missed \
some consumer advisories. + \
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); + \
triggerRemoteStartBridge(); + }catch(IOException e){
+ log.warn("Caught exception from remote start",e);
+ }
+
+ }
}
});
localBroker.start();
remoteBroker.start();
+// triggerLocalStartBridge();
triggerRemoteStartBridge();
}
@@ -160,7 +182,7 @@
public void run(){
try{
startLocalBridge();
- }catch(IOException e){
+ }catch(Exception e){
log.error("Failed to start network bridge: "+e,e);
}
}
@@ -173,7 +195,7 @@
public void run(){
try{
startRemoteBridge();
- }catch(IOException e){
+ }catch(Exception e){
log.error("Failed to start network bridge: "+e,e);
}
}
@@ -181,8 +203,9 @@
thead.start();
}
- protected void startLocalBridge() throws IOException {
+ protected void startLocalBridge() throws Exception {
if(localBridgeStarted.compareAndSet(false,true)){
+
localConnectionInfo=new ConnectionInfo();
localConnectionInfo.setConnectionId(new \
ConnectionId(idGenerator.generateId())); \
localClientId="NC_"+remoteBrokerName+"_inbound"+name; @@ -201,7 +224,7 @@
}
}
- protected void startRemoteBridge() throws IOException {
+ protected void startRemoteBridge() throws Exception {
if(remoteBridgeStarted.compareAndSet(false,true)){
remoteConnectionInfo=new ConnectionInfo();
@@ -229,7 +252,7 @@
+destinationFilter));
demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo);
- //we want infomation about Destinations as well
+ //we want information about Destinations as well
ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2);
destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
destinationInfo.setPrefetchSize(prefetchSize);
@@ -290,6 +313,7 @@
}finally{
ServiceStopper ss=new ServiceStopper();
ss.stop(localBroker);
+ localBridgeStarted.set(false);
}
}
@@ -489,9 +513,13 @@
serviceLocalBrokerInfo(command);
}else if(command.isShutdownInfo()){
log.info(localBrokerName+" Shutting down");
- shutDown = true;
- doStop();
-
+ // Don't shut down the whole connector if the remote side was \
interrupted. + // the local transport is just shutting down \
temporarily until the remote side + // is restored.
+ if( !remoteInterupted.get() ) {
+ shutDown = true;
+ doStop();
+ }
}else{
switch(command.getDataStructureType()){
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-co \
re/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=410123&r1=410122&r2=410123&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java \
(original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java \
Mon May 29 10:27:00 2006 @@ -229,8 +229,8 @@
ServiceSupport.dispose(connectedTransport);
connectedTransport = null;
connectedTransportURI = null;
- reconnectTask.wakeup();
}
+ reconnectTask.wakeup();
}
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic