[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r640641 - in
From:       chirino () apache ! org
Date:       2008-03-24 23:19:54
Message-ID: 20080324231955.0B07B1A9832 () eris ! apache ! org
[Download RAW message or body]

Author: chirino
Date: Mon Mar 24 16:19:50 2008
New Revision: 640641

URL: http://svn.apache.org/viewvc?rev=640641&view=rev
Log:
Better failover error handling and now we pass on the max initial inactivity timeout \
to the timeout used by the intial wire format negociation.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java


Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/transport/WireFormatNegotiator.java?rev=640641&r1=640640&r2=640641&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java \
Mon Mar 24 16:19:50 2008 @@ -57,6 +57,13 @@
             minimumVersion = 1;
         }
         this.minimumVersion = minimumVersion;
+        
+        // Setup the initial negociation timeout to be the same as the inital max \
inactivity delay specified on the wireformat +        // Does not make sense for us \
to take longer. +        try {
+            setNegotiateTimeout(wireFormat.getPreferedWireFormatInfo().getMaxInactivityDurationInitalDelay());
 +        } catch (IOException e) {
+        }
     }
 
     public void start() throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/transport/failover/FailoverTransport.java?rev=640641&r1=640640&r2=640641&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java \
Mon Mar 24 16:19:50 2008 @@ -94,7 +94,7 @@
     private int backupPoolSize=1;
     private boolean trackMessages = false;
     private int maxCacheSize = 128 * 1024;
-    private TransportListener disposedListener = new DefaultTransportListener();
+    private TransportListener disposedListener = new DefaultTransportListener() {};
     
 
     private final TransportListener myTransportListener = createTransportListener();
@@ -189,42 +189,33 @@
 
     public final void handleTransportFailure(IOException e) throws \
InterruptedException {  
-        Transport transport = connectedTransport.get();
+        Transport transport = connectedTransport.getAndSet(null);
         if( transport!=null ) {
+            
+            transport.setTransportListener(disposedListener);
             ServiceSupport.dispose(transport);
-        }
-        
-        boolean wasConnected=false;            
-        synchronized (reconnectMutex) {
-            boolean reconnectOk = false;
-            if(started) {
-                LOG.warn("Transport failed, attempting to automatically reconnect \
                due to: " + e);
-                LOG.debug("Transport failed with the following exception:", e);
-                reconnectOk = true;
-            }
             
-            if (connectedTransport.get() != null) {
-                wasConnected=true;
+            synchronized (reconnectMutex) {
+                boolean reconnectOk = false;
+                if(started) {
+                    LOG.warn("Transport failed, attempting to automatically \
reconnect due to: " + e); +                    LOG.debug("Transport failed with the \
following exception:", e); +                    reconnectOk = true;
+                }
+                
                 initialized = false;
                 failedConnectTransportURI=connectedTransportURI;
-                Transport old = connectedTransport.get();
-                if(old != null) {
-                    //don't want errors from old transport
-                    old.setTransportListener(disposedListener);
-                }
-                connectedTransport.set(null);
                 connectedTransportURI = null;
                 connected=false;
+                    
+                if(reconnectOk) {
+                    reconnectTask.wakeup();
+                }
+            }
+
+            if (transportListener != null) {
+                transportListener.transportInterupted();
             }
-            	
-            if(reconnectOk) {
-            	reconnectTask.wakeup();
-            }
-        }
-
-        // Avoid double firing a transportInterupted() event due to an extra \
                IOException
-        if (transportListener != null && wasConnected) {
-            transportListener.transportInterupted();
         }
 
     }


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic