[prev in list] [next in list] [prev in thread] [next in thread]
List: avro-commits
Subject: svn commit: r1196217 - in /avro/trunk: ./ lang/java/ipc/src/main/java/org/apache/avro/ipc/
From: cutting () apache ! org
Date: 2011-11-01 19:09:07
Message-ID: 20111101190907.8A5102388993 () eris ! apache ! org
[Download RAW message or body]
Author: cutting
Date: Tue Nov 1 19:09:06 2011
New Revision: 1196217
URL: http://svn.apache.org/viewvc?rev=1196217&view=rev
Log:
AVRO-943. Java: Fix an intermittent deadlock in TestNettyServerWithCallbacks. \
Contributed by James Baldassari.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1196217&r1=1196216&r2=1196217&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Nov 1 19:09:06 2011
@@ -10,6 +10,9 @@ Avro 1.6.1 (unreleased)
BUG FIXES
+ AVRO-943. Java: Fix an intermittent deadlock in
+ TestNettyServerWithCallbacks. (James Baldassari via cutting)
+
Avro 1.6.0 (2 November 2011)
INCOMPATIBLE CHANGES
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1196217&r1=1196216&r2=1196217&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java \
(original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java \
Tue Nov 1 19:09:06 2011 @@ -21,12 +21,13 @@ package org.apache.avro.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.util.Iterator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicInteger;
@@ -57,6 +58,13 @@ import org.slf4j.LoggerFactory;
* A Netty-based {@link Transceiver} implementation.
*/
public class NettyTransceiver extends Transceiver {
+ /** If not specified, the default connection timeout will be used (60 sec). */
+ public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 60 * 1000L;
+ public static final String NETTY_CONNECT_TIMEOUT_OPTION =
+ "connectTimeoutMillis";
+ public static final String NETTY_TCP_NODELAY_OPTION = "tcpNoDelay";
+ public static final boolean DEFAULT_TCP_NODELAY_VALUE = true;
+
private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class
.getName());
@@ -65,6 +73,7 @@ public class NettyTransceiver extends Tr
new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>();
private final ChannelFactory channelFactory;
+ private final long connectTimeoutMillis;
private final ClientBootstrap bootstrap;
private final InetSocketAddress remoteAddr;
@@ -78,33 +87,91 @@ public class NettyTransceiver extends Tr
NettyTransceiver() {
channelFactory = null;
+ connectTimeoutMillis = 0L;
bootstrap = null;
remoteAddr = null;
}
/**
* Creates a NettyTransceiver, and attempts to connect to the given address.
+ * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection
+ * timeout.
* @param addr the address to connect to.
* @throws IOException if an error occurs connecting to the given address.
*/
public NettyTransceiver(InetSocketAddress addr) throws IOException {
- this(addr, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
+ this(addr, DEFAULT_CONNECTION_TIMEOUT_MILLIS);
+ }
+
+ /**
+ * Creates a NettyTransceiver, and attempts to connect to the given address.
+ * @param addr the address to connect to.
+ * @param connectTimeoutMillis maximum amount of time to wait for connection
+ * establishment in milliseconds, or null to use
+ * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS}.
+ * @throws IOException if an error occurs connecting to the given address.
+ */
+ public NettyTransceiver(InetSocketAddress addr,
+ Long connectTimeoutMillis) throws IOException {
+ this(addr, new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(new NettyTransceiverThreadFactory(
+ "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
+ Executors.newCachedThreadPool(new NettyTransceiverThreadFactory(
+ "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))),
+ connectTimeoutMillis);
}
/**
* Creates a NettyTransceiver, and attempts to connect to the given address.
+ * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection
+ * timeout.
+ * @param addr the address to connect to.
+ * @param channelFactory the factory to use to create a new Netty Channel.
+ * @throws IOException if an error occurs connecting to the given address.
+ */
+ public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory)
+ throws IOException {
+ this(addr, channelFactory, buildDefaultBootstrapOptions(null));
+ }
+
+ /**
+ * Creates a NettyTransceiver, and attempts to connect to the given address.
+ * @param addr the address to connect to.
+ * @param channelFactory the factory to use to create a new Netty Channel.
+ * @param connectTimeoutMillis maximum amount of time to wait for connection
+ * establishment in milliseconds, or null to use
+ * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS}.
+ * @throws IOException if an error occurs connecting to the given address.
+ */
+ public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory,
+ Long connectTimeoutMillis) throws IOException {
+ this(addr, channelFactory,
+ buildDefaultBootstrapOptions(connectTimeoutMillis));
+ }
+
+ /**
+ * Creates a NettyTransceiver, and attempts to connect to the given address.
+ * It is strongly recommended that the {@link #NETTY_CONNECT_TIMEOUT_OPTION}
+ * option be set to a reasonable timeout value (a Long value in milliseconds)
+ * to prevent connect/disconnect attempts from hanging indefinitely. It is
+ * also recommended that the {@link #NETTY_TCP_NODELAY_OPTION} option be set
+ * to true to minimize RPC latency.
* @param addr the address to connect to.
* @param channelFactory the factory to use to create a new Netty Channel.
+ * @param nettyClientBootstrapOptions map of Netty ClientBootstrap options
+ * to use.
* @throws IOException if an error occurs connecting to the given address.
*/
- public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) \
throws IOException { + public NettyTransceiver(InetSocketAddress addr, \
ChannelFactory channelFactory, + Map<String, Object> \
nettyClientBootstrapOptions) throws IOException { if (channelFactory == null) {
throw new NullPointerException("channelFactory is null");
}
// Set up.
this.channelFactory = channelFactory;
+ this.connectTimeoutMillis = (Long)
+ nettyClientBootstrapOptions.get(NETTY_CONNECT_TIMEOUT_OPTION);
bootstrap = new ClientBootstrap(channelFactory);
remoteAddr = addr;
@@ -120,7 +187,11 @@ public class NettyTransceiver extends Tr
}
});
- bootstrap.setOption("tcpNoDelay", true);
+ if (nettyClientBootstrapOptions != null) {
+ LOG.debug("Using Netty bootstrap options: " +
+ nettyClientBootstrapOptions);
+ bootstrap.setOptions(nettyClientBootstrapOptions);
+ }
// Make a new connection.
stateLock.readLock().lock();
@@ -132,6 +203,22 @@ public class NettyTransceiver extends Tr
}
/**
+ * Creates the default options map for the Netty ClientBootstrap.
+ * @param connectTimeoutMillis connection timeout in milliseconds, or null
+ * if no timeout is desired.
+ * @return the map of Netty bootstrap options.
+ */
+ private static Map<String, Object> buildDefaultBootstrapOptions(
+ Long connectTimeoutMillis) {
+ Map<String, Object> options = new HashMap<String, Object>(2);
+ options.put(NETTY_TCP_NODELAY_OPTION, DEFAULT_TCP_NODELAY_VALUE);
+ options.put(NETTY_CONNECT_TIMEOUT_OPTION,
+ connectTimeoutMillis == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS :
+ connectTimeoutMillis);
+ return options;
+ }
+
+ /**
* Tests whether the given channel is ready for writing.
* @return true if the channel is open and ready; false otherwise.
*/
@@ -155,15 +242,16 @@ public class NettyTransceiver extends Tr
stateLock.readLock().unlock();
stateLock.writeLock().lock();
try {
- LOG.info("Connecting to " + remoteAddr);
- ChannelFuture channelFuture = bootstrap.connect(remoteAddr);
- channelFuture.awaitUninterruptibly();
- if (!channelFuture.isSuccess()) {
- channelFuture.getCause().printStackTrace();
- throw new IOException("Error connecting to " + remoteAddr,
- channelFuture.getCause());
+ if (!isChannelReady(channel)) {
+ LOG.debug("Connecting to " + remoteAddr);
+ ChannelFuture channelFuture = bootstrap.connect(remoteAddr);
+ channelFuture.awaitUninterruptibly(connectTimeoutMillis);
+ if (!channelFuture.isSuccess()) {
+ throw new IOException("Error connecting to " + remoteAddr,
+ channelFuture.getCause());
+ }
+ channel = channelFuture.getChannel();
}
- channel = channelFuture.getChannel();
} finally {
// Downgrade to read lock:
stateLock.readLock().lock();
@@ -177,7 +265,7 @@ public class NettyTransceiver extends Tr
* Closes the connection to the remote peer if connected.
*/
private void disconnect() {
- disconnect(false, false);
+ disconnect(false, false, null);
}
/**
@@ -185,17 +273,23 @@ public class NettyTransceiver extends Tr
* @param awaitCompletion if true, will block until the close has completed.
* @param cancelPendingRequests if true, will drain the requests map and
* send an IOException to all Callbacks.
+ * @param cause if non-null and cancelPendingRequests is true, this Throwable
+ * will be passed to all Callbacks.
*/
- private void disconnect(boolean awaitCompletion, boolean cancelPendingRequests) {
+ private void disconnect(boolean awaitCompletion, boolean cancelPendingRequests,
+ Throwable cause) {
+ Channel channelToClose = null;
Map<Integer, Callback<List<ByteBuffer>>> requestsToCancel = null;
stateLock.writeLock().lock();
try {
if (channel != null) {
- LOG.info("Disconnecting from " + remoteAddr);
- ChannelFuture closeFuture = channel.close();
- if (awaitCompletion) {
- closeFuture.awaitUninterruptibly();
+ if (cause != null) {
+ LOG.debug("Disconnecting from " + remoteAddr, cause);
+ }
+ else {
+ LOG.debug("Disconnecting from " + remoteAddr);
}
+ channelToClose = channel;
channel = null;
remote = null;
if (cancelPendingRequests) {
@@ -210,11 +304,21 @@ public class NettyTransceiver extends Tr
stateLock.writeLock().unlock();
}
+ // Cancel any pending requests by sending errors to the callbacks:
if ((requestsToCancel != null) && !requestsToCancel.isEmpty()) {
- LOG.warn("Removing " + requestsToCancel.size() + " pending request(s).");
+ LOG.debug("Removing " + requestsToCancel.size() + " pending request(s).");
for (Callback<List<ByteBuffer>> request : requestsToCancel.values()) {
request.handleError(
- new IOException(getClass().getSimpleName() + " closed"));
+ cause != null ? cause :
+ new IOException(getClass().getSimpleName() + " closed"));
+ }
+ }
+
+ // Close the channel:
+ if (channelToClose != null) {
+ ChannelFuture closeFuture = channelToClose.close();
+ if (awaitCompletion && (closeFuture != null)) {
+ closeFuture.awaitUninterruptibly(connectTimeoutMillis);
}
}
}
@@ -240,7 +344,7 @@ public class NettyTransceiver extends Tr
public void close() {
try {
// Close the connection:
- disconnect(true, true);
+ disconnect(true, true, null);
} finally {
// Shut down all thread pools to exit.
channelFactory.releaseExternalResources();
@@ -268,10 +372,10 @@ public class NettyTransceiver extends Tr
transceive(request, transceiverFuture);
return transceiverFuture.get();
} catch (InterruptedException e) {
- LOG.warn("failed to get the response", e);
+ LOG.debug("failed to get the response", e);
return null;
} catch (ExecutionException e) {
- LOG.warn("failed to get the response", e);
+ LOG.debug("failed to get the response", e);
return null;
}
}
@@ -353,11 +457,11 @@ public class NettyTransceiver extends Tr
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
- LOG.info(e.toString());
+ LOG.debug(e.toString());
ChannelStateEvent cse = (ChannelStateEvent)e;
if ((cse.getState() == ChannelState.OPEN) && \
(Boolean.FALSE.equals(cse.getValue()))) { // Server closed connection; disconnect \
client side
- LOG.info("Remote peer " + remoteAddr + " closed connection.");
+ LOG.debug("Remote peer " + remoteAddr + " closed connection.");
disconnect();
}
}
@@ -387,17 +491,34 @@ public class NettyTransceiver extends Tr
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- LOG.warn("Unexpected exception from downstream.", e.getCause());
- e.getChannel().close();
- // let the blocking waiting exit
- Iterator<Callback<List<ByteBuffer>>> it = requests.values().iterator();
- while(it.hasNext()) {
- it.next().handleError(e.getCause());
- it.remove();
- }
-
+ disconnect(false, true, e.getCause());
}
}
+ /**
+ * Creates threads with unique names based on a specified name prefix.
+ */
+ private static class NettyTransceiverThreadFactory implements ThreadFactory {
+ private final AtomicInteger threadId = new AtomicInteger(0);
+ private final String prefix;
+
+ /**
+ * Creates a NettyTransceiverThreadFactory that creates threads with the
+ * specified name.
+ * @param prefix the name prefix to use for all threads created by this
+ * ThreadFactory. A unique ID will be appended to this prefix to form the
+ * final thread name.
+ */
+ public NettyTransceiverThreadFactory(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setName(prefix + " " + threadId.incrementAndGet());
+ return thread;
+ }
+ }
}
Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java?rev=1196217&r1=1196216&r2=1196217&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java \
(original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java Tue \
Nov 1 19:09:06 2011 @@ -37,7 +37,7 @@ public class TestProtocolNetty extends T
@Override
public Transceiver createTransceiver() throws Exception{
- return new NettyTransceiver(new InetSocketAddress(server.getPort()));
+ return new NettyTransceiver(new InetSocketAddress(server.getPort()), 2000L);
}
@Override
Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java?rev=1196217&r1=1196216&r2=1196217&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java \
(original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java \
Tue Nov 1 19:09:06 2011 @@ -35,7 +35,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
public class TestNettyServer {
-
+ static final long CONNECT_TIMEOUT_MILLIS = 2000; // 2 sec
private static Server server;
private static Transceiver transceiver;
private static Mail proxy;
@@ -82,7 +82,7 @@ public class TestNettyServer {
System.out.println("server port : " + serverPort);
transceiver = new NettyTransceiver(new InetSocketAddress(
- serverPort));
+ serverPort), CONNECT_TIMEOUT_MILLIS);
proxy = SpecificRequestor.getClient(Mail.class, transceiver);
}
Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/av \
ro/ipc/TestNettyServerWithCallbacks.java?rev=1196217&r1=1196216&r2=1196217&view=diff \
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java \
(original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java \
Tue Nov 1 19:09:06 2011 @@ -67,7 +67,7 @@ public class TestNettyServerWithCallback
System.out.println("server port : " + serverPort);
transceiver = new NettyTransceiver(new InetSocketAddress(
- serverPort));
+ serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
simpleClient = SpecificRequestor.getClient(Simple.Callback.class, transceiver);
}
@@ -268,7 +268,7 @@ public class TestNettyServerWithCallback
System.out.println("server2 port : " + serverPort);
Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(
- serverPort));
+ serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
try {
Simple.Callback simpleClient2 =
SpecificRequestor.getClient(Simple.Callback.class, transceiver2);
@@ -337,7 +337,7 @@ public class TestNettyServerWithCallback
CallFuture<Integer> addFuture = new CallFuture<Integer>();
Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(
- serverPort));
+ serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
try {
Simple.Callback simpleClient2 =
SpecificRequestor.getClient(Simple.Callback.class, transceiver2);
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic