[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-commits
Subject:    [1/2] flume git commit: FLUME-2631. End to End authentication in Flume
From:       hshreedharan () apache ! org
Date:       2015-03-06 7:20:08
Message-ID: 4eb85dff3d2b45c5898f654c24c0771f () git ! apache ! org
[Download RAW message or body]

Repository: flume
Updated Branches:
  refs/heads/flume-1.6 fa7ead55c -> 1b0f051b6


http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java \
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
 index 33f73a9..9a48841 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
                
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
 @@ -18,7 +18,6 @@
 
 package org.apache.flume.sink.hdfs;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Calendar;
@@ -31,7 +30,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flume.Channel;
@@ -41,6 +39,9 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.SystemClock;
 import org.apache.flume.Transaction;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.FlumeAuthenticator;
+import org.apache.flume.auth.PrivilegedExecutor;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.BucketPath;
 import org.apache.flume.instrumentation.SinkCounter;
@@ -50,9 +51,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,12 +98,6 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable {  private static final int defaultThreadPoolSize = 10;
   private static final int defaultRollTimerPoolSize = 1;
 
-  /**
-   * Singleton credential manager that manages static credentials for the
-   * entire JVM
-   */
-  private static final AtomicReference<KerberosUser> staticLogin
-      = new AtomicReference<KerberosUser>();
 
   private final HDFSWriterFactory writerFactory;
   private WriterLinkedHashMap sfWriters;
@@ -129,11 +121,6 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable {  private ExecutorService callTimeoutPool;
   private ScheduledExecutorService timedRollerPool;
 
-  private String kerbConfPrincipal;
-  private String kerbKeytab;
-  private String proxyUserName;
-  private UserGroupInformation proxyTicket;
-
   private boolean needRounding = false;
   private int roundUnit = Calendar.SECOND;
   private int roundValue = 1;
@@ -150,6 +137,7 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable {  private final Object sfWritersLock = new Object();
   private long retryInterval;
   private int tryCount;
+  private PrivilegedExecutor privExecutor;
 
 
   /*
@@ -225,9 +213,9 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable {  defaultThreadPoolSize);
     rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize",
         defaultRollTimerPoolSize);
-    kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", "");
-    kerbKeytab = context.getString("hdfs.kerberosKeytab", "");
-    proxyUserName = context.getString("hdfs.proxyUser", "");
+    String kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal");
+    String kerbKeytab = context.getString("hdfs.kerberosKeytab");
+    String proxyUser = context.getString("hdfs.proxyUser");
     tryCount = context.getInteger("hdfs.closeTries", defaultTryCount);
     if(tryCount <= 0) {
       LOG.warn("Retry count value : " + tryCount + " is not " +
@@ -269,9 +257,13 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable {  + " when fileType is: " + fileType);
     }
 
-    if (!authenticate()) {
-      LOG.error("Failed to authenticate!");
-    }
+    // get the appropriate executor
+    this.privExecutor = FlumeAuthenticationUtil.getAuthenticator(
+            kerbConfPrincipal, kerbKeytab).proxyAs(proxyUser);
+
+
+
+
     needRounding = context.getBoolean("hdfs.round", false);
 
     if(needRounding) {
@@ -482,7 +474,7 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable {  rollSize, rollCount,
       batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
       suffix, codeC, compType, hdfsWriter, timedRollerPool,
-      proxyTicket, sinkCounter, idleTimeout, closeCallback,
+      privExecutor, sinkCounter, idleTimeout, closeCallback,
       lookupPath, callTimeout, callTimeoutPool, retryInterval,
       tryCount);
     if(mockFs != null) {
@@ -551,197 +543,6 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable {  super.start();
   }
 
-  private boolean authenticate() {
-
-    // logic for kerberos login
-    boolean useSecurity = UserGroupInformation.isSecurityEnabled();
-
-    LOG.info("Hadoop Security enabled: " + useSecurity);
-
-    if (useSecurity) {
-
-      // sanity checking
-      if (kerbConfPrincipal.isEmpty()) {
-        LOG.error("Hadoop running in secure mode, but Flume config doesn't "
-                + "specify a principal to use for Kerberos auth.");
-        return false;
-      }
-      if (kerbKeytab.isEmpty()) {
-        LOG.error("Hadoop running in secure mode, but Flume config doesn't "
-                + "specify a keytab to use for Kerberos auth.");
-        return false;
-      } else {
-        //If keytab is specified, user should want it take effect.
-        //HDFSEventSink will halt when keytab file is non-exist or unreadable
-        File kfile = new File(kerbKeytab);
-        if (!(kfile.isFile() && kfile.canRead())) {
-          throw new IllegalArgumentException("The keyTab file: "
-                  + kerbKeytab + " is nonexistent or can't read. "
-                  + "Please specify a readable keytab file for Kerberos auth.");
-        }
-      }
-
-      String principal;
-      try {
-        // resolves _HOST pattern using standard Hadoop search/replace
-        // via DNS lookup when 2nd argument is empty
-        principal = SecurityUtil.getServerPrincipal(kerbConfPrincipal, "");
-      } catch (IOException e) {
-        LOG.error("Host lookup error resolving kerberos principal ("
-                + kerbConfPrincipal + "). Exception follows.", e);
-        return false;
-      }
-
-      Preconditions.checkNotNull(principal, "Principal must not be null");
-      KerberosUser prevUser = staticLogin.get();
-      KerberosUser newUser = new KerberosUser(principal, kerbKeytab);
-
-      // be cruel and unusual when user tries to login as multiple principals
-      // this isn't really valid with a reconfigure but this should be rare
-      // enough to warrant a restart of the agent JVM
-      // TODO: find a way to interrogate the entire current config state,
-      // since we don't have to be unnecessarily protective if they switch all
-      // HDFS sinks to use a different principal all at once.
-      Preconditions.checkState(prevUser == null || prevUser.equals(newUser),
-              "Cannot use multiple kerberos principals in the same agent. " +
-                      " Must restart agent to use new principal or keytab. " +
-                      "Previous = %s, New = %s", prevUser, newUser);
-
-      // attempt to use cached credential if the user is the same
-      // this is polite and should avoid flooding the KDC with auth requests
-      UserGroupInformation curUser = null;
-      if (prevUser != null && prevUser.equals(newUser)) {
-        try {
-          curUser = UserGroupInformation.getLoginUser();
-        } catch (IOException e) {
-          LOG.warn("User unexpectedly had no active login. Continuing with " +
-                  "authentication", e);
-        }
-      }
-
-      if (curUser == null || !curUser.getUserName().equals(principal)) {
-        try {
-          // static login
-          kerberosLogin(this, principal, kerbKeytab);
-        } catch (IOException e) {
-          LOG.error("Authentication or file read error while attempting to "
-                  + "login as kerberos principal (" + principal + ") using "
-                  + "keytab (" + kerbKeytab + "). Exception follows.", e);
-          return false;
-        }
-      } else {
-        LOG.debug("{}: Using existing principal login: {}", this, curUser);
-      }
-
-      // we supposedly got through this unscathed... so store the static user
-      staticLogin.set(newUser);
-    }
-
-    // hadoop impersonation works with or without kerberos security
-    proxyTicket = null;
-    if (!proxyUserName.isEmpty()) {
-      try {
-        proxyTicket = UserGroupInformation.createProxyUser(
-                proxyUserName, UserGroupInformation.getLoginUser());
-      } catch (IOException e) {
-        LOG.error("Unable to login as proxy user. Exception follows.", e);
-        return false;
-      }
-    }
-
-    UserGroupInformation ugi = null;
-    if (proxyTicket != null) {
-      ugi = proxyTicket;
-    } else if (useSecurity) {
-      try {
-        ugi = UserGroupInformation.getLoginUser();
-      } catch (IOException e) {
-        LOG.error("Unexpected error: Unable to get authenticated user after " +
-                "apparent successful login! Exception follows.", e);
-        return false;
-      }
-    }
-
-    if (ugi != null) {
-      // dump login information
-      AuthenticationMethod authMethod = ugi.getAuthenticationMethod();
-      LOG.info("Auth method: {}", authMethod);
-      LOG.info(" User name: {}", ugi.getUserName());
-      LOG.info(" Using keytab: {}", ugi.isFromKeytab());
-      if (authMethod == AuthenticationMethod.PROXY) {
-        UserGroupInformation superUser;
-        try {
-          superUser = UserGroupInformation.getLoginUser();
-          LOG.info(" Superuser auth: {}", superUser.getAuthenticationMethod());
-          LOG.info(" Superuser name: {}", superUser.getUserName());
-          LOG.info(" Superuser using keytab: {}", superUser.isFromKeytab());
-        } catch (IOException e) {
-          LOG.error("Unexpected error: unknown superuser impersonating proxy.",
-                  e);
-          return false;
-        }
-      }
-
-      LOG.info("Logged in as user {}", ugi.getUserName());
-
-      return true;
-    }
-
-    return true;
-  }
-
-  /**
-   * Static synchronized method for static Kerberos login. <br/>
-   * Static synchronized due to a thundering herd problem when multiple Sinks
-   * attempt to log in using the same principal at the same time with the
-   * intention of impersonating different users (or even the same user).
-   * If this is not controlled, MIT Kerberos v5 believes it is seeing a replay
-   * attach and it returns:
-   * <blockquote>Request is a replay (34) - PROCESS_TGS</blockquote>
-   * In addition, since the underlying Hadoop APIs we are using for
-   * impersonation are static, we define this method as static as well.
-   *
-   * @param principal
-   *         Fully-qualified principal to use for authentication.
-   * @param keytab
-   *         Location of keytab file containing credentials for principal.
-   * @return Logged-in user
-   * @throws IOException
-   *         if login fails.
-   */
-  private static synchronized UserGroupInformation kerberosLogin(
-          HDFSEventSink sink, String principal, String keytab) throws IOException {
-
-    // if we are the 2nd user thru the lock, the login should already be
-    // available statically if login was successful
-    UserGroupInformation curUser = null;
-    try {
-      curUser = UserGroupInformation.getLoginUser();
-    } catch (IOException e) {
-      // not a big deal but this shouldn't typically happen because it will
-      // generally fall back to the UNIX user
-      LOG.debug("Unable to get login user before Kerberos auth attempt.", e);
-    }
-
-    // we already have logged in successfully
-    if (curUser != null && curUser.getUserName().equals(principal)) {
-      LOG.debug("{}: Using existing principal ({}): {}",
-              new Object[]{sink, principal, curUser});
-
-      // no principal found
-    } else {
-
-      LOG.info("{}: Attempting kerberos login as principal ({}) from keytab " +
-              "file ({})", new Object[]{sink, principal, keytab});
-
-      // attempt static kerberos login
-      UserGroupInformation.loginUserFromKeytab(principal, keytab);
-      curUser = UserGroupInformation.getLoginUser();
-    }
-
-    return curUser;
-  }
-
   @Override
   public String toString() {
     return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() +

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java \
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
 index 7c74b16..2581f73 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
                
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
 @@ -29,6 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.PrivilegedExecutor;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.hadoop.conf.Configuration;
@@ -53,10 +55,12 @@ public class TestBucketWriter {
   private Context ctx = new Context();
 
   private static ScheduledExecutorService timedRollerPool;
+  private static PrivilegedExecutor proxy;
 
   @BeforeClass
   public static void setup() {
     timedRollerPool = Executors.newSingleThreadScheduledExecutor();
+    proxy = FlumeAuthenticationUtil.getAuthenticator(null, null).proxyAs(null);
   }
 
   @AfterClass
@@ -72,7 +76,7 @@ public class TestBucketWriter {
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
         "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
-        hdfsWriter, timedRollerPool, null,
+        hdfsWriter, timedRollerPool, proxy,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
         null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
 
@@ -97,7 +101,7 @@ public class TestBucketWriter {
     BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0,
       ctx, "/tmp", "file", "", ".tmp", null, null,
       SequenceFile.CompressionType.NONE, hdfsWriter,timedRollerPool,
-      null, new SinkCounter("test-bucket-writer-" +
+      proxy, new SinkCounter("test-bucket-writer-" +
       System.currentTimeMillis()),0, null, null, 30000,
       Executors.newSingleThreadExecutor(), 0, 0);
 
@@ -124,7 +128,7 @@ public class TestBucketWriter {
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
       "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
-      hdfsWriter, timedRollerPool, null,
+      hdfsWriter, timedRollerPool, proxy,
       new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
       0, new HDFSEventSink.WriterCallback() {
       @Override
@@ -147,7 +151,7 @@ public class TestBucketWriter {
 
     bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
       "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
-      hdfsWriter, timedRollerPool, null,
+      hdfsWriter, timedRollerPool, proxy,
       new SinkCounter("test-bucket-writer-"
         + System.currentTimeMillis()), 0, null, null, 30000,
       Executors.newSingleThreadExecutor(), 0, 0);
@@ -230,7 +234,7 @@ public class TestBucketWriter {
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
       0, ctx, path, name, "", ".tmp", null, null,
       SequenceFile.CompressionType.NONE, hdfsWriter,
-      timedRollerPool, null, new SinkCounter("test-bucket-writer-"
+      timedRollerPool, proxy, new SinkCounter("test-bucket-writer-"
       + System.currentTimeMillis()),
       0, null, null, 30000, Executors.newSingleThreadExecutor(),
       0, 0);
@@ -255,7 +259,7 @@ public class TestBucketWriter {
       BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0,
         0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
         SequenceFile.CompressionType.NONE, hdfsWriter,
-        timedRollerPool, null, new SinkCounter("test-bucket-writer-"
+        timedRollerPool, proxy, new SinkCounter("test-bucket-writer-"
         + System.currentTimeMillis()), 0, null, null, 30000,
         Executors.newSingleThreadExecutor(), 0, 0);
 
@@ -283,7 +287,7 @@ public class TestBucketWriter {
       BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0,
         0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
         SequenceFile.CompressionType.NONE, hdfsWriter,
-        timedRollerPool, null, new SinkCounter(
+        timedRollerPool, proxy, new SinkCounter(
         "test-bucket-writer-" + System.currentTimeMillis()), 0,
         null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
 
@@ -316,7 +320,7 @@ public class TestBucketWriter {
       0, ctx, "/tmp", "file", "", ".tmp", suffix,
       HDFSEventSink.getCodec("gzip"),
       SequenceFile.CompressionType.BLOCK, hdfsWriter,
-      timedRollerPool, null, new SinkCounter("test-bucket-writer-"
+      timedRollerPool, proxy, new SinkCounter("test-bucket-writer-"
       + System.currentTimeMillis()), 0, null, null, 30000,
       Executors.newSingleThreadExecutor(), 0, 0
     );
@@ -348,7 +352,7 @@ public class TestBucketWriter {
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
       0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null,
       SequenceFile.CompressionType.NONE, hdfsWriter,
-      timedRollerPool, null, new SinkCounter(
+      timedRollerPool, proxy, new SinkCounter(
         "test-bucket-writer-" + System.currentTimeMillis()), 0,
       null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
 
@@ -368,7 +372,7 @@ public class TestBucketWriter {
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
       0, ctx, "/tmp", "file", "", SUFFIX, null, null,
       SequenceFile.CompressionType.NONE, hdfsWriter,
-      timedRollerPool, null, new SinkCounter(
+      timedRollerPool, proxy, new SinkCounter(
         "test-bucket-writer-" + System.currentTimeMillis()), 0,
       null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
 
@@ -388,7 +392,7 @@ public class TestBucketWriter {
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
       0, ctx, "/tmp", "file", "", SUFFIX, null, null,
       SequenceFile.CompressionType.NONE,
-      hdfsWriter, timedRollerPool, null,
+      hdfsWriter, timedRollerPool, proxy,
       new SinkCounter(
         "test-bucket-writer-" + System.currentTimeMillis()), 0,
       new HDFSEventSink.WriterCallback() {
@@ -442,7 +446,7 @@ public class TestBucketWriter {
     BucketWriter bucketWriter = new BucketWriter(0, 0, 1, 1, ctx,
       hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null,
       null, new MockDataStream(mockFs),
-      timedRollerPool, null,
+      timedRollerPool, proxy,
       new SinkCounter(
         "test-bucket-writer-" + System.currentTimeMillis()),
       0, null, null, 30000, Executors.newSingleThreadExecutor(), 1,

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java \
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
 index 1b7a364..23862eb 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
                
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
 @@ -276,7 +276,7 @@ public class TestHDFSEventSink {
       Assert.fail("no exception thrown");
     } catch (IllegalArgumentException expected) {
       Assert.assertTrue(expected.getMessage().contains(
-          "is nonexistent or can't read."));
+          "Keytab is not a readable file"));
     } finally {
       //turn security off
       conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
 index 5de0bd5..e659ada 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
                
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
 @@ -36,6 +36,8 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
 import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.PrivilegedExecutor;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
@@ -54,7 +56,6 @@ import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import java.security.PrivilegedExceptionAction;
-import org.apache.hadoop.hbase.security.User;
 
 
 /**
@@ -103,11 +104,11 @@ public class HBaseSink extends AbstractSink implements \
Configurable {  private Context serializerContext;
   private String kerberosPrincipal;
   private String kerberosKeytab;
-  private User hbaseUser;
   private boolean enableWal = true;
   private boolean batchIncrements = false;
   private Method refGetFamilyMap = null;
   private SinkCounter sinkCounter;
+  private PrivilegedExecutor privilegedExecutor;
 
   // Internal hooks used for unit testing.
   private DebugIncrementsCallback debugIncrCallback = null;
@@ -132,17 +133,14 @@ public class HBaseSink extends AbstractSink implements \
Configurable {  Preconditions.checkArgument(table == null, "Please call stop " +
         "before calling start on an old instance.");
     try {
-      if (HBaseSinkSecurityManager.isSecurityEnabled(config)) {
-        hbaseUser = HBaseSinkSecurityManager.login(config, null,
-          kerberosPrincipal, kerberosKeytab);
-      }
+      privilegedExecutor = \
FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab);  } catch \
(Exception ex) {  sinkCounter.incrementConnectionFailedCount();
       throw new FlumeException("Failed to login to HBase using "
         + "provided credentials.", ex);
     }
     try {
-      table = runPrivileged(new PrivilegedExceptionAction<HTable>() {
+      table = privilegedExecutor.execute(new PrivilegedExceptionAction<HTable>() {
         @Override
         public HTable run() throws Exception {
           HTable table = new HTable(config, tableName);
@@ -160,7 +158,7 @@ public class HBaseSink extends AbstractSink implements \
Configurable {  " from HBase", e);
     }
     try {
-      if (!runPrivileged(new PrivilegedExceptionAction<Boolean>() {
+      if (!privilegedExecutor.execute(new PrivilegedExceptionAction<Boolean>() {
         @Override
         public Boolean run() throws IOException {
           return table.getTableDescriptor().hasFamily(columnFamily);
@@ -233,8 +231,8 @@ public class HBaseSink extends AbstractSink implements \
Configurable {  logger.error("Could not instantiate event serializer." , e);
       Throwables.propagate(e);
     }
-    kerberosKeytab = \
                context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB, "");
-    kerberosPrincipal = \
context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL, ""); +    \
kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB); +  \
kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL);
  
     enableWal = context.getBoolean(HBaseSinkConfigurationConstants
       .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL);
@@ -371,7 +369,7 @@ public class HBaseSink extends AbstractSink implements \
Configurable {  private void putEventsAndCommit(final List<Row> actions,
       final List<Increment> incs, Transaction txn) throws Exception {
 
-    runPrivileged(new PrivilegedExceptionAction<Void>() {
+    privilegedExecutor.execute(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
         for (Row r : actions) {
@@ -388,7 +386,7 @@ public class HBaseSink extends AbstractSink implements \
Configurable {  }
     });
 
-    runPrivileged(new PrivilegedExceptionAction<Void>() {
+    privilegedExecutor.execute(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
 
@@ -416,18 +414,6 @@ public class HBaseSink extends AbstractSink implements \
Configurable {  sinkCounter.addToEventDrainSuccessCount(actions.size());
   }
 
-  private <T> T runPrivileged(final PrivilegedExceptionAction<T> action)
-          throws Exception {
-    if(hbaseUser != null) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Calling runAs as hbase user: " + hbaseUser.getName());
-      }
-      return hbaseUser.runAs(action);
-    } else {
-      return action.run();
-    }
-  }
-
   /**
    * The method getFamilyMap() is no longer available in Hbase 0.96.
    * We must use reflection to determine which version we may use.

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java
 deleted file mode 100644
index 762fce9..0000000
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java
                
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.sink.hbase;
-
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import org.apache.flume.FlumeException;
-import org.apache.flume.sink.hdfs.KerberosUser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class to handle logging into HBase with the credentials passed in.
- */
-public class HBaseSinkSecurityManager {
-
-  /*
-   * volatile for safe publication. Since this is updated only by
-   * a single thread (configuration) and read later by the sink threads,
-   * this can just be volatile, no need of Atomic reference.
-   */
-  private volatile static KerberosUser loggedInUser;
-  private static final String FLUME_KEYTAB_KEY = "flume.keytab.key";
-  private static final String FLUME_PRINCIPAL_KEY = "flume.principal.key";
-  private static final Logger LOG =
-          LoggerFactory.getLogger(HBaseSinkSecurityManager.class);
-
-  /**
-   * Checks if security is enabled for the HBase cluster.
-   *
-   * @return - true if security is enabled on the HBase cluster and
-   * the underlying HDFS cluster.
-   */
-  public static boolean isSecurityEnabled(Configuration conf) {
-    return User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf);
-  }
-
-  /**
-   * Login the user using the configuration, and the hostname specified to use
-   * for logging in.
-   *
-   * @param conf - Configuration to use for logging the user in.
-   * @param hostname - The hostname to use for logging the user in. If no
-   * hostname is specified (null or empty string), the canonical hostname for
-   * the address returned by {@linkplain InetAddress#getLocalHost()} will be
-   * used.
-   * @return The logged in HBase {@linkplain User}.
-   * @throws IOException if login failed, or hostname lookup failed.
-   */
-  public static synchronized User login(Configuration conf, String hostname,
-          String kerberosPrincipal, String kerberosKeytab) throws IOException {
-    if (kerberosPrincipal.isEmpty()) {
-      String msg = "Login failed, since kerberos principal was not specified.";
-      LOG.error(msg);
-      throw new IllegalArgumentException(msg);
-    }
-    if (kerberosKeytab.isEmpty()) {
-      String msg = "Login failed, since kerberos keytab was not specified.";
-      LOG.error(msg);
-      throw new IllegalArgumentException(msg);
-    } else {
-      //If keytab is specified, user should want it take effect.
-      //HDFSEventSink will halt when keytab file is non-exist or unreadable
-      File kfile = new File(kerberosKeytab);
-      if (!(kfile.isFile() && kfile.canRead())) {
-        throw new IllegalArgumentException("The keyTab file: "
-                + kerberosKeytab + " is nonexistent or can't read. "
-                + "Please specify a readable keytab file for Kerberos auth.");
-      }
-    }
-    String principal = kerberosPrincipal;
-    try {
-      // resolves _HOST pattern using standard Hadoop search/replace
-      // via DNS lookup when 2nd argument is empty
-      principal = SecurityUtil.getServerPrincipal(kerberosPrincipal,"");
-    } catch (IOException e) {
-      LOG.error("Host lookup error resolving kerberos principal ("
-              + kerberosPrincipal + "). Exception follows.", e);
-      throw e;
-    }
-    Preconditions.checkNotNull(principal, "Principal must not be null");
-    KerberosUser newUser = new KerberosUser(principal, kerberosKeytab);
-    //The HDFS Sink does not allow login credentials to change.
-    //To be uniform, we will do the same thing here.
-    User hbaseUser = null;
-    boolean loggedIn = false;
-    if (loggedInUser != null) {
-      Preconditions.checkArgument(newUser.equals(loggedInUser),
-              "Cannot switch kerberos credentials during a reconfiguration. "
-              + "Please restart the agent to set the new credentials.");
-      try {
-        hbaseUser = User.create(UserGroupInformation.getLoginUser());
-        loggedIn = true;
-      } catch (IOException ex) {
-        LOG.warn("Previous login does not exist, "
-                + "will authenticate against KDC");
-      }
-    }
-    if (!loggedIn) {
-      if (hostname == null || hostname.isEmpty()) {
-        hostname = InetAddress.getLocalHost().getCanonicalHostName();
-      }
-      conf.set(FLUME_KEYTAB_KEY, kerberosKeytab);
-      conf.set(FLUME_PRINCIPAL_KEY, principal);
-      User.login(conf, FLUME_KEYTAB_KEY, FLUME_PRINCIPAL_KEY, hostname);
-      hbaseUser = User.create(UserGroupInformation.getLoginUser());
-      loggedInUser = newUser;
-      //TODO: Set the loggedInUser to the current user.
-      LOG.info("Logged into HBase as user: " + hbaseUser.getName());
-    }
-    return hbaseUser;
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3e40558..aad8be6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@ limitations under the License.
     <module>flume-ng-sdk</module>
     <module>flume-ng-tests</module>
     <module>flume-tools</module>
+    <module>flume-ng-auth</module>
   </modules>
 
   <profiles>
@@ -1226,6 +1227,12 @@ limitations under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-auth</artifactId>
+        <version>1.6.0-SNAPSHOT</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.flume.flume-ng-clients</groupId>
         <artifactId>flume-ng-log4jappender</artifactId>
         <version>1.6.0-SNAPSHOT</version>


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic