[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: git commit: FLUME-2238. Provide option to configure worker threads in NettyAvroRpcClient
From: hshreedharan () apache ! org
Date: 2013-12-10 0:36:44
Message-ID: da696b902c654d389f10fb412297de0c () git ! apache ! org
[Download RAW message or body]
Updated Branches:
refs/heads/flume-1.5 e371d565b -> 209169bb5
FLUME-2238. Provide option to configure worker threads in NettyAvroRpcClient
(Cameron Gandevia via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/209169bb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/209169bb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/209169bb
Branch: refs/heads/flume-1.5
Commit: 209169bb561f65d58a7140d952c34163d31ab825
Parents: e371d56
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Mon Dec 9 16:35:33 2013 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Mon Dec 9 16:35:33 2013 -0800
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 35 ++++----
.../apache/flume/api/NettyAvroRpcClient.java | 86 +++++++++++++++-----
.../api/RpcClientConfigurationConstants.java | 6 ++
.../flume/api/TestNettyAvroRpcClient.java | 79 ++++++++++++++++--
4 files changed, 162 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/209169bb/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst \
b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 8687cb7..0737c44 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1638,25 +1638,26 @@ hostname / port pair. The events are taken from the \
configured Channel in batches of the configured batch size.
Required properties are in **bold**.
-========================== ======= ==============================================
+========================== ===================================================== \
===========================================================================================
Property Name Default Description
-========================== ======= ==============================================
+========================== ===================================================== \
===========================================================================================
**channel** --
-**type** -- The component type name, needs to be ``avro``.
-**hostname** -- The hostname or IP address to bind to.
-**port** -- The port # to listen on.
-batch-size 100 number of event to batch together for send.
-connect-timeout 20000 Amount of time (ms) to allow for the first \
(handshake) request.
-request-timeout 20000 Amount of time (ms) to allow for requests \
after the first.
-reset-connection-interval none Amount of time (s) before the connection to \
the next hop is reset. This will force the Avro Sink to reconnect to the next hop. \
This will allow the sink to connect to hosts behind a hardware load-balancer when \
news hosts are added without having to restart the agent.
-compression-type none This can be "none" or "deflate". The \
compression-type must match the compression-type of matching \
AvroSource
-compression-level 6 The level of compression to compress event. 0 \
= no compression and 1-9 is compression. The higher the number the \
more compression
-ssl false Set to true to enable SSL for this AvroSink. \
When configuring SSL, you can optionally set a "truststore", "truststore-password", \
"truststore-type", and specify whether to "trust-all-certs".
-trust-all-certs false If this is set to true, SSL server \
certificates for remote servers (Avro Sources) will not be checked. This should NOT \
be used in production because it makes it easier for an attacker to execute a \
man-in-the-middle attack and "listen in" on the encrypted connection.
-truststore -- The path to a custom Java truststore file. \
Flume uses the certificate authority information in this file to determine whether \
the remote Avro Source's SSL authentication credentials should be trusted. If not \
specified, the default Java JSSE certificate authority files (typically "jssecacerts" \
or "cacerts" in the Oracle JRE) will be used.
-truststore-password -- The password for the specified truststore.
-truststore-type JKS The type of the Java truststore. This can be \
"JKS" or other supported Java truststore type.
-========================== ======= ==============================================
+**type** -- \
The component type name, needs to be ``avro``. +**hostname** -- \
The hostname or IP address to bind to. +**port** -- \
The port # to listen on. +batch-size 100 \
number of event to batch together for send. +connect-timeout 20000 \
Amount of time (ms) to allow for the first (handshake) request. +request-timeout \
20000 Amount of time (ms) to allow \
for requests after the first. +reset-connection-interval none \
Amount of time (s) before the connection to the next hop is reset. This will force \
the Avro Sink to reconnect to the next hop. This will allow the sink to connect to \
hosts behind a hardware load-balancer when news hosts are added without having to \
restart the agent. +compression-type none \
This can be "none" or "deflate". The compression-type must match the \
compression-type of matching AvroSource +compression-level 6 \
The level of compression to compress event. 0 = no compression and 1-9 is \
compression. The higher the number the more compression +ssl \
false Set to true to enable SSL for \
this AvroSink. When configuring SSL, you can optionally set a "truststore", \
"truststore-password", "truststore-type", and specify whether to "trust-all-certs". \
+trust-all-certs false \
If this is set to true, SSL server certificates for remote servers (Avro Sources) \
will not be checked. This should NOT be used in production because it makes it easier \
for an attacker to execute a man-in-the-middle attack and "listen in" on the \
encrypted connection. +truststore -- \
The path to a custom Java truststore file. Flume uses the certificate authority \
information in this file to determine whether the remote Avro Source's SSL \
authentication credentials should be trusted. If not specified, the default Java JSSE \
certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) \
will be used. +truststore-password -- \
The password for the specified truststore. +truststore-type JKS \
The type of the Java truststore. This can be "JKS" or other supported Java truststore \
type. +maxIoWorkers 2 * the number of available processors in the \
machine The maximum number of I/O worker threads. This is configured on the \
NettyAvroRpcClient NioClientSocketChannelFactory. +========================== \
===================================================== \
===========================================================================================
Example for agent named a1:
http://git-wip-us.apache.org/repos/asf/flume/blob/209169bb/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java \
b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java index \
9aabdd4..a2eb264 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
@@ -55,6 +56,7 @@ import org.apache.avro.ipc.CallFuture;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.commons.lang.StringUtils;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
@@ -99,6 +101,7 @@ implements RpcClient {
.getLogger(NettyAvroRpcClient.class);
private boolean enableDeflateCompression;
private int compressionLevel;
+ private int maxIoWorkers;
/**
* This constructor is intended to be called from {@link RpcClientFactory}.
@@ -128,20 +131,34 @@ implements RpcClient {
try {
+ ExecutorService bossExecutor =
+ Executors.newCachedThreadPool(new TransceiverThreadFactory(
+ "Avro " + NettyTransceiver.class.getSimpleName() + " Boss"));
+ ExecutorService workerExecutor =
+ Executors.newCachedThreadPool(new TransceiverThreadFactory(
+ "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"));
+
if (enableDeflateCompression || enableSsl) {
- socketChannelFactory = new SSLCompressionChannelFactory(
- Executors.newCachedThreadPool(new TransceiverThreadFactory(
- "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
- Executors.newCachedThreadPool(new TransceiverThreadFactory(
- "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")),
- enableDeflateCompression, enableSsl, trustAllCerts, compressionLevel,
- truststore, truststorePassword, truststoreType);
+ if (maxIoWorkers >= 1) {
+ socketChannelFactory = new SSLCompressionChannelFactory(
+ bossExecutor, workerExecutor,
+ enableDeflateCompression, enableSsl, trustAllCerts,
+ compressionLevel, truststore, truststorePassword, truststoreType,
+ maxIoWorkers);
+ } else {
+ socketChannelFactory = new SSLCompressionChannelFactory(
+ bossExecutor, workerExecutor,
+ enableDeflateCompression, enableSsl, trustAllCerts,
+ compressionLevel, truststore, truststorePassword, truststoreType);
+ }
} else {
- socketChannelFactory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(new TransceiverThreadFactory(
- "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
- Executors.newCachedThreadPool(new TransceiverThreadFactory(
- "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")));
+ if (maxIoWorkers >= 1) {
+ socketChannelFactory = new NioClientSocketChannelFactory(
+ bossExecutor, workerExecutor, maxIoWorkers);
+ } else {
+ socketChannelFactory = new NioClientSocketChannelFactory(
+ bossExecutor, workerExecutor);
+ }
}
transceiver = new NettyTransceiver(this.address,
@@ -587,6 +604,23 @@ implements RpcClient {
truststoreType = properties.getProperty(
RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS");
+ String maxIoWorkersStr = properties.getProperty(
+ RpcClientConfigurationConstants.MAX_IO_WORKERS);
+ if (!StringUtils.isEmpty(maxIoWorkersStr)) {
+ try {
+ maxIoWorkers = Integer.parseInt(maxIoWorkersStr);
+ } catch (NumberFormatException ex) {
+ logger.warn ("Invalid maxIOWorkers:" + maxIoWorkersStr + " Using " +
+ "default maxIOWorkers.");
+ maxIoWorkers = -1;
+ }
+ }
+
+ if (maxIoWorkers < 1) {
+ logger.warn("Using default maxIOWorkers");
+ maxIoWorkers = -1;
+ }
+
this.connect();
}
@@ -628,13 +662,13 @@ implements RpcClient {
*/
private static class SSLCompressionChannelFactory extends \
NioClientSocketChannelFactory {
- private boolean enableCompression;
- private int compressionLevel;
- private boolean enableSsl;
- private boolean trustAllCerts;
- private String truststore;
- private String truststorePassword;
- private String truststoreType;
+ private final boolean enableCompression;
+ private final int compressionLevel;
+ private final boolean enableSsl;
+ private final boolean trustAllCerts;
+ private final String truststore;
+ private final String truststorePassword;
+ private final String truststoreType;
public SSLCompressionChannelFactory(Executor bossExecutor, Executor \
workerExecutor,
boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
@@ -650,6 +684,20 @@ implements RpcClient {
this.truststoreType = truststoreType;
}
+ public SSLCompressionChannelFactory(Executor bossExecutor, Executor \
workerExecutor, + boolean enableCompression, boolean enableSsl, boolean \
trustAllCerts, + int compressionLevel, String truststore, String \
truststorePassword, + String truststoreType, int maxIOWorkers) {
+ super(bossExecutor, workerExecutor, maxIOWorkers);
+ this.enableCompression = enableCompression;
+ this.enableSsl = enableSsl;
+ this.compressionLevel = compressionLevel;
+ this.trustAllCerts = trustAllCerts;
+ this.truststore = truststore;
+ this.truststorePassword = truststorePassword;
+ this.truststoreType = truststoreType;
+ }
+
@Override
public SocketChannel newChannel(ChannelPipeline pipeline) {
TrustManager[] managers;
http://git-wip-us.apache.org/repos/asf/flume/blob/209169bb/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java \
b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index 7aa70cb..136c504 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -144,6 +144,12 @@ public final class RpcClientConfigurationConstants {
public static final String CONFIG_TRUSTSTORE_PASSWORD = "truststore-password";
public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type";
+ /**
+ * Configuration constants for the NettyAvroRpcClient
+ * NioClientSocketChannelFactory
+ */
+ public static final String MAX_IO_WORKERS = "maxIoWorkers";
+
private RpcClientConfigurationConstants() {
// disable explicit object creation
}
http://git-wip-us.apache.org/repos/asf/flume/blob/209169bb/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java \
b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java index \
bfb1fa6..cf4f415 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
@@ -18,28 +18,22 @@
*/
package org.apache.flume.api;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-import org.junit.Test;
-
import org.apache.avro.ipc.Server;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
-import org.apache.flume.event.EventBuilder;
-
import org.apache.flume.api.RpcTestUtils.FailedAvroHandler;
import org.apache.flume.api.RpcTestUtils.OKAvroHandler;
import org.apache.flume.api.RpcTestUtils.ThrowingAvroHandler;
import org.apache.flume.api.RpcTestUtils.UnknownAvroHandler;
+import org.apache.flume.event.EventBuilder;
import org.junit.Assert;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -333,4 +327,73 @@ public class TestNettyAvroRpcClient {
RpcTestUtils.handlerBatchAppendTest(new ThrowingAvroHandler());
logger.error("Throwing: I should never have gotten here!");
}
+
+ /**
+ * configure the NettyAvroRpcClient with a non-default
+ * NioClientSocketChannelFactory number of io worker threads
+ *
+ * @throws FlumeException
+ * @throws EventDeliveryException
+ */
+ @Test
+ public void testAppendWithMaxIOWorkers() throws FlumeException, \
EventDeliveryException { + NettyAvroRpcClient client = null;
+ Server server = RpcTestUtils.startServer(new OKAvroHandler());
+ Properties props = new Properties();
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "localhost");
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + \
"localhost", localhost + + ":" + server.getPort());
+ props.setProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS, \
Integer.toString(2)); + try {
+ client = new NettyAvroRpcClient();
+ client.configure(props);
+ for (int i = 0; i < 5; i++) {
+ client.append(EventBuilder.withBody("evt:" + i, Charset.forName("UTF8")));
+ }
+ } finally {
+ RpcTestUtils.stopServer(server);
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+
+ /**
+ * Simple request with compression on the server and client with compression
+ * level 0
+ *
+ * configure the NettyAvroRpcClient with a non-default
+ * NioClientSocketChannelFactory number of io worker threads
+ *
+ * Compression level 0 = no compression
+ *
+ * @throws FlumeException
+ * @throws EventDeliveryException
+ */
+ @Test
+ public void testAppendWithMaxIOWorkersSimpleCompressionLevel0() throws \
FlumeException, + EventDeliveryException {
+ NettyAvroRpcClient client = null;
+ Server server = RpcTestUtils.startServer(new OKAvroHandler(), 0, true);
+ Properties props = new Properties();
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "localhost");
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + \
"localhost", localhost + + ":" + server.getPort());
+ props.setProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS, \
Integer.toString(2)); + \
props.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, \
"deflate"); + props.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL, \
"" + 0); +
+ try {
+ client = new NettyAvroRpcClient();
+ client.configure(props);
+ for (int i = 0; i < 5; i++) {
+ client.append(EventBuilder.withBody("evt:" + i, Charset.forName("UTF8")));
+ }
+ } finally {
+ RpcTestUtils.stopServer(server);
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic