[prev in list] [next in list] [prev in thread] [next in thread] 

List:       mina-commits
Subject:    svn commit: r1053752 - in
From:       ngn () apache ! org
Date:       2010-12-29 21:54:41
Message-ID: 20101229215441.3AF7523889B9 () eris ! apache ! org
[Download RAW message or body]

Author: ngn
Date: Wed Dec 29 21:54:40 2010
New Revision: 1053752

URL: http://svn.apache.org/viewvc?rev=1053752&view=rev
Log:
Handle reconnecting to all registred addresses for an XMPP server, or failing if all \
fail

Modified:
    mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
  mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java
  mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java


Modified: mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
                
URL: http://svn.apache.org/viewvc/mina/vysper/branches/s2s/server/core/src/main/java/o \
rg/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java?rev=1053752&r1=1053751&r2=1053752&view=diff
 ==============================================================================
--- mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java \
                (original)
+++ mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java \
Wed Dec 29 21:54:40 2010 @@ -1,11 +1,14 @@
 package org.apache.vysper.xmpp.server.s2s;
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
+import org.apache.mina.core.future.ConnectFuture;
 import org.apache.mina.core.service.IoConnector;
 import org.apache.mina.core.service.IoHandlerAdapter;
 import org.apache.mina.core.session.IoSession;
@@ -25,6 +28,7 @@ import org.apache.vysper.xmpp.server.Ser
 import org.apache.vysper.xmpp.server.SessionState;
 import org.apache.vysper.xmpp.server.XMPPVersion;
 import org.apache.vysper.xmpp.server.response.ServerResponses;
+import org.apache.vysper.xmpp.server.s2s.XmppEndpointResolver.ResolvedAddress;
 import org.apache.vysper.xmpp.stanza.Stanza;
 import org.apache.vysper.xmpp.stanza.StanzaBuilder;
 import org.apache.vysper.xmpp.writer.StanzaWriter;
@@ -39,8 +43,11 @@ public class XMPPServerConnector impleme
     private MinaBackedSessionContext sessionContext;
     private Entity otherServer;
     private SessionStateHolder sessionStateHolder = new SessionStateHolder();
-    private IoConnector connector = new NioSocketConnector();
+    private IoConnector connector;
     
+    private int connectTimeout = 30000;
+    private int xmppHandshakeTimeout = 30000;
+
     private int pingPeriod = 30000;
     private int pingTimeout = 10000;
     
@@ -48,138 +55,66 @@ public class XMPPServerConnector impleme
     
     private Timer pingTimer = new Timer("pingtimer", true);
     
-    private class PingTask extends TimerTask {
-        public void run() {
-            XmppPingModule pingModule = \
                serverRuntimeContext.getModule(XmppPingModule.class);
-            if(pingModule != null) {
-                pingModule.ping(XMPPServerConnector.this, \
serverRuntimeContext.getServerEnitity(), otherServer, pingTimeout, \
                XMPPServerConnector.this);
-            }
-        }
-    }
-    
     public XMPPServerConnector(Entity otherServer, ServerRuntimeContext \
serverRuntimeContext) {  this.serverRuntimeContext = serverRuntimeContext;
         this.otherServer = otherServer;
-
-        DefaultIoFilterChainBuilder filterChainBuilder = new \
                DefaultIoFilterChainBuilder();
-        filterChainBuilder.addLast("xmppCodec", new ProtocolCodecFilter(new \
                XMPPProtocolCodecFactory()));
-        filterChainBuilder.addLast("loggingFilter", new StanzaLoggingFilter());
-        connector.setFilterChainBuilder(filterChainBuilder);
     }
 
-    public synchronized void start() {
+    public synchronized void start() throws IOException {
         LOG.info("Starting XMPP server connector to {}", otherServer);
+
+        // make this method synchronous
         final CountDownLatch authenticatedLatch = new CountDownLatch(1);
         
-        connector.setHandler(new IoHandlerAdapter() {
-            @Override
-            public void exceptionCaught(IoSession session, Throwable cause) throws \
                Exception {
-                LOG.info("Exception thrown by XMPP server connector to {}, probably \
                a bug in Vysper", otherServer);
-            }
-
-            @Override
-            public void messageReceived(IoSession session, Object message) throws \
                Exception {
-                if(message == SslFilter.SESSION_SECURED) {
-                    // TODO handle unsecure
-                    // connection secured, send stream opener
-                    sessionStateHolder.setState(SessionState.ENCRYPTED);
-                    
-                    LOG.info("XMPP server connector to {} secured using TLS", \
                otherServer);
-                    LOG.debug("XMPP server connector to {} restarting stream", \
                otherServer);
-                    
-                    sessionContext.setIsReopeningXMLStream();
-                    
-                    Stanza opener = new \
ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), \
                otherServer, XMPPVersion.VERSION_1_0, sessionContext);
-                    
-                    sessionContext.write(opener);
-                } else {
-                    Stanza msg = (Stanza) message;
-                    
-                    if(msg.getName().equals("stream")) {
-                        sessionContext.setSessionId(msg.getAttributeValue("id"));
-                    } else if(msg.getName().equals("features")) {
-                        if(startTlsSupported(msg)) {
-                            LOG.info("XMPP server connector to {} is starting TLS", \
                otherServer);
-                            Stanza startTlsStanza = new StanzaBuilder("starttls", \
                NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).build();
-                            
-                            sessionContext.write(startTlsStanza);
-                            
-                        } else if(dialbackSupported(msg)) {
-                            Entity originating = \
                serverRuntimeContext.getServerEnitity();
-    
-                            String dailbackId = new \
DailbackIdGenerator().generate(otherServer, originating, \
                sessionContext.getSessionId());
-                            
-                            Stanza dbResult = new StanzaBuilder("result", \
                NamespaceURIs.JABBER_SERVER_DIALBACK, "db")
-                                .addAttribute("from", originating.getDomain())
-                                .addAttribute("to", otherServer.getDomain())
-                                .addText(dailbackId)
-                                .build();
-                            
-                            sessionContext.write(dbResult);
-                        } else {
-                            throw new RuntimeException("Unsupported features");
-                        }
-                    } else if(msg.getName().equals("result") && \
                NamespaceURIs.JABBER_SERVER_DIALBACK.equals(msg.getNamespaceURI())) {
-                        // TODO check and handle dailback result
-                        sessionStateHolder.setState(SessionState.AUTHENTICATED);
-                        
-                        LOG.info("XMPP server connector to {} authenticated using \
                dialback", otherServer);
-                        authenticatedLatch.countDown();
-                        
-                        // connection established, start pinging
-                        startPinging();
-                    } else if(msg.getName().equals("proceed") && \
                NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS.equals(msg.getNamespaceURI())) \
                {
-                        \
                sessionStateHolder.setState(SessionState.ENCRYPTION_STARTED);
-                        
-                        LOG.debug("XMPP server connector to {} switching to TLS", \
                otherServer);
-                        sessionContext.switchToTLS(false, true);
+        boolean successfullyConnected = false;
+        
+        XmppEndpointResolver resolver = new XmppEndpointResolver();
+        List<ResolvedAddress> addresses = \
resolver.resolveXmppServer(otherServer.getDomain()); +        
+        for(ResolvedAddress address : addresses) {
+            LOG.info("Connecting to XMPP server {} at {}", otherServer, \
address.getAddress()); +            
+            connector = createConnector(authenticatedLatch);
+            ConnectFuture connectFuture = connector.connect(address.getAddress());
+            if(connectFuture.awaitUninterruptibly(connectTimeout) && \
connectFuture.isConnected()) { +                // success on the TCP/IP lever, now \
wait for the XMPP handshake +
+                try {
+                    if(authenticatedLatch.await(xmppHandshakeTimeout, \
TimeUnit.MILLISECONDS)) { +                        // success, break out of connect \
loop +                        successfullyConnected = true;
+                        break;
                     } else {
-                        // TODO other stanzas coming here?
+                        // attempt next
+                        LOG.warn("XMPP handshake with {} at () timed out", \
otherServer, address.getAddress());  }
+                } catch (InterruptedException e) {
+                    throw new IOException("Connection to " + otherServer + " was \
interrupted", e);  }
-            }
-            
-            private boolean startTlsSupported(Stanza stanza) {
-                return !stanza.getInnerElementsNamed("starttls", \
                NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).isEmpty();
-            }
-
-            private boolean dialbackSupported(Stanza stanza) {
-                // TODO check for dialback namespace
-                return !stanza.getInnerElementsNamed("dialback", \
                NamespaceURIs.URN_XMPP_FEATURES_DIALBACK).isEmpty();
-            }
+            } 
 
-            @Override
-            public void sessionClosed(IoSession session) throws Exception {
-                // Socket was closed, make sure we close the connector
-                LOG.info("XMPP server connector socket closed, closing connector");
-                close();
-            }
-
-            @Override
-            public void sessionOpened(IoSession session) throws Exception {
-                sessionContext = new MinaBackedSessionContext(serverRuntimeContext, \
                sessionStateHolder, session);
-                sessionStateHolder.setState(SessionState.INITIATED);
-                Stanza opener = new \
ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), \
                otherServer, XMPPVersion.VERSION_1_0, sessionContext);
-                
-                sessionContext.write(opener);
-            }
-        });
-        
-        XmppEndpointResolver resolver = new XmppEndpointResolver();
-        InetSocketAddress address = \
                resolver.resolveXmppServer(otherServer.getDomain()).get(0).getAddress();
                
-        
-        LOG.debug("Connecting to XMPP server {} at {}", otherServer, address);
-        connector.connect(address);
+            LOG.warn("Failed connecting to XMPP server " + otherServer + " at " + \
address.getAddress(), connectFuture.getException()); +            \
connector.dispose(); +            connector = null;
+        }
         
-        // make this method sync
-        // TODO handle timeout
-        try {
-            authenticatedLatch.await(20000, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            // TODO handle
+        if(!successfullyConnected) {
+            throw new IOException("Failed to connect to XMPP server at " + \
otherServer);  }
     }
     
+    private NioSocketConnector createConnector(CountDownLatch authenticatedLatch) {
+        NioSocketConnector connector = new NioSocketConnector();
+        DefaultIoFilterChainBuilder filterChainBuilder = new \
DefaultIoFilterChainBuilder(); +        filterChainBuilder.addLast("xmppCodec", new \
ProtocolCodecFilter(new XMPPProtocolCodecFactory())); +        \
filterChainBuilder.addLast("loggingFilter", new StanzaLoggingFilter()); +        \
connector.setFilterChainBuilder(filterChainBuilder); +        \
connector.setHandler(new ConnectorIoHandler(authenticatedLatch)); +        return \
connector; +    }
+
+
+    
     private void startPinging() {
         pingTimer.schedule(new PingTask(), pingPeriod, pingPeriod);
     }
@@ -212,4 +147,115 @@ public class XMPPServerConnector impleme
     public boolean isClosed() {
         return closed;
     }
+
+    private final class ConnectorIoHandler extends IoHandlerAdapter {
+        private final CountDownLatch authenticatedLatch;
+
+        private ConnectorIoHandler(CountDownLatch authenticatedLatch) {
+            this.authenticatedLatch = authenticatedLatch;
+        }
+
+        @Override
+        public void exceptionCaught(IoSession session, Throwable cause) throws \
Exception { +            LOG.info("Exception thrown by XMPP server connector to {}, \
probably a bug in Vysper", otherServer); +        }
+
+        @Override
+        public void messageReceived(IoSession session, Object message) throws \
Exception { +            if(message == SslFilter.SESSION_SECURED) {
+                // TODO handle unsecure
+                // connection secured, send stream opener
+                sessionStateHolder.setState(SessionState.ENCRYPTED);
+                
+                LOG.info("XMPP server connector to {} secured using TLS", \
otherServer); +                LOG.debug("XMPP server connector to {} restarting \
stream", otherServer); +                
+                sessionContext.setIsReopeningXMLStream();
+                
+                Stanza opener = new \
ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), \
otherServer, XMPPVersion.VERSION_1_0, sessionContext); +                
+                sessionContext.write(opener);
+            } else {
+                Stanza msg = (Stanza) message;
+                
+                if(msg.getName().equals("stream")) {
+                    sessionContext.setSessionId(msg.getAttributeValue("id"));
+                } else if(msg.getName().equals("features")) {
+                    if(startTlsSupported(msg)) {
+                        LOG.info("XMPP server connector to {} is starting TLS", \
otherServer); +                        Stanza startTlsStanza = new \
StanzaBuilder("starttls", NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).build(); +   \
 +                        sessionContext.write(startTlsStanza);
+                        
+                    } else if(dialbackSupported(msg)) {
+                        Entity originating = \
serverRuntimeContext.getServerEnitity(); +   
+                        String dailbackId = new \
DailbackIdGenerator().generate(otherServer, originating, \
sessionContext.getSessionId()); +                        
+                        Stanza dbResult = new StanzaBuilder("result", \
NamespaceURIs.JABBER_SERVER_DIALBACK, "db") +                            \
.addAttribute("from", originating.getDomain()) +                            \
.addAttribute("to", otherServer.getDomain()) +                            \
.addText(dailbackId) +                            .build();
+                        
+                        sessionContext.write(dbResult);
+                    } else {
+                        throw new RuntimeException("Unsupported features");
+                    }
+                } else if(msg.getName().equals("result") && \
NamespaceURIs.JABBER_SERVER_DIALBACK.equals(msg.getNamespaceURI())) { +               \
// TODO check and handle dailback result +                    \
sessionStateHolder.setState(SessionState.AUTHENTICATED); +                    
+                    LOG.info("XMPP server connector to {} authenticated using \
dialback", otherServer); +                    authenticatedLatch.countDown();
+                    
+                    // connection established, start pinging
+                    startPinging();
+                } else if(msg.getName().equals("proceed") && \
NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS.equals(msg.getNamespaceURI())) { +      \
sessionStateHolder.setState(SessionState.ENCRYPTION_STARTED); +                    
+                    LOG.debug("XMPP server connector to {} switching to TLS", \
otherServer); +                    sessionContext.switchToTLS(false, true);
+                } else {
+                    // TODO other stanzas coming here?
+                }
+            }
+        }
+
+        private boolean startTlsSupported(Stanza stanza) {
+            return !stanza.getInnerElementsNamed("starttls", \
NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).isEmpty(); +        }
+
+        private boolean dialbackSupported(Stanza stanza) {
+            // TODO check for dialback namespace
+            return !stanza.getInnerElementsNamed("dialback", \
NamespaceURIs.URN_XMPP_FEATURES_DIALBACK).isEmpty(); +        }
+
+        @Override
+        public void sessionClosed(IoSession session) throws Exception {
+            // Socket was closed, make sure we close the connector
+            LOG.info("XMPP server connector socket closed, closing connector");
+            close();
+        }
+
+        @Override
+        public void sessionOpened(IoSession session) throws Exception {
+            sessionContext = new MinaBackedSessionContext(serverRuntimeContext, \
sessionStateHolder, session); +            \
sessionStateHolder.setState(SessionState.INITIATED); +            Stanza opener = new \
ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), \
otherServer, XMPPVersion.VERSION_1_0, sessionContext); +            
+            sessionContext.write(opener);
+        }
+    }
+
+    private class PingTask extends TimerTask {
+        public void run() {
+            XmppPingModule pingModule = \
serverRuntimeContext.getModule(XmppPingModule.class); +            if(pingModule != \
null) { +                pingModule.ping(XMPPServerConnector.this, \
serverRuntimeContext.getServerEnitity(), otherServer, pingTimeout, \
XMPPServerConnector.this); +            }
+        }
+    }
+    
+
 }

Modified: mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java
                
URL: http://svn.apache.org/viewvc/mina/vysper/branches/s2s/server/core/src/main/java/o \
rg/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java?rev=1053752&r1=1053751&r2=1053752&view=diff
 ==============================================================================
--- mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java \
                (original)
+++ mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java \
Wed Dec 29 21:54:40 2010 @@ -1,4 +1,5 @@
 package org.apache.vysper.xmpp.server.s2s;
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -14,7 +15,7 @@ public class XMPPServerConnectorRegistry
         this.serverRuntimeContext = serverRuntimeContext;
     }
 
-    public synchronized XMPPServerConnector getConnector(Entity server) {
+    public synchronized XMPPServerConnector getConnector(Entity server) throws \
IOException {  XMPPServerConnector connector = connectors.get(server);
 
         if(connector != null && connector.isClosed()) {

Modified: mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java
                
URL: http://svn.apache.org/viewvc/mina/vysper/branches/s2s/server/core/src/main/java/o \
rg/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java?rev=1053752&r1=1053751&r2=1053752&view=diff
 ==============================================================================
--- mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java \
                (original)
+++ mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java \
Wed Dec 29 21:54:40 2010 @@ -39,12 +39,6 @@ public class XmppEndpointResolver {
         }
     }
 
-    public static void main(String[] args) throws Exception {
-        XmppEndpointResolver resolver = new XmppEndpointResolver();
-        System.out.println(resolver.resolveXmppServer("protocol7.com"));
-        
-    }
-    
     public List<ResolvedAddress> resolveXmppServer(String domain) {
         List<ResolvedAddress> addresses = new ArrayList<ResolvedAddress>();
         try {


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic