[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: [1/2] flume git commit: FLUME-2631. End to End authentication in Flume
From: hshreedharan () apache ! org
Date: 2015-03-06 7:20:08
Message-ID: 4eb85dff3d2b45c5898f654c24c0771f () git ! apache ! org
[Download RAW message or body]
Repository: flume
Updated Branches:
refs/heads/flume-1.6 fa7ead55c -> 1b0f051b6
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java \
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 33f73a9..9a48841 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -18,7 +18,6 @@
package org.apache.flume.sink.hdfs;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
@@ -31,7 +30,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import org.apache.flume.Channel;
@@ -41,6 +39,9 @@ import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.SystemClock;
import org.apache.flume.Transaction;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.FlumeAuthenticator;
+import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.instrumentation.SinkCounter;
@@ -50,9 +51,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,12 +98,6 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable { private static final int defaultThreadPoolSize = 10;
private static final int defaultRollTimerPoolSize = 1;
- /**
- * Singleton credential manager that manages static credentials for the
- * entire JVM
- */
- private static final AtomicReference<KerberosUser> staticLogin
- = new AtomicReference<KerberosUser>();
private final HDFSWriterFactory writerFactory;
private WriterLinkedHashMap sfWriters;
@@ -129,11 +121,6 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable { private ExecutorService callTimeoutPool;
private ScheduledExecutorService timedRollerPool;
- private String kerbConfPrincipal;
- private String kerbKeytab;
- private String proxyUserName;
- private UserGroupInformation proxyTicket;
-
private boolean needRounding = false;
private int roundUnit = Calendar.SECOND;
private int roundValue = 1;
@@ -150,6 +137,7 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable { private final Object sfWritersLock = new Object();
private long retryInterval;
private int tryCount;
+ private PrivilegedExecutor privExecutor;
/*
@@ -225,9 +213,9 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable { defaultThreadPoolSize);
rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize",
defaultRollTimerPoolSize);
- kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", "");
- kerbKeytab = context.getString("hdfs.kerberosKeytab", "");
- proxyUserName = context.getString("hdfs.proxyUser", "");
+ String kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal");
+ String kerbKeytab = context.getString("hdfs.kerberosKeytab");
+ String proxyUser = context.getString("hdfs.proxyUser");
tryCount = context.getInteger("hdfs.closeTries", defaultTryCount);
if(tryCount <= 0) {
LOG.warn("Retry count value : " + tryCount + " is not " +
@@ -269,9 +257,13 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable { + " when fileType is: " + fileType);
}
- if (!authenticate()) {
- LOG.error("Failed to authenticate!");
- }
+ // get the appropriate executor
+ this.privExecutor = FlumeAuthenticationUtil.getAuthenticator(
+ kerbConfPrincipal, kerbKeytab).proxyAs(proxyUser);
+
+
+
+
needRounding = context.getBoolean("hdfs.round", false);
if(needRounding) {
@@ -482,7 +474,7 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable { rollSize, rollCount,
batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType, hdfsWriter, timedRollerPool,
- proxyTicket, sinkCounter, idleTimeout, closeCallback,
+ privExecutor, sinkCounter, idleTimeout, closeCallback,
lookupPath, callTimeout, callTimeoutPool, retryInterval,
tryCount);
if(mockFs != null) {
@@ -551,197 +543,6 @@ public class HDFSEventSink extends AbstractSink implements \
Configurable { super.start();
}
- private boolean authenticate() {
-
- // logic for kerberos login
- boolean useSecurity = UserGroupInformation.isSecurityEnabled();
-
- LOG.info("Hadoop Security enabled: " + useSecurity);
-
- if (useSecurity) {
-
- // sanity checking
- if (kerbConfPrincipal.isEmpty()) {
- LOG.error("Hadoop running in secure mode, but Flume config doesn't "
- + "specify a principal to use for Kerberos auth.");
- return false;
- }
- if (kerbKeytab.isEmpty()) {
- LOG.error("Hadoop running in secure mode, but Flume config doesn't "
- + "specify a keytab to use for Kerberos auth.");
- return false;
- } else {
- //If keytab is specified, user should want it take effect.
- //HDFSEventSink will halt when keytab file is non-exist or unreadable
- File kfile = new File(kerbKeytab);
- if (!(kfile.isFile() && kfile.canRead())) {
- throw new IllegalArgumentException("The keyTab file: "
- + kerbKeytab + " is nonexistent or can't read. "
- + "Please specify a readable keytab file for Kerberos auth.");
- }
- }
-
- String principal;
- try {
- // resolves _HOST pattern using standard Hadoop search/replace
- // via DNS lookup when 2nd argument is empty
- principal = SecurityUtil.getServerPrincipal(kerbConfPrincipal, "");
- } catch (IOException e) {
- LOG.error("Host lookup error resolving kerberos principal ("
- + kerbConfPrincipal + "). Exception follows.", e);
- return false;
- }
-
- Preconditions.checkNotNull(principal, "Principal must not be null");
- KerberosUser prevUser = staticLogin.get();
- KerberosUser newUser = new KerberosUser(principal, kerbKeytab);
-
- // be cruel and unusual when user tries to login as multiple principals
- // this isn't really valid with a reconfigure but this should be rare
- // enough to warrant a restart of the agent JVM
- // TODO: find a way to interrogate the entire current config state,
- // since we don't have to be unnecessarily protective if they switch all
- // HDFS sinks to use a different principal all at once.
- Preconditions.checkState(prevUser == null || prevUser.equals(newUser),
- "Cannot use multiple kerberos principals in the same agent. " +
- " Must restart agent to use new principal or keytab. " +
- "Previous = %s, New = %s", prevUser, newUser);
-
- // attempt to use cached credential if the user is the same
- // this is polite and should avoid flooding the KDC with auth requests
- UserGroupInformation curUser = null;
- if (prevUser != null && prevUser.equals(newUser)) {
- try {
- curUser = UserGroupInformation.getLoginUser();
- } catch (IOException e) {
- LOG.warn("User unexpectedly had no active login. Continuing with " +
- "authentication", e);
- }
- }
-
- if (curUser == null || !curUser.getUserName().equals(principal)) {
- try {
- // static login
- kerberosLogin(this, principal, kerbKeytab);
- } catch (IOException e) {
- LOG.error("Authentication or file read error while attempting to "
- + "login as kerberos principal (" + principal + ") using "
- + "keytab (" + kerbKeytab + "). Exception follows.", e);
- return false;
- }
- } else {
- LOG.debug("{}: Using existing principal login: {}", this, curUser);
- }
-
- // we supposedly got through this unscathed... so store the static user
- staticLogin.set(newUser);
- }
-
- // hadoop impersonation works with or without kerberos security
- proxyTicket = null;
- if (!proxyUserName.isEmpty()) {
- try {
- proxyTicket = UserGroupInformation.createProxyUser(
- proxyUserName, UserGroupInformation.getLoginUser());
- } catch (IOException e) {
- LOG.error("Unable to login as proxy user. Exception follows.", e);
- return false;
- }
- }
-
- UserGroupInformation ugi = null;
- if (proxyTicket != null) {
- ugi = proxyTicket;
- } else if (useSecurity) {
- try {
- ugi = UserGroupInformation.getLoginUser();
- } catch (IOException e) {
- LOG.error("Unexpected error: Unable to get authenticated user after " +
- "apparent successful login! Exception follows.", e);
- return false;
- }
- }
-
- if (ugi != null) {
- // dump login information
- AuthenticationMethod authMethod = ugi.getAuthenticationMethod();
- LOG.info("Auth method: {}", authMethod);
- LOG.info(" User name: {}", ugi.getUserName());
- LOG.info(" Using keytab: {}", ugi.isFromKeytab());
- if (authMethod == AuthenticationMethod.PROXY) {
- UserGroupInformation superUser;
- try {
- superUser = UserGroupInformation.getLoginUser();
- LOG.info(" Superuser auth: {}", superUser.getAuthenticationMethod());
- LOG.info(" Superuser name: {}", superUser.getUserName());
- LOG.info(" Superuser using keytab: {}", superUser.isFromKeytab());
- } catch (IOException e) {
- LOG.error("Unexpected error: unknown superuser impersonating proxy.",
- e);
- return false;
- }
- }
-
- LOG.info("Logged in as user {}", ugi.getUserName());
-
- return true;
- }
-
- return true;
- }
-
- /**
- * Static synchronized method for static Kerberos login. <br/>
- * Static synchronized due to a thundering herd problem when multiple Sinks
- * attempt to log in using the same principal at the same time with the
- * intention of impersonating different users (or even the same user).
- * If this is not controlled, MIT Kerberos v5 believes it is seeing a replay
- * attach and it returns:
- * <blockquote>Request is a replay (34) - PROCESS_TGS</blockquote>
- * In addition, since the underlying Hadoop APIs we are using for
- * impersonation are static, we define this method as static as well.
- *
- * @param principal
- * Fully-qualified principal to use for authentication.
- * @param keytab
- * Location of keytab file containing credentials for principal.
- * @return Logged-in user
- * @throws IOException
- * if login fails.
- */
- private static synchronized UserGroupInformation kerberosLogin(
- HDFSEventSink sink, String principal, String keytab) throws IOException {
-
- // if we are the 2nd user thru the lock, the login should already be
- // available statically if login was successful
- UserGroupInformation curUser = null;
- try {
- curUser = UserGroupInformation.getLoginUser();
- } catch (IOException e) {
- // not a big deal but this shouldn't typically happen because it will
- // generally fall back to the UNIX user
- LOG.debug("Unable to get login user before Kerberos auth attempt.", e);
- }
-
- // we already have logged in successfully
- if (curUser != null && curUser.getUserName().equals(principal)) {
- LOG.debug("{}: Using existing principal ({}): {}",
- new Object[]{sink, principal, curUser});
-
- // no principal found
- } else {
-
- LOG.info("{}: Attempting kerberos login as principal ({}) from keytab " +
- "file ({})", new Object[]{sink, principal, keytab});
-
- // attempt static kerberos login
- UserGroupInformation.loginUserFromKeytab(principal, keytab);
- curUser = UserGroupInformation.getLoginUser();
- }
-
- return curUser;
- }
-
@Override
public String toString() {
return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() +
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java \
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 7c74b16..2581f73 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -29,6 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.hadoop.conf.Configuration;
@@ -53,10 +55,12 @@ public class TestBucketWriter {
private Context ctx = new Context();
private static ScheduledExecutorService timedRollerPool;
+ private static PrivilegedExecutor proxy;
@BeforeClass
public static void setup() {
timedRollerPool = Executors.newSingleThreadScheduledExecutor();
+ proxy = FlumeAuthenticationUtil.getAuthenticator(null, null).proxyAs(null);
}
@AfterClass
@@ -72,7 +76,7 @@ public class TestBucketWriter {
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
"/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
- hdfsWriter, timedRollerPool, null,
+ hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
@@ -97,7 +101,7 @@ public class TestBucketWriter {
BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0,
ctx, "/tmp", "file", "", ".tmp", null, null,
SequenceFile.CompressionType.NONE, hdfsWriter,timedRollerPool,
- null, new SinkCounter("test-bucket-writer-" +
+ proxy, new SinkCounter("test-bucket-writer-" +
System.currentTimeMillis()),0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0);
@@ -124,7 +128,7 @@ public class TestBucketWriter {
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
"/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
- hdfsWriter, timedRollerPool, null,
+ hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
0, new HDFSEventSink.WriterCallback() {
@Override
@@ -147,7 +151,7 @@ public class TestBucketWriter {
bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
"/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
- hdfsWriter, timedRollerPool, null,
+ hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-"
+ System.currentTimeMillis()), 0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0);
@@ -230,7 +234,7 @@ public class TestBucketWriter {
BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
0, ctx, path, name, "", ".tmp", null, null,
SequenceFile.CompressionType.NONE, hdfsWriter,
- timedRollerPool, null, new SinkCounter("test-bucket-writer-"
+ timedRollerPool, proxy, new SinkCounter("test-bucket-writer-"
+ System.currentTimeMillis()),
0, null, null, 30000, Executors.newSingleThreadExecutor(),
0, 0);
@@ -255,7 +259,7 @@ public class TestBucketWriter {
BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0,
0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
SequenceFile.CompressionType.NONE, hdfsWriter,
- timedRollerPool, null, new SinkCounter("test-bucket-writer-"
+ timedRollerPool, proxy, new SinkCounter("test-bucket-writer-"
+ System.currentTimeMillis()), 0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0);
@@ -283,7 +287,7 @@ public class TestBucketWriter {
BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0,
0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
SequenceFile.CompressionType.NONE, hdfsWriter,
- timedRollerPool, null, new SinkCounter(
+ timedRollerPool, proxy, new SinkCounter(
"test-bucket-writer-" + System.currentTimeMillis()), 0,
null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
@@ -316,7 +320,7 @@ public class TestBucketWriter {
0, ctx, "/tmp", "file", "", ".tmp", suffix,
HDFSEventSink.getCodec("gzip"),
SequenceFile.CompressionType.BLOCK, hdfsWriter,
- timedRollerPool, null, new SinkCounter("test-bucket-writer-"
+ timedRollerPool, proxy, new SinkCounter("test-bucket-writer-"
+ System.currentTimeMillis()), 0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0
);
@@ -348,7 +352,7 @@ public class TestBucketWriter {
BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null,
SequenceFile.CompressionType.NONE, hdfsWriter,
- timedRollerPool, null, new SinkCounter(
+ timedRollerPool, proxy, new SinkCounter(
"test-bucket-writer-" + System.currentTimeMillis()), 0,
null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
@@ -368,7 +372,7 @@ public class TestBucketWriter {
BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
0, ctx, "/tmp", "file", "", SUFFIX, null, null,
SequenceFile.CompressionType.NONE, hdfsWriter,
- timedRollerPool, null, new SinkCounter(
+ timedRollerPool, proxy, new SinkCounter(
"test-bucket-writer-" + System.currentTimeMillis()), 0,
null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
@@ -388,7 +392,7 @@ public class TestBucketWriter {
BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
0, ctx, "/tmp", "file", "", SUFFIX, null, null,
SequenceFile.CompressionType.NONE,
- hdfsWriter, timedRollerPool, null,
+ hdfsWriter, timedRollerPool, proxy,
new SinkCounter(
"test-bucket-writer-" + System.currentTimeMillis()), 0,
new HDFSEventSink.WriterCallback() {
@@ -442,7 +446,7 @@ public class TestBucketWriter {
BucketWriter bucketWriter = new BucketWriter(0, 0, 1, 1, ctx,
hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null,
null, new MockDataStream(mockFs),
- timedRollerPool, null,
+ timedRollerPool, proxy,
new SinkCounter(
"test-bucket-writer-" + System.currentTimeMillis()),
0, null, null, 30000, Executors.newSingleThreadExecutor(), 1,
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java \
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index 1b7a364..23862eb 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -276,7 +276,7 @@ public class TestHDFSEventSink {
Assert.fail("no exception thrown");
} catch (IllegalArgumentException expected) {
Assert.assertTrue(expected.getMessage().contains(
- "is nonexistent or can't read."));
+ "Keytab is not a readable file"));
} finally {
//turn security off
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index 5de0bd5..e659ada 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -36,6 +36,8 @@ import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Transaction;
import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
@@ -54,7 +56,6 @@ import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.security.PrivilegedExceptionAction;
-import org.apache.hadoop.hbase.security.User;
/**
@@ -103,11 +104,11 @@ public class HBaseSink extends AbstractSink implements \
Configurable { private Context serializerContext;
private String kerberosPrincipal;
private String kerberosKeytab;
- private User hbaseUser;
private boolean enableWal = true;
private boolean batchIncrements = false;
private Method refGetFamilyMap = null;
private SinkCounter sinkCounter;
+ private PrivilegedExecutor privilegedExecutor;
// Internal hooks used for unit testing.
private DebugIncrementsCallback debugIncrCallback = null;
@@ -132,17 +133,14 @@ public class HBaseSink extends AbstractSink implements \
Configurable { Preconditions.checkArgument(table == null, "Please call stop " +
"before calling start on an old instance.");
try {
- if (HBaseSinkSecurityManager.isSecurityEnabled(config)) {
- hbaseUser = HBaseSinkSecurityManager.login(config, null,
- kerberosPrincipal, kerberosKeytab);
- }
+ privilegedExecutor = \
FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab); } catch \
(Exception ex) { sinkCounter.incrementConnectionFailedCount();
throw new FlumeException("Failed to login to HBase using "
+ "provided credentials.", ex);
}
try {
- table = runPrivileged(new PrivilegedExceptionAction<HTable>() {
+ table = privilegedExecutor.execute(new PrivilegedExceptionAction<HTable>() {
@Override
public HTable run() throws Exception {
HTable table = new HTable(config, tableName);
@@ -160,7 +158,7 @@ public class HBaseSink extends AbstractSink implements \
Configurable { " from HBase", e);
}
try {
- if (!runPrivileged(new PrivilegedExceptionAction<Boolean>() {
+ if (!privilegedExecutor.execute(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException {
return table.getTableDescriptor().hasFamily(columnFamily);
@@ -233,8 +231,8 @@ public class HBaseSink extends AbstractSink implements \
Configurable { logger.error("Could not instantiate event serializer." , e);
Throwables.propagate(e);
}
- kerberosKeytab = \
context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB, "");
- kerberosPrincipal = \
context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL, ""); + \
kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB); + \
kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL);
enableWal = context.getBoolean(HBaseSinkConfigurationConstants
.CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL);
@@ -371,7 +369,7 @@ public class HBaseSink extends AbstractSink implements \
Configurable { private void putEventsAndCommit(final List<Row> actions,
final List<Increment> incs, Transaction txn) throws Exception {
- runPrivileged(new PrivilegedExceptionAction<Void>() {
+ privilegedExecutor.execute(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
for (Row r : actions) {
@@ -388,7 +386,7 @@ public class HBaseSink extends AbstractSink implements \
Configurable { }
});
- runPrivileged(new PrivilegedExceptionAction<Void>() {
+ privilegedExecutor.execute(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
@@ -416,18 +414,6 @@ public class HBaseSink extends AbstractSink implements \
Configurable { sinkCounter.addToEventDrainSuccessCount(actions.size());
}
- private <T> T runPrivileged(final PrivilegedExceptionAction<T> action)
- throws Exception {
- if(hbaseUser != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("Calling runAs as hbase user: " + hbaseUser.getName());
- }
- return hbaseUser.runAs(action);
- } else {
- return action.run();
- }
- }
-
/**
* The method getFamilyMap() is no longer available in Hbase 0.96.
* We must use reflection to determine which version we may use.
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java
deleted file mode 100644
index 762fce9..0000000
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkSecurityManager.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.sink.hbase;
-
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import org.apache.flume.FlumeException;
-import org.apache.flume.sink.hdfs.KerberosUser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class to handle logging into HBase with the credentials passed in.
- */
-public class HBaseSinkSecurityManager {
-
- /*
- * volatile for safe publication. Since this is updated only by
- * a single thread (configuration) and read later by the sink threads,
- * this can just be volatile, no need of Atomic reference.
- */
- private volatile static KerberosUser loggedInUser;
- private static final String FLUME_KEYTAB_KEY = "flume.keytab.key";
- private static final String FLUME_PRINCIPAL_KEY = "flume.principal.key";
- private static final Logger LOG =
- LoggerFactory.getLogger(HBaseSinkSecurityManager.class);
-
- /**
- * Checks if security is enabled for the HBase cluster.
- *
- * @return - true if security is enabled on the HBase cluster and
- * the underlying HDFS cluster.
- */
- public static boolean isSecurityEnabled(Configuration conf) {
- return User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf);
- }
-
- /**
- * Login the user using the configuration, and the hostname specified to use
- * for logging in.
- *
- * @param conf - Configuration to use for logging the user in.
- * @param hostname - The hostname to use for logging the user in. If no
- * hostname is specified (null or empty string), the canonical hostname for
- * the address returned by {@linkplain InetAddress#getLocalHost()} will be
- * used.
- * @return The logged in HBase {@linkplain User}.
- * @throws IOException if login failed, or hostname lookup failed.
- */
- public static synchronized User login(Configuration conf, String hostname,
- String kerberosPrincipal, String kerberosKeytab) throws IOException {
- if (kerberosPrincipal.isEmpty()) {
- String msg = "Login failed, since kerberos principal was not specified.";
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
- if (kerberosKeytab.isEmpty()) {
- String msg = "Login failed, since kerberos keytab was not specified.";
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- } else {
- //If keytab is specified, user should want it take effect.
- //HDFSEventSink will halt when keytab file is non-exist or unreadable
- File kfile = new File(kerberosKeytab);
- if (!(kfile.isFile() && kfile.canRead())) {
- throw new IllegalArgumentException("The keyTab file: "
- + kerberosKeytab + " is nonexistent or can't read. "
- + "Please specify a readable keytab file for Kerberos auth.");
- }
- }
- String principal = kerberosPrincipal;
- try {
- // resolves _HOST pattern using standard Hadoop search/replace
- // via DNS lookup when 2nd argument is empty
- principal = SecurityUtil.getServerPrincipal(kerberosPrincipal,"");
- } catch (IOException e) {
- LOG.error("Host lookup error resolving kerberos principal ("
- + kerberosPrincipal + "). Exception follows.", e);
- throw e;
- }
- Preconditions.checkNotNull(principal, "Principal must not be null");
- KerberosUser newUser = new KerberosUser(principal, kerberosKeytab);
- //The HDFS Sink does not allow login credentials to change.
- //To be uniform, we will do the same thing here.
- User hbaseUser = null;
- boolean loggedIn = false;
- if (loggedInUser != null) {
- Preconditions.checkArgument(newUser.equals(loggedInUser),
- "Cannot switch kerberos credentials during a reconfiguration. "
- + "Please restart the agent to set the new credentials.");
- try {
- hbaseUser = User.create(UserGroupInformation.getLoginUser());
- loggedIn = true;
- } catch (IOException ex) {
- LOG.warn("Previous login does not exist, "
- + "will authenticate against KDC");
- }
- }
- if (!loggedIn) {
- if (hostname == null || hostname.isEmpty()) {
- hostname = InetAddress.getLocalHost().getCanonicalHostName();
- }
- conf.set(FLUME_KEYTAB_KEY, kerberosKeytab);
- conf.set(FLUME_PRINCIPAL_KEY, principal);
- User.login(conf, FLUME_KEYTAB_KEY, FLUME_PRINCIPAL_KEY, hostname);
- hbaseUser = User.create(UserGroupInformation.getLoginUser());
- loggedInUser = newUser;
- //TODO: Set the loggedInUser to the current user.
- LOG.info("Logged into HBase as user: " + hbaseUser.getName());
- }
- return hbaseUser;
- }
-}
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3e40558..aad8be6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@ limitations under the License.
<module>flume-ng-sdk</module>
<module>flume-ng-tests</module>
<module>flume-tools</module>
+ <module>flume-ng-auth</module>
</modules>
<profiles>
@@ -1226,6 +1227,12 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-auth</artifactId>
+ <version>1.6.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.6.0-SNAPSHOT</version>
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic