[prev in list] [next in list] [prev in thread] [next in thread] 

List:       avro-commits
Subject:    svn commit: r1196217 - in /avro/trunk: ./ lang/java/ipc/src/main/java/org/apache/avro/ipc/
From:       cutting () apache ! org
Date:       2011-11-01 19:09:07
Message-ID: 20111101190907.8A5102388993 () eris ! apache ! org
[Download RAW message or body]

Author: cutting
Date: Tue Nov  1 19:09:06 2011
New Revision: 1196217

URL: http://svn.apache.org/viewvc?rev=1196217&view=rev
Log:
AVRO-943. Java: Fix an intermittent deadlock in TestNettyServerWithCallbacks.  \
Contributed by James Baldassari.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java


Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1196217&r1=1196216&r2=1196217&view=diff
 ==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Nov  1 19:09:06 2011
@@ -10,6 +10,9 @@ Avro 1.6.1 (unreleased)
 
   BUG FIXES
 
+    AVRO-943. Java: Fix an intermittent deadlock in
+    TestNettyServerWithCallbacks.  (James Baldassari via cutting)
+
 Avro 1.6.0 (2 November 2011)
 
   INCOMPATIBLE CHANGES

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
                
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1196217&r1=1196216&r2=1196217&view=diff
 ==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java \
                (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java \
Tue Nov  1 19:09:06 2011 @@ -21,12 +21,13 @@ package org.apache.avro.ipc;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.Iterator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,6 +58,13 @@ import org.slf4j.LoggerFactory;
  * A Netty-based {@link Transceiver} implementation.
  */
 public class NettyTransceiver extends Transceiver {
+  /** If not specified, the default connection timeout will be used (60 sec). */
+  public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 60 * 1000L;
+  public static final String NETTY_CONNECT_TIMEOUT_OPTION = 
+      "connectTimeoutMillis";
+  public static final String NETTY_TCP_NODELAY_OPTION = "tcpNoDelay";
+  public static final boolean DEFAULT_TCP_NODELAY_VALUE = true;
+  
   private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class
       .getName());
 
@@ -65,6 +73,7 @@ public class NettyTransceiver extends Tr
     new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>();
   
   private final ChannelFactory channelFactory;
+  private final long connectTimeoutMillis;
   private final ClientBootstrap bootstrap;
   private final InetSocketAddress remoteAddr;
   
@@ -78,33 +87,91 @@ public class NettyTransceiver extends Tr
 
   NettyTransceiver() {
     channelFactory = null;
+    connectTimeoutMillis = 0L;
     bootstrap = null;
     remoteAddr = null;
   }
 
   /**
    * Creates a NettyTransceiver, and attempts to connect to the given address.
+   * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection 
+   * timeout.
    * @param addr the address to connect to.
    * @throws IOException if an error occurs connecting to the given address.
    */
   public NettyTransceiver(InetSocketAddress addr) throws IOException {
-    this(addr, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), 
-        Executors.newCachedThreadPool()));
+    this(addr, DEFAULT_CONNECTION_TIMEOUT_MILLIS);
+  }
+  
+  /**
+   * Creates a NettyTransceiver, and attempts to connect to the given address.
+   * @param addr the address to connect to.
+   * @param connectTimeoutMillis maximum amount of time to wait for connection 
+   * establishment in milliseconds, or null to use 
+   * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS}.
+   * @throws IOException if an error occurs connecting to the given address.
+   */
+  public NettyTransceiver(InetSocketAddress addr, 
+      Long connectTimeoutMillis) throws IOException {
+    this(addr, new NioClientSocketChannelFactory(
+        Executors.newCachedThreadPool(new NettyTransceiverThreadFactory(
+            "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")), 
+        Executors.newCachedThreadPool(new NettyTransceiverThreadFactory(
+            "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))), 
+        connectTimeoutMillis);
   }
 
   /**
    * Creates a NettyTransceiver, and attempts to connect to the given address.
+   * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection 
+   * timeout.
+   * @param addr the address to connect to.
+   * @param channelFactory the factory to use to create a new Netty Channel.
+   * @throws IOException if an error occurs connecting to the given address.
+   */
+  public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) 
+    throws IOException {
+    this(addr, channelFactory, buildDefaultBootstrapOptions(null));
+  }
+  
+  /**
+   * Creates a NettyTransceiver, and attempts to connect to the given address.
+   * @param addr the address to connect to.
+   * @param channelFactory the factory to use to create a new Netty Channel.
+   * @param connectTimeoutMillis maximum amount of time to wait for connection 
+   * establishment in milliseconds, or null to use 
+   * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS}.
+   * @throws IOException if an error occurs connecting to the given address.
+   */
+  public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory, 
+      Long connectTimeoutMillis) throws IOException {
+    this(addr, channelFactory, 
+        buildDefaultBootstrapOptions(connectTimeoutMillis));
+  }
+  
+  /**
+   * Creates a NettyTransceiver, and attempts to connect to the given address.
+   * It is strongly recommended that the {@link #NETTY_CONNECT_TIMEOUT_OPTION} 
+   * option be set to a reasonable timeout value (a Long value in milliseconds) 
+   * to prevent connect/disconnect attempts from hanging indefinitely.  It is 
+   * also recommended that the {@link #NETTY_TCP_NODELAY_OPTION} option be set 
+   * to true to minimize RPC latency.
    * @param addr the address to connect to.
    * @param channelFactory the factory to use to create a new Netty Channel.
+   * @param nettyClientBootstrapOptions map of Netty ClientBootstrap options 
+   * to use.
    * @throws IOException if an error occurs connecting to the given address.
    */
-  public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) \
throws IOException { +  public NettyTransceiver(InetSocketAddress addr, \
ChannelFactory channelFactory,  +      Map<String, Object> \
nettyClientBootstrapOptions) throws IOException {  if (channelFactory == null) {
       throw new NullPointerException("channelFactory is null");
     }
     
     // Set up.
     this.channelFactory = channelFactory;
+    this.connectTimeoutMillis = (Long) 
+        nettyClientBootstrapOptions.get(NETTY_CONNECT_TIMEOUT_OPTION);
     bootstrap = new ClientBootstrap(channelFactory);
     remoteAddr = addr;
 
@@ -120,7 +187,11 @@ public class NettyTransceiver extends Tr
       }
     });
 
-    bootstrap.setOption("tcpNoDelay", true);
+    if (nettyClientBootstrapOptions != null) {
+      LOG.debug("Using Netty bootstrap options: " + 
+          nettyClientBootstrapOptions);
+      bootstrap.setOptions(nettyClientBootstrapOptions);
+    }
 
     // Make a new connection.
     stateLock.readLock().lock();
@@ -132,6 +203,22 @@ public class NettyTransceiver extends Tr
   }
   
   /**
+   * Creates the default options map for the Netty ClientBootstrap.
+   * @param connectTimeoutMillis connection timeout in milliseconds, or null 
+   * if no timeout is desired.
+   * @return the map of Netty bootstrap options.
+   */
+  private static Map<String, Object> buildDefaultBootstrapOptions(
+      Long connectTimeoutMillis) {
+    Map<String, Object> options = new HashMap<String, Object>(2);
+    options.put(NETTY_TCP_NODELAY_OPTION, DEFAULT_TCP_NODELAY_VALUE);
+    options.put(NETTY_CONNECT_TIMEOUT_OPTION, 
+        connectTimeoutMillis == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : 
+          connectTimeoutMillis);
+    return options;
+  }
+  
+  /**
    * Tests whether the given channel is ready for writing.
    * @return true if the channel is open and ready; false otherwise.
    */
@@ -155,15 +242,16 @@ public class NettyTransceiver extends Tr
       stateLock.readLock().unlock();
       stateLock.writeLock().lock();
       try {
-        LOG.info("Connecting to " + remoteAddr);
-        ChannelFuture channelFuture = bootstrap.connect(remoteAddr);
-        channelFuture.awaitUninterruptibly();
-        if (!channelFuture.isSuccess()) {
-          channelFuture.getCause().printStackTrace();
-          throw new IOException("Error connecting to " + remoteAddr, 
-              channelFuture.getCause());
+        if (!isChannelReady(channel)) {
+          LOG.debug("Connecting to " + remoteAddr);
+          ChannelFuture channelFuture = bootstrap.connect(remoteAddr);
+          channelFuture.awaitUninterruptibly(connectTimeoutMillis);
+          if (!channelFuture.isSuccess()) {
+            throw new IOException("Error connecting to " + remoteAddr, 
+                channelFuture.getCause());
+          }
+          channel = channelFuture.getChannel();
         }
-        channel = channelFuture.getChannel();
       } finally {
         // Downgrade to read lock:
         stateLock.readLock().lock();
@@ -177,7 +265,7 @@ public class NettyTransceiver extends Tr
    * Closes the connection to the remote peer if connected.
    */
   private void disconnect() {
-    disconnect(false, false);
+    disconnect(false, false, null);
   }
   
   /**
@@ -185,17 +273,23 @@ public class NettyTransceiver extends Tr
    * @param awaitCompletion if true, will block until the close has completed.
    * @param cancelPendingRequests if true, will drain the requests map and 
    * send an IOException to all Callbacks.
+   * @param cause if non-null and cancelPendingRequests is true, this Throwable 
+   * will be passed to all Callbacks.
    */
-  private void disconnect(boolean awaitCompletion, boolean cancelPendingRequests) {
+  private void disconnect(boolean awaitCompletion, boolean cancelPendingRequests,
+      Throwable cause) {
+    Channel channelToClose = null;
     Map<Integer, Callback<List<ByteBuffer>>> requestsToCancel = null;
     stateLock.writeLock().lock();
     try {
       if (channel != null) {
-        LOG.info("Disconnecting from " + remoteAddr);
-        ChannelFuture closeFuture = channel.close();
-        if (awaitCompletion) {
-          closeFuture.awaitUninterruptibly();
+        if (cause != null) {
+          LOG.debug("Disconnecting from " + remoteAddr, cause);
+        }
+        else {
+          LOG.debug("Disconnecting from " + remoteAddr);
         }
+        channelToClose = channel;
         channel = null;
         remote = null;
         if (cancelPendingRequests) {
@@ -210,11 +304,21 @@ public class NettyTransceiver extends Tr
       stateLock.writeLock().unlock();
     }
     
+    // Cancel any pending requests by sending errors to the callbacks:
     if ((requestsToCancel != null) && !requestsToCancel.isEmpty()) {
-      LOG.warn("Removing " + requestsToCancel.size() + " pending request(s).");
+      LOG.debug("Removing " + requestsToCancel.size() + " pending request(s).");
       for (Callback<List<ByteBuffer>> request : requestsToCancel.values()) {
         request.handleError(
-            new IOException(getClass().getSimpleName() + " closed"));
+            cause != null ? cause : 
+              new IOException(getClass().getSimpleName() + " closed"));
+      }
+    }
+    
+    // Close the channel:
+    if (channelToClose != null) {
+      ChannelFuture closeFuture = channelToClose.close();
+      if (awaitCompletion && (closeFuture != null)) {
+        closeFuture.awaitUninterruptibly(connectTimeoutMillis);
       }
     }
   }
@@ -240,7 +344,7 @@ public class NettyTransceiver extends Tr
   public void close() {
     try {
       // Close the connection:
-      disconnect(true, true);
+      disconnect(true, true, null);
     } finally {
       // Shut down all thread pools to exit.
       channelFactory.releaseExternalResources();
@@ -268,10 +372,10 @@ public class NettyTransceiver extends Tr
       transceive(request, transceiverFuture);
       return transceiverFuture.get();
     } catch (InterruptedException e) {
-      LOG.warn("failed to get the response", e);
+      LOG.debug("failed to get the response", e);
       return null;
     } catch (ExecutionException e) {
-      LOG.warn("failed to get the response", e);
+      LOG.debug("failed to get the response", e);
       return null;
     }
   }
@@ -353,11 +457,11 @@ public class NettyTransceiver extends Tr
     public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
         throws Exception {
       if (e instanceof ChannelStateEvent) {
-        LOG.info(e.toString());
+        LOG.debug(e.toString());
         ChannelStateEvent cse = (ChannelStateEvent)e;
         if ((cse.getState() == ChannelState.OPEN) && \
(Boolean.FALSE.equals(cse.getValue()))) {  // Server closed connection; disconnect \
                client side
-          LOG.info("Remote peer " + remoteAddr + " closed connection.");
+          LOG.debug("Remote peer " + remoteAddr + " closed connection.");
           disconnect();
         }
       }
@@ -387,17 +491,34 @@ public class NettyTransceiver extends Tr
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-      LOG.warn("Unexpected exception from downstream.", e.getCause());
-      e.getChannel().close();
-      // let the blocking waiting exit
-      Iterator<Callback<List<ByteBuffer>>> it = requests.values().iterator();
-      while(it.hasNext()) {
-        it.next().handleError(e.getCause());
-        it.remove();
-      }
-      
+      disconnect(false, true, e.getCause());      
     }
 
   }
 
+  /**
+   * Creates threads with unique names based on a specified name prefix.
+   */
+  private static class NettyTransceiverThreadFactory implements ThreadFactory {
+    private final AtomicInteger threadId = new AtomicInteger(0);
+    private final String prefix;
+    
+    /**
+     * Creates a NettyTransceiverThreadFactory that creates threads with the 
+     * specified name.
+     * @param prefix the name prefix to use for all threads created by this 
+     * ThreadFactory.  A unique ID will be appended to this prefix to form the 
+     * final thread name.
+     */
+    public NettyTransceiverThreadFactory(String prefix) {
+      this.prefix = prefix;
+    }
+    
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread thread = new Thread(r);
+      thread.setName(prefix + " " + threadId.incrementAndGet());
+      return thread;
+    }
+  }
 }

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java
                
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java?rev=1196217&r1=1196216&r2=1196217&view=diff
 ==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java \
                (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java Tue \
Nov  1 19:09:06 2011 @@ -37,7 +37,7 @@ public class TestProtocolNetty extends T
   
   @Override
   public Transceiver createTransceiver() throws Exception{
-    return new NettyTransceiver(new InetSocketAddress(server.getPort()));
+    return new NettyTransceiver(new InetSocketAddress(server.getPort()), 2000L);
   }
   
   @Override

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
                
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java?rev=1196217&r1=1196216&r2=1196217&view=diff
 ==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java \
                (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java \
Tue Nov  1 19:09:06 2011 @@ -35,7 +35,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestNettyServer {
-
+  static final long CONNECT_TIMEOUT_MILLIS = 2000; // 2 sec
   private static Server server;
   private static Transceiver transceiver;
   private static Mail proxy;
@@ -82,7 +82,7 @@ public class TestNettyServer {
     System.out.println("server port : " + serverPort);
 
     transceiver = new NettyTransceiver(new InetSocketAddress(
-        serverPort));
+        serverPort), CONNECT_TIMEOUT_MILLIS);
     proxy = SpecificRequestor.getClient(Mail.class, transceiver);
   }
   

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
                
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/av \
ro/ipc/TestNettyServerWithCallbacks.java?rev=1196217&r1=1196216&r2=1196217&view=diff \
                ==============================================================================
                
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java \
                (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java \
Tue Nov  1 19:09:06 2011 @@ -67,7 +67,7 @@ public class TestNettyServerWithCallback
     System.out.println("server port : " + serverPort);
 
     transceiver = new NettyTransceiver(new InetSocketAddress(
-        serverPort));
+        serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
     simpleClient = SpecificRequestor.getClient(Simple.Callback.class, transceiver);
   }
   
@@ -268,7 +268,7 @@ public class TestNettyServerWithCallback
       System.out.println("server2 port : " + serverPort);
 
       Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(
-          serverPort));
+          serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
       try {
         Simple.Callback simpleClient2 = 
           SpecificRequestor.getClient(Simple.Callback.class, transceiver2);
@@ -337,7 +337,7 @@ public class TestNettyServerWithCallback
 
       CallFuture<Integer> addFuture = new CallFuture<Integer>();
       Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(
-          serverPort));
+          serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
       try {        
         Simple.Callback simpleClient2 = 
           SpecificRequestor.getClient(Simple.Callback.class, transceiver2);


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic