[prev in list] [next in list] [prev in thread] [next in thread]
List: mina-commits
Subject: svn commit: r1053752 - in
From: ngn () apache ! org
Date: 2010-12-29 21:54:41
Message-ID: 20101229215441.3AF7523889B9 () eris ! apache ! org
[Download RAW message or body]
Author: ngn
Date: Wed Dec 29 21:54:40 2010
New Revision: 1053752
URL: http://svn.apache.org/viewvc?rev=1053752&view=rev
Log:
Handle reconnecting to all registred addresses for an XMPP server, or failing if all \
fail
Modified:
mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java
mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java
Modified: mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java
URL: http://svn.apache.org/viewvc/mina/vysper/branches/s2s/server/core/src/main/java/o \
rg/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java?rev=1053752&r1=1053751&r2=1053752&view=diff
==============================================================================
--- mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java \
(original)
+++ mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnector.java \
Wed Dec 29 21:54:40 2010 @@ -1,11 +1,14 @@
package org.apache.vysper.xmpp.server.s2s;
+import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
+import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
@@ -25,6 +28,7 @@ import org.apache.vysper.xmpp.server.Ser
import org.apache.vysper.xmpp.server.SessionState;
import org.apache.vysper.xmpp.server.XMPPVersion;
import org.apache.vysper.xmpp.server.response.ServerResponses;
+import org.apache.vysper.xmpp.server.s2s.XmppEndpointResolver.ResolvedAddress;
import org.apache.vysper.xmpp.stanza.Stanza;
import org.apache.vysper.xmpp.stanza.StanzaBuilder;
import org.apache.vysper.xmpp.writer.StanzaWriter;
@@ -39,8 +43,11 @@ public class XMPPServerConnector impleme
private MinaBackedSessionContext sessionContext;
private Entity otherServer;
private SessionStateHolder sessionStateHolder = new SessionStateHolder();
- private IoConnector connector = new NioSocketConnector();
+ private IoConnector connector;
+ private int connectTimeout = 30000;
+ private int xmppHandshakeTimeout = 30000;
+
private int pingPeriod = 30000;
private int pingTimeout = 10000;
@@ -48,138 +55,66 @@ public class XMPPServerConnector impleme
private Timer pingTimer = new Timer("pingtimer", true);
- private class PingTask extends TimerTask {
- public void run() {
- XmppPingModule pingModule = \
serverRuntimeContext.getModule(XmppPingModule.class);
- if(pingModule != null) {
- pingModule.ping(XMPPServerConnector.this, \
serverRuntimeContext.getServerEnitity(), otherServer, pingTimeout, \
XMPPServerConnector.this);
- }
- }
- }
-
public XMPPServerConnector(Entity otherServer, ServerRuntimeContext \
serverRuntimeContext) { this.serverRuntimeContext = serverRuntimeContext;
this.otherServer = otherServer;
-
- DefaultIoFilterChainBuilder filterChainBuilder = new \
DefaultIoFilterChainBuilder();
- filterChainBuilder.addLast("xmppCodec", new ProtocolCodecFilter(new \
XMPPProtocolCodecFactory()));
- filterChainBuilder.addLast("loggingFilter", new StanzaLoggingFilter());
- connector.setFilterChainBuilder(filterChainBuilder);
}
- public synchronized void start() {
+ public synchronized void start() throws IOException {
LOG.info("Starting XMPP server connector to {}", otherServer);
+
+ // make this method synchronous
final CountDownLatch authenticatedLatch = new CountDownLatch(1);
- connector.setHandler(new IoHandlerAdapter() {
- @Override
- public void exceptionCaught(IoSession session, Throwable cause) throws \
Exception {
- LOG.info("Exception thrown by XMPP server connector to {}, probably \
a bug in Vysper", otherServer);
- }
-
- @Override
- public void messageReceived(IoSession session, Object message) throws \
Exception {
- if(message == SslFilter.SESSION_SECURED) {
- // TODO handle unsecure
- // connection secured, send stream opener
- sessionStateHolder.setState(SessionState.ENCRYPTED);
-
- LOG.info("XMPP server connector to {} secured using TLS", \
otherServer);
- LOG.debug("XMPP server connector to {} restarting stream", \
otherServer);
-
- sessionContext.setIsReopeningXMLStream();
-
- Stanza opener = new \
ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), \
otherServer, XMPPVersion.VERSION_1_0, sessionContext);
-
- sessionContext.write(opener);
- } else {
- Stanza msg = (Stanza) message;
-
- if(msg.getName().equals("stream")) {
- sessionContext.setSessionId(msg.getAttributeValue("id"));
- } else if(msg.getName().equals("features")) {
- if(startTlsSupported(msg)) {
- LOG.info("XMPP server connector to {} is starting TLS", \
otherServer);
- Stanza startTlsStanza = new StanzaBuilder("starttls", \
NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).build();
-
- sessionContext.write(startTlsStanza);
-
- } else if(dialbackSupported(msg)) {
- Entity originating = \
serverRuntimeContext.getServerEnitity();
-
- String dailbackId = new \
DailbackIdGenerator().generate(otherServer, originating, \
sessionContext.getSessionId());
-
- Stanza dbResult = new StanzaBuilder("result", \
NamespaceURIs.JABBER_SERVER_DIALBACK, "db")
- .addAttribute("from", originating.getDomain())
- .addAttribute("to", otherServer.getDomain())
- .addText(dailbackId)
- .build();
-
- sessionContext.write(dbResult);
- } else {
- throw new RuntimeException("Unsupported features");
- }
- } else if(msg.getName().equals("result") && \
NamespaceURIs.JABBER_SERVER_DIALBACK.equals(msg.getNamespaceURI())) {
- // TODO check and handle dailback result
- sessionStateHolder.setState(SessionState.AUTHENTICATED);
-
- LOG.info("XMPP server connector to {} authenticated using \
dialback", otherServer);
- authenticatedLatch.countDown();
-
- // connection established, start pinging
- startPinging();
- } else if(msg.getName().equals("proceed") && \
NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS.equals(msg.getNamespaceURI())) \
{
- \
sessionStateHolder.setState(SessionState.ENCRYPTION_STARTED);
-
- LOG.debug("XMPP server connector to {} switching to TLS", \
otherServer);
- sessionContext.switchToTLS(false, true);
+ boolean successfullyConnected = false;
+
+ XmppEndpointResolver resolver = new XmppEndpointResolver();
+ List<ResolvedAddress> addresses = \
resolver.resolveXmppServer(otherServer.getDomain()); +
+ for(ResolvedAddress address : addresses) {
+ LOG.info("Connecting to XMPP server {} at {}", otherServer, \
address.getAddress()); +
+ connector = createConnector(authenticatedLatch);
+ ConnectFuture connectFuture = connector.connect(address.getAddress());
+ if(connectFuture.awaitUninterruptibly(connectTimeout) && \
connectFuture.isConnected()) { + // success on the TCP/IP lever, now \
wait for the XMPP handshake +
+ try {
+ if(authenticatedLatch.await(xmppHandshakeTimeout, \
TimeUnit.MILLISECONDS)) { + // success, break out of connect \
loop + successfullyConnected = true;
+ break;
} else {
- // TODO other stanzas coming here?
+ // attempt next
+ LOG.warn("XMPP handshake with {} at () timed out", \
otherServer, address.getAddress()); }
+ } catch (InterruptedException e) {
+ throw new IOException("Connection to " + otherServer + " was \
interrupted", e); }
- }
-
- private boolean startTlsSupported(Stanza stanza) {
- return !stanza.getInnerElementsNamed("starttls", \
NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).isEmpty();
- }
-
- private boolean dialbackSupported(Stanza stanza) {
- // TODO check for dialback namespace
- return !stanza.getInnerElementsNamed("dialback", \
NamespaceURIs.URN_XMPP_FEATURES_DIALBACK).isEmpty();
- }
+ }
- @Override
- public void sessionClosed(IoSession session) throws Exception {
- // Socket was closed, make sure we close the connector
- LOG.info("XMPP server connector socket closed, closing connector");
- close();
- }
-
- @Override
- public void sessionOpened(IoSession session) throws Exception {
- sessionContext = new MinaBackedSessionContext(serverRuntimeContext, \
sessionStateHolder, session);
- sessionStateHolder.setState(SessionState.INITIATED);
- Stanza opener = new \
ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), \
otherServer, XMPPVersion.VERSION_1_0, sessionContext);
-
- sessionContext.write(opener);
- }
- });
-
- XmppEndpointResolver resolver = new XmppEndpointResolver();
- InetSocketAddress address = \
resolver.resolveXmppServer(otherServer.getDomain()).get(0).getAddress();
-
- LOG.debug("Connecting to XMPP server {} at {}", otherServer, address);
- connector.connect(address);
+ LOG.warn("Failed connecting to XMPP server " + otherServer + " at " + \
address.getAddress(), connectFuture.getException()); + \
connector.dispose(); + connector = null;
+ }
- // make this method sync
- // TODO handle timeout
- try {
- authenticatedLatch.await(20000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- // TODO handle
+ if(!successfullyConnected) {
+ throw new IOException("Failed to connect to XMPP server at " + \
otherServer); }
}
+ private NioSocketConnector createConnector(CountDownLatch authenticatedLatch) {
+ NioSocketConnector connector = new NioSocketConnector();
+ DefaultIoFilterChainBuilder filterChainBuilder = new \
DefaultIoFilterChainBuilder(); + filterChainBuilder.addLast("xmppCodec", new \
ProtocolCodecFilter(new XMPPProtocolCodecFactory())); + \
filterChainBuilder.addLast("loggingFilter", new StanzaLoggingFilter()); + \
connector.setFilterChainBuilder(filterChainBuilder); + \
connector.setHandler(new ConnectorIoHandler(authenticatedLatch)); + return \
connector; + }
+
+
+
private void startPinging() {
pingTimer.schedule(new PingTask(), pingPeriod, pingPeriod);
}
@@ -212,4 +147,115 @@ public class XMPPServerConnector impleme
public boolean isClosed() {
return closed;
}
+
+ private final class ConnectorIoHandler extends IoHandlerAdapter {
+ private final CountDownLatch authenticatedLatch;
+
+ private ConnectorIoHandler(CountDownLatch authenticatedLatch) {
+ this.authenticatedLatch = authenticatedLatch;
+ }
+
+ @Override
+ public void exceptionCaught(IoSession session, Throwable cause) throws \
Exception { + LOG.info("Exception thrown by XMPP server connector to {}, \
probably a bug in Vysper", otherServer); + }
+
+ @Override
+ public void messageReceived(IoSession session, Object message) throws \
Exception { + if(message == SslFilter.SESSION_SECURED) {
+ // TODO handle unsecure
+ // connection secured, send stream opener
+ sessionStateHolder.setState(SessionState.ENCRYPTED);
+
+ LOG.info("XMPP server connector to {} secured using TLS", \
otherServer); + LOG.debug("XMPP server connector to {} restarting \
stream", otherServer); +
+ sessionContext.setIsReopeningXMLStream();
+
+ Stanza opener = new \
ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), \
otherServer, XMPPVersion.VERSION_1_0, sessionContext); +
+ sessionContext.write(opener);
+ } else {
+ Stanza msg = (Stanza) message;
+
+ if(msg.getName().equals("stream")) {
+ sessionContext.setSessionId(msg.getAttributeValue("id"));
+ } else if(msg.getName().equals("features")) {
+ if(startTlsSupported(msg)) {
+ LOG.info("XMPP server connector to {} is starting TLS", \
otherServer); + Stanza startTlsStanza = new \
StanzaBuilder("starttls", NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).build(); + \
+ sessionContext.write(startTlsStanza);
+
+ } else if(dialbackSupported(msg)) {
+ Entity originating = \
serverRuntimeContext.getServerEnitity(); +
+ String dailbackId = new \
DailbackIdGenerator().generate(otherServer, originating, \
sessionContext.getSessionId()); +
+ Stanza dbResult = new StanzaBuilder("result", \
NamespaceURIs.JABBER_SERVER_DIALBACK, "db") + \
.addAttribute("from", originating.getDomain()) + \
.addAttribute("to", otherServer.getDomain()) + \
.addText(dailbackId) + .build();
+
+ sessionContext.write(dbResult);
+ } else {
+ throw new RuntimeException("Unsupported features");
+ }
+ } else if(msg.getName().equals("result") && \
NamespaceURIs.JABBER_SERVER_DIALBACK.equals(msg.getNamespaceURI())) { + \
// TODO check and handle dailback result + \
sessionStateHolder.setState(SessionState.AUTHENTICATED); +
+ LOG.info("XMPP server connector to {} authenticated using \
dialback", otherServer); + authenticatedLatch.countDown();
+
+ // connection established, start pinging
+ startPinging();
+ } else if(msg.getName().equals("proceed") && \
NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS.equals(msg.getNamespaceURI())) { + \
sessionStateHolder.setState(SessionState.ENCRYPTION_STARTED); +
+ LOG.debug("XMPP server connector to {} switching to TLS", \
otherServer); + sessionContext.switchToTLS(false, true);
+ } else {
+ // TODO other stanzas coming here?
+ }
+ }
+ }
+
+ private boolean startTlsSupported(Stanza stanza) {
+ return !stanza.getInnerElementsNamed("starttls", \
NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_TLS).isEmpty(); + }
+
+ private boolean dialbackSupported(Stanza stanza) {
+ // TODO check for dialback namespace
+ return !stanza.getInnerElementsNamed("dialback", \
NamespaceURIs.URN_XMPP_FEATURES_DIALBACK).isEmpty(); + }
+
+ @Override
+ public void sessionClosed(IoSession session) throws Exception {
+ // Socket was closed, make sure we close the connector
+ LOG.info("XMPP server connector socket closed, closing connector");
+ close();
+ }
+
+ @Override
+ public void sessionOpened(IoSession session) throws Exception {
+ sessionContext = new MinaBackedSessionContext(serverRuntimeContext, \
sessionStateHolder, session); + \
sessionStateHolder.setState(SessionState.INITIATED); + Stanza opener = new \
ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), \
otherServer, XMPPVersion.VERSION_1_0, sessionContext); +
+ sessionContext.write(opener);
+ }
+ }
+
+ private class PingTask extends TimerTask {
+ public void run() {
+ XmppPingModule pingModule = \
serverRuntimeContext.getModule(XmppPingModule.class); + if(pingModule != \
null) { + pingModule.ping(XMPPServerConnector.this, \
serverRuntimeContext.getServerEnitity(), otherServer, pingTimeout, \
XMPPServerConnector.this); + }
+ }
+ }
+
+
}
Modified: mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java
URL: http://svn.apache.org/viewvc/mina/vysper/branches/s2s/server/core/src/main/java/o \
rg/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java?rev=1053752&r1=1053751&r2=1053752&view=diff
==============================================================================
--- mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java \
(original)
+++ mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XMPPServerConnectorRegistry.java \
Wed Dec 29 21:54:40 2010 @@ -1,4 +1,5 @@
package org.apache.vysper.xmpp.server.s2s;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -14,7 +15,7 @@ public class XMPPServerConnectorRegistry
this.serverRuntimeContext = serverRuntimeContext;
}
- public synchronized XMPPServerConnector getConnector(Entity server) {
+ public synchronized XMPPServerConnector getConnector(Entity server) throws \
IOException { XMPPServerConnector connector = connectors.get(server);
if(connector != null && connector.isClosed()) {
Modified: mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java
URL: http://svn.apache.org/viewvc/mina/vysper/branches/s2s/server/core/src/main/java/o \
rg/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java?rev=1053752&r1=1053751&r2=1053752&view=diff
==============================================================================
--- mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java \
(original)
+++ mina/vysper/branches/s2s/server/core/src/main/java/org/apache/vysper/xmpp/server/s2s/XmppEndpointResolver.java \
Wed Dec 29 21:54:40 2010 @@ -39,12 +39,6 @@ public class XmppEndpointResolver {
}
}
- public static void main(String[] args) throws Exception {
- XmppEndpointResolver resolver = new XmppEndpointResolver();
- System.out.println(resolver.resolveXmppServer("protocol7.com"));
-
- }
-
public List<ResolvedAddress> resolveXmppServer(String domain) {
List<ResolvedAddress> addresses = new ArrayList<ResolvedAddress>();
try {
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic