[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r410123 - in
From:       chirino () apache ! org
Date:       2006-05-29 17:27:00
Message-ID: 20060529172701.6D4BD1A983A () eris ! apache ! org
[Download RAW message or body]

Author: chirino
Date: Mon May 29 10:27:00 2006
New Revision: 410123

URL: http://svn.apache.org/viewvc?rev=410123&view=rev
Log:
Fix for: http://issues.apache.org/activemq/browse/AMQ-726

Modified:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
  incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java


Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
                
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-co \
re/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=410123&r1=410122&r2=410123&view=diff
 ==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java \
                (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java \
Mon May 29 10:27:00 2006 @@ -101,6 +101,7 @@
     protected boolean decreaseNetworkConsumerPriority;
     protected boolean shutDown;
     protected int networkTTL = 1;
+    protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
 
     
     public DemandForwardingBridgeSupport(final Transport localBroker, final \
Transport remoteBroker) { @@ -130,28 +131,49 @@
     
             public synchronized void transportInterupted(){
                 //clear any subscriptions - to try and prevent the bridge from \
                stalling the broker
-                log.warn("Outbound transport to " + remoteBrokerName +  " \
                interrupted ...");
-                clearDownSubscriptions();
-                doStopLocal();
-                startedLatch = new CountDownLatch(2);
-                try{
-                    triggerLocalStartBridge();
-                }catch(IOException e){
-                    log.warn("Caught exception from local start",e);
+                if( remoteInterupted.compareAndSet(false, true) ) {
+                    log.warn("Outbound transport to " + remoteBrokerName +  " \
interrupted ..."); +                    clearDownSubscriptions();
+                    try{
+                        \
localBroker.oneway(remoteConnectionInfo.createRemoveCommand()); +                    \
}catch(IOException e){ +                        log.warn("Caught exception from local \
start",e); +                    }
+                    localBridgeStarted.set(false);
+                    remoteBridgeStarted.set(false);
+                    startedLatch = new CountDownLatch(2);
                 }
                 
             }
     
             public synchronized void transportResumed(){
-                //restart and static subscriptions - the consumer advisories will be \
                replayed
-                log.info("Outbound transport to " + remoteBrokerName + " resumed");
-                setupStaticDestinations();
-                startedLatch.countDown();
+                
+                if( remoteInterupted.compareAndSet(true, false) ) {
+                    
+                    //restart and static subscriptions - the consumer advisories \
will be replayed +                    log.info("Outbound transport to " + \
remoteBrokerName + " resumed"); +                    
+//                    try{
+//                        triggerLocalStartBridge();
+//                    }catch(IOException e){
+//                        log.warn("Caught exception from local start",e);
+//                    }
+    
+                    try{
+                        // clear out the previous connection as it may have missed \
some consumer advisories. +                        \
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); +                    \
triggerRemoteStartBridge(); +                    }catch(IOException e){
+                        log.warn("Caught exception from remote start",e);
+                    }
+                    
+                }
                 
             }
         });
         localBroker.start();
         remoteBroker.start();
+//        triggerLocalStartBridge();
         triggerRemoteStartBridge();
     }
 
@@ -160,7 +182,7 @@
             public void run(){
                 try{
                     startLocalBridge();
-                }catch(IOException e){
+                }catch(Exception e){
                     log.error("Failed to start network bridge: "+e,e);
                 }
             }
@@ -173,7 +195,7 @@
             public void run(){
                 try{
                     startRemoteBridge();
-                }catch(IOException e){
+                }catch(Exception e){
                     log.error("Failed to start network bridge: "+e,e);
                 }
             }
@@ -181,8 +203,9 @@
         thead.start();
     }
 
-    protected void startLocalBridge() throws IOException {
+    protected void startLocalBridge() throws Exception {
         if(localBridgeStarted.compareAndSet(false,true)){
+            
             localConnectionInfo=new ConnectionInfo();
             localConnectionInfo.setConnectionId(new \
ConnectionId(idGenerator.generateId()));  \
localClientId="NC_"+remoteBrokerName+"_inbound"+name; @@ -201,7 +224,7 @@
         }
     }
 
-    protected void startRemoteBridge() throws IOException {
+    protected void startRemoteBridge() throws Exception {
         if(remoteBridgeStarted.compareAndSet(false,true)){
 
             remoteConnectionInfo=new ConnectionInfo();
@@ -229,7 +252,7 @@
                             +destinationFilter));
             demandConsumerInfo.setPrefetchSize(prefetchSize);
             remoteBroker.oneway(demandConsumerInfo);
-            //we want infomation about Destinations as well
+            //we want information about Destinations as well
             ConsumerInfo destinationInfo  = new ConsumerInfo(remoteSessionInfo,2);
             destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
  destinationInfo.setPrefetchSize(prefetchSize);
@@ -290,6 +313,7 @@
         }finally{
             ServiceStopper ss=new ServiceStopper();
             ss.stop(localBroker);
+            localBridgeStarted.set(false);
         }
     }
 
@@ -489,9 +513,13 @@
                     serviceLocalBrokerInfo(command);
                 }else if(command.isShutdownInfo()){
                     log.info(localBrokerName+" Shutting down");
-                    shutDown = true;
-                    doStop();
-                   
+                    // Don't shut down the whole connector if the remote side was \
interrupted. +                    // the local transport is just shutting down \
temporarily until the remote side +                    // is restored.
+                    if( !remoteInterupted.get() ) { 
+                        shutDown = true;
+                        doStop();
+                    }
                     
                 }else{
                     switch(command.getDataStructureType()){

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
                
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-co \
re/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=410123&r1=410122&r2=410123&view=diff
 ==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java \
                (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java \
Mon May 29 10:27:00 2006 @@ -229,8 +229,8 @@
                 ServiceSupport.dispose(connectedTransport);
                 connectedTransport = null;
                 connectedTransportURI = null;
-                reconnectTask.wakeup();
             }
+            reconnectTask.wakeup();
         }
     }
 


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic