[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: [2/2] flume git commit: FLUME-2631. End to End authentication in Flume
From: hshreedharan () apache ! org
Date: 2015-03-06 7:20:09
Message-ID: b4bbae6a32ed4aafb200a2ada79df058 () git ! apache ! org
[Download RAW message or body]
FLUME-2631. End to End authentication in Flume
(Johny Rufus via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1b0f051b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1b0f051b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1b0f051b
Branch: refs/heads/flume-1.6
Commit: 1b0f051b610b5a102c869a9d06254258a3de898f
Parents: fa7ead5
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Mar 5 23:19:13 2015 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Mar 5 23:19:54 2015 -0800
----------------------------------------------------------------------
flume-ng-auth/pom.xml | 88 +++++++
.../flume/api/SecureRpcClientFactory.java | 40 ++++
.../apache/flume/api/SecureThriftRpcClient.java | 113 +++++++++
.../flume/auth/FlumeAuthenticationUtil.java | 99 ++++++++
.../apache/flume/auth/FlumeAuthenticator.java | 45 ++++
.../flume/auth/KerberosAuthenticator.java | 233 +++++++++++++++++++
.../apache/flume/auth/PrivilegedExecutor.java | 52 +++++
.../apache/flume/auth/SecurityException.java | 40 ++++
.../apache/flume/auth/SimpleAuthenticator.java | 88 +++++++
.../java/org/apache/flume/auth/UGIExecutor.java | 80 +++++++
.../flume/auth/TestFlumeAuthenticator.java | 128 ++++++++++
flume-ng-core/pom.xml | 5 +
.../java/org/apache/flume/sink/ThriftSink.java | 14 +-
.../org/apache/flume/source/ThriftSource.java | 67 +++++-
flume-ng-dist/pom.xml | 4 +
flume-ng-dist/src/main/assembly/bin.xml | 1 +
flume-ng-dist/src/main/assembly/src.xml | 1 +
.../api/RpcClientConfigurationConstants.java | 2 +
.../org/apache/flume/api/ThriftRpcClient.java | 30 ++-
flume-ng-sinks/flume-dataset-sink/pom.xml | 7 -
.../org/apache/flume/sink/kite/DatasetSink.java | 39 ++--
.../apache/flume/sink/kite/KerberosUtil.java | 187 ---------------
.../flume/sink/kite/TestKerberosUtil.java | 121 ----------
.../apache/flume/sink/hdfs/BucketWriter.java | 37 +--
.../apache/flume/sink/hdfs/HDFSEventSink.java | 229 ++----------------
.../flume/sink/hdfs/TestBucketWriter.java | 28 ++-
.../flume/sink/hdfs/TestHDFSEventSink.java | 2 +-
.../org/apache/flume/sink/hbase/HBaseSink.java | 34 +--
.../sink/hbase/HBaseSinkSecurityManager.java | 134 -----------
pom.xml | 7 +
30 files changed, 1185 insertions(+), 770 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-auth/pom.xml b/flume-ng-auth/pom.xml
new file mode 100644
index 0000000..292731d
--- /dev/null
+++ b/flume-ng-auth/pom.xml
@@ -0,0 +1,88 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" \
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + \
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 \
http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flume-parent</artifactId>
+ <groupId>org.apache.flume</groupId>
+ <version>1.6.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flume-ng-auth</artifactId>
+ <name>Flume Auth</name>
+ <description>Flume Authentication</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>2.3.7</version>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>${hadoop.common.artifact.id}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <version>${hadoop2.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java \
b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java new \
file mode 100644 index 0000000..c976458
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.api;
+
+import java.util.Properties;
+
+/**
+ * Factory class to construct Flume {@link RPCClient} implementations.
+ */
+public class SecureRpcClientFactory {
+
+ /**
+ * Return a secure {@linkplain org.apache.flume.api.RpcClient} that uses Thrift \
for communicating with + * the next hop.
+ * @param props
+ * @return - An {@linkplain org.apache.flume.api.RpcClient} which uses thrift \
configured with the + * given parameters.
+ */
+ public static RpcClient getThriftInstance(Properties props) {
+ ThriftRpcClient client = new SecureThriftRpcClient();
+ client.configure(props);
+ return client;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java \
b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java new \
file mode 100644 index 0000000..7316e1b
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.api;
+
+import org.apache.flume.FlumeException;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.FlumeAuthenticator;
+import org.apache.flume.auth.PrivilegedExecutor;
+import org.apache.thrift.transport.*;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class SecureThriftRpcClient extends ThriftRpcClient {
+
+ private static final String CLIENT_PRINCIPAL = "client-principal";
+ private static final String CLIENT_KEYTAB = "client-keytab";
+ private static final String SERVER_PRINCIPAL = "server-principal";
+
+ private String serverPrincipal;
+ private FlumeAuthenticator privilegedExecutor;
+
+ @Override
+ protected void configure(Properties properties) throws FlumeException {
+ super.configure(properties);
+ serverPrincipal = properties.getProperty(SERVER_PRINCIPAL);
+ if (serverPrincipal == null || serverPrincipal.isEmpty()) {
+ throw new IllegalArgumentException("Flume in secure mode, but Flume config \
doesn't " + + "specify a server principal to use for Kerberos auth.");
+ }
+ String clientPrincipal = properties.getProperty(CLIENT_PRINCIPAL);
+ String keytab = properties.getProperty(CLIENT_KEYTAB);
+ this.privilegedExecutor = \
FlumeAuthenticationUtil.getAuthenticator(clientPrincipal, keytab); + \
if(!privilegedExecutor.isAuthenticated()) { + throw new \
FlumeException("Authentication failed in Kerberos mode for " + + \
"principal " + clientPrincipal + " keytab " + keytab); + }
+ }
+
+ @Override
+ protected TTransport getTransport(TSocket tsocket) throws Exception {
+ Map<String, String> saslProperties = new HashMap<String, String>();
+ saslProperties.put(Sasl.QOP, "auth");
+ String[] names;
+ try {
+ names = FlumeAuthenticationUtil.splitKerberosName(serverPrincipal);
+ } catch (IOException e) {
+ throw new FlumeException(
+ "Error while trying to resolve Principal name - " + serverPrincipal, \
e); + }
+ return new UgiSaslClientTransport(
+ "GSSAPI", null, names[0], names[1], saslProperties, null, tsocket, \
privilegedExecutor); + }
+
+ /**
+ * This transport wraps the Sasl transports to set up the right UGI context for \
open(). + */
+ public static class UgiSaslClientTransport extends TSaslClientTransport {
+ PrivilegedExecutor privilegedExecutor;
+ public UgiSaslClientTransport(String mechanism, String authorizationId,
+ String protocol, String serverName, Map<String, String> props,
+ CallbackHandler cbh, TTransport transport, PrivilegedExecutor \
privilegedExecutor) throws IOException { + super(mechanism, authorizationId, \
protocol, serverName, props, cbh, + transport);
+ this.privilegedExecutor = privilegedExecutor;
+ }
+
+ // open the SASL transport with using the current UserGroupInformation
+ // This is needed to get the current login context stored
+ @Override
+ public void open() throws FlumeException {
+ try {
+ this.privilegedExecutor.execute(
+ new PrivilegedExceptionAction<Void>() {
+ public Void run() throws FlumeException {
+ try {
+ UgiSaslClientTransport.super.open();
+ } catch (TTransportException e) {
+ throw new FlumeException("Failed to open SASL transport", e);
+ }
+ return null;
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new FlumeException(
+ "Interrupted while opening underlying transport", e);
+ } catch (Exception e) {
+ throw new FlumeException("Failed to open SASL transport", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java \
b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java new \
file mode 100644 index 0000000..02afc0d
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+
+import javax.security.auth.callback.CallbackHandler;
+import java.io.IOException;
+
+/**
+ * FlumeAuthentication utility class that provides methods to get an
+ * Authenticator. If proper credentials are provided KerberosAuthenticator is
+ * returned which can be used to execute as the authenticated principal ,
+ * or else a SimpleAuthenticator which executes without any authentication
+ */
+public class FlumeAuthenticationUtil {
+
+ private FlumeAuthenticationUtil() {}
+
+ private static KerberosAuthenticator kerbAuthenticator;
+
+ /**
+ * If principal and keytab are null, this method returns a SimpleAuthenticator
+ * which executes without authentication. If valid credentials are
+ * provided KerberosAuthenitcator is returned which can be used to execute as
+ * the authenticated principal. Invalid credentials result in
+ * IllegalArgumentException and Failure to authenticate results in \
SecurityException + *
+ * @param principal
+ * @param keytab
+ * @return FlumeAuthenticator
+ *
+ * @throws org.apache.flume.auth.SecurityException
+ */
+ public synchronized static FlumeAuthenticator getAuthenticator(
+ String principal, String keytab) throws SecurityException {
+
+ if(principal == null && keytab == null) {
+ return SimpleAuthenticator.getSimpleAuthenticator();
+ }
+
+ Preconditions.checkArgument(principal != null,
+ "Principal can not be null when keytab is provided");
+ Preconditions.checkArgument(keytab != null,
+ "Keytab can not be null when Principal is provided");
+
+ if(kerbAuthenticator == null) {
+ kerbAuthenticator = new KerberosAuthenticator();
+ }
+ kerbAuthenticator.authenticate(principal, keytab);
+
+ return kerbAuthenticator;
+ }
+
+ /**
+ * Returns the standard SaslGssCallbackHandler from the hadoop common module
+ *
+ * @return CallbackHandler
+ */
+ public static CallbackHandler getSaslGssCallbackHandler() {
+ return new SaslRpcServer.SaslGssCallbackHandler();
+ }
+
+ /**
+ * Resolves the principal using Hadoop common's SecurityUtil and splits
+ * the kerberos principal into three parts user name, host and kerberos realm
+ *
+ * @param principal
+ * @return String[] of username, hostname and kerberos realm
+ * @throws IOException
+ */
+ public static String[] splitKerberosName(String principal) throws IOException {
+ String resolvedPrinc = SecurityUtil.getServerPrincipal(principal, "");
+ return SaslRpcServer.splitKerberosName(resolvedPrinc);
+ }
+}
+
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java \
b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java new file \
mode 100644 index 0000000..dbe241d
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticator.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+/**
+ * FlumeAuthenticator extends on a PrivilegedExecutor providing capabilities to
+ * proxy as a different user
+ */
+public interface FlumeAuthenticator extends PrivilegedExecutor {
+ /**
+ * Returns the current instance if proxyUsername is null or
+ * returns the proxied Executor if proxyUserName is valid
+ * @param proxyUserName
+ * @return PrivilegedExecutor
+ */
+ public PrivilegedExecutor proxyAs(String proxyUserName);
+
+ /**
+ * Returns true, if the underlying Authenticator was obtained by
+ * successful kerberos authentication
+ * @return boolean
+ */
+ public boolean isAuthenticated();
+
+ /**
+ * For Authenticators backed by credentials, this method refreshes the
+ * credentials periodically
+ */
+ public void startCredentialRefresher();
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java \
b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java new \
file mode 100644 index 0000000..3244046
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+
+import com.google.common.base.Preconditions;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+
+/**
+ * A kerberos authenticator, which authenticates using the supplied principal
+ * and keytab and executes with authenticated privileges
+ */
+class KerberosAuthenticator implements FlumeAuthenticator {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(KerberosAuthenticator.class);
+
+ private volatile UserGroupInformation ugi;
+ private volatile PrivilegedExecutor privilegedExecutor;
+ private Map<String, PrivilegedExecutor> proxyCache = new HashMap<String, \
PrivilegedExecutor>(); +
+
+ @Override
+ public <T> T execute(PrivilegedAction<T> action) {
+ return privilegedExecutor.execute(action);
+ }
+
+ @Override
+ public <T> T execute(PrivilegedExceptionAction<T> action) throws Exception {
+ return privilegedExecutor.execute(action);
+ }
+
+ @Override
+ public synchronized PrivilegedExecutor proxyAs(String proxyUserName) {
+ if(proxyUserName == null || proxyUserName.isEmpty()) {
+ return this;
+ }
+ if(proxyCache.get(proxyUserName) == null) {
+ UserGroupInformation proxyUgi;
+ proxyUgi = UserGroupInformation.createProxyUser(proxyUserName, ugi);
+ printUGI(proxyUgi);
+ proxyCache.put(proxyUserName, new UGIExecutor(proxyUgi));
+ }
+ return proxyCache.get(proxyUserName);
+ }
+
+ @Override
+ public boolean isAuthenticated() {
+ return true;
+ }
+
+ /**
+ * When valid principal and keytab are provided and if authentication has
+ * not yet been done for this object, this method authenticates the
+ * credentials and populates the ugi. In case of null or invalid credentials
+ * IllegalArgumentException is thrown. In case of failure to authenticate,
+ * SecurityException is thrown. If authentication has already happened on
+ * this KerberosAuthenticator object, then this method checks to see if the \
current + * credentials passed are same as the validated credentials. If not, it \
throws + * an exception as this authenticator can represent only one Principal.
+ *
+ * @param principal
+ * @param keytab
+ */
+ public synchronized void authenticate(String principal, String keytab) {
+ // sanity checking
+
+ Preconditions.checkArgument(principal != null && !principal.isEmpty(),
+ "Invalid Kerberos principal: " + String.valueOf(principal));
+ Preconditions.checkArgument(keytab != null && !keytab.isEmpty(),
+ "Invalid Kerberos keytab: " + String.valueOf(keytab));
+ File keytabFile = new File(keytab);
+ Preconditions.checkArgument(keytabFile.isFile() && keytabFile.canRead(),
+ "Keytab is not a readable file: " + String.valueOf(keytab));
+
+
+ // resolve the requested principal
+ String resolvedPrincipal;
+ try {
+ // resolves _HOST pattern using standard Hadoop search/replace
+ // via DNS lookup when 2nd argument is empty
+ resolvedPrincipal = SecurityUtil.getServerPrincipal(principal, "");
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Host lookup error resolving kerberos \
principal (" + + principal + "). Exception follows.", e);
+ }
+ Preconditions.checkNotNull(resolvedPrincipal,
+ "Resolved Principal must not be null");
+
+
+ // be cruel and unusual when user tries to login as multiple principals
+ // this isn't really valid with a reconfigure but this should be rare
+ // enough to warrant a restart of the agent JVM
+ // TODO: find a way to interrogate the entire current config state,
+ // since we don't have to be unnecessarily protective if they switch all
+ // HDFS sinks to use a different principal all at once.
+
+ Preconditions.checkState(ugi == null || \
ugi.getUserName().equals(resolvedPrincipal), + "Cannot use multiple kerberos \
principals in the same agent. " + + " Must restart agent to use new principal or \
keytab. " + + "Previous = %s, New = %s", ugi, resolvedPrincipal);
+
+
+ // enable the kerberos mode of UGI, before doing anything else
+ if(!UserGroupInformation.isSecurityEnabled()) {
+ Configuration conf = new Configuration(false);
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ }
+
+ // We are interested in currently logged in user with kerberos creds
+ UserGroupInformation curUser = null;
+ try {
+ curUser = UserGroupInformation.getLoginUser();
+ if(curUser != null && !curUser.hasKerberosCredentials()) {
+ curUser = null;
+ }
+ } catch (IOException e) {
+ LOG.warn("User unexpectedly had no active login. Continuing with " +
+ "authentication", e);
+ }
+
+ /*
+ * if ugi is not null,
+ * if ugi matches currently logged in kerberos user, we are good
+ * else we are logged out, so relogin our ugi
+ * else if ugi is null, login and populate state
+ */
+ try {
+ if (ugi != null) {
+ if (curUser != null && curUser.getUserName().equals(ugi.getUserName())) {
+ LOG.debug("Using existing principal login: {}", ugi);
+ } else {
+ LOG.info("Attempting kerberos Re-login as principal ({}) "
+ , new Object[] { ugi.getUserName() } );
+ ugi.reloginFromKeytab();
+ }
+ } else {
+ LOG.info("Attempting kerberos login as principal ({}) from keytab " +
+ "file ({})", new Object[] { resolvedPrincipal, keytab } );
+ UserGroupInformation.loginUserFromKeytab(resolvedPrincipal, keytab);
+ this.ugi = UserGroupInformation.getLoginUser();
+ this.privilegedExecutor = new UGIExecutor(this.ugi);
+ }
+ } catch (IOException e) {
+ throw new SecurityException("Authentication error while attempting to "
+ + "login as kerberos principal (" + resolvedPrincipal + ") using "
+ + "keytab (" + keytab + "). Exception follows.", e);
+ }
+
+ printUGI(this.ugi);
+ }
+
+ private void printUGI(UserGroupInformation ugi) {
+ if (ugi != null) {
+ // dump login information
+ AuthenticationMethod authMethod = ugi.getAuthenticationMethod();
+ LOG.info("\n{} \nUser: {} \nAuth method: {} \nKeytab: {} \n",
+ new Object[]{ authMethod.equals(AuthenticationMethod.PROXY) ?
+ "Proxy as: " : "Logged as: ", ugi.getUserName(), authMethod,
+ ugi.isFromKeytab() }
+ );
+ }
+ }
+
+ /**
+ * startCredentialRefresher should be used only for long running
+ * methods like Thrift source. For all privileged methods that use a UGI, the
+ * credentials are checked automatically and refreshed before the
+ * privileged method is executed in the UGIExecutor
+ */
+ @Override
+ public void startCredentialRefresher() {
+ int CHECK_TGT_INTERVAL = 120; // seconds
+ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ scheduler.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ugi.checkTGTAndReloginFromKeytab();
+ } catch (IOException e) {
+ LOG.warn("Error occured during checkTGTAndReloginFromKeytab() for user " +
+ ugi.getUserName(), e);
+ }
+ }
+ }, CHECK_TGT_INTERVAL, CHECK_TGT_INTERVAL, TimeUnit.SECONDS);
+ }
+
+ @VisibleForTesting
+ String getUserName() {
+ if(ugi != null) {
+ return ugi.getUserName();
+ } else {
+ return null;
+ }
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java \
b/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java new file \
mode 100644 index 0000000..0aa321a
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/PrivilegedExecutor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+
+
+/**
+ * PrivilegedExecutor provides the ability to execute a PrivilegedAction
+ * or a PrivilegedExceptionAction. Implementors of this class, can chose to execute
+ * in normal mode or secure authenticated mode
+ */
+public interface PrivilegedExecutor {
+ /**
+ * This method is used to execute a privileged action, the implementor can
+ * chose to execute the action using the appropriate privileges
+ *
+ * @param action A PrivilegedExceptionAction to perform as the desired user
+ * @param <T> The return type of the action
+ * @return T the T value returned by action.run()
+ * @throws Exception
+ */
+ public <T> T execute(PrivilegedExceptionAction<T> action) throws Exception;
+
+ /**
+ * This method is used to execute a privileged action, the implementor can
+ * chose to execute the action using the appropriate privileges
+ *
+ * @param action A PrivilegedAction to perform as the desired user
+ * @param <T> The return type of the action
+ * @return T the T value returned by action.run()
+ */
+ public <T> T execute(PrivilegedAction<T> action);
+}
+
+
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java \
b/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java new file \
mode 100644 index 0000000..5760481
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/SecurityException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+/**
+ * SecurityException thrown in the Flume security module
+ */
+public class SecurityException extends RuntimeException {
+ public SecurityException(String message) {
+ super(message);
+ }
+
+ public SecurityException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SecurityException(Throwable cause) {
+ super(cause);
+ }
+}
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java \
b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java new file \
mode 100644 index 0000000..f7b5bea
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A no-op authenticator, which does not authenticate and executes
+ * without any authenticated privileges
+ */
+class SimpleAuthenticator implements FlumeAuthenticator {
+ private SimpleAuthenticator() {}
+
+ private static class SimpleAuthenticatorHolder {
+ public static SimpleAuthenticator authenticator = new SimpleAuthenticator();
+ }
+
+ public static SimpleAuthenticator getSimpleAuthenticator() {
+ return SimpleAuthenticatorHolder.authenticator;
+ }
+
+ private Map<String, PrivilegedExecutor> proxyCache =
+ new HashMap<String, PrivilegedExecutor>();
+
+
+ @Override
+ public <T> T execute(PrivilegedExceptionAction<T> action)
+ throws Exception {
+ return action.run();
+ }
+
+ @Override
+ public <T> T execute(PrivilegedAction<T> action) {
+ return action.run();
+ }
+
+ @Override
+ public synchronized PrivilegedExecutor proxyAs(String proxyUserName) {
+ if(proxyUserName == null || proxyUserName.isEmpty()) {
+ return this;
+ }
+ if(proxyCache.get(proxyUserName) == null) {
+ UserGroupInformation proxyUgi;
+ try {
+ proxyUgi = UserGroupInformation.createProxyUser(proxyUserName,
+ UserGroupInformation.getCurrentUser());
+ } catch (IOException e) {
+ throw new SecurityException("Unable to create proxy User", e);
+ }
+ proxyCache.put(proxyUserName, new UGIExecutor(proxyUgi));
+ }
+ return proxyCache.get(proxyUserName);
+ }
+
+ @Override
+ public boolean isAuthenticated() {
+ return false;
+ }
+
+ @Override
+ public void startCredentialRefresher() {
+ // no-op
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java \
b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java new file mode \
100644 index 0000000..a5aeef2
--- /dev/null
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+
+class UGIExecutor implements PrivilegedExecutor {
+ private UserGroupInformation ugi;
+
+ UGIExecutor(UserGroupInformation ugi) {
+ this.ugi = ugi;
+ }
+
+ @Override
+ public <T> T execute(PrivilegedAction<T> action) {
+ ensureValidAuth();
+ return ugi.doAs(action);
+ }
+
+ @Override
+ public <T> T execute(PrivilegedExceptionAction<T> action) throws Exception {
+ ensureValidAuth();
+ try {
+ return ugi.doAs(action);
+ } catch (IOException ex) {
+ throw new SecurityException("Privileged action failed", ex);
+ } catch (InterruptedException ex) {
+ Thread.interrupted();
+ throw new SecurityException(ex);
+ }
+ }
+
+ private void ensureValidAuth() {
+ reloginUGI(ugi);
+ if(ugi.getAuthenticationMethod().equals(AuthenticationMethod.PROXY)) {
+ reloginUGI(ugi.getRealUser());
+ }
+ }
+
+ private void reloginUGI(UserGroupInformation ugi) {
+ try {
+ if(ugi.hasKerberosCredentials()) {
+ ugi.checkTGTAndReloginFromKeytab();
+ }
+ } catch (IOException e) {
+ throw new SecurityException("Error trying to relogin from keytab for user "
+ + ugi.getUserName(), e);
+ }
+ }
+
+ @VisibleForTesting
+ String getUserName() {
+ if(ugi != null) {
+ return ugi.getUserName();
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java \
b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java new \
file mode 100644 index 0000000..45ba2b0
--- /dev/null
+++ b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.auth;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestFlumeAuthenticator {
+
+ private static MiniKdc kdc;
+ private static File workDir;
+ private static File flumeKeytab;
+ private static String flumePrincipal = "flume/localhost";
+ private static File aliceKeytab;
+ private static String alicePrincipal = "alice";
+ private static Properties conf;
+
+ @BeforeClass
+ public static void startMiniKdc() throws Exception {
+ workDir = new File(System.getProperty("test.dir", "target"),
+ TestFlumeAuthenticator.class.getSimpleName());
+ flumeKeytab = new File(workDir, "flume.keytab");
+ aliceKeytab = new File(workDir, "alice.keytab");
+ conf = MiniKdc.createConf();
+
+ kdc = new MiniKdc(conf, workDir);
+ kdc.start();
+
+ kdc.createPrincipal(flumeKeytab, flumePrincipal);
+ flumePrincipal = flumePrincipal + "@" + kdc.getRealm();
+
+ kdc.createPrincipal(aliceKeytab, alicePrincipal);
+ alicePrincipal = alicePrincipal + "@" + kdc.getRealm();
+ }
+
+ @AfterClass
+ public static void stopMiniKdc() {
+ if (kdc != null) {
+ kdc.stop();
+ }
+ }
+
+ @Test
+ public void testNullLogin() throws IOException {
+ String principal = null;
+ String keytab = null;
+
+ FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator(
+ principal, keytab);
+ assertFalse(authenticator.isAuthenticated());
+ }
+
+ @Test
+ public void testFlumeLogin() throws IOException {
+ String principal = flumePrincipal;
+ String keytab = flumeKeytab.getAbsolutePath();
+ String expResult = principal;
+
+ FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator(
+ principal, keytab);
+ assertTrue(authenticator.isAuthenticated());
+
+ String result = ((KerberosAuthenticator)authenticator).getUserName();
+ assertEquals("Initial login failed", expResult, result);
+
+ authenticator = FlumeAuthenticationUtil.getAuthenticator(
+ principal, keytab);
+ result = ((KerberosAuthenticator)authenticator).getUserName();
+ assertEquals("Re-login failed", expResult, result);
+
+ principal = alicePrincipal;
+ keytab = aliceKeytab.getAbsolutePath();
+ try {
+ authenticator = FlumeAuthenticationUtil.getAuthenticator(
+ principal, keytab);
+ result = ((KerberosAuthenticator)authenticator).getUserName();
+ fail("Login should have failed with a new principal: " + result);
+ } catch (Exception ex) {
+ assertTrue("Login with a new principal failed, but for an unexpected "
+ + "reason: " + ex.getMessage(),
+ ex.getMessage().contains("Cannot use multiple kerberos principals"));
+ }
+ }
+
+ @Test
+ public void testProxyAs() throws IOException {
+ String username = "alice";
+
+ String expResult = username;
+ FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator(
+ null, null);
+ String result = ((UGIExecutor)(authenticator.proxyAs(username))).getUserName();
+ assertEquals("Proxy as didn't generate the expected username", expResult, \
result); +
+ authenticator = FlumeAuthenticationUtil.getAuthenticator(
+ flumePrincipal, flumeKeytab.getAbsolutePath());
+
+ String login = ((KerberosAuthenticator)authenticator).getUserName();
+ assertEquals("Login succeeded, but the principal doesn't match",
+ flumePrincipal, login);
+
+ result = ((UGIExecutor)(authenticator.proxyAs(username))).getUserName();
+ assertEquals("Proxy as didn't generate the expected username", expResult, \
result); + }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-core/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index 8992414..fe34c03 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -264,6 +264,11 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-auth</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java \
b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java index \
baa60d0..32021d3 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
@@ -23,6 +23,8 @@ import java.util.Properties;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.api.SecureRpcClientFactory;
+
/**
* <p>
* A {@link org.apache.flume.Sink} implementation that can send events to an RPC \
server (such as @@ -102,12 +104,18 @@ import org.apache.flume.api.RpcClientFactory;
public class ThriftSink extends AbstractRpcSink {
@Override
protected RpcClient initializeRpcClient(Properties props) {
- props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
- RpcClientFactory.ClientType.THRIFT.name());
// Only one thread is enough, since only one sink thread processes
// transactions at any given time. Each sink owns its own Rpc client.
props.setProperty(RpcClientConfigurationConstants
.CONFIG_CONNECTION_POOL_SIZE, String.valueOf(1));
- return RpcClientFactory.getInstance(props);
+ boolean enableKerberos = Boolean.parseBoolean(props.getProperty(
+ RpcClientConfigurationConstants.KERBEROS_KEY, "false"));
+ if(enableKerberos) {
+ return SecureRpcClientFactory.getThriftInstance(props);
+ } else {
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
+ RpcClientFactory.ClientType.THRIFT.name());
+ return RpcClientFactory.getInstance(props);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java \
b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java index \
06bb604..1d8bb33 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
@@ -26,6 +26,8 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.FlumeAuthenticator;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
@@ -45,12 +47,16 @@ import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TTransportFactory;
+import org.apache.thrift.transport.TSaslServerTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLServerSocket;
+import javax.security.sasl.Sasl;
import java.io.FileInputStream;
+import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -60,10 +66,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.security.PrivilegedAction;
public class ThriftSource extends AbstractSource implements Configurable,
EventDrivenSource {
@@ -97,6 +106,10 @@ public class ThriftSource extends AbstractSource implements \
Configurable, private static final String KEYMANAGER_TYPE = "keymanager-type";
private static final String EXCLUDE_PROTOCOLS = "exclude-protocols";
+ private static final String KERBEROS_KEY = "kerberos";
+ private static final String AGENT_PRINCIPAL = "agent-principal";
+ private static final String AGENT_KEYTAB = "agent-keytab";
+
private Integer port;
private String bindAddress;
private int maxThreads = 0;
@@ -110,6 +123,9 @@ public class ThriftSource extends AbstractSource implements \
Configurable, private String keyManagerType;
private final List<String> excludeProtocols = new LinkedList<String>();
private boolean enableSsl = false;
+ private boolean enableKerberos = false;
+ private String principal;
+ private FlumeAuthenticator flumeAuth;
@Override
public void configure(Context context) {
@@ -171,6 +187,18 @@ public class ThriftSource extends AbstractSource implements \
Configurable,
"Thrift source configured with invalid keystore: " + keystore, ex);
}
}
+
+ principal = context.getString(AGENT_PRINCIPAL);
+ String keytab = context.getString(AGENT_KEYTAB);
+ enableKerberos = context.getBoolean(KERBEROS_KEY, false);
+ this.flumeAuth = FlumeAuthenticationUtil.getAuthenticator(principal, keytab);
+ if(enableKerberos) {
+ if(!flumeAuth.isAuthenticated()) {
+ throw new FlumeException("Authentication failed in Kerberos mode for " +
+ "principal " + principal + " keytab " + keytab);
+ }
+ flumeAuth.startCredentialRefresher();
+ }
}
@Override
@@ -195,7 +223,15 @@ public class ThriftSource extends AbstractSource implements \
Configurable, servingExecutor.submit(new Runnable() {
@Override
public void run() {
- server.serve();
+ flumeAuth.execute(
+ new PrivilegedAction<Object>() {
+ @Override
+ public Object run() {
+ server.serve();
+ return null;
+ }
+ }
+ );
}
});
@@ -263,7 +299,7 @@ public class ThriftSource extends AbstractSource implements \
Configurable, }
private TServer getTThreadedSelectorServer() {
- if(enableSsl) {
+ if(enableSsl || enableKerberos) {
return null;
}
Class<?> serverClass;
@@ -277,6 +313,7 @@ public class ThriftSource extends AbstractSource implements \
Configurable,
TServerTransport serverTransport = new TNonblockingServerSocket(
new InetSocketAddress(bindAddress, port));
+
ExecutorService sourceService;
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
"Flume Thrift IPC Thread %d").build();
@@ -328,14 +365,35 @@ public class ThriftSource extends AbstractSource implements \
Configurable, args.protocolFactory(getProtocolFactory());
//populate the transportFactory
- args.inputTransportFactory(new TFastFramedTransport.Factory());
- args.outputTransportFactory(new TFastFramedTransport.Factory());
+ if(enableKerberos) {
+ args.transportFactory(getSASLTransportFactory());
+ } else {
+ args.transportFactory(new TFastFramedTransport.Factory());
+ }
// populate the Processor
args.processor(new ThriftSourceProtocol
.Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
}
+ private TTransportFactory getSASLTransportFactory() {
+ String[] names;
+ try {
+ names = FlumeAuthenticationUtil.splitKerberosName(principal);
+ } catch (IOException e) {
+ throw new FlumeException(
+ "Error while trying to resolve Principal name - " + principal, e);
+ }
+ Map<String, String> saslProperties = new HashMap<String, String>();
+ saslProperties.put(Sasl.QOP, "auth");
+ TSaslServerTransport.Factory saslTransportFactory =
+ new TSaslServerTransport.Factory();
+ saslTransportFactory.addServerDefinition(
+ "GSSAPI", names[0], names[1], saslProperties,
+ FlumeAuthenticationUtil.getSaslGssCallbackHandler());
+ return saslTransportFactory;
+ }
+
@Override
public void stop() {
if(server != null && server.isServing()) {
@@ -402,5 +460,4 @@ public class ThriftSource extends AbstractSource implements \
Configurable, return Status.OK;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index a083fe2..9f7c4f6 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -202,6 +202,10 @@
<groupId>org.apache.flume</groupId>
<artifactId>flume-tools</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-auth</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-dist/src/main/assembly/bin.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/src/main/assembly/bin.xml \
b/flume-ng-dist/src/main/assembly/bin.xml index 5aa7cc6..a61180d 100644
--- a/flume-ng-dist/src/main/assembly/bin.xml
+++ b/flume-ng-dist/src/main/assembly/bin.xml
@@ -68,6 +68,7 @@
<exclude>flume-ng-clients/**</exclude>
<exclude>flume-ng-embedded-agent/**</exclude>
<exclude>flume-tools/**</exclude>
+ <exclude>flume-ng-auth/**</exclude>
<exclude>**/target/**</exclude>
<exclude>**/.classpath</exclude>
<exclude>**/.project</exclude>
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-dist/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/src/main/assembly/src.xml \
b/flume-ng-dist/src/main/assembly/src.xml index b1e79a2..e5f4156 100644
--- a/flume-ng-dist/src/main/assembly/src.xml
+++ b/flume-ng-dist/src/main/assembly/src.xml
@@ -49,6 +49,7 @@
<include>org.apache.flume:flume-ng-clients</include>
<include>org.apache.flume:flume-ng-embedded-agent</include>
<include>org.apache.flume:flume-tools</include>
+ <include>org.apache.flume:flume-ng-auth</include>
</includes>
<sources>
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java \
b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index 33a2330..343e07b 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -145,6 +145,8 @@ public final class RpcClientConfigurationConstants {
public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type";
public static final String CONFIG_EXCLUDE_PROTOCOLS = "exclude-protocols";
+ public static final String KERBEROS_KEY = "kerberos";
+
/**
* Configuration constants for the NettyAvroRpcClient
* NioClientSocketChannelFactory
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java \
b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java index \
4f75a2b..5c4cc41 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
@@ -28,6 +28,7 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,7 +74,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
public static final String CONFIG_PROTOCOL = "protocol";
public static final String BINARY_PROTOCOL = "binary";
public static final String COMPACT_PROTOCOL = "compact";
-
+
private int batchSize;
private long requestTimeout;
private final Lock stateLock;
@@ -83,7 +84,6 @@ public class ThriftRpcClient extends AbstractRpcClient {
private ConnectionPoolManager connectionManager;
private final ExecutorService callTimeoutPool;
private final AtomicLong threadCounter;
- private int connectionPoolSize;
private final Random random = new Random();
private String protocol;
@@ -95,7 +95,6 @@ public class ThriftRpcClient extends AbstractRpcClient {
private static final String TRUSTMANAGER_TYPE = "trustmanager-type";
private final List<String> excludeProtocols = new LinkedList<String>();
-
public ThriftRpcClient() {
stateLock = new ReentrantLock(true);
connState = State.INIT;
@@ -319,7 +318,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
requestTimeout =
RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
}
- connectionPoolSize = Integer.parseInt(properties.getProperty(
+ int connectionPoolSize = Integer.parseInt(properties.getProperty(
RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE,
String.valueOf(RpcClientConfigurationConstants
.DEFAULT_CONNECTION_POOL_SIZE)));
@@ -352,6 +351,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
}
}
}
+
connectionManager = new ConnectionPoolManager(connectionPoolSize);
connState = State.READY;
} catch (Throwable ex) {
@@ -372,33 +372,41 @@ public class ThriftRpcClient extends AbstractRpcClient {
INIT, READY, DEAD
}
+ protected TTransport getTransport(TSocket tsocket) throws Exception {
+ return new TFastFramedTransport(tsocket);
+ }
+
/**
* Wrapper around a client and transport, so we can clean up when this
* client gets closed.
*/
private class ClientWrapper {
public final ThriftSourceProtocol.Client client;
- public final TFastFramedTransport transport;
+ public final TTransport transport;
private final int hashCode;
public ClientWrapper() throws Exception{
TSocket tsocket;
if(enableSsl) {
- // JDK6's factory doesn't appear to pass the protocol onto the Socket \
properly so we have
- // to do some magic to make sure that happens. Not an issue in JDK7
- // Lifted from thrift-0.9.1 to make the SSLContext
- SSLContext sslContext = createSSLContext(truststore, truststorePassword, \
trustManagerType, truststoreType); + // JDK6's factory doesn't appear to pass \
the protocol onto the Socket + // properly so we have to do some magic to make \
sure that happens. + // Not an issue in JDK7 Lifted from thrift-0.9.1 to make \
the SSLContext + SSLContext sslContext = createSSLContext(truststore, \
truststorePassword, + trustManagerType, truststoreType);
// Create the factory from it
SSLSocketFactory sslSockFactory = sslContext.getSocketFactory();
// Create the TSocket from that
- tsocket = createSSLSocket(sslSockFactory, hostname, port, 120000, \
excludeProtocols); + tsocket = createSSLSocket(
+ sslSockFactory, hostname, port, 120000, excludeProtocols);
} else {
tsocket = new TSocket(hostname, port);
}
- transport = new TFastFramedTransport(tsocket);
+
+ transport = getTransport(tsocket);
+
// The transport is already open for SSL as part of \
TSSLTransportFactory.getClientSocket if(!transport.isOpen()) {
transport.open();
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-dataset-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/pom.xml \
b/flume-ng-sinks/flume-dataset-sink/pom.xml index ad3f603..92f7021 100644
--- a/flume-ng-sinks/flume-dataset-sink/pom.xml
+++ b/flume-ng-sinks/flume-dataset-sink/pom.xml
@@ -150,13 +150,6 @@ limitations under the License.
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minikdc</artifactId>
- <version>${hadoop2.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java \
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index fd9f991..a9f42b8 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -17,6 +17,8 @@
*/
package org.apache.flume.sink.kite;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.sink.kite.parser.EntityParserFactory;
import org.apache.flume.sink.kite.parser.EntityParser;
import org.apache.flume.sink.kite.policy.FailurePolicy;
@@ -25,8 +27,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
+
import java.net.URI;
-import java.security.PrivilegedExceptionAction;
+import java.security.PrivilegedAction;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
@@ -40,7 +43,6 @@ import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
-import org.apache.hadoop.security.UserGroupInformation;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetIOException;
@@ -72,7 +74,7 @@ public class DatasetSink extends AbstractSink implements \
Configurable { private static final Logger LOG = \
LoggerFactory.getLogger(DatasetSink.class);
private Context context = null;
- private UserGroupInformation login = null;
+ private PrivilegedExecutor privilegedExecutor;
private String datasetName = null;
private URI datasetUri = null;
@@ -159,15 +161,12 @@ public class DatasetSink extends AbstractSink implements \
Configurable { public void configure(Context context) {
this.context = context;
- // initialize login credentials
- this.login = KerberosUtil.login(
- context.getString(AUTH_PRINCIPAL),
- context.getString(AUTH_KEYTAB));
- String effectiveUser
- = context.getString(AUTH_PROXY_USER);
- if (effectiveUser != null) {
- this.login = KerberosUtil.proxyAs(effectiveUser, login);
- }
+ String principal = context.getString(AUTH_PRINCIPAL);
+ String keytab = context.getString(AUTH_KEYTAB);
+ String effectiveUser = context.getString(AUTH_PROXY_USER);
+
+ this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(
+ principal, keytab).proxyAs(effectiveUser);
// Get the dataset URI and name from the context
String datasetURI = context.getString(CONFIG_KITE_DATASET_URI);
@@ -395,13 +394,15 @@ public class DatasetSink extends AbstractSink implements \
Configurable { // reset the commited flag whenver a new writer is created
committedBatch = false;
try {
- View<GenericRecord> view = KerberosUtil.runPrivileged(login,
- new PrivilegedExceptionAction<Dataset<GenericRecord>>() {
- @Override
- public Dataset<GenericRecord> run() {
- return Datasets.load(datasetUri);
- }
- });
+ View<GenericRecord> view;
+
+ view = privilegedExecutor.execute(
+ new PrivilegedAction<Dataset<GenericRecord>>() {
+ @Override
+ public Dataset<GenericRecord> run() {
+ return Datasets.load(datasetUri);
+ }
+ });
DatasetDescriptor descriptor = view.getDataset().getDescriptor();
Format format = descriptor.getFormat();
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java \
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java
deleted file mode 100644
index c0dbffb..0000000
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/KerberosUtil.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import java.io.File;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.kitesdk.data.DatasetException;
-import org.kitesdk.data.DatasetIOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KerberosUtil {
-
- private static final Logger LOG = LoggerFactory.getLogger(KerberosUtil.class);
-
- public static class SecurityException extends RuntimeException {
- private SecurityException(String message) {
- super(message);
- }
-
- private SecurityException(String message, Throwable cause) {
- super(message, cause);
- }
-
- private SecurityException(Throwable cause) {
- super(cause);
- }
- }
-
- public static UserGroupInformation proxyAs(String username,
- UserGroupInformation login) {
- Preconditions.checkArgument(username != null && !username.isEmpty(),
- "Invalid username: " + String.valueOf(username));
- Preconditions.checkArgument(login != null,
- "Cannot proxy without an authenticated user");
-
- // hadoop impersonation works with or without kerberos security
- return UserGroupInformation.createProxyUser(username, login);
- }
-
- /**
- * Static synchronized method for static Kerberos login. <br/>
- * Static synchronized due to a thundering herd problem when multiple Sinks
- * attempt to log in using the same principal at the same time with the
- * intention of impersonating different users (or even the same user).
- * If this is not controlled, MIT Kerberos v5 believes it is seeing a replay
- * attach and it returns:
- * <blockquote>Request is a replay (34) - PROCESS_TGS</blockquote>
- * In addition, since the underlying Hadoop APIs we are using for
- * impersonation are static, we define this method as static as well.
- *
- * @param principal
- * Fully-qualified principal to use for authentication.
- * @param keytab
- * Location of keytab file containing credentials for principal.
- * @return Logged-in user
- * @throws SecurityException
- * if login fails.
- * @throws IllegalArgumentException
- * if the principal or the keytab is not usable
- */
- public static synchronized UserGroupInformation login(String principal,
- String keytab) {
- // If the principal or keytab isn't set, get the current (Linux) user
- if (principal == null || keytab == null) {
- try {
- return UserGroupInformation.getCurrentUser();
- } catch (IOException ex) {
- LOG.error("Can't get current user: {}", ex.getMessage());
- throw new RuntimeException(ex);
- }
- }
-
- // resolve the requested principal, if it is present
- String finalPrincipal = null;
- if (principal != null && !principal.isEmpty()) {
- try {
- // resolves _HOST pattern using standard Hadoop search/replace
- // via DNS lookup when 2nd argument is empty
- finalPrincipal = SecurityUtil.getServerPrincipal(principal, "");
- } catch (IOException e) {
- throw new SecurityException(
- "Failed to resolve Kerberos principal", e);
- }
- }
-
- // check if there is a user already logged in
- UserGroupInformation currentUser = null;
- try {
- currentUser = UserGroupInformation.getLoginUser();
- } catch (IOException e) {
- // not a big deal but this shouldn't typically happen because it will
- // generally fall back to the UNIX user
- LOG.debug("Unable to get login user before Kerberos auth attempt", e);
- }
-
- // if the current user is valid (matches the given principal and has a TGT)
- // then use it
- if (currentUser != null && currentUser.hasKerberosCredentials()) {
- if (finalPrincipal == null ||
- finalPrincipal.equals(currentUser.getUserName())) {
- LOG.debug("Using existing login for {}: {}",
- finalPrincipal, currentUser);
- return currentUser;
- } else {
- // be cruel and unusual when user tries to login as multiple principals
- // this isn't really valid with a reconfigure but this should be rare
- // enough to warrant a restart of the agent JVM
- // TODO: find a way to interrogate the entire current config state,
- // since we don't have to be unnecessarily protective if they switch all
- // HDFS sinks to use a different principal all at once.
- throw new SecurityException(
- "Cannot use multiple Kerberos principals: " + finalPrincipal +
- " would replace " + currentUser.getUserName());
- }
- }
-
- // prepare for a new login
- Preconditions.checkArgument(principal != null && !principal.isEmpty(),
- "Invalid Kerberos principal: " + String.valueOf(principal));
- Preconditions.checkNotNull(finalPrincipal,
- "Resolved principal must not be null");
- Preconditions.checkArgument(keytab != null && !keytab.isEmpty(),
- "Invalid Kerberos keytab: " + String.valueOf(keytab));
- File keytabFile = new File(keytab);
- Preconditions.checkArgument(keytabFile.isFile() && keytabFile.canRead(),
- "Keytab is not a readable file: " + String.valueOf(keytab));
-
- try {
- // attempt static kerberos login
- LOG.debug("Logging in as {} with {}", finalPrincipal, keytab);
- UserGroupInformation.loginUserFromKeytab(principal, keytab);
- return UserGroupInformation.getLoginUser();
- } catch (IOException e) {
- throw new SecurityException("Kerberos login failed", e);
- }
- }
-
- /**
- * Allow methods to act with the privileges of a login.
- *
- * If the login is null, the current privileges will be used.
- *
- * @param <T> The return type of the action
- * @param login UserGroupInformation credentials to use for action
- * @param action A PrivilegedExceptionAction to perform as another user
- * @return the T value returned by action.run()
- */
- public static <T> T runPrivileged(UserGroupInformation login,
- PrivilegedExceptionAction<T> action) {
- try {
- if (login == null) {
- return action.run();
- } else {
- return login.doAs(action);
- }
- } catch (IOException ex) {
- throw new DatasetIOException("Privileged action failed", ex);
- } catch (InterruptedException ex) {
- Thread.interrupted();
- throw new DatasetException(ex);
- } catch (Exception ex) {
- throw Throwables.propagate(ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java \
b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java
deleted file mode 100644
index f53ef73..0000000
--- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestKerberosUtil.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.sink.kite;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.minikdc.MiniKdc;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class TestKerberosUtil {
-
- private static MiniKdc kdc;
- private static File workDir;
- private static File flumeKeytab;
- private static String flumePrincipal = "flume/localhost";
- private static File aliceKeytab;
- private static String alicePrincipal = "alice";
- private static Properties conf;
-
- @BeforeClass
- public static void startMiniKdc() throws Exception {
- URL resource = Thread.currentThread()
- .getContextClassLoader().getResource("enable-kerberos.xml");
- Configuration.addDefaultResource("enable-kerberos.xml");
-
- workDir = new File(System.getProperty("test.dir", "target"),
- TestKerberosUtil.class.getSimpleName());
- flumeKeytab = new File(workDir, "flume.keytab");
- aliceKeytab = new File(workDir, "alice.keytab");
- conf = MiniKdc.createConf();
-
- kdc = new MiniKdc(conf, workDir);
- kdc.start();
-
- kdc.createPrincipal(flumeKeytab, flumePrincipal);
- flumePrincipal = flumePrincipal + "@" + kdc.getRealm();
-
- kdc.createPrincipal(aliceKeytab, alicePrincipal);
- alicePrincipal = alicePrincipal + "@" + kdc.getRealm();
- }
-
- @AfterClass
- public static void stopMiniKdc() {
- if (kdc != null) {
- kdc.stop();
- }
- }
-
- @Test
- public void testNullLogin() throws IOException {
- String principal = null;
- String keytab = null;
- UserGroupInformation expResult = UserGroupInformation.getCurrentUser();
- UserGroupInformation result = KerberosUtil.login(principal, keytab);
- assertEquals(expResult, result);
- }
-
- @Test
- public void testFlumeLogin() throws IOException {
- String principal = flumePrincipal;
- String keytab = flumeKeytab.getAbsolutePath();
- String expResult = principal;
-
- String result = KerberosUtil.login(principal, keytab).getUserName();
- assertEquals("Initial login failed", expResult, result);
-
- result = KerberosUtil.login(principal, keytab).getUserName();
- assertEquals("Re-login failed", expResult, result);
-
- principal = alicePrincipal;
- keytab = aliceKeytab.getAbsolutePath();
- try {
- result = KerberosUtil.login(principal, keytab).getUserName();
- fail("Login should have failed with a new principal: " + result);
- } catch (KerberosUtil.SecurityException ex) {
- assertTrue("Login with a new principal failed, but for an unexpected "
- + "reason: " + ex.getMessage(),
- ex.getMessage().contains("Cannot use multiple Kerberos principals: "));
- }
- }
-
- @Test
- public void testProxyAs() throws IOException {
- String username = "alice";
-
- UserGroupInformation login = UserGroupInformation.getCurrentUser();
- String expResult = username;
- String result = KerberosUtil.proxyAs(username, login).getUserName();
- assertEquals("Proxy as didn't generate the expected username", expResult, \
result);
-
- login = KerberosUtil.login(flumePrincipal, flumeKeytab.getAbsolutePath());
- assertEquals("Login succeeded, but the principal doesn't match",
- flumePrincipal, login.getUserName());
-
- result = KerberosUtil.proxyAs(username, login).getUserName();
- assertEquals("Proxy as didn't generate the expected username", expResult, \
result);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flume/blob/1b0f051b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java \
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 62f4eee..6b97de6 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -38,6 +38,7 @@ import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.SystemClock;
+import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
import org.apache.hadoop.conf.Configuration;
@@ -45,7 +46,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +75,7 @@ class BucketWriter {
private final CompressionCodec codeC;
private final CompressionType compType;
private final ScheduledExecutorService timedRollerPool;
- private final UserGroupInformation user;
+ private final PrivilegedExecutor proxyUser;
private final AtomicLong fileExtensionCounter;
@@ -120,7 +120,7 @@ class BucketWriter {
Context context, String filePath, String fileName, String inUsePrefix,
String inUseSuffix, String fileSuffix, CompressionCodec codeC,
CompressionType compType, HDFSWriter writer,
- ScheduledExecutorService timedRollerPool, UserGroupInformation user,
+ ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser,
SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback,
String onCloseCallbackPath, long callTimeout,
ExecutorService callTimeoutPool, long retryInterval,
@@ -138,7 +138,7 @@ class BucketWriter {
this.compType = compType;
this.writer = writer;
this.timedRollerPool = timedRollerPool;
- this.user = user;
+ this.proxyUser = proxyUser;
this.sinkCounter = sinkCounter;
this.idleTimeout = idleTimeout;
this.onCloseCallback = onCloseCallback;
@@ -165,33 +165,6 @@ class BucketWriter {
this.writer = dataWriter;
}
- /**
- * Allow methods to act as another user (typically used for HDFS Kerberos)
- * @param <T>
- * @param action
- * @return
- * @throws IOException
- * @throws InterruptedException
- */
- private <T> T runPrivileged(final PrivilegedExceptionAction<T> action)
- throws IOException, InterruptedException {
-
- if (user != null) {
- return user.doAs(action);
- } else {
- try {
- return action.run();
- } catch (IOException ex) {
- throw ex;
- } catch (InterruptedException ex) {
- throw ex;
- } catch (RuntimeException ex) {
- throw ex;
- } catch (Exception ex) {
- throw new RuntimeException("Unexpected exception.", ex);
- }
- }
- }
/**
* Clear the class counters
@@ -700,7 +673,7 @@ class BucketWriter {
Future<T> future = callTimeoutPool.submit(new Callable<T>() {
@Override
public T call() throws Exception {
- return runPrivileged(new PrivilegedExceptionAction<T>() {
+ return proxyUser.execute(new PrivilegedExceptionAction<T>() {
@Override
public T run() throws Exception {
return callRunner.call();
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic