[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-commits
Subject:    git commit: FLUME-2238. Provide option to configure worker threads in NettyAvroRpcClient
From:       hshreedharan () apache ! org
Date:       2013-12-10 0:36:44
Message-ID: da696b902c654d389f10fb412297de0c () git ! apache ! org
[Download RAW message or body]

Updated Branches:
  refs/heads/flume-1.5 e371d565b -> 209169bb5


FLUME-2238. Provide option to configure worker threads in NettyAvroRpcClient

(Cameron Gandevia via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/209169bb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/209169bb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/209169bb

Branch: refs/heads/flume-1.5
Commit: 209169bb561f65d58a7140d952c34163d31ab825
Parents: e371d56
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Mon Dec 9 16:35:33 2013 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Mon Dec 9 16:35:33 2013 -0800

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 35 ++++----
 .../apache/flume/api/NettyAvroRpcClient.java    | 86 +++++++++++++++-----
 .../api/RpcClientConfigurationConstants.java    |  6 ++
 .../flume/api/TestNettyAvroRpcClient.java       | 79 ++++++++++++++++--
 4 files changed, 162 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/209169bb/flume-ng-doc/sphinx/FlumeUserGuide.rst
                
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst \
b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 8687cb7..0737c44 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1638,25 +1638,26 @@ hostname / port pair. The events are taken from the \
configured Channel in  batches of the configured batch size.
 Required properties are in **bold**.
 
-==========================   =======  ==============================================
+==========================   =====================================================  \
===========================================================================================
  Property Name                Default  Description
-==========================   =======  ==============================================
+==========================   =====================================================  \
===========================================================================================
                
 **channel**                  --
-**type**                     --       The component type name, needs to be ``avro``.
-**hostname**                 --       The hostname or IP address to bind to.
-**port**                     --       The port # to listen on.
-batch-size                   100      number of event to batch together for send.
-connect-timeout              20000    Amount of time (ms) to allow for the first \
                (handshake) request.
-request-timeout              20000    Amount of time (ms) to allow for requests \
                after the first.
-reset-connection-interval    none     Amount of time (s) before the connection to \
the next hop is reset. This will force the Avro Sink to reconnect to the next hop. \
This will allow the sink to connect to hosts behind a hardware load-balancer when \
                news hosts are added without having to restart the agent.
-compression-type             none     This can be "none" or "deflate".  The \
                compression-type must match the compression-type of matching \
                AvroSource
-compression-level            6        The level of compression to compress event. 0 \
                = no compression and 1-9 is compression.  The higher the number the \
                more compression
-ssl                          false    Set to true to enable SSL for this AvroSink. \
When configuring SSL, you can optionally set a "truststore", "truststore-password", \
                "truststore-type", and specify whether to "trust-all-certs".
-trust-all-certs              false    If this is set to true, SSL server \
certificates for remote servers (Avro Sources) will not be checked. This should NOT \
be used in production because it makes it easier for an attacker to execute a \
                man-in-the-middle attack and "listen in" on the encrypted connection.
-truststore                   --       The path to a custom Java truststore file. \
Flume uses the certificate authority information in this file to determine whether \
the remote Avro Source's SSL authentication credentials should be trusted. If not \
specified, the default Java JSSE certificate authority files (typically "jssecacerts" \
                or "cacerts" in the Oracle JRE) will be used.
-truststore-password          --       The password for the specified truststore.
-truststore-type              JKS      The type of the Java truststore. This can be \
                "JKS" or other supported Java truststore type.
-==========================   =======  ==============================================
+**type**                     --                                                     \
The component type name, needs to be ``avro``. +**hostname**                 --       \
The hostname or IP address to bind to. +**port**                     --               \
The port # to listen on. +batch-size                   100                            \
number of event to batch together for send. +connect-timeout              20000       \
Amount of time (ms) to allow for the first (handshake) request. +request-timeout      \
20000                                                  Amount of time (ms) to allow \
for requests after the first. +reset-connection-interval    none                      \
Amount of time (s) before the connection to the next hop is reset. This will force \
the Avro Sink to reconnect to the next hop. This will allow the sink to connect to \
hosts behind a hardware load-balancer when news hosts are added without having to \
restart the agent. +compression-type             none                                 \
This can be "none" or "deflate".  The compression-type must match the \
compression-type of matching AvroSource +compression-level            6               \
The level of compression to compress event. 0 = no compression and 1-9 is \
compression.  The higher the number the more compression +ssl                         \
false                                                  Set to true to enable SSL for \
this AvroSink. When configuring SSL, you can optionally set a "truststore", \
"truststore-password", "truststore-type", and specify whether to "trust-all-certs". \
+trust-all-certs              false                                                  \
If this is set to true, SSL server certificates for remote servers (Avro Sources) \
will not be checked. This should NOT be used in production because it makes it easier \
for an attacker to execute a man-in-the-middle attack and "listen in" on the \
encrypted connection. +truststore                   --                                \
The path to a custom Java truststore file. Flume uses the certificate authority \
information in this file to determine whether the remote Avro Source's SSL \
authentication credentials should be trusted. If not specified, the default Java JSSE \
certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) \
will be used. +truststore-password          --                                        \
The password for the specified truststore. +truststore-type              JKS          \
The type of the Java truststore. This can be "JKS" or other supported Java truststore \
type. +maxIoWorkers                 2 * the number of available processors in the \
machine  The maximum number of I/O worker threads. This is configured on the \
NettyAvroRpcClient NioClientSocketChannelFactory. +==========================   \
=====================================================  \
===========================================================================================
  
 Example for agent named a1:
 

http://git-wip-us.apache.org/repos/asf/flume/blob/209169bb/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java \
b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java index \
                9aabdd4..a2eb264 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManager;
@@ -55,6 +56,7 @@ import org.apache.avro.ipc.CallFuture;
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.Transceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -99,6 +101,7 @@ implements RpcClient {
       .getLogger(NettyAvroRpcClient.class);
   private boolean enableDeflateCompression;
   private int compressionLevel;
+  private int maxIoWorkers;
 
   /**
    * This constructor is intended to be called from {@link RpcClientFactory}.
@@ -128,20 +131,34 @@ implements RpcClient {
 
     try {
 
+      ExecutorService bossExecutor =
+        Executors.newCachedThreadPool(new TransceiverThreadFactory(
+          "Avro " + NettyTransceiver.class.getSimpleName() + " Boss"));
+      ExecutorService workerExecutor =
+        Executors.newCachedThreadPool(new TransceiverThreadFactory(
+          "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"));
+
       if (enableDeflateCompression || enableSsl) {
-        socketChannelFactory = new SSLCompressionChannelFactory(
-            Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
-            Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")),
-            enableDeflateCompression, enableSsl, trustAllCerts, compressionLevel,
-            truststore, truststorePassword, truststoreType);
+        if (maxIoWorkers >= 1) {
+          socketChannelFactory = new SSLCompressionChannelFactory(
+            bossExecutor, workerExecutor,
+            enableDeflateCompression, enableSsl, trustAllCerts,
+            compressionLevel, truststore, truststorePassword, truststoreType,
+            maxIoWorkers);
+        } else {
+          socketChannelFactory = new SSLCompressionChannelFactory(
+            bossExecutor, workerExecutor,
+            enableDeflateCompression, enableSsl, trustAllCerts,
+            compressionLevel, truststore, truststorePassword, truststoreType);
+        }
       } else {
-        socketChannelFactory = new NioClientSocketChannelFactory(
-            Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
-            Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")));
+        if (maxIoWorkers >= 1) {
+          socketChannelFactory = new NioClientSocketChannelFactory(
+              bossExecutor, workerExecutor, maxIoWorkers);
+        } else {
+          socketChannelFactory = new NioClientSocketChannelFactory(
+              bossExecutor, workerExecutor);
+        }
       }
 
       transceiver = new NettyTransceiver(this.address,
@@ -587,6 +604,23 @@ implements RpcClient {
     truststoreType = properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS");
 
+    String maxIoWorkersStr = properties.getProperty(
+      RpcClientConfigurationConstants.MAX_IO_WORKERS);
+    if (!StringUtils.isEmpty(maxIoWorkersStr)) {
+      try {
+        maxIoWorkers = Integer.parseInt(maxIoWorkersStr);
+      } catch (NumberFormatException ex) {
+        logger.warn ("Invalid maxIOWorkers:" + maxIoWorkersStr + " Using " +
+          "default maxIOWorkers.");
+        maxIoWorkers = -1;
+      }
+    }
+
+    if (maxIoWorkers < 1) {
+      logger.warn("Using default maxIOWorkers");
+      maxIoWorkers = -1;
+    }
+
     this.connect();
   }
 
@@ -628,13 +662,13 @@ implements RpcClient {
    */
   private static class SSLCompressionChannelFactory extends \
NioClientSocketChannelFactory {  
-    private boolean enableCompression;
-    private int compressionLevel;
-    private boolean enableSsl;
-    private boolean trustAllCerts;
-    private String truststore;
-    private String truststorePassword;
-    private String truststoreType;
+    private final boolean enableCompression;
+    private final int compressionLevel;
+    private final boolean enableSsl;
+    private final boolean trustAllCerts;
+    private final String truststore;
+    private final String truststorePassword;
+    private final String truststoreType;
 
     public SSLCompressionChannelFactory(Executor bossExecutor, Executor \
                workerExecutor,
         boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
@@ -650,6 +684,20 @@ implements RpcClient {
       this.truststoreType = truststoreType;
     }
 
+    public SSLCompressionChannelFactory(Executor bossExecutor, Executor \
workerExecutor, +        boolean enableCompression, boolean enableSsl, boolean \
trustAllCerts, +        int compressionLevel, String truststore, String \
truststorePassword, +        String truststoreType, int maxIOWorkers) {
+      super(bossExecutor, workerExecutor, maxIOWorkers);
+      this.enableCompression = enableCompression;
+      this.enableSsl = enableSsl;
+      this.compressionLevel = compressionLevel;
+      this.trustAllCerts = trustAllCerts;
+      this.truststore = truststore;
+      this.truststorePassword = truststorePassword;
+      this.truststoreType = truststoreType;
+    }
+
     @Override
     public SocketChannel newChannel(ChannelPipeline pipeline) {
       TrustManager[] managers;

http://git-wip-us.apache.org/repos/asf/flume/blob/209169bb/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java \
b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
 index 7aa70cb..136c504 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
                
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
 @@ -144,6 +144,12 @@ public final class RpcClientConfigurationConstants {
   public static final String CONFIG_TRUSTSTORE_PASSWORD = "truststore-password";
   public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type";
 
+  /**
+   * Configuration constants for the NettyAvroRpcClient
+   * NioClientSocketChannelFactory
+   */
+  public static final String MAX_IO_WORKERS = "maxIoWorkers";
+
   private RpcClientConfigurationConstants() {
     // disable explicit object creation
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/209169bb/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java \
b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java index \
                bfb1fa6..cf4f415 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
@@ -18,28 +18,22 @@
  */
 package org.apache.flume.api;
 
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
-import org.junit.Test;
-
 import org.apache.avro.ipc.Server;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
-import org.apache.flume.event.EventBuilder;
-
 import org.apache.flume.api.RpcTestUtils.FailedAvroHandler;
 import org.apache.flume.api.RpcTestUtils.OKAvroHandler;
 import org.apache.flume.api.RpcTestUtils.ThrowingAvroHandler;
 import org.apache.flume.api.RpcTestUtils.UnknownAvroHandler;
+import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -333,4 +327,73 @@ public class TestNettyAvroRpcClient {
     RpcTestUtils.handlerBatchAppendTest(new ThrowingAvroHandler());
     logger.error("Throwing: I should never have gotten here!");
   }
+
+  /**
+   * configure the NettyAvroRpcClient with a non-default
+   * NioClientSocketChannelFactory number of io worker threads
+   *
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testAppendWithMaxIOWorkers() throws FlumeException, \
EventDeliveryException { +    NettyAvroRpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler());
+    Properties props = new Properties();
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "localhost");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + \
"localhost", localhost +        + ":" + server.getPort());
+    props.setProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS, \
Integer.toString(2)); +    try {
+      client = new NettyAvroRpcClient();
+      client.configure(props);
+      for (int i = 0; i < 5; i++) {
+        client.append(EventBuilder.withBody("evt:" + i, Charset.forName("UTF8")));
+      }
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
+
+  /**
+   * Simple request with compression on the server and client with compression
+   * level 0
+   *
+   * configure the NettyAvroRpcClient with a non-default
+   * NioClientSocketChannelFactory number of io worker threads
+   *
+   * Compression level 0 = no compression
+   *
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testAppendWithMaxIOWorkersSimpleCompressionLevel0() throws \
FlumeException, +      EventDeliveryException {
+    NettyAvroRpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler(), 0, true);
+    Properties props = new Properties();
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "localhost");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + \
"localhost", localhost +        + ":" + server.getPort());
+    props.setProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS, \
Integer.toString(2)); +    \
props.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, \
"deflate"); +    props.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL, \
"" + 0); +
+    try {
+      client = new NettyAvroRpcClient();
+      client.configure(props);
+      for (int i = 0; i < 5; i++) {
+        client.append(EventBuilder.withBody("evt:" + i, Charset.forName("UTF8")));
+      }
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
 }


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic