[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: [flume] 01/01: FLUME-3335 - Support configuration via HTTP(S)
From: rgoers () apache ! org
Date: 2021-10-27 6:54:37
Message-ID: 20211027065436.E7DD781FFA () gitbox ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch FLUME-3335
in repository https://gitbox.apache.org/repos/asf/flume.git
commit 2b4d546e41f1f8256633dfa3f62810aa2ebffdf3
Author: Ralph Goers <rgoers@apache.org>
AuthorDate: Tue Oct 26 23:54:12 2021 -0700
FLUME-3335 - Support configuration via HTTP(S)
---
flume-ng-doc/sphinx/FlumeUserGuide.rst | 99 ++++++++-
flume-ng-node/pom.xml | 24 ++-
.../flume/node/AbstractConfigurationProvider.java | 3 +
.../java/org/apache/flume/node/Application.java | 237 ++++++++++++++++-----
.../flume/node/ClasspathConfigurationSource.java | 71 ++++++
.../node/ClasspathConfigurationSourceFactory.java | 41 ++++
.../org/apache/flume/node/ConfigurationSource.java | 56 +++++
.../flume/node/ConfigurationSourceFactory.java | 50 +++++
.../flume/node/EnvVarResolverProperties.java | 5 +
.../apache/flume/node/FileConfigurationSource.java | 110 ++++++++++
.../flume/node/FileConfigurationSourceFactory.java | 42 ++++
.../apache/flume/node/HttpConfigurationSource.java | 150 +++++++++++++
.../flume/node/HttpConfigurationSourceFactory.java | 42 ++++
.../java/org/apache/flume/node/MapResolver.java | 70 ++++++
.../flume/node/MaterializedConfiguration.java | 12 +-
...PollingPropertiesFileConfigurationProvider.java | 143 +------------
.../node/PropertiesFileConfigurationProvider.java | 58 +----
...Provider.java => UriConfigurationProvider.java} | 237 ++++++++++++++++-----
.../flume/node/net/AuthorizationProvider.java | 27 +++
.../flume/node/net/BasicAuthorizationProvider.java | 45 ++++
.../apache/flume/node/net/LaxHostnameVerifier.java | 38 ++++
.../flume/node/net/UrlConnectionFactory.java | 104 +++++++++
...rg.apache.flume.node.ConfigurationSourceFactory | 17 ++
.../node/TestClasspathConfigurationSource.java | 73 +++++++
.../java/org/apache/flume/node/TestEnvLookup.java | 58 +++++
.../flume/node/TestHttpConfigurationSource.java | 141 ++++++++++++
.../org/apache/flume/node/TestOverrideFile.java | 62 ++++++
...PollingPropertiesFileConfigurationProvider.java | 8 +-
.../TestPropertiesFileConfigurationProvider.java | 8 +-
.../org/apache/flume/node/TestRecursiveLookup.java | 58 +++++
.../test/resources/flume-conf-override.properties | 21 ++
.../resources/flume-conf-with-envLookup.properties | 35 +++
.../flume-conf-with-recursiveLookup.properties | 37 ++++
pom.xml | 33 ++-
34 files changed, 1898 insertions(+), 317 deletions(-)
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst \
b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 3fdfc67..315e5c4 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -118,12 +118,11 @@ Setup
Setting up an agent
-------------------
-Flume agent configuration is stored in a local configuration file. This is a
-text file that follows the Java properties file format.
-Configurations for one or more agents can be specified in the same
-configuration file. The configuration file includes properties of each source,
-sink and channel in an agent and how they are wired together to form data
-flows.
+Flume agent configuration is stored in one or more configuration files that
+follow the Java properties file format. Configurations for one or more agents
+can be specified in these configuration files. The configuration includes
+properties of each source, sink and channel in an agent and how they are wired
+together to form data flows.
Configuring individual components
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -225,25 +224,105 @@ The original Flume terminal will output the event in a log \
message.
Congratulations - you've successfully configured and deployed a Flume agent! \
Subsequent sections cover agent configuration in much more detail.
-Using environment variables in configuration files
+Configuration from URIs
+~~~~~~~~~~~~~~~~~~~~~~~
+As of version 1.10.0 Flume supports being configured using URIs instead of just from \
local files. Direct support +for HTTP(S), file, and classpath URIs is included. The \
HTTP support includes support for authentication using +basic authorization but other \
authorization mechanisms may be supported by specifying the fully qualified name +of \
the class that implements the AuthorizationProvider interface using the \
--auth-provider option. HTTP also +supports reloading of configuration files using \
polling if the target server properly responds to the If-Modified-Since +header.
+
+To specify credentials for HTTP authentication add::
+
+ --conf-user userid --conf-password password
+
+to the startup command.
+
+Multiple Configuration Files
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+As of version 1.10.0 Flume supports being configured from multiple configuration \
files instead of just one. +This more easily allows values to be overridden or added \
based on specific environments. Each file should +be configured using its own \
--conf-file or --conf-uri option. However, all files should either be provided +with \
--conf-file or with --conf-uri. If --conf-file and --conf-uri appear together as \
options all --conf-uri +configurations will be processed before any of the \
--conf-file configurations are merged. +
+For example, a configuration of::
+
+ $ bin/flume-ng agent --conf conf --conf-file example.conf --conf-uri \
http://localhost:80/flume.conf --conf-uri http://localhost:80/override.conf --name a1 \
-Dflume.root.logger=INFO,console +
+will cause flume.conf to be read first, override.conf to be merged with it and \
finally example.conf would be +merged last. If it is desirec to have example.conf be \
the base configuration it should be specified using the +--conf-uri option either \
as:: +
+ --conf-uri classpath://example.conf
+ or
+ --conf-uri file:///example.conf
+
+depending on how it should be accessed.
+
+Using environment variables, system properies, or other properties configuration \
files ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Flume has the ability to substitute environment variables in the configuration. For \
example::
a1.sources = r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = ${NC_PORT}
+ a1.sources.r1.port = ${env:NC_PORT}
a1.sources.r1.channels = c1
NB: it currently works for values only, not for keys. (Ie. only on the "right side" \
of the `=` mark of the config lines.)
-This can be enabled via Java system properties on agent invocation by setting \
`propertiesImplementation = org.apache.flume.node.EnvVarResolverProperties`. +As of \
version 1.10.0 Flume resolves configuration values using Apache Commons Text's \
StringSubstitutor +class using the default set of Lookups along with a lookup that \
uses the configuration files as a +source for replacement values.
For example::
- $ NC_PORT=44444 bin/flume-ng agent --conf conf --conf-file example.conf --name a1 \
-Dflume.root.logger=INFO,console \
-DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties + $ \
NC_PORT=44444 bin/flume-ng agent --conf conf --conf-file example.conf --name a1 \
-Dflume.root.logger=INFO,console
Note the above is just an example, environment variables can be configured in other \
ways, including being set in `conf/flume-env.sh`.
+As noted, system properties are also supported, so the configuration::
+
+ a1.sources = r1
+ a1.sources.r1.type = netcat
+ a1.sources.r1.bind = 0.0.0.0
+ a1.sources.r1.port = ${sys:NC_PORT}
+ a1.sources.r1.channels = c1
+
+could be used and the startup command could be::
+
+ $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 \
-Dflume.root.logger=INFO,console -DNC_PORT=44444 +
+Furthermore, because multiple configuration files are allowed the first file could \
contain:: +
+ a1.sources = r1
+ a1.sources.r1.type = netcat
+ a1.sources.r1.bind = 0.0.0.0
+ a1.sources.r1.port = ${NC_PORT}
+ a1.sources.r1.channels = c1
+
+and the override file could contain::
+
+ NC_PORT = 44444
+
+In this case the startup command could be::
+
+ $ bin/flume-ng agent --conf conf --conf-file example.conf --conf-file \
override.conf --name a1 -Dflume.root.logger=INFO,console +
+Note that the method for specifying environment variables as was done in prior \
versions will stil work +but has been deprecated in favor of using ${env:varName}.
+
+Using a command options file
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+Instead of specifying all the command options on the command line as of version \
1.10.0 command +options may be placed in either /etc/flume/flume.opts or flume.opts \
on the classpath. An example +might be::
+
+ conf-file = example.conf
+ conf-file = override.conf
+ name = a1
+
Logging raw data
~~~~~~~~~~~~~~~~
diff --git a/flume-ng-node/pom.xml b/flume-ng-node/pom.xml
index 65fcb50..1db864a 100644
--- a/flume-ng-node/pom.xml
+++ b/flume-ng-node/pom.xml
@@ -106,6 +106,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-text</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
@@ -134,15 +139,26 @@
</exclusions>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
- </dependency>
+ </dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
- </dependency>
+ </dependency>
+
+ <dependency>
+ <groupId>net.jcip</groupId>
+ <artifactId>jcip-annotations</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-annotations</artifactId>
+ <optional>true</optional>
+ </dependency>
</dependencies>
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
index caf4522..2f0b643 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
@@ -69,6 +69,9 @@ import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings("REC_CATCH_EXCEPTION")
public abstract class AbstractConfigurationProvider implements ConfigurationProvider \
{
private static final Logger LOGGER = \
LoggerFactory.getLogger(AbstractConfigurationProvider.class);
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java index \
406bb7d..1f4df59 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
@@ -19,19 +19,29 @@
package org.apache.flume.node;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
-import org.apache.flume.Constants;
import org.apache.flume.Context;
import org.apache.flume.SinkRunner;
import org.apache.flume.SourceRunner;
@@ -41,19 +51,16 @@ import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.lifecycle.LifecycleSupervisor;
import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
+import org.apache.flume.node.net.AuthorizationProvider;
+import org.apache.flume.node.net.BasicAuthorizationProvider;
import org.apache.flume.util.SSLUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
public class Application {
@@ -63,6 +70,8 @@ public class Application {
public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
+ private static final int DEFAULT_INTERVAL = 300;
+ private static final int DEFAULT_FILE_INTERVAL = 30;
private final List<LifecycleAware> components;
private final LifecycleSupervisor supervisor;
private MaterializedConfiguration materializedConfiguration;
@@ -109,8 +118,8 @@ public class Application {
public void stop() {
lifecycleLock.lock();
- stopAllComponents();
try {
+ stopAllComponents();
supervisor.stop();
if (monitorServer != null) {
monitorServer.stop();
@@ -231,7 +240,7 @@ public class Application {
//Not a known type, use FQCN
klass = (Class<? extends MonitorService>) Class.forName(monitorType);
}
- this.monitorServer = klass.newInstance();
+ this.monitorServer = klass.getConstructor().newInstance();
Context context = new Context();
for (String key : keys) {
if (key.startsWith(CONF_MONITOR_PREFIX)) {
@@ -242,14 +251,14 @@ public class Application {
monitorServer.configure(context);
monitorServer.start();
}
- } catch (Exception e) {
+ } catch (ReflectiveOperationException e) {
logger.warn("Error starting monitoring. "
+ "Monitoring might not be available.", e);
}
-
}
public static void main(String[] args) {
+ Properties initProps = loadConfigOpts();
try {
SSLUtil.initGlobalSSLParameters();
@@ -261,7 +270,40 @@ public class Application {
options.addOption(option);
option = new Option("f", "conf-file", true,
- "specify a config file (required if -z missing)");
+ "specify a config file (required if -c, -u, and -z are missing)");
+ option.setRequired(false);
+ options.addOption(option);
+
+ option = new Option("u", "conf-uri", true,
+ "specify a config uri (required if -c, -f and -z are missing)");
+ option.setRequired(false);
+ options.addOption(option);
+
+ option = new Option("a", "auth-provider", true,
+ "specify an authorization provider class");
+ option.setRequired(false);
+ options.addOption(option);
+
+ option = new Option("c", "conf-provider", true,
+ "specify a configuration provider class (required if -f, -u, and -z \
are missing)"); + option.setRequired(false);
+ options.addOption(option);
+
+ option = new Option("n", "conf-user", true, "user name to access configuration \
uri"); + option.setRequired(false);
+ options.addOption(option);
+
+ option = new Option("p", "conf-password", true, "password to access \
configuration uri"); + option.setRequired(false);
+ options.addOption(option);
+
+ option = new Option("i", "poll-interval", true,
+ "number of seconds between checks for a configuration change");
+ option.setRequired(false);
+ options.addOption(option);
+
+ option = new Option("b", "backup-directory", true,
+ "directory in which to store the backup configuration file");
option.setRequired(false);
options.addOption(option);
@@ -271,7 +313,7 @@ public class Application {
// Options for Zookeeper
option = new Option("z", "zkConnString", true,
- "specify the ZooKeeper connection to use (required if -f missing)");
+ "specify the ZooKeeper connection to use (required if -c, -f, and -u \
are missing)"); option.setRequired(false);
options.addOption(option);
@@ -283,8 +325,8 @@ public class Application {
option = new Option("h", "help", false, "display help text");
options.addOption(option);
- CommandLineParser parser = new GnuParser();
- CommandLine commandLine = parser.parse(options, args);
+ DefaultParser parser = new DefaultParser();
+ CommandLine commandLine = parser.parse(options, args, initProps);
if (commandLine.hasOption('h')) {
new HelpFormatter().printHelp("flume-ng agent", options, true);
@@ -299,8 +341,41 @@ public class Application {
isZkConfigured = true;
}
+ List<URI> confUri = null;
+ ConfigurationProvider provider = null;
+ int defaultInterval = DEFAULT_FILE_INTERVAL;
+ if (commandLine.hasOption('u') || commandLine.hasOption("conf-uri")) {
+ confUri = new ArrayList<>();
+ for (String uri : commandLine.getOptionValues("conf-uri")) {
+ if (uri.toLowerCase(Locale.ROOT).startsWith("http")) {
+ defaultInterval = DEFAULT_INTERVAL;
+ }
+ confUri.add(new URI(uri));
+ }
+ } else if (commandLine.hasOption("f") || commandLine.hasOption("conf-file")) {
+ confUri = new ArrayList<>();
+ for (String filePath : commandLine.getOptionValues("conf-file")) {
+ confUri.add(new File(filePath).toURI());
+ }
+ }
+
+ if (commandLine.hasOption("c") || commandLine.hasOption("conf-provider")) {
+ String className = commandLine.getOptionValue("conf-provider");
+ try {
+ Class<?> clazz = Application.class.getClassLoader().loadClass(className);
+ Constructor<?> constructor = clazz.getConstructor(String[].class);
+ provider = (ConfigurationProvider) constructor.newInstance((Object[]) \
args); + } catch (ReflectiveOperationException ex) {
+ logger.error("Error creating ConfigurationProvider {}", className, ex);
+ }
+ }
+
Application application;
- if (isZkConfigured) {
+ if (provider != null) {
+ List<LifecycleAware> components = Lists.newArrayList();
+ application = new Application(components);
+ application.handleConfigurationEvent(provider.getConfiguration());
+ } else if (isZkConfigured) {
// get options
String zkConnectionStr = commandLine.getOptionValue('z');
String baseZkPath = commandLine.getOptionValue('p');
@@ -321,44 +396,65 @@ public class Application {
application = new Application();
application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration());
}
- } else {
- File configurationFile = new File(commandLine.getOptionValue('f'));
-
- /*
- * The following is to ensure that by default the agent will fail on
- * startup if the file does not exist.
- */
- if (!configurationFile.exists()) {
- // If command line invocation, then need to fail fast
- if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
- null) {
- String path = configurationFile.getPath();
- try {
- path = configurationFile.getCanonicalPath();
- } catch (IOException ex) {
- logger.error("Failed to read canonical path for file: " + path,
- ex);
+ } else if (confUri != null) {
+ String confUser = commandLine.getOptionValue("conf-user");
+ String confPassword = commandLine.getOptionValue("conf-password");
+ String pollInterval = commandLine.getOptionValue("poll-interval");
+ String backupDirectory = commandLine.getOptionValue("backup-directory");
+ int interval = StringUtils.isNotEmpty(pollInterval) ? \
Integer.parseInt(pollInterval) : 0; + String verify = \
commandLine.getOptionValue("verify-host", "true"); + boolean verifyHost = \
Boolean.parseBoolean(verify); + AuthorizationProvider authorizationProvider = \
null; + String authProviderClass = \
commandLine.getOptionValue("auth-provider"); + if (authProviderClass != null) \
{ + try {
+ Class<?> clazz = Class.forName(authProviderClass);
+ Object obj = clazz.getDeclaredConstructor(String[].class)
+ .newInstance((Object[]) args);
+ if (obj instanceof AuthorizationProvider) {
+ authorizationProvider = (AuthorizationProvider) obj;
+ } else {
+ logger.error(
+ "The supplied authorization provider does not implement \
AuthorizationProvider"); + return;
}
- throw new ParseException(
- "The specified configuration file does not exist: " + path);
+ } catch (ReflectiveOperationException ex) {
+ logger.error("Unable to create authorization provider: {}", \
ex.getMessage()); + return;
+ }
+ }
+ if (authorizationProvider == null && StringUtils.isNotEmpty(confUser)
+ && StringUtils.isNotEmpty(confPassword)) {
+ authorizationProvider = new BasicAuthorizationProvider(confUser, \
confPassword); + }
+ EventBus eventBus = null;
+ if (reload) {
+ eventBus = new EventBus(agentName + "-event-bus");
+ if (interval == 0) {
+ interval = defaultInterval;
+ }
+ }
+ List<ConfigurationSource> configurationSources = new ArrayList<>();
+ for (URI uri : confUri) {
+ ConfigurationSource configurationSource =
+ ConfigurationSourceFactory.getConfigurationSource(uri, \
authorizationProvider, + verifyHost);
+ if (configurationSource != null) {
+ configurationSources.add(configurationSource);
}
}
List<LifecycleAware> components = Lists.newArrayList();
+ UriConfigurationProvider configurationProvider = new \
UriConfigurationProvider(agentName, + configurationSources, \
backupDirectory, eventBus, interval); + components.add(configurationProvider);
- if (reload) {
- EventBus eventBus = new EventBus(agentName + "-event-bus");
- PollingPropertiesFileConfigurationProvider configurationProvider =
- new PollingPropertiesFileConfigurationProvider(
- agentName, configurationFile, eventBus, 30);
- components.add(configurationProvider);
- application = new Application(components);
+ application = new Application(components);
+ if (eventBus != null) {
eventBus.register(application);
- } else {
- PropertiesFileConfigurationProvider configurationProvider =
- new PropertiesFileConfigurationProvider(agentName, configurationFile);
- application = new Application();
- application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
+ application.handleConfigurationEvent(configurationProvider.getConfiguration());
+ } else {
+ throw new ParseException("No configuiration was provided");
}
application.start();
@@ -370,8 +466,35 @@ public class Application {
}
});
- } catch (Exception e) {
+ } catch (ParseException | URISyntaxException | RuntimeException e) {
logger.error("A fatal error occurred while running. Exception follows.", e);
}
}
+ @SuppressWarnings("PMD")
+ private static Properties loadConfigOpts() {
+ Properties initProps = new Properties();
+ InputStream is = null;
+ try {
+ is = new FileInputStream("/etc/flume/flume.opts");
+ } catch (IOException ex) {
+ // Ignore the exception.
+ }
+ if (is == null) {
+ is = Application.class.getClassLoader().getResourceAsStream("flume.opts");
+ }
+ if (is != null) {
+ try {
+ initProps.load(is);
+ } catch (Exception ex) {
+ logger.warn("Unable to load options file due to: {}", ex.getMessage());
+ } finally {
+ try {
+ is.close();
+ } catch (IOException ex) {
+ // Ignore this error.
+ }
+ }
+ }
+ return initProps;
+ }
}
\ No newline at end of file
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSource.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSource.java \
new file mode 100644 index 0000000..1e456b5
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSource.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.InputStream;
+import java.net.URI;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.conf.ConfigurationException;
+
+public class ClasspathConfigurationSource implements ConfigurationSource {
+
+ private final String path;
+ private final URI uri;
+
+ public ClasspathConfigurationSource(URI uri) {
+ this.uri = uri;
+ if (StringUtils.isNotEmpty(uri.getPath())) {
+ // classpath:///filename && classpath:/filename
+ this.path = uri.getPath().substring(1);
+ } else if (StringUtils.isNotEmpty(uri.getAuthority())) {
+ // classpath://filename
+ this.path = uri.getAuthority();
+ } else if (StringUtils.isNotEmpty(uri.getSchemeSpecificPart())) {
+ // classpath:filename
+ this.path = uri.getSchemeSpecificPart();
+ } else {
+ throw new ConfigurationException("Invalid uri: " + uri);
+ }
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ return this.getClass().getClassLoader().getResourceAsStream(path);
+ }
+
+ @Override
+ public String getUri() {
+ return this.uri.toString();
+ }
+
+ @Override
+ public String getExtension() {
+ int length = uri.getPath().indexOf(".");
+ if (length <= 1) {
+ return PROPERTIES;
+ }
+ return uri.getPath().substring(length + 1);
+ }
+
+ @Override
+ public String toString() {
+ return "{ classpath: " + path + "}";
+ }
+
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSourceFactory.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSourceFactory.java
new file mode 100644
index 0000000..2921f35
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSourceFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.flume.node.net.AuthorizationProvider;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Creates a ConfigurationSource from a file on the classpath..
+ */
+public class ClasspathConfigurationSourceFactory implements \
ConfigurationSourceFactory { +
+ private static final List<String> SCHEMES = Lists.newArrayList("classpath");
+
+ public List<String> getSchemes() {
+ return SCHEMES;
+ }
+
+ public ConfigurationSource createConfigurationSource(URI uri,
+ AuthorizationProvider authorizationProvider, boolean verifyHost) {
+ return new ClasspathConfigurationSource(uri);
+ }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSource.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSource.java new file \
mode 100644 index 0000000..ccd849e
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSource.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.io.InputStream;
+
+/**
+ * Interface for retrieving configuration data.
+ */
+public interface ConfigurationSource {
+
+ static final String PROPERTIES = "properties";
+ static final String JSON = "json";
+ static final String YAML = "yaml";
+ static final String XML = "xml";
+
+ /**
+ * Returns the InputStream if it hasn't already been processed.
+ * @return The InputStream or null.
+ */
+ InputStream getInputStream();
+
+ /**
+ * Returns the URI string.
+ * @return The string URI.
+ */
+ String getUri();
+
+ /**
+ * Determine if the configuration data source has been modified since it was last \
checked. + * @return true if the data was modified.
+ */
+ default boolean isModified() {
+ return false;
+ }
+
+ /**
+ * Return the "file" extension for the specified uri.
+ * @return The file extension.
+ */
+ String getExtension();
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSourceFactory.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSourceFactory.java \
new file mode 100644 index 0000000..1d57949
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSourceFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.net.URI;
+import java.util.List;
+import java.util.ServiceLoader;
+
+import org.apache.flume.node.net.AuthorizationProvider;
+
+/**
+ * Creates ConfigurationSources.
+ */
+public interface ConfigurationSourceFactory {
+
+ static ConfigurationSource getConfigurationSource(URI uri,
+ AuthorizationProvider authorizationProvider, boolean verifyHost) {
+
+ String protocol = uri.getScheme();
+ final ServiceLoader<ConfigurationSourceFactory> serviceLoader =
+ ServiceLoader.load(ConfigurationSourceFactory.class,
+ ConfigurationSourceFactory.class.getClassLoader());
+ for (final ConfigurationSourceFactory configurationSourceFactory : \
serviceLoader) { + if \
(configurationSourceFactory.getSchemes().contains(protocol)) { + return \
configurationSourceFactory.createConfigurationSource(uri, authorizationProvider, + \
verifyHost); + }
+ }
+ return null;
+ }
+
+ List<String> getSchemes();
+
+ ConfigurationSource createConfigurationSource(URI uri,
+ AuthorizationProvider authorizationProvider, boolean verifyHost);
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/EnvVarResolverProperties.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/EnvVarResolverProperties.java \
index e0b0d22..ce64de8 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/EnvVarResolverProperties.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/EnvVarResolverProperties.java
@@ -28,8 +28,13 @@ import java.util.regex.Pattern;
* A class that extends the Java built-in Properties overriding
* {@link java.util.Properties#getProperty(String)} to allow ${ENV_VAR_NAME}-style \
environment
* variable inclusions
+ * @deprecated Use ${env:key} instead.
*/
+@Deprecated
public class EnvVarResolverProperties extends Properties {
+
+ private static final long serialVersionUID = -9134232469049352862L;
+
/**
* @param input The input string with ${ENV_VAR_NAME}-style environment variable \
names
* @return The output string with ${ENV_VAR_NAME} replaced with their environment \
variable values
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSource.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSource.java new \
file mode 100644 index 0000000..4db66a0
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSource.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.flume.CounterGroup;
+import org.apache.flume.conf.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileConfigurationSource implements ConfigurationSource {
+
+ private static final Logger LOGGER = \
LoggerFactory.getLogger(FileConfigurationSource.class); +
+ private final Path path;
+ private final URI uri;
+ private final CounterGroup counterGroup;
+ private byte[] data;
+ private long lastChange;
+
+ public FileConfigurationSource(URI uri) {
+ this.uri = uri;
+ this.path = Paths.get(uri);
+ counterGroup = new CounterGroup();
+ try {
+ this.lastChange = path.toFile().lastModified();
+ data = Files.readAllBytes(this.path);
+ } catch (IOException ioe) {
+ LOGGER.error("Unable to read {}: {}", path.toString(), ioe.getMessage());
+ throw new ConfigurationException("Unable to read file " + path.toString(), \
ioe); + }
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ return new ByteArrayInputStream(data);
+ }
+
+ @Override
+ public String getUri() {
+ return this.uri.toString();
+ }
+
+ @Override
+ public String getExtension() {
+ int length = uri.getPath().indexOf(".");
+ if (length <= 1) {
+ return PROPERTIES;
+ }
+ return uri.getPath().substring(length + 1);
+ }
+
+ @Override
+ public boolean isModified() {
+ LOGGER.debug("Checking file:{} for changes", path.toString());
+
+ counterGroup.incrementAndGet("file.checks");
+
+ long lastModified = path.toFile().lastModified();
+
+ if (lastModified > lastChange) {
+ LOGGER.info("Reloading configuration file:{}", path.toString());
+
+ counterGroup.incrementAndGet("file.loads");
+
+ lastChange = lastModified;
+
+ try {
+ data = Files.readAllBytes(path);
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("Failed to load configuration data. Exception follows.", e);
+ } catch (NoClassDefFoundError e) {
+ LOGGER.error("Failed to start agent because dependencies were not found in \
classpath." + + "Error follows.", e);
+ } catch (Throwable t) {
+ // caught because the caller does not handle or log Throwables
+ LOGGER.error("Unhandled error", t);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "{ file:" + path.toString() + "}";
+ }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSourceFactory.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSourceFactory.java
new file mode 100644
index 0000000..6325fe8
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSourceFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.flume.node.net.AuthorizationProvider;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Creates a FileConfigurationSource.
+ */
+public class FileConfigurationSourceFactory implements ConfigurationSourceFactory {
+
+ @SuppressWarnings(value = {"EI_EXPOSE_REP"})
+ private static final List<String> SCHEMES = Lists.newArrayList("file");
+
+ public List<String> getSchemes() {
+ return SCHEMES;
+ }
+
+ public ConfigurationSource createConfigurationSource(URI uri,
+ AuthorizationProvider authorizationProvider, boolean verifyHost) {
+ return new FileConfigurationSource(uri);
+ }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSource.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSource.java new \
file mode 100644 index 0000000..6beafe3
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSource.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.node.net.AuthorizationProvider;
+import org.apache.flume.node.net.UrlConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HttpConfigurationSource implements ConfigurationSource {
+
+ private static final Logger LOGGER = \
LoggerFactory.getLogger(HttpConfigurationSource.class); + private static final int \
NOT_MODIFIED = 304; + private static final int OK = 200;
+ private static final int BUF_SIZE = 1024;
+
+ private final URI uri;
+ private final CounterGroup counterGroup;
+ private final AuthorizationProvider authorizationProvider;
+ private final boolean verifyHost;
+ private long lastModified = 0;
+ private byte[] data = null;
+
+ public HttpConfigurationSource(URI uri, AuthorizationProvider \
authorizationProvider, + boolean verifyHost) {
+ this.authorizationProvider = authorizationProvider;
+ this.uri = uri;
+ this.verifyHost = verifyHost;
+ counterGroup = new CounterGroup();
+ readInputStream();
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ return new ByteArrayInputStream(data);
+ }
+
+ @Override
+ public String getUri() {
+ return this.uri.toString();
+ }
+
+ @Override
+ public String getExtension() {
+ int length = uri.getPath().indexOf(".");
+ if (length <= 1) {
+ return PROPERTIES;
+ }
+ return uri.getPath().substring(length + 1);
+ }
+
+ @Override
+ public boolean isModified() {
+ LOGGER.debug("Checking {} for changes", uri);
+
+ counterGroup.incrementAndGet("uri.checks");
+ try {
+ LOGGER.info("Reloading configuration from:{}", uri);
+ if (readInputStream()) {
+ counterGroup.incrementAndGet("uri.loads");
+ return true;
+ }
+ } catch (ConfigurationException ex) {
+ LOGGER.error("Unable to access configuration due to {}: ", ex.getMessage());
+ }
+ return false;
+ }
+
+ private boolean readInputStream() {
+ try {
+ HttpURLConnection connection = \
UrlConnectionFactory.createConnection(uri.toURL(), + authorizationProvider, \
lastModified, verifyHost); + connection.connect();
+
+ int code = connection.getResponseCode();
+ switch (code) {
+ case NOT_MODIFIED: {
+ LOGGER.debug("Configuration Not Modified");
+ return false;
+ }
+ case OK: {
+ try (InputStream is = connection.getInputStream()) {
+ lastModified = connection.getLastModified();
+ LOGGER.debug("Content was modified for {}", uri.toString());
+ data = IOUtils.toByteArray(is);
+ return true;
+ } catch (final IOException e) {
+ try (InputStream es = connection.getErrorStream()) {
+ LOGGER.info("Error accessing configuration at {}: {}", uri, \
readStream(es)); + } catch (final IOException ioe) {
+ LOGGER.error("Error accessing configuration at {}: {}", uri, \
e.getMessage()); + }
+ throw new ConfigurationException("Unable to access " + uri.toString(), \
e); + }
+ }
+ default: {
+ if (code < 0) {
+ LOGGER.info("Invalid response code returned");
+ } else {
+ LOGGER.info("Unexpected response code returned {}", code);
+ }
+ return false;
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Error accessing {}: {}", uri.toString(), e.getMessage());
+ throw new ConfigurationException("Unable to access " + uri.toString(), e);
+ }
+ }
+
+ private byte[] readStream(InputStream is) throws IOException {
+ ByteArrayOutputStream result = new ByteArrayOutputStream();
+ byte[] buffer = new byte[BUF_SIZE];
+ int length;
+ while ((length = is.read(buffer)) != -1) {
+ result.write(buffer, 0, length);
+ }
+ return result.toByteArray();
+ }
+
+ @Override
+ public String toString() {
+ return "{ uri:" + uri + "}";
+ }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSourceFactory.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSourceFactory.java
new file mode 100644
index 0000000..10725f0
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSourceFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.flume.node.net.AuthorizationProvider;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Creates an HttpConfigurationSource.
+ */
+public class HttpConfigurationSourceFactory implements ConfigurationSourceFactory {
+
+ @SuppressWarnings(value = {"EI_EXPOSE_REP"})
+ private static final List<String> SCHEMES = Lists.newArrayList("http", "https");
+
+ public List<String> getSchemes() {
+ return SCHEMES;
+ }
+
+ public ConfigurationSource createConfigurationSource(URI uri,
+ AuthorizationProvider authorizationProvider, boolean verifyHost) {
+ return new HttpConfigurationSource(uri, authorizationProvider, verifyHost);
+ }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/MapResolver.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/MapResolver.java new file mode \
100644 index 0000000..f7571b9
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/MapResolver.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.text.StringSubstitutor;
+import org.apache.commons.text.lookup.StringLookup;
+import org.apache.commons.text.lookup.StringLookupFactory;
+
+/**
+ * Resolves replaceable tokens to create a Map.
+ * <p>
+ * Needs org.apache.commons:commons-lang3 on classpath
+ */
+final class MapResolver {
+
+ private static final String PROPS_IMPL_KEY = "propertiesImplementation";
+ private static final String ENV_VAR_PROPERTY = \
"org.apache.flume.node.EnvVarResolverProperties"; +
+ public static Map<String, String> resolveProperties(Properties properties) {
+ Map<String, String> map = new HashMap<>();
+ boolean useEnvVars = \
ENV_VAR_PROPERTY.equals(System.getProperty(PROPS_IMPL_KEY)); + StringLookup \
defaultLookup = useEnvVars ? new DefaultLookup(map) : + \
StringLookupFactory.INSTANCE.mapStringLookup(map); + StringLookup lookup = \
StringLookupFactory.INSTANCE.interpolatorStringLookup(defaultLookup); + \
StringSubstitutor substitutor = new StringSubstitutor(lookup); + \
substitutor.setEnableSubstitutionInVariables(true); + \
properties.stringPropertyNames().forEach((k) -> map.put(k, + \
substitutor.replace(properties.getProperty(k)))); + return map;
+ }
+
+ private static class DefaultLookup implements StringLookup {
+ private final Map<String, String> properties;
+
+ DefaultLookup(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ /**
+ * Provide compatibility with EnvVarResolverProperties.
+ *
+ * @param key The key.
+ * @return The value associated with the key or null.
+ */
+ @Override
+ public String lookup(String key) {
+ return properties.containsKey(key) ?
+ properties.get(key) : System.getenv(key);
+ }
+ }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java \
index fa3ef55..80d5497 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java
@@ -32,16 +32,16 @@ import java.util.Map;
*/
public interface MaterializedConfiguration {
- public void addSourceRunner(String name, SourceRunner sourceRunner);
+ void addSourceRunner(String name, SourceRunner sourceRunner);
- public void addSinkRunner(String name, SinkRunner sinkRunner);
+ void addSinkRunner(String name, SinkRunner sinkRunner);
- public void addChannel(String name, Channel channel);
+ void addChannel(String name, Channel channel);
- public Map<String, SourceRunner> getSourceRunners();
+ Map<String, SourceRunner> getSourceRunners();
- public Map<String, SinkRunner> getSinkRunners();
+ Map<String, SinkRunner> getSinkRunners();
- public Map<String, Channel> getChannels();
+ Map<String, Channel> getChannels();
}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
index 13cb38f..db040b5 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
@@ -18,143 +18,20 @@
package org.apache.flume.node;
import java.io.File;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.flume.CounterGroup;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class PollingPropertiesFileConfigurationProvider
- extends PropertiesFileConfigurationProvider
- implements LifecycleAware {
-
- private static final Logger LOGGER =
- LoggerFactory.getLogger(PollingPropertiesFileConfigurationProvider.class);
-
- private final EventBus eventBus;
- private final File file;
- private final int interval;
- private final CounterGroup counterGroup;
- private LifecycleState lifecycleState;
-
- private ScheduledExecutorService executorService;
-
- public PollingPropertiesFileConfigurationProvider(String agentName,
- File file, EventBus eventBus, int interval) {
- super(agentName, file);
- this.eventBus = eventBus;
- this.file = file;
- this.interval = interval;
- counterGroup = new CounterGroup();
- lifecycleState = LifecycleState.IDLE;
- }
-
- @Override
- public void start() {
- LOGGER.info("Configuration provider starting");
-
- Preconditions.checkState(file != null,
- "The parameter file must not be null");
-
- executorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
- .build());
-
- FileWatcherRunnable fileWatcherRunnable =
- new FileWatcherRunnable(file, counterGroup);
-
- executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
- TimeUnit.SECONDS);
-
- lifecycleState = LifecycleState.START;
-
- LOGGER.debug("Configuration provider started");
- }
-
- @Override
- public void stop() {
- LOGGER.info("Configuration provider stopping");
-
- executorService.shutdown();
- try {
- if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
- LOGGER.debug("File watcher has not terminated. Forcing shutdown of \
executor.");
- executorService.shutdownNow();
- while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
- LOGGER.debug("Waiting for file watcher to terminate");
- }
- }
- } catch (InterruptedException e) {
- LOGGER.debug("Interrupted while waiting for file watcher to terminate");
- Thread.currentThread().interrupt();
- }
- lifecycleState = LifecycleState.STOP;
- LOGGER.debug("Configuration provider stopped");
- }
-
- @Override
- public synchronized LifecycleState getLifecycleState() {
- return lifecycleState;
- }
-
-
- @Override
- public String toString() {
- return "{ file:" + file + " counterGroup:" + counterGroup + " provider:"
- + getClass().getCanonicalName() + " agentName:" + getAgentName() + " }";
- }
-
- public class FileWatcherRunnable implements Runnable {
-
- private final File file;
- private final CounterGroup counterGroup;
-
- private long lastChange;
- public FileWatcherRunnable(File file, CounterGroup counterGroup) {
- super();
- this.file = file;
- this.counterGroup = counterGroup;
- this.lastChange = 0L;
- }
-
- @Override
- public void run() {
- LOGGER.debug("Checking file:{} for changes", file);
-
- counterGroup.incrementAndGet("file.checks");
-
- long lastModified = file.lastModified();
-
- if (lastModified > lastChange) {
- LOGGER.info("Reloading configuration file:{}", file);
-
- counterGroup.incrementAndGet("file.loads");
-
- lastChange = lastModified;
+/**
+ * @deprecated Use UriConfigurationProvider instead.
+ */
+@Deprecated
+public class PollingPropertiesFileConfigurationProvider extends \
UriConfigurationProvider {
- try {
- eventBus.post(getConfiguration());
- } catch (Exception e) {
- LOGGER.error("Failed to load configuration data. Exception follows.",
- e);
- } catch (NoClassDefFoundError e) {
- LOGGER.error("Failed to start agent because dependencies were not " +
- "found in classpath. Error follows.", e);
- } catch (Throwable t) {
- // caught because the caller does not handle or log Throwables
- LOGGER.error("Unhandled error", t);
- }
- }
- }
+ public PollingPropertiesFileConfigurationProvider(String agentName, File file, \
EventBus eventBus, + int interval) {
+ super(agentName, Lists.newArrayList(new FileConfigurationSource(file.toURI())), \
null, + eventBus, interval);
}
}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
index 75f45db..a2b0409 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
@@ -17,16 +17,9 @@
*/
package org.apache.flume.node;
-import java.io.BufferedReader;
import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Properties;
-import org.apache.flume.conf.FlumeConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
/**
* <p>
@@ -165,52 +158,13 @@ import org.slf4j.LoggerFactory;
* </p>
*
* @see java.util.Properties#load(java.io.Reader)
+ * @deprecated Use UriConfigurationProvider.
*/
-public class PropertiesFileConfigurationProvider extends
- AbstractConfigurationProvider {
-
- private static final Logger LOGGER = LoggerFactory
- .getLogger(PropertiesFileConfigurationProvider.class);
- private static final String DEFAULT_PROPERTIES_IMPLEMENTATION = \
"java.util.Properties";
-
- private final File file;
+@Deprecated
+public class PropertiesFileConfigurationProvider extends UriConfigurationProvider {
public PropertiesFileConfigurationProvider(String agentName, File file) {
- super(agentName);
- this.file = file;
- }
-
- @Override
- public FlumeConfiguration getFlumeConfiguration() {
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new FileReader(file));
- String resolverClassName = System.getProperty("propertiesImplementation",
- DEFAULT_PROPERTIES_IMPLEMENTATION);
- Class<? extends Properties> propsclass = Class.forName(resolverClassName)
- .asSubclass(Properties.class);
- Properties properties = propsclass.newInstance();
- properties.load(reader);
- return new FlumeConfiguration(toMap(properties));
- } catch (IOException ex) {
- LOGGER.error("Unable to load file:" + file
- + " (I/O failure) - Exception follows.", ex);
- } catch (ClassNotFoundException e) {
- LOGGER.error("Configuration resolver class not found", e);
- } catch (InstantiationException e) {
- LOGGER.error("Instantiation exception", e);
- } catch (IllegalAccessException e) {
- LOGGER.error("Illegal access exception", e);
- } finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException ex) {
- LOGGER.warn(
- "Unable to close file reader for file: " + file, ex);
- }
- }
- }
- return new FlumeConfiguration(new HashMap<String, String>());
+ super(agentName, Lists.newArrayList(new FileConfigurationSource(file.toURI())), \
null, null, 0); + super.start();
}
}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/UriConfigurationProvider.java \
similarity index 52% copy from \
flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
copy to flume-ng-node/src/main/java/org/apache/flume/node/UriConfigurationProvider.java
index 75f45db..e4c74c6 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/UriConfigurationProvider.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,27 +17,41 @@
*/
package org.apache.flume.node;
-import java.io.BufferedReader;
import java.io.File;
-import java.io.FileReader;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.HashMap;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* <p>
- * A configuration provider that uses properties file for specifying
- * configuration. The configuration files follow the Java properties file syntax
- * rules specified at {@link java.util.Properties#load(java.io.Reader)}. Every
- * configuration value specified in the properties file is prefixed by an
+ * A configuration provider that uses properties for specifying
+ * configuration. The configurations follow the Java properties file syntax
+ * rules specified at {@link Properties#load(java.io.Reader)}. Every
+ * configuration value specified in the properties is prefixed by an
* <em>Agent Name</em> which helps isolate an individual agent's namespace.
* </p>
* <p>
- * Valid configuration files must observe the following rules for every agent
+ * Valid configurations must observe the following rules for every agent
* namespace.
* <ul>
* <li>For every <agent name> there must be three lists specified that
@@ -101,7 +115,7 @@ import org.slf4j.LoggerFactory;
* <li>Sinks not assigned to a group will be assigned to default single sink
* groups.</li>
* </ul>
- *
+ * <p>
* Apart from the above required configuration values, each source, sink or
* channel can have its own set of arbitrary configuration as required by the
* implementation. Each of these configuration values are expressed by fully
@@ -164,53 +178,180 @@ import org.slf4j.LoggerFactory;
*
* </p>
*
- * @see java.util.Properties#load(java.io.Reader)
+ * @see Properties#load(java.io.Reader)
*/
-public class PropertiesFileConfigurationProvider extends
- AbstractConfigurationProvider {
+public class UriConfigurationProvider extends AbstractConfigurationProvider
+ implements LifecycleAware {
- private static final Logger LOGGER = LoggerFactory
- .getLogger(PropertiesFileConfigurationProvider.class);
- private static final String DEFAULT_PROPERTIES_IMPLEMENTATION = \
"java.util.Properties"; + private static final Logger LOGGER = \
LoggerFactory.getLogger(UriConfigurationProvider.class);
- private final File file;
+ private final List<ConfigurationSource> configurationSources;
+ private final File backupDirectory;
+ private final EventBus eventBus;
+ private final int interval;
+ private final CounterGroup counterGroup;
+ private LifecycleState lifecycleState = LifecycleState.IDLE;
+ private ScheduledExecutorService executorService;
- public PropertiesFileConfigurationProvider(String agentName, File file) {
+ public UriConfigurationProvider(String agentName, List<ConfigurationSource> \
sourceList, + String backupDirectory, EventBus eventBus, int pollInterval) {
super(agentName);
- this.file = file;
+ this.configurationSources = sourceList;
+ this.backupDirectory = backupDirectory != null ? new File(backupDirectory) : \
null; + this.eventBus = eventBus;
+ this.interval = pollInterval;
+ counterGroup = new CounterGroup();
+ }
+
+ @Override
+ public void start() {
+ if (eventBus != null && interval > 0) {
+ executorService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
+ .build());
+
+ WatcherRunnable watcherRunnable = new WatcherRunnable(configurationSources, \
counterGroup, + eventBus);
+
+ executorService.scheduleWithFixedDelay(watcherRunnable, 0, interval,
+ TimeUnit.SECONDS);
+ }
+ lifecycleState = LifecycleState.START;
+ }
+
+ @Override
+ public void stop() {
+ if (executorService != null) {
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+ LOGGER.debug("File watcher has not terminated. Forcing shutdown of \
executor."); + executorService.shutdownNow();
+ while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+ LOGGER.debug("Waiting for file watcher to terminate");
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.debug("Interrupted while waiting for file watcher to terminate");
+ Thread.currentThread().interrupt();
+ }
+ }
+ lifecycleState = LifecycleState.STOP;
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+ protected List<ConfigurationSource> getConfigurationSources() {
+ return configurationSources;
}
@Override
public FlumeConfiguration getFlumeConfiguration() {
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new FileReader(file));
- String resolverClassName = System.getProperty("propertiesImplementation",
- DEFAULT_PROPERTIES_IMPLEMENTATION);
- Class<? extends Properties> propsclass = Class.forName(resolverClassName)
- .asSubclass(Properties.class);
- Properties properties = propsclass.newInstance();
- properties.load(reader);
- return new FlumeConfiguration(toMap(properties));
- } catch (IOException ex) {
- LOGGER.error("Unable to load file:" + file
- + " (I/O failure) - Exception follows.", ex);
- } catch (ClassNotFoundException e) {
- LOGGER.error("Configuration resolver class not found", e);
- } catch (InstantiationException e) {
- LOGGER.error("Instantiation exception", e);
- } catch (IllegalAccessException e) {
- LOGGER.error("Illegal access exception", e);
- } finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException ex) {
- LOGGER.warn(
- "Unable to close file reader for file: " + file, ex);
+ Map<String, String> configMap = null;
+ Properties properties = new Properties();
+ for (ConfigurationSource configurationSource : configurationSources) {
+ try (InputStream is = configurationSource.getInputStream()) {
+ if (is != null) {
+ switch (configurationSource.getExtension()) {
+ case ConfigurationSource.JSON: case ConfigurationSource.YAML:
+ case ConfigurationSource.XML: {
+ LOGGER.warn("File extension type {} is unsupported",
+ configurationSource.getExtension());
+ break;
+ }
+ default: {
+ properties.load(is);
+ break;
+ }
+ }
+ }
+ } catch (IOException ioe) {
+ LOGGER.warn("Unable to load properties from {}: {}", \
configurationSource.getUri(), + ioe.getMessage());
+ }
+ if (properties.size() > 0) {
+ configMap = MapResolver.resolveProperties(properties);
+ }
+ }
+ if (configMap != null) {
+ Properties props = new Properties();
+ props.putAll(configMap);
+ if (backupDirectory != null) {
+ if (backupDirectory.mkdirs()) {
+ // This is only being logged to keep Spotbugs happy. We can't ignore the \
result of mkdirs. + LOGGER.debug("Created directories for {}", \
backupDirectory.toString()); + }
+ File backupFile = getBackupFile(backupDirectory, getAgentName());
+ try (OutputStream os = new FileOutputStream(backupFile)) {
+ props.store(os, "Backup created at " + LocalDateTime.now().toString());
+ } catch (IOException ioe) {
+ LOGGER.warn("Unable to create backup properties file: {}" + \
ioe.getMessage()); + }
+ }
+ } else {
+ if (backupDirectory != null) {
+ File backup = getBackupFile(backupDirectory, getAgentName());
+ if (backup.exists()) {
+ Properties props = new Properties();
+ try (InputStream is = new FileInputStream(backup)) {
+ LOGGER.warn("Unable to access primary configuration. Trying backup");
+ props.load(is);
+ configMap = MapResolver.resolveProperties(props);
+ } catch (IOException ex) {
+ LOGGER.warn("Error reading backup file: {}", ex.getMessage());
+ }
+ }
+ }
+ }
+ if (configMap != null) {
+ return new FlumeConfiguration(configMap);
+ } else {
+ LOGGER.error("No configuration could be found");
+ return null;
+ }
+ }
+
+ private File getBackupFile(File backupDirectory, String agentName) {
+ if (backupDirectory != null) {
+ return new File(backupDirectory, "." + agentName + ".properties");
+ }
+ return null;
+ }
+
+ private class WatcherRunnable implements Runnable {
+
+ private List<ConfigurationSource> configurationSources;
+ private final CounterGroup counterGroup;
+ private final EventBus eventBus;
+
+ public WatcherRunnable(List<ConfigurationSource> sources, CounterGroup \
counterGroup, + EventBus eventBus) {
+ this.configurationSources = sources;
+ this.counterGroup = counterGroup;
+ this.eventBus = eventBus;
+ }
+
+ @Override
+ public void run() {
+ LOGGER.debug("Checking for changes to sources");
+
+ counterGroup.incrementAndGet("uri.checks");
+ try {
+ boolean isModified = false;
+ for (ConfigurationSource source : configurationSources) {
+ if (source.isModified()) {
+ isModified = true;
+ }
+ }
+ if (isModified) {
+ eventBus.post(getConfiguration());
}
+ } catch (ConfigurationException ex) {
+ LOGGER.warn("Unable to update configuration: {}", ex.getMessage());
}
}
- return new FlumeConfiguration(new HashMap<String, String>());
}
}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/net/AuthorizationProvider.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/net/AuthorizationProvider.java \
new file mode 100644 index 0000000..24542d4
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/net/AuthorizationProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node.net;
+
+import java.net.URLConnection;
+
+/**
+ * Interface to be implemented to add an Authorization header to an HTTP request.
+ */
+public interface AuthorizationProvider {
+
+ void addAuthorization(URLConnection urlConnection);
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/net/BasicAuthorizationProvider.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/net/BasicAuthorizationProvider.java
new file mode 100644
index 0000000..aa1fc9a
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/net/BasicAuthorizationProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node.net;
+
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+/**
+ * Provides the Basic Authorization header to a request.
+ */
+public class BasicAuthorizationProvider implements AuthorizationProvider {
+
+ private static final Base64.Encoder encoder = Base64.getEncoder();
+
+ private String authString = null;
+
+ public BasicAuthorizationProvider(String userName, String password) {
+ if (userName != null && password != null) {
+ String toEncode = userName + ":" + password;
+ authString = "Basic " + \
encoder.encodeToString(toEncode.getBytes(StandardCharsets.UTF_8)); + }
+ }
+
+ @Override
+ public void addAuthorization(URLConnection urlConnection) {
+ if (authString != null) {
+ urlConnection.setRequestProperty("Authorization", authString);
+ }
+ }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/net/LaxHostnameVerifier.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/net/LaxHostnameVerifier.java new \
file mode 100644 index 0000000..d1cbaaf
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/net/LaxHostnameVerifier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node.net;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLSession;
+
+/**
+ * An HostnameVerifier which accepts everything.
+ */
+public final class LaxHostnameVerifier implements HostnameVerifier {
+ /**
+ * Singleton instance.
+ */
+ public static final HostnameVerifier INSTANCE = new LaxHostnameVerifier();
+
+ private LaxHostnameVerifier() {
+ }
+
+ @Override
+ public boolean verify(final String s, final SSLSession sslSession) {
+ return true;
+ }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/net/UrlConnectionFactory.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/net/UrlConnectionFactory.java new \
file mode 100644 index 0000000..21e4f26
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/net/UrlConnectionFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node.net;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+import javax.net.ssl.HttpsURLConnection;
+
+/**
+ * Constructs an HTTPURLConnection.
+ */
+public class UrlConnectionFactory {
+
+ private static int DEFAULT_TIMEOUT = 60000;
+ private static int connectTimeoutMillis = DEFAULT_TIMEOUT;
+ private static int readTimeoutMillis = DEFAULT_TIMEOUT;
+ private static final String XML = "application/xml";
+ private static final String YAML = "application/yaml";
+ private static final String JSON = "application/json";
+ private static final String PROPERTIES = "text/x-java-properties";
+ private static final String TEXT = "text/plain";
+ public static final String HTTP = "http";
+ public static final String HTTPS = "https";
+
+ public static HttpURLConnection createConnection(URL url,
+ AuthorizationProvider authorizationProvider, long lastModifiedMillis, boolean \
verifyHost) + throws IOException {
+ final HttpURLConnection urlConnection = (HttpURLConnection) \
url.openConnection(); + if (HTTPS.equals(url.getProtocol()) && !verifyHost) {
+ ((HttpsURLConnection) \
urlConnection).setHostnameVerifier(LaxHostnameVerifier.INSTANCE); + }
+ if (authorizationProvider != null) {
+ authorizationProvider.addAuthorization(urlConnection);
+ }
+ urlConnection.setAllowUserInteraction(false);
+ urlConnection.setDoOutput(true);
+ urlConnection.setDoInput(true);
+ urlConnection.setRequestMethod("GET");
+ if (connectTimeoutMillis > 0) {
+ urlConnection.setConnectTimeout(connectTimeoutMillis);
+ }
+ if (readTimeoutMillis > 0) {
+ urlConnection.setReadTimeout(readTimeoutMillis);
+ }
+ urlConnection.setRequestProperty("Content-Type", getContentType(url));
+ if (lastModifiedMillis > 0) {
+ urlConnection.setIfModifiedSince(lastModifiedMillis);
+ }
+ return urlConnection;
+ }
+
+ public static URLConnection createConnection(URL url) throws IOException {
+ return createConnection(url, null, 0, true);
+ }
+
+ public static URLConnection createConnection(URL url, AuthorizationProvider \
authorizationProvider) + throws IOException {
+ URLConnection urlConnection = null;
+ if (url.getProtocol().equals(HTTPS) || url.getProtocol().equals(HTTP)) {
+ urlConnection = createConnection(url, authorizationProvider, 0, true);
+ } else {
+ urlConnection = url.openConnection();
+ }
+ return urlConnection;
+ }
+
+ private static String getContentType(URL url) {
+ String[] fileParts = url.getFile().split("\\.");
+ String type = fileParts[fileParts.length - 1].trim();
+ switch (type) {
+ case "properties": {
+ return PROPERTIES;
+ }
+ case "json": {
+ return JSON;
+ }
+ case "yaml": case "yml": {
+ return YAML;
+ }
+ case "xml": {
+ return XML;
+ }
+ default: {
+ return TEXT;
+ }
+ }
+ }
+}
diff --git a/flume-ng-node/src/main/resources/META-INF/services/org.apache.flume.node.ConfigurationSourceFactory \
b/flume-ng-node/src/main/resources/META-INF/services/org.apache.flume.node.ConfigurationSourceFactory
new file mode 100644
index 0000000..c49998c
--- /dev/null
+++ b/flume-ng-node/src/main/resources/META-INF/services/org.apache.flume.node.ConfigurationSourceFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache license, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the license for the specific language governing permissions and
+# limitations under the license.
+org.apache.flume.node.ClasspathConfigurationSourceFactory
+org.apache.flume.node.FileConfigurationSourceFactory
+org.apache.flume.node.HttpConfigurationSourceFactory
\ No newline at end of file
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestClasspathConfigurationSource.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestClasspathConfigurationSource.java
new file mode 100644
index 0000000..5fb208f
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestClasspathConfigurationSource.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.net.URI;
+import java.util.Properties;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests that files can be loaded from the Classpath.
+ */
+public class TestClasspathConfigurationSource {
+
+ @Test
+ public void testClasspath() throws Exception {
+ URI confFile = new URI("classpath:///flume-conf.properties");
+ ConfigurationSource source = new ClasspathConfigurationSource(confFile);
+ Assert.assertNotNull("No configuration returned", source);
+ Properties props = new Properties();
+ props.load(source.getInputStream());
+ String value = props.getProperty("host1.sources");
+ Assert.assertNotNull("Missing key", value);
+ }
+
+ @Test
+ public void testOddClasspath() throws Exception {
+ URI confFile = new URI("classpath:/flume-conf.properties");
+ ConfigurationSource source = new ClasspathConfigurationSource(confFile);
+ Assert.assertNotNull("No configuration returned", source);
+ Properties props = new Properties();
+ props.load(source.getInputStream());
+ String value = props.getProperty("host1.sources");
+ Assert.assertNotNull("Missing key", value);
+ }
+
+ @Test
+ public void testImproperClasspath() throws Exception {
+ URI confFile = new URI("classpath://flume-conf.properties");
+ ConfigurationSource source = new ClasspathConfigurationSource(confFile);
+ Assert.assertNotNull("No configuration returned", source);
+ Properties props = new Properties();
+ props.load(source.getInputStream());
+ String value = props.getProperty("host1.sources");
+ Assert.assertNotNull("Missing key", value);
+ }
+
+ @Test
+ public void testShorthandClasspath() throws Exception {
+ URI confFile = new URI("classpath:flume-conf.properties");
+ ConfigurationSource source = new ClasspathConfigurationSource(confFile);
+ Assert.assertNotNull("No configuration returned", source);
+ Properties props = new Properties();
+ props.load(source.getInputStream());
+ String value = props.getProperty("host1.sources");
+ Assert.assertNotNull("Missing key", value);
+ }
+}
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestEnvLookup.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestEnvLookup.java new file mode \
100644 index 0000000..36f95b1
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestEnvLookup.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Assert;
+
+public class TestEnvLookup {
+ private static final File TESTFILE = new File(
+ TestEnvLookup.class.getClassLoader()
+ .getResource("flume-conf-with-envLookup.properties").getFile());
+ private static final String NC_PORT = "6667";
+
+ @Rule
+ public final EnvironmentVariables environmentVariables = new \
EnvironmentVariables(); + private UriConfigurationProvider provider;
+
+ @Before
+ public void setUp() throws Exception {
+ environmentVariables.set("NC_PORT", NC_PORT);
+ List<ConfigurationSource> sourceList =
+ Lists.newArrayList(new FileConfigurationSource(TESTFILE.toURI()));
+ provider = new UriConfigurationProvider("a1", sourceList, null,
+ null, 0);
+ }
+
+ @Test
+ public void getProperty() throws Exception {
+
+ Assert.assertEquals(NC_PORT, provider.getFlumeConfiguration()
+ .getConfigurationFor("a1")
+ .getSourceContext().get("r1").getParameters().get("port"));
+ }
+}
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java \
new file mode 100644 index 0000000..b128116
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.util.Base64;
+import java.util.Enumeration;
+import java.util.Properties;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.node.net.AuthorizationProvider;
+import org.apache.flume.node.net.BasicAuthorizationProvider;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests that files can be loaded via http.
+ */
+public class TestHttpConfigurationSource {
+
+ private static final String BASIC = "Basic ";
+ private static final String expectedCreds = "flume:flume";
+ private static Server server;
+ private static Base64.Decoder decoder = Base64.getDecoder();
+
+ @BeforeClass
+ public static void startServer() throws Exception {
+ try {
+ server = new Server(1080);
+ ServletContextHandler context = new ServletContextHandler();
+ ServletHolder defaultServ = new ServletHolder("default", TestServlet.class);
+ defaultServ.setInitParameter("resourceBase", System.getProperty("user.dir"));
+ defaultServ.setInitParameter("dirAllowed", "true");
+ context.addServlet(defaultServ, "/");
+ server.setHandler(context);
+
+ // Start Server
+ server.start();
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ throw ex;
+ }
+ }
+
+ @AfterClass
+ public static void stopServer() throws Exception {
+ server.stop();
+ }
+
+
+ @Test(expected = ConfigurationException.class)
+ public void testBadCrdentials() throws Exception {
+ URI confFile = new URI("http://localhost/flume-conf.properties");
+ AuthorizationProvider authProvider = new BasicAuthorizationProvider("foo", \
"bar"); + ConfigurationSource source = new HttpConfigurationSource(confFile, \
authProvider, true); + }
+
+ @Test
+ public void testGet() throws Exception {
+ URI confFile = new URI("http://localhost:1080/flume-conf.properties");
+ AuthorizationProvider authProvider = new BasicAuthorizationProvider("flume", \
"flume"); + ConfigurationSource source = new HttpConfigurationSource(confFile, \
authProvider, true); + Assert.assertNotNull("No configuration returned", source);
+ InputStream is = source.getInputStream();
+ Assert.assertNotNull("No data returned", is);
+ Properties props = new Properties();
+ props.load(is);
+ String value = props.getProperty("host1.sources");
+ Assert.assertNotNull("Missing key", value);
+ Assert.assertFalse(source.isModified());
+ File file = new File("target/test-classes/flume-conf.properties");
+ if (file.setLastModified(System.currentTimeMillis())) {
+ Assert.assertTrue(source.isModified());
+ }
+ }
+
+ public static class TestServlet extends DefaultServlet {
+
+ private static final long serialVersionUID = -2885158530511450659L;
+
+ @Override
+ protected void doGet(HttpServletRequest request,
+ HttpServletResponse response) throws ServletException, IOException {
+ Enumeration<String> headers = \
request.getHeaders(HttpHeader.AUTHORIZATION.toString()); + if (headers == null) \
{ + response.sendError(401, "No Auth header");
+ return;
+ }
+ while (headers.hasMoreElements()) {
+ String authData = headers.nextElement();
+ Assert.assertTrue("Not a Basic auth header", authData.startsWith(BASIC));
+ String credentials = new \
String(decoder.decode(authData.substring(BASIC.length()))); + \
Assert.assertEquals(expectedCreds, credentials); + }
+ if (request.getServletPath().equals("/flume-conf.properties")) {
+ File file = new File("target/test-classes/flume-conf.properties");
+ long modifiedSince = \
request.getDateHeader(HttpHeader.IF_MODIFIED_SINCE.toString()); + long \
lastModified = file.lastModified(); + if (modifiedSince > 0 && lastModified <= \
modifiedSince) { + response.setStatus(304);
+ return;
+ }
+ response.setDateHeader(HttpHeader.LAST_MODIFIED.toString(), lastModified);
+ response.setContentLengthLong(file.length());
+ Files.copy(file.toPath(), response.getOutputStream());
+ response.getOutputStream().flush();
+ response.setStatus(200);
+ } else {
+ response.sendError(400, "Unsupported request");
+ }
+ }
+ }
+}
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestOverrideFile.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestOverrideFile.java new file \
mode 100644 index 0000000..0b5e5e0
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestOverrideFile.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Assert;
+
+public class TestOverrideFile {
+ private static final File TESTFILE = new File(
+ TestOverrideFile.class.getClassLoader()
+ .getResource("flume-conf-with-recursiveLookup.properties").getFile());
+ private static final File OVERRIDEFILE = new File(
+ TestOverrideFile.class.getClassLoader()
+ .getResource("flume-conf-override.properties").getFile());
+ private static final String BIND = "192.168.13.101";
+
+ @Rule
+ public final EnvironmentVariables environmentVariables = new \
EnvironmentVariables(); + private UriConfigurationProvider provider;
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty("env", "DEV");
+ List<ConfigurationSource> sourceList =
+ Lists.newArrayList(new FileConfigurationSource(TESTFILE.toURI()),
+ new FileConfigurationSource(OVERRIDEFILE.toURI()));
+ provider = new UriConfigurationProvider("a1", sourceList, null,
+ null, 0);
+ }
+
+ @Test
+ public void getProperty() throws Exception {
+
+ Assert.assertEquals(BIND, provider.getFlumeConfiguration()
+ .getConfigurationFor("a1")
+ .getSourceContext().get("r1").getParameters().get("bind"));
+ }
+}
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
index 480f6a5..d20fa35 100644
--- a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
@@ -40,7 +40,7 @@ public class TestPollingPropertiesFileConfigurationProvider {
TestPollingPropertiesFileConfigurationProvider.class.getClassLoader()
.getResource("flume-conf.properties").getFile());
- private PollingPropertiesFileConfigurationProvider provider;
+ private UriConfigurationProvider provider;
private File baseDir;
private File configFile;
private EventBus eventBus;
@@ -54,9 +54,9 @@ public class TestPollingPropertiesFileConfigurationProvider {
Files.copy(TESTFILE, configFile);
eventBus = new EventBus("test");
- provider =
- new PollingPropertiesFileConfigurationProvider("host1",
- configFile, eventBus, 1);
+ ConfigurationSource source = new FileConfigurationSource(configFile.toURI());
+ provider = new UriConfigurationProvider("host1", Lists.newArrayList(source), \
null, + eventBus, 1);
provider.start();
LifecycleController.waitForOneOf(provider, LifecycleState.START_OR_ERROR);
}
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
index 4875c56..95c7777 100644
--- a/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
@@ -43,11 +43,15 @@ public class TestPropertiesFileConfigurationProvider {
TestPropertiesFileConfigurationProvider.class.getClassLoader()
.getResource("flume-conf.properties").getFile());
- private PropertiesFileConfigurationProvider provider;
+ private UriConfigurationProvider provider;
+ private List<ConfigurationSource> sources;
@Before
public void setUp() throws Exception {
- provider = new PropertiesFileConfigurationProvider("test", TESTFILE);
+ ConfigurationSource source = new FileConfigurationSource(TESTFILE.toURI());
+ sources = Lists.newArrayList(source);
+ provider = new UriConfigurationProvider("test", sources, null, null, 0);
+ provider.start();
}
@After
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestRecursiveLookup.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestRecursiveLookup.java new file \
mode 100644 index 0000000..de78ea6
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestRecursiveLookup.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Assert;
+
+public class TestRecursiveLookup {
+ private static final File TESTFILE = new File(
+ TestRecursiveLookup.class.getClassLoader()
+ .getResource("flume-conf-with-recursiveLookup.properties").getFile());
+ private static final String BIND = "192.168.11.101";
+
+ @Rule
+ public final EnvironmentVariables environmentVariables = new \
EnvironmentVariables(); + private UriConfigurationProvider provider;
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty("env", "DEV");
+ List<ConfigurationSource> sourceList =
+ Lists.newArrayList(new FileConfigurationSource(TESTFILE.toURI()));
+ provider = new UriConfigurationProvider("a1", sourceList, null,
+ null, 0);
+ }
+
+ @Test
+ public void getProperty() throws Exception {
+
+ Assert.assertEquals(BIND, provider.getFlumeConfiguration()
+ .getConfigurationFor("a1")
+ .getSourceContext().get("r1").getParameters().get("bind"));
+ }
+}
diff --git a/flume-ng-node/src/test/resources/flume-conf-override.properties \
b/flume-ng-node/src/test/resources/flume-conf-override.properties new file mode \
100644 index 0000000..1d86d4b
--- /dev/null
+++ b/flume-ng-node/src/test/resources/flume-conf-override.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+PROD_BIND=192.168.12.110
+DEV_BIND=192.168.13.101
+
diff --git a/flume-ng-node/src/test/resources/flume-conf-with-envLookup.properties \
b/flume-ng-node/src/test/resources/flume-conf-with-envLookup.properties new file mode \
100644 index 0000000..20848e9
--- /dev/null
+++ b/flume-ng-node/src/test/resources/flume-conf-with-envLookup.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+a1.sources = r1
+a1.sources.r1.type = netcat
+a1.sources.r1.bind = 0.0.0.0
+a1.sources.r1.port = ${env:NC_PORT}
+a1.sources.r1.channels = c1
+
+a1.channels = c1
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 10000
+a1.channels.c1.transactionCapacity = 10000
+a1.channels.c1.byteCapacityBufferPercentage = 20
+a1.channels.c1.byteCapacity = 800000
+
+a1.channels = c1
+a1.sinks = k1
+a1.sinks.k1.type = logger
+a1.sinks.k1.channel = c1
diff --git a/flume-ng-node/src/test/resources/flume-conf-with-recursiveLookup.properties \
b/flume-ng-node/src/test/resources/flume-conf-with-recursiveLookup.properties new \
file mode 100644 index 0000000..c679384
--- /dev/null
+++ b/flume-ng-node/src/test/resources/flume-conf-with-recursiveLookup.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+PROD_BIND=192.168.10.110
+DEV_BIND=192.168.11.101
+a1.sources = r1
+a1.sources.r1.type = netcat
+a1.sources.r1.bind = ${${sys:env}_BIND}
+a1.sources.r1.port = 6667
+a1.sources.r1.channels = c1
+
+a1.channels = c1
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 10000
+a1.channels.c1.transactionCapacity = 10000
+a1.channels.c1.byteCapacityBufferPercentage = 20
+a1.channels.c1.byteCapacity = 800000
+
+a1.channels = c1
+a1.sinks = k1
+a1.sinks.k1.type = logger
+a1.sinks.k1.channel = c1
diff --git a/pom.xml b/pom.xml
index 2dc3dba..9a9e492 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,13 +52,14 @@ limitations under the License.
<bundle-plugin.version>2.3.7</bundle-plugin.version>
<checkstyle.tool.version>8.12</checkstyle.tool.version>
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
- <commons-cli.version>1.2</commons-cli.version>
+ <commons-cli.version>1.4</commons-cli.version>
<commons-codec.version>1.8</commons-codec.version>
<commons-collections.version>3.2.2</commons-collections.version>
<commons-compress.version>1.4.1</commons-compress.version>
<commons-dbcp.version>1.4</commons-dbcp.version>
<commons-io.version>2.1</commons-io.version>
<commons-lang.version>2.5</commons-lang.version>
+ <commons-text.version>1.9</commons-text.version>
<curator.version>5.1.0</curator.version>
<derby.version>10.14.1.0</derby.version>
<dropwizard-metrics.version>4.1.18</dropwizard-metrics.version>
@@ -767,6 +768,12 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-text</artifactId>
+ <version>${commons-text.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava-old.version}</version>
@@ -1765,6 +1772,30 @@ limitations under the License.
<version>${dropwizard-metrics.version}</version>
</dependency>
+ <dependency>
+ <groupId>net.jcip</groupId>
+ <artifactId>jcip-annotations</artifactId>
+ <version>1.0</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-annotations</artifactId>
+ <version>${mvn-spotbugs-plugin.version}</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.mock-server</groupId>
+ <artifactId>mockserver-netty</artifactId>
+ <version>3.10.8</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mock-server</groupId>
+ <artifactId>mockserver-client-java</artifactId>
+ <version>3.10.8</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic