[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-commits
Subject:    [flume] 01/01: FLUME-3335 - Support configuration via HTTP(S)
From:       rgoers () apache ! org
Date:       2021-10-27 6:54:37
Message-ID: 20211027065436.E7DD781FFA () gitbox ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch FLUME-3335
in repository https://gitbox.apache.org/repos/asf/flume.git

commit 2b4d546e41f1f8256633dfa3f62810aa2ebffdf3
Author: Ralph Goers <rgoers@apache.org>
AuthorDate: Tue Oct 26 23:54:12 2021 -0700

    FLUME-3335 - Support configuration via HTTP(S)
---
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |  99 ++++++++-
 flume-ng-node/pom.xml                              |  24 ++-
 .../flume/node/AbstractConfigurationProvider.java  |   3 +
 .../java/org/apache/flume/node/Application.java    | 237 ++++++++++++++++-----
 .../flume/node/ClasspathConfigurationSource.java   |  71 ++++++
 .../node/ClasspathConfigurationSourceFactory.java  |  41 ++++
 .../org/apache/flume/node/ConfigurationSource.java |  56 +++++
 .../flume/node/ConfigurationSourceFactory.java     |  50 +++++
 .../flume/node/EnvVarResolverProperties.java       |   5 +
 .../apache/flume/node/FileConfigurationSource.java | 110 ++++++++++
 .../flume/node/FileConfigurationSourceFactory.java |  42 ++++
 .../apache/flume/node/HttpConfigurationSource.java | 150 +++++++++++++
 .../flume/node/HttpConfigurationSourceFactory.java |  42 ++++
 .../java/org/apache/flume/node/MapResolver.java    |  70 ++++++
 .../flume/node/MaterializedConfiguration.java      |  12 +-
 ...PollingPropertiesFileConfigurationProvider.java | 143 +------------
 .../node/PropertiesFileConfigurationProvider.java  |  58 +----
 ...Provider.java => UriConfigurationProvider.java} | 237 ++++++++++++++++-----
 .../flume/node/net/AuthorizationProvider.java      |  27 +++
 .../flume/node/net/BasicAuthorizationProvider.java |  45 ++++
 .../apache/flume/node/net/LaxHostnameVerifier.java |  38 ++++
 .../flume/node/net/UrlConnectionFactory.java       | 104 +++++++++
 ...rg.apache.flume.node.ConfigurationSourceFactory |  17 ++
 .../node/TestClasspathConfigurationSource.java     |  73 +++++++
 .../java/org/apache/flume/node/TestEnvLookup.java  |  58 +++++
 .../flume/node/TestHttpConfigurationSource.java    | 141 ++++++++++++
 .../org/apache/flume/node/TestOverrideFile.java    |  62 ++++++
 ...PollingPropertiesFileConfigurationProvider.java |   8 +-
 .../TestPropertiesFileConfigurationProvider.java   |   8 +-
 .../org/apache/flume/node/TestRecursiveLookup.java |  58 +++++
 .../test/resources/flume-conf-override.properties  |  21 ++
 .../resources/flume-conf-with-envLookup.properties |  35 +++
 .../flume-conf-with-recursiveLookup.properties     |  37 ++++
 pom.xml                                            |  33 ++-
 34 files changed, 1898 insertions(+), 317 deletions(-)

diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst \
b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 3fdfc67..315e5c4 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -118,12 +118,11 @@ Setup
 Setting up an agent
 -------------------
 
-Flume agent configuration is stored in a local configuration file.  This is a
-text file that follows the Java properties file format.
-Configurations for one or more agents can be specified in the same
-configuration file. The configuration file includes properties of each source,
-sink and channel in an agent and how they are wired together to form data
-flows.
+Flume agent configuration is stored in one or more configuration files that
+follow the Java properties file format. Configurations for one or more agents
+can be specified in these configuration files. The configuration includes
+properties of each source, sink and channel in an agent and how they are wired
+together to form data flows.
 
 Configuring individual components
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -225,25 +224,105 @@ The original Flume terminal will output the event in a log \
message.  
 Congratulations - you've successfully configured and deployed a Flume agent! \
Subsequent sections cover agent configuration in much more detail.  
-Using environment variables in configuration files
+Configuration from URIs
+~~~~~~~~~~~~~~~~~~~~~~~
+As of version 1.10.0 Flume supports being configured using URIs instead of just from \
local files. Direct support +for HTTP(S), file, and classpath URIs is included. The \
HTTP support includes support for authentication using +basic authorization but other \
authorization mechanisms may be supported by specifying the fully qualified name +of \
the class that implements the AuthorizationProvider interface using the \
--auth-provider option. HTTP also +supports reloading of configuration files using \
polling if the target server properly responds to the If-Modified-Since +header.
+
+To specify credentials for HTTP authentication add::
+
+  --conf-user userid --conf-password password
+
+to the startup command.
+
+Multiple Configuration Files
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+As of version 1.10.0 Flume supports being configured from multiple configuration \
files instead of just one. +This more easily allows values to be overridden or added \
based on specific environments. Each file should +be configured using its own \
--conf-file or --conf-uri option. However, all files should either be provided +with \
--conf-file or with --conf-uri. If --conf-file and --conf-uri appear together as \
options all --conf-uri +configurations will be processed before any of the \
--conf-file configurations are merged. +
+For example, a configuration of::
+
+  $ bin/flume-ng agent --conf conf --conf-file example.conf --conf-uri \
http://localhost:80/flume.conf --conf-uri http://localhost:80/override.conf --name a1 \
-Dflume.root.logger=INFO,console +
+will cause flume.conf to be read first, override.conf to be merged with it and \
finally example.conf would be +merged last. If it is desirec to have example.conf be \
the base configuration it should be specified using the +--conf-uri option either \
as:: +
+  --conf-uri classpath://example.conf
+  or
+  --conf-uri file:///example.conf
+
+depending on how it should be accessed.
+
+Using environment variables, system properies, or other properties configuration \
files  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 Flume has the ability to substitute environment variables in the configuration. For \
example::  
   a1.sources = r1
   a1.sources.r1.type = netcat
   a1.sources.r1.bind = 0.0.0.0
-  a1.sources.r1.port = ${NC_PORT}
+  a1.sources.r1.port = ${env:NC_PORT}
   a1.sources.r1.channels = c1
 
 NB: it currently works for values only, not for keys. (Ie. only on the "right side" \
of the `=` mark of the config lines.)  
-This can be enabled via Java system properties on agent invocation by setting \
`propertiesImplementation = org.apache.flume.node.EnvVarResolverProperties`. +As of \
version 1.10.0 Flume resolves configuration values using Apache Commons Text's \
StringSubstitutor +class using the default set of Lookups along with a lookup that \
uses the configuration files as a +source for replacement values.
 
 For example::
-  $ NC_PORT=44444 bin/flume-ng agent --conf conf --conf-file example.conf --name a1 \
-Dflume.root.logger=INFO,console \
-DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties +  $ \
NC_PORT=44444 bin/flume-ng agent --conf conf --conf-file example.conf --name a1 \
-Dflume.root.logger=INFO,console  
 Note the above is just an example, environment variables can be configured in other \
ways, including being set in `conf/flume-env.sh`.  
+As noted, system properties are also supported, so the configuration::
+
+  a1.sources = r1
+  a1.sources.r1.type = netcat
+  a1.sources.r1.bind = 0.0.0.0
+  a1.sources.r1.port = ${sys:NC_PORT}
+  a1.sources.r1.channels = c1
+
+could be used and the startup command could be::
+
+  $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 \
-Dflume.root.logger=INFO,console -DNC_PORT=44444 +
+Furthermore, because multiple configuration files are allowed the first file could \
contain:: +
+  a1.sources = r1
+  a1.sources.r1.type = netcat
+  a1.sources.r1.bind = 0.0.0.0
+  a1.sources.r1.port = ${NC_PORT}
+  a1.sources.r1.channels = c1
+
+and the override file could contain::
+
+  NC_PORT = 44444
+
+In this case the startup command could be::
+
+  $ bin/flume-ng agent --conf conf --conf-file example.conf --conf-file \
override.conf --name a1 -Dflume.root.logger=INFO,console +
+Note that the method for specifying environment variables as was done in prior \
versions will stil work +but has been deprecated in favor of using ${env:varName}.
+
+Using a command options file
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+Instead of specifying all the command options on the command line as of version \
1.10.0 command +options may be placed in either /etc/flume/flume.opts or flume.opts \
on the classpath. An example +might be::
+
+  conf-file = example.conf
+  conf-file = override.conf
+  name = a1
+
 Logging raw data
 ~~~~~~~~~~~~~~~~
 
diff --git a/flume-ng-node/pom.xml b/flume-ng-node/pom.xml
index 65fcb50..1db864a 100644
--- a/flume-ng-node/pom.xml
+++ b/flume-ng-node/pom.xml
@@ -106,6 +106,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-text</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -134,15 +139,26 @@
       </exclusions>
     </dependency>
 
-  <dependency>
+    <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-recipes</artifactId>
-  </dependency>
+    </dependency>
 
-  <dependency>
+    <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-test</artifactId>
-  </dependency>
+    </dependency>
+
+    <dependency>
+      <groupId>net.jcip</groupId>
+      <artifactId>jcip-annotations</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>com.github.spotbugs</groupId>
+      <artifactId>spotbugs-annotations</artifactId>
+      <optional>true</optional>
+    </dependency>
 
   </dependencies>
 
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
 index caf4522..2f0b643 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
                
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
 @@ -69,6 +69,9 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings("REC_CATCH_EXCEPTION")
 public abstract class AbstractConfigurationProvider implements ConfigurationProvider \
{  
   private static final Logger LOGGER = \
                LoggerFactory.getLogger(AbstractConfigurationProvider.class);
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java index \
                406bb7d..1f4df59 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
@@ -19,19 +19,29 @@
 
 package org.apache.flume.node;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
-import org.apache.flume.Constants;
 import org.apache.flume.Context;
 import org.apache.flume.SinkRunner;
 import org.apache.flume.SourceRunner;
@@ -41,19 +51,16 @@ import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.lifecycle.LifecycleSupervisor;
 import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
+import org.apache.flume.node.net.AuthorizationProvider;
+import org.apache.flume.node.net.BasicAuthorizationProvider;
 import org.apache.flume.util.SSLUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 
 public class Application {
 
@@ -63,6 +70,8 @@ public class Application {
   public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
   public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
 
+  private static final int DEFAULT_INTERVAL = 300;
+  private static final int DEFAULT_FILE_INTERVAL = 30;
   private final List<LifecycleAware> components;
   private final LifecycleSupervisor supervisor;
   private MaterializedConfiguration materializedConfiguration;
@@ -109,8 +118,8 @@ public class Application {
 
   public void stop() {
     lifecycleLock.lock();
-    stopAllComponents();
     try {
+      stopAllComponents();
       supervisor.stop();
       if (monitorServer != null) {
         monitorServer.stop();
@@ -231,7 +240,7 @@ public class Application {
           //Not a known type, use FQCN
           klass = (Class<? extends MonitorService>) Class.forName(monitorType);
         }
-        this.monitorServer = klass.newInstance();
+        this.monitorServer = klass.getConstructor().newInstance();
         Context context = new Context();
         for (String key : keys) {
           if (key.startsWith(CONF_MONITOR_PREFIX)) {
@@ -242,14 +251,14 @@ public class Application {
         monitorServer.configure(context);
         monitorServer.start();
       }
-    } catch (Exception e) {
+    } catch (ReflectiveOperationException e) {
       logger.warn("Error starting monitoring. "
           + "Monitoring might not be available.", e);
     }
-
   }
 
   public static void main(String[] args) {
+    Properties initProps = loadConfigOpts();
 
     try {
       SSLUtil.initGlobalSSLParameters();
@@ -261,7 +270,40 @@ public class Application {
       options.addOption(option);
 
       option = new Option("f", "conf-file", true,
-          "specify a config file (required if -z missing)");
+              "specify a config file (required if -c, -u, and -z are missing)");
+      option.setRequired(false);
+      options.addOption(option);
+
+      option = new Option("u", "conf-uri", true,
+              "specify a config uri (required if -c, -f and -z are missing)");
+      option.setRequired(false);
+      options.addOption(option);
+
+      option = new Option("a", "auth-provider", true,
+          "specify an authorization provider class");
+      option.setRequired(false);
+      options.addOption(option);
+
+      option = new Option("c", "conf-provider", true,
+              "specify a configuration provider class (required if -f, -u, and -z \
are missing)"); +      option.setRequired(false);
+      options.addOption(option);
+
+      option = new Option("n", "conf-user", true, "user name to access configuration \
uri"); +      option.setRequired(false);
+      options.addOption(option);
+
+      option = new Option("p", "conf-password", true, "password to access \
configuration uri"); +      option.setRequired(false);
+      options.addOption(option);
+
+      option = new Option("i", "poll-interval", true,
+          "number of seconds between checks for a configuration change");
+      option.setRequired(false);
+      options.addOption(option);
+
+      option = new Option("b", "backup-directory", true,
+          "directory in which to store the backup configuration file");
       option.setRequired(false);
       options.addOption(option);
 
@@ -271,7 +313,7 @@ public class Application {
 
       // Options for Zookeeper
       option = new Option("z", "zkConnString", true,
-          "specify the ZooKeeper connection to use (required if -f missing)");
+              "specify the ZooKeeper connection to use (required if -c, -f, and -u \
are missing)");  option.setRequired(false);
       options.addOption(option);
 
@@ -283,8 +325,8 @@ public class Application {
       option = new Option("h", "help", false, "display help text");
       options.addOption(option);
 
-      CommandLineParser parser = new GnuParser();
-      CommandLine commandLine = parser.parse(options, args);
+      DefaultParser parser = new DefaultParser();
+      CommandLine commandLine = parser.parse(options, args, initProps);
 
       if (commandLine.hasOption('h')) {
         new HelpFormatter().printHelp("flume-ng agent", options, true);
@@ -299,8 +341,41 @@ public class Application {
         isZkConfigured = true;
       }
 
+      List<URI> confUri = null;
+      ConfigurationProvider provider = null;
+      int defaultInterval = DEFAULT_FILE_INTERVAL;
+      if (commandLine.hasOption('u') || commandLine.hasOption("conf-uri")) {
+        confUri = new ArrayList<>();
+        for (String uri : commandLine.getOptionValues("conf-uri")) {
+          if (uri.toLowerCase(Locale.ROOT).startsWith("http")) {
+            defaultInterval = DEFAULT_INTERVAL;
+          }
+          confUri.add(new URI(uri));
+        }
+      } else if (commandLine.hasOption("f") || commandLine.hasOption("conf-file")) {
+        confUri = new ArrayList<>();
+        for (String filePath : commandLine.getOptionValues("conf-file")) {
+          confUri.add(new File(filePath).toURI());
+        }
+      }
+
+      if (commandLine.hasOption("c") || commandLine.hasOption("conf-provider")) {
+        String className = commandLine.getOptionValue("conf-provider");
+        try {
+          Class<?> clazz = Application.class.getClassLoader().loadClass(className);
+          Constructor<?> constructor = clazz.getConstructor(String[].class);
+          provider = (ConfigurationProvider) constructor.newInstance((Object[]) \
args); +        } catch (ReflectiveOperationException  ex) {
+          logger.error("Error creating ConfigurationProvider {}", className, ex);
+        }
+      }
+
       Application application;
-      if (isZkConfigured) {
+      if (provider != null) {
+        List<LifecycleAware> components = Lists.newArrayList();
+        application = new Application(components);
+        application.handleConfigurationEvent(provider.getConfiguration());
+      } else if (isZkConfigured) {
         // get options
         String zkConnectionStr = commandLine.getOptionValue('z');
         String baseZkPath = commandLine.getOptionValue('p');
@@ -321,44 +396,65 @@ public class Application {
           application = new Application();
           application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration());
  }
-      } else {
-        File configurationFile = new File(commandLine.getOptionValue('f'));
-
-        /*
-         * The following is to ensure that by default the agent will fail on
-         * startup if the file does not exist.
-         */
-        if (!configurationFile.exists()) {
-          // If command line invocation, then need to fail fast
-          if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
-              null) {
-            String path = configurationFile.getPath();
-            try {
-              path = configurationFile.getCanonicalPath();
-            } catch (IOException ex) {
-              logger.error("Failed to read canonical path for file: " + path,
-                  ex);
+      } else if (confUri != null) {
+        String confUser = commandLine.getOptionValue("conf-user");
+        String confPassword = commandLine.getOptionValue("conf-password");
+        String pollInterval = commandLine.getOptionValue("poll-interval");
+        String backupDirectory = commandLine.getOptionValue("backup-directory");
+        int interval = StringUtils.isNotEmpty(pollInterval) ? \
Integer.parseInt(pollInterval) : 0; +        String verify = \
commandLine.getOptionValue("verify-host", "true"); +        boolean verifyHost = \
Boolean.parseBoolean(verify); +        AuthorizationProvider authorizationProvider = \
null; +        String authProviderClass = \
commandLine.getOptionValue("auth-provider"); +        if (authProviderClass != null) \
{ +          try {
+            Class<?> clazz = Class.forName(authProviderClass);
+            Object obj = clazz.getDeclaredConstructor(String[].class)
+                .newInstance((Object[]) args);
+            if (obj instanceof AuthorizationProvider) {
+              authorizationProvider = (AuthorizationProvider) obj;
+            } else {
+              logger.error(
+                  "The supplied authorization provider does not implement \
AuthorizationProvider"); +              return;
             }
-            throw new ParseException(
-                "The specified configuration file does not exist: " + path);
+          } catch (ReflectiveOperationException ex) {
+            logger.error("Unable to create authorization provider: {}", \
ex.getMessage()); +            return;
+          }
+        }
+        if (authorizationProvider == null && StringUtils.isNotEmpty(confUser)
+            && StringUtils.isNotEmpty(confPassword)) {
+          authorizationProvider = new BasicAuthorizationProvider(confUser, \
confPassword); +        }
+        EventBus eventBus = null;
+        if (reload) {
+          eventBus = new EventBus(agentName + "-event-bus");
+          if (interval == 0) {
+            interval = defaultInterval;
+          }
+        }
+        List<ConfigurationSource> configurationSources = new ArrayList<>();
+        for (URI uri : confUri) {
+          ConfigurationSource configurationSource =
+              ConfigurationSourceFactory.getConfigurationSource(uri, \
authorizationProvider, +                  verifyHost);
+          if (configurationSource != null) {
+            configurationSources.add(configurationSource);
           }
         }
         List<LifecycleAware> components = Lists.newArrayList();
+        UriConfigurationProvider configurationProvider = new \
UriConfigurationProvider(agentName, +            configurationSources, \
backupDirectory, eventBus, interval); +        components.add(configurationProvider);
 
-        if (reload) {
-          EventBus eventBus = new EventBus(agentName + "-event-bus");
-          PollingPropertiesFileConfigurationProvider configurationProvider =
-              new PollingPropertiesFileConfigurationProvider(
-                  agentName, configurationFile, eventBus, 30);
-          components.add(configurationProvider);
-          application = new Application(components);
+        application = new Application(components);
+        if (eventBus != null) {
           eventBus.register(application);
-        } else {
-          PropertiesFileConfigurationProvider configurationProvider =
-              new PropertiesFileConfigurationProvider(agentName, configurationFile);
-          application = new Application();
-          application.handleConfigurationEvent(configurationProvider.getConfiguration());
  }
+        application.handleConfigurationEvent(configurationProvider.getConfiguration());
 +      } else {
+        throw new ParseException("No configuiration was provided");
       }
       application.start();
 
@@ -370,8 +466,35 @@ public class Application {
         }
       });
 
-    } catch (Exception e) {
+    } catch (ParseException | URISyntaxException | RuntimeException e) {
       logger.error("A fatal error occurred while running. Exception follows.", e);
     }
   }
+  @SuppressWarnings("PMD")
+  private static Properties loadConfigOpts() {
+    Properties initProps = new Properties();
+    InputStream is = null;
+    try {
+      is = new FileInputStream("/etc/flume/flume.opts");
+    } catch (IOException  ex) {
+      // Ignore the exception.
+    }
+    if (is == null) {
+      is = Application.class.getClassLoader().getResourceAsStream("flume.opts");
+    }
+    if (is != null) {
+      try {
+        initProps.load(is);
+      } catch (Exception ex) {
+        logger.warn("Unable to load options file due to: {}", ex.getMessage());
+      } finally {
+        try {
+          is.close();
+        } catch (IOException ex) {
+          // Ignore this error.
+        }
+      }
+    }
+    return initProps;
+  }
 }
\ No newline at end of file
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSource.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSource.java \
new file mode 100644 index 0000000..1e456b5
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSource.java
 @@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.InputStream;
+import java.net.URI;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.conf.ConfigurationException;
+
+public class ClasspathConfigurationSource implements ConfigurationSource {
+
+  private final String path;
+  private final URI uri;
+
+  public ClasspathConfigurationSource(URI uri) {
+    this.uri = uri;
+    if (StringUtils.isNotEmpty(uri.getPath())) {
+      // classpath:///filename && classpath:/filename
+      this.path = uri.getPath().substring(1);
+    } else if (StringUtils.isNotEmpty(uri.getAuthority())) {
+      // classpath://filename
+      this.path = uri.getAuthority();
+    } else if (StringUtils.isNotEmpty(uri.getSchemeSpecificPart())) {
+      // classpath:filename
+      this.path = uri.getSchemeSpecificPart();
+    } else {
+      throw new ConfigurationException("Invalid uri: " + uri);
+    }
+  }
+
+  @Override
+  public InputStream getInputStream() {
+    return this.getClass().getClassLoader().getResourceAsStream(path);
+  }
+
+  @Override
+  public String getUri() {
+    return this.uri.toString();
+  }
+
+  @Override
+  public String getExtension() {
+    int length = uri.getPath().indexOf(".");
+    if (length <= 1) {
+      return PROPERTIES;
+    }
+    return uri.getPath().substring(length + 1);
+  }
+
+  @Override
+  public String toString() {
+    return "{ classpath: " + path + "}";
+  }
+
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSourceFactory.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSourceFactory.java
 new file mode 100644
index 0000000..2921f35
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSourceFactory.java
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.flume.node.net.AuthorizationProvider;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Creates a ConfigurationSource from a file on the classpath..
+ */
+public class ClasspathConfigurationSourceFactory implements \
ConfigurationSourceFactory { +
+  private static final List<String> SCHEMES = Lists.newArrayList("classpath");
+
+  public List<String> getSchemes() {
+    return SCHEMES;
+  }
+
+  public ConfigurationSource createConfigurationSource(URI uri, 
+      AuthorizationProvider authorizationProvider, boolean verifyHost) {
+    return new ClasspathConfigurationSource(uri);
+  }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSource.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSource.java new file \
mode 100644 index 0000000..ccd849e
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSource.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.io.InputStream;
+
+/**
+ * Interface for retrieving configuration data.
+ */
+public interface ConfigurationSource {
+
+  static final String PROPERTIES = "properties";
+  static final String JSON = "json";
+  static final String YAML = "yaml";
+  static final String XML = "xml";
+
+  /**
+   * Returns the InputStream if it hasn't already been processed.
+   * @return The InputStream or null.
+   */
+  InputStream getInputStream();
+
+  /**
+   * Returns the URI string.
+   * @return The string URI.
+   */
+  String getUri();
+
+  /**
+   * Determine if the configuration data source has been modified since it was last \
checked. +   * @return true if the data was modified.
+   */
+  default boolean isModified() {
+    return false;
+  }
+
+  /**
+   * Return the "file" extension for the specified uri.
+   * @return The file extension.
+   */
+  String getExtension();
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSourceFactory.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSourceFactory.java \
new file mode 100644 index 0000000..1d57949
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSourceFactory.java
 @@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.net.URI;
+import java.util.List;
+import java.util.ServiceLoader;
+
+import org.apache.flume.node.net.AuthorizationProvider;
+
+/**
+ * Creates ConfigurationSources.
+ */
+public interface ConfigurationSourceFactory {
+
+  static ConfigurationSource getConfigurationSource(URI uri,
+      AuthorizationProvider authorizationProvider, boolean verifyHost) {
+
+    String protocol = uri.getScheme();
+    final ServiceLoader<ConfigurationSourceFactory> serviceLoader =
+        ServiceLoader.load(ConfigurationSourceFactory.class,
+        ConfigurationSourceFactory.class.getClassLoader());
+    for (final ConfigurationSourceFactory configurationSourceFactory : \
serviceLoader) { +      if \
(configurationSourceFactory.getSchemes().contains(protocol)) { +        return \
configurationSourceFactory.createConfigurationSource(uri, authorizationProvider, +    \
verifyHost); +      }
+    }
+    return null;
+  }
+
+  List<String> getSchemes();
+
+  ConfigurationSource createConfigurationSource(URI uri,
+      AuthorizationProvider authorizationProvider, boolean verifyHost);
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/EnvVarResolverProperties.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/EnvVarResolverProperties.java \
                index e0b0d22..ce64de8 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/EnvVarResolverProperties.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/EnvVarResolverProperties.java
@@ -28,8 +28,13 @@ import java.util.regex.Pattern;
  * A class that extends the Java built-in Properties overriding
  * {@link java.util.Properties#getProperty(String)} to allow ${ENV_VAR_NAME}-style \
                environment
  * variable inclusions
+ * @deprecated Use ${env:key} instead.
  */
+@Deprecated
 public class EnvVarResolverProperties extends Properties {
+
+  private static final long serialVersionUID = -9134232469049352862L;
+
   /**
    * @param input The input string with ${ENV_VAR_NAME}-style environment variable \
                names
    * @return The output string with ${ENV_VAR_NAME} replaced with their environment \
                variable values
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSource.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSource.java new \
file mode 100644 index 0000000..4db66a0
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSource.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.flume.CounterGroup;
+import org.apache.flume.conf.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileConfigurationSource implements ConfigurationSource {
+
+  private static final Logger LOGGER = \
LoggerFactory.getLogger(FileConfigurationSource.class); +
+  private final Path path;
+  private final URI uri;
+  private final CounterGroup counterGroup;
+  private byte[] data;
+  private long lastChange;
+
+  public FileConfigurationSource(URI uri) {
+    this.uri = uri;
+    this.path = Paths.get(uri);
+    counterGroup = new CounterGroup();
+    try {
+      this.lastChange = path.toFile().lastModified();
+      data = Files.readAllBytes(this.path);
+    } catch (IOException ioe) {
+      LOGGER.error("Unable to read {}: {}", path.toString(), ioe.getMessage());
+      throw new ConfigurationException("Unable to read file " + path.toString(), \
ioe); +    }
+  }
+
+  @Override
+  public InputStream getInputStream() {
+    return new ByteArrayInputStream(data);
+  }
+
+  @Override
+  public String getUri() {
+    return this.uri.toString();
+  }
+
+  @Override
+  public String getExtension() {
+    int length = uri.getPath().indexOf(".");
+    if (length <= 1) {
+      return PROPERTIES;
+    }
+    return uri.getPath().substring(length + 1);
+  }
+
+  @Override
+  public boolean isModified() {
+    LOGGER.debug("Checking file:{} for changes", path.toString());
+
+    counterGroup.incrementAndGet("file.checks");
+
+    long lastModified = path.toFile().lastModified();
+
+    if (lastModified > lastChange) {
+      LOGGER.info("Reloading configuration file:{}", path.toString());
+
+      counterGroup.incrementAndGet("file.loads");
+
+      lastChange = lastModified;
+
+      try {
+        data = Files.readAllBytes(path);
+        return true;
+      } catch (Exception e) {
+        LOGGER.error("Failed to load configuration data. Exception follows.", e);
+      } catch (NoClassDefFoundError e) {
+        LOGGER.error("Failed to start agent because dependencies were not found in \
classpath." +            + "Error follows.", e);
+      } catch (Throwable t) {
+        // caught because the caller does not handle or log Throwables
+        LOGGER.error("Unhandled error", t);
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return "{ file:" + path.toString() + "}";
+  }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSourceFactory.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSourceFactory.java
 new file mode 100644
index 0000000..6325fe8
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSourceFactory.java
 @@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.flume.node.net.AuthorizationProvider;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Creates a FileConfigurationSource.
+ */
+public class FileConfigurationSourceFactory implements ConfigurationSourceFactory {
+
+  @SuppressWarnings(value = {"EI_EXPOSE_REP"})
+  private static final List<String> SCHEMES = Lists.newArrayList("file");
+
+  public List<String> getSchemes() {
+    return SCHEMES;
+  }
+
+  public ConfigurationSource createConfigurationSource(URI uri,
+      AuthorizationProvider authorizationProvider, boolean verifyHost) {
+    return new FileConfigurationSource(uri);
+  }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSource.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSource.java new \
file mode 100644 index 0000000..6beafe3
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSource.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.node.net.AuthorizationProvider;
+import org.apache.flume.node.net.UrlConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HttpConfigurationSource implements ConfigurationSource {
+
+  private static final Logger LOGGER = \
LoggerFactory.getLogger(HttpConfigurationSource.class); +  private static final int \
NOT_MODIFIED = 304; +  private static final int OK = 200;
+  private static final int BUF_SIZE = 1024;
+
+  private final URI uri;
+  private final CounterGroup counterGroup;
+  private final AuthorizationProvider authorizationProvider;
+  private final boolean verifyHost;
+  private long lastModified = 0;
+  private byte[] data = null;
+
+  public HttpConfigurationSource(URI uri, AuthorizationProvider \
authorizationProvider, +      boolean verifyHost) {
+    this.authorizationProvider = authorizationProvider;
+    this.uri = uri;
+    this.verifyHost = verifyHost;
+    counterGroup = new CounterGroup();
+    readInputStream();
+  }
+
+  @Override
+  public InputStream getInputStream() {
+    return new ByteArrayInputStream(data);
+  }
+
+  @Override
+  public String getUri() {
+    return this.uri.toString();
+  }
+
+  @Override
+  public String getExtension() {
+    int length = uri.getPath().indexOf(".");
+    if (length <= 1) {
+      return PROPERTIES;
+    }
+    return uri.getPath().substring(length + 1);
+  }
+
+  @Override
+  public boolean isModified() {
+    LOGGER.debug("Checking {} for changes", uri);
+
+    counterGroup.incrementAndGet("uri.checks");
+    try {
+      LOGGER.info("Reloading configuration from:{}", uri);
+      if (readInputStream()) {
+        counterGroup.incrementAndGet("uri.loads");
+        return true;
+      }
+    } catch (ConfigurationException ex) {
+      LOGGER.error("Unable to access configuration due to {}: ", ex.getMessage());
+    }
+    return false;
+  }
+
+  private boolean readInputStream() {
+    try {
+      HttpURLConnection connection = \
UrlConnectionFactory.createConnection(uri.toURL(), +          authorizationProvider, \
lastModified, verifyHost); +      connection.connect();
+
+      int code = connection.getResponseCode();
+      switch (code) {
+        case NOT_MODIFIED: {
+          LOGGER.debug("Configuration Not Modified");
+          return false;
+        }
+        case OK: {
+          try (InputStream is = connection.getInputStream()) {
+            lastModified = connection.getLastModified();
+            LOGGER.debug("Content was modified for {}", uri.toString());
+            data = IOUtils.toByteArray(is);
+            return true;
+          } catch (final IOException e) {
+            try (InputStream es = connection.getErrorStream()) {
+              LOGGER.info("Error accessing configuration at {}: {}", uri, \
readStream(es)); +            } catch (final IOException ioe) {
+              LOGGER.error("Error accessing configuration at {}: {}", uri, \
e.getMessage()); +            }
+            throw new ConfigurationException("Unable to access " + uri.toString(), \
e); +          }
+        }
+        default: {
+          if (code < 0) {
+            LOGGER.info("Invalid response code returned");
+          } else {
+            LOGGER.info("Unexpected response code returned {}", code);
+          }
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Error accessing {}: {}", uri.toString(), e.getMessage());
+      throw new ConfigurationException("Unable to access " + uri.toString(), e);
+    }
+  }
+
+  private byte[] readStream(InputStream is) throws IOException {
+    ByteArrayOutputStream result = new ByteArrayOutputStream();
+    byte[] buffer = new byte[BUF_SIZE];
+    int length;
+    while ((length = is.read(buffer)) != -1) {
+      result.write(buffer, 0, length);
+    }
+    return result.toByteArray();
+  }
+
+  @Override
+  public String toString() {
+    return "{ uri:" + uri + "}";
+  }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSourceFactory.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSourceFactory.java
 new file mode 100644
index 0000000..10725f0
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSourceFactory.java
 @@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.flume.node.net.AuthorizationProvider;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Creates an HttpConfigurationSource.
+ */
+public class HttpConfigurationSourceFactory implements ConfigurationSourceFactory {
+
+  @SuppressWarnings(value = {"EI_EXPOSE_REP"})
+  private static final List<String> SCHEMES = Lists.newArrayList("http", "https");
+
+  public List<String> getSchemes() {
+    return SCHEMES;
+  }
+
+  public ConfigurationSource createConfigurationSource(URI uri,
+      AuthorizationProvider authorizationProvider, boolean verifyHost) {
+    return new HttpConfigurationSource(uri, authorizationProvider, verifyHost);
+  }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/MapResolver.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/MapResolver.java new file mode \
100644 index 0000000..f7571b9
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/MapResolver.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.text.StringSubstitutor;
+import org.apache.commons.text.lookup.StringLookup;
+import org.apache.commons.text.lookup.StringLookupFactory;
+
+/**
+ * Resolves replaceable tokens to create a Map.
+ * <p>
+ * Needs org.apache.commons:commons-lang3 on classpath
+ */
+final class MapResolver {
+
+  private static final String PROPS_IMPL_KEY = "propertiesImplementation";
+  private static final String ENV_VAR_PROPERTY = \
"org.apache.flume.node.EnvVarResolverProperties"; +
+  public static Map<String, String> resolveProperties(Properties properties) {
+    Map<String, String> map = new HashMap<>();
+    boolean useEnvVars = \
ENV_VAR_PROPERTY.equals(System.getProperty(PROPS_IMPL_KEY)); +    StringLookup \
defaultLookup = useEnvVars ? new DefaultLookup(map) : +        \
StringLookupFactory.INSTANCE.mapStringLookup(map); +    StringLookup lookup = \
StringLookupFactory.INSTANCE.interpolatorStringLookup(defaultLookup); +    \
StringSubstitutor substitutor = new StringSubstitutor(lookup); +    \
substitutor.setEnableSubstitutionInVariables(true); +    \
properties.stringPropertyNames().forEach((k) -> map.put(k, +        \
substitutor.replace(properties.getProperty(k)))); +    return map;
+  }
+
+  private static class DefaultLookup implements StringLookup {
+    private final Map<String, String> properties;
+
+    DefaultLookup(Map<String, String> properties) {
+      this.properties = properties;
+    }
+
+    /**
+     * Provide compatibility with EnvVarResolverProperties.
+     *
+     * @param key The key.
+     * @return The value associated with the key or null.
+     */
+    @Override
+    public String lookup(String key) {
+      return properties.containsKey(key) ?
+          properties.get(key) : System.getenv(key);
+    }
+  }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java \
                index fa3ef55..80d5497 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java
                
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java
 @@ -32,16 +32,16 @@ import java.util.Map;
  */
 public interface MaterializedConfiguration {
 
-  public void addSourceRunner(String name, SourceRunner sourceRunner);
+  void addSourceRunner(String name, SourceRunner sourceRunner);
 
-  public void addSinkRunner(String name, SinkRunner sinkRunner);
+  void addSinkRunner(String name, SinkRunner sinkRunner);
 
-  public void addChannel(String name, Channel channel);
+  void addChannel(String name, Channel channel);
 
-  public Map<String, SourceRunner> getSourceRunners();
+  Map<String, SourceRunner> getSourceRunners();
 
-  public Map<String, SinkRunner> getSinkRunners();
+  Map<String, SinkRunner> getSinkRunners();
 
-  public Map<String, Channel> getChannels();
+  Map<String, Channel> getChannels();
 
 }
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
 index 13cb38f..db040b5 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
                
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
 @@ -18,143 +18,20 @@
 package org.apache.flume.node;
 
 import java.io.File;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.flume.CounterGroup;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.eventbus.EventBus;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class PollingPropertiesFileConfigurationProvider
-    extends PropertiesFileConfigurationProvider
-    implements LifecycleAware {
-
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(PollingPropertiesFileConfigurationProvider.class);
-
-  private final EventBus eventBus;
-  private final File file;
-  private final int interval;
-  private final CounterGroup counterGroup;
-  private LifecycleState lifecycleState;
-
-  private ScheduledExecutorService executorService;
-
-  public PollingPropertiesFileConfigurationProvider(String agentName,
-      File file, EventBus eventBus, int interval) {
-    super(agentName, file);
-    this.eventBus = eventBus;
-    this.file = file;
-    this.interval = interval;
-    counterGroup = new CounterGroup();
-    lifecycleState = LifecycleState.IDLE;
-  }
-
-  @Override
-  public void start() {
-    LOGGER.info("Configuration provider starting");
-
-    Preconditions.checkState(file != null,
-        "The parameter file must not be null");
-
-    executorService = Executors.newSingleThreadScheduledExecutor(
-            new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
-                .build());
-
-    FileWatcherRunnable fileWatcherRunnable =
-        new FileWatcherRunnable(file, counterGroup);
-
-    executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
-        TimeUnit.SECONDS);
-
-    lifecycleState = LifecycleState.START;
-
-    LOGGER.debug("Configuration provider started");
-  }
-
-  @Override
-  public void stop() {
-    LOGGER.info("Configuration provider stopping");
-
-    executorService.shutdown();
-    try {
-      if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
-        LOGGER.debug("File watcher has not terminated. Forcing shutdown of \
                executor.");
-        executorService.shutdownNow();
-        while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
-          LOGGER.debug("Waiting for file watcher to terminate");
-        }
-      }
-    } catch (InterruptedException e) {
-      LOGGER.debug("Interrupted while waiting for file watcher to terminate");
-      Thread.currentThread().interrupt();
-    }
-    lifecycleState = LifecycleState.STOP;
-    LOGGER.debug("Configuration provider stopped");
-  }
-
-  @Override
-  public synchronized  LifecycleState getLifecycleState() {
-    return lifecycleState;
-  }
-
-
-  @Override
-  public String toString() {
-    return "{ file:" + file + " counterGroup:" + counterGroup + "  provider:"
-        + getClass().getCanonicalName() + " agentName:" + getAgentName() + " }";
-  }
-
-  public class FileWatcherRunnable implements Runnable {
-
-    private final File file;
-    private final CounterGroup counterGroup;
-
-    private long lastChange;
 
-    public FileWatcherRunnable(File file, CounterGroup counterGroup) {
-      super();
-      this.file = file;
-      this.counterGroup = counterGroup;
-      this.lastChange = 0L;
-    }
-
-    @Override
-    public void run() {
-      LOGGER.debug("Checking file:{} for changes", file);
-
-      counterGroup.incrementAndGet("file.checks");
-
-      long lastModified = file.lastModified();
-
-      if (lastModified > lastChange) {
-        LOGGER.info("Reloading configuration file:{}", file);
-
-        counterGroup.incrementAndGet("file.loads");
-
-        lastChange = lastModified;
+/**
+ * @deprecated Use UriConfigurationProvider instead.
+ */
+@Deprecated
+public class PollingPropertiesFileConfigurationProvider extends \
UriConfigurationProvider {  
-        try {
-          eventBus.post(getConfiguration());
-        } catch (Exception e) {
-          LOGGER.error("Failed to load configuration data. Exception follows.",
-              e);
-        } catch (NoClassDefFoundError e) {
-          LOGGER.error("Failed to start agent because dependencies were not " +
-              "found in classpath. Error follows.", e);
-        } catch (Throwable t) {
-          // caught because the caller does not handle or log Throwables
-          LOGGER.error("Unhandled error", t);
-        }
-      }
-    }
+  public PollingPropertiesFileConfigurationProvider(String agentName, File file, \
EventBus eventBus, +      int interval) {
+    super(agentName, Lists.newArrayList(new FileConfigurationSource(file.toURI())), \
null, +        eventBus, interval);
   }
 
 }
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
 index 75f45db..a2b0409 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
                
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
 @@ -17,16 +17,9 @@
  */
 package org.apache.flume.node;
 
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Properties;
 
-import org.apache.flume.conf.FlumeConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
 
 /**
  * <p>
@@ -165,52 +158,13 @@ import org.slf4j.LoggerFactory;
  * </p>
  *
  * @see java.util.Properties#load(java.io.Reader)
+ * @deprecated Use UriConfigurationProvider.
  */
-public class PropertiesFileConfigurationProvider extends
-    AbstractConfigurationProvider {
-
-  private static final Logger LOGGER = LoggerFactory
-      .getLogger(PropertiesFileConfigurationProvider.class);
-  private static final String DEFAULT_PROPERTIES_IMPLEMENTATION = \
                "java.util.Properties";
-
-  private final File file;
+@Deprecated
+public class PropertiesFileConfigurationProvider extends UriConfigurationProvider {
 
   public PropertiesFileConfigurationProvider(String agentName, File file) {
-    super(agentName);
-    this.file = file;
-  }
-
-  @Override
-  public FlumeConfiguration getFlumeConfiguration() {
-    BufferedReader reader = null;
-    try {
-      reader = new BufferedReader(new FileReader(file));
-      String resolverClassName = System.getProperty("propertiesImplementation",
-          DEFAULT_PROPERTIES_IMPLEMENTATION);
-      Class<? extends Properties> propsclass = Class.forName(resolverClassName)
-          .asSubclass(Properties.class);
-      Properties properties = propsclass.newInstance();
-      properties.load(reader);
-      return new FlumeConfiguration(toMap(properties));
-    } catch (IOException ex) {
-      LOGGER.error("Unable to load file:" + file
-          + " (I/O failure) - Exception follows.", ex);
-    } catch (ClassNotFoundException e) {
-      LOGGER.error("Configuration resolver class not found", e);
-    } catch (InstantiationException e) {
-      LOGGER.error("Instantiation exception", e);
-    } catch (IllegalAccessException e) {
-      LOGGER.error("Illegal access exception", e);
-    } finally {
-      if (reader != null) {
-        try {
-          reader.close();
-        } catch (IOException ex) {
-          LOGGER.warn(
-              "Unable to close file reader for file: " + file, ex);
-        }
-      }
-    }
-    return new FlumeConfiguration(new HashMap<String, String>());
+    super(agentName, Lists.newArrayList(new FileConfigurationSource(file.toURI())), \
null, null, 0); +    super.start();
   }
 }
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/UriConfigurationProvider.java \
similarity index 52% copy from \
flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
 copy to flume-ng-node/src/main/java/org/apache/flume/node/UriConfigurationProvider.java
 index 75f45db..e4c74c6 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
                
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/UriConfigurationProvider.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,27 +17,41 @@
  */
 package org.apache.flume.node;
 
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileReader;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.HashMap;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.flume.CounterGroup;
+import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * <p>
- * A configuration provider that uses properties file for specifying
- * configuration. The configuration files follow the Java properties file syntax
- * rules specified at {@link java.util.Properties#load(java.io.Reader)}. Every
- * configuration value specified in the properties file is prefixed by an
+ * A configuration provider that uses properties for specifying
+ * configuration. The configurations follow the Java properties file syntax
+ * rules specified at {@link Properties#load(java.io.Reader)}. Every
+ * configuration value specified in the properties is prefixed by an
  * <em>Agent Name</em> which helps isolate an individual agent&apos;s namespace.
  * </p>
  * <p>
- * Valid configuration files must observe the following rules for every agent
+ * Valid configurations must observe the following rules for every agent
  * namespace.
  * <ul>
  * <li>For every &lt;agent name&gt; there must be three lists specified that
@@ -101,7 +115,7 @@ import org.slf4j.LoggerFactory;
  * <li>Sinks not assigned to a group will be assigned to default single sink
  * groups.</li>
  * </ul>
- *
+ * <p>
  * Apart from the above required configuration values, each source, sink or
  * channel can have its own set of arbitrary configuration as required by the
  * implementation. Each of these configuration values are expressed by fully
@@ -164,53 +178,180 @@ import org.slf4j.LoggerFactory;
  *
  * </p>
  *
- * @see java.util.Properties#load(java.io.Reader)
+ * @see Properties#load(java.io.Reader)
  */
-public class PropertiesFileConfigurationProvider extends
-    AbstractConfigurationProvider {
+public class UriConfigurationProvider extends AbstractConfigurationProvider
+    implements LifecycleAware {
 
-  private static final Logger LOGGER = LoggerFactory
-      .getLogger(PropertiesFileConfigurationProvider.class);
-  private static final String DEFAULT_PROPERTIES_IMPLEMENTATION = \
"java.util.Properties"; +  private static final Logger LOGGER = \
LoggerFactory.getLogger(UriConfigurationProvider.class);  
-  private final File file;
+  private final List<ConfigurationSource> configurationSources;
+  private final File backupDirectory;
+  private final EventBus eventBus;
+  private final int interval;
+  private final CounterGroup counterGroup;
+  private LifecycleState lifecycleState = LifecycleState.IDLE;
+  private ScheduledExecutorService executorService;
 
-  public PropertiesFileConfigurationProvider(String agentName, File file) {
+  public UriConfigurationProvider(String agentName, List<ConfigurationSource> \
sourceList, +      String backupDirectory, EventBus eventBus, int pollInterval) {
     super(agentName);
-    this.file = file;
+    this.configurationSources = sourceList;
+    this.backupDirectory = backupDirectory != null ? new File(backupDirectory) : \
null; +    this.eventBus = eventBus;
+    this.interval = pollInterval;
+    counterGroup = new CounterGroup();
+  }
+
+  @Override
+  public void start() {
+    if (eventBus != null && interval > 0) {
+      executorService = Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
+              .build());
+
+      WatcherRunnable watcherRunnable = new WatcherRunnable(configurationSources, \
counterGroup, +          eventBus);
+
+      executorService.scheduleWithFixedDelay(watcherRunnable, 0, interval,
+          TimeUnit.SECONDS);
+    }
+    lifecycleState = LifecycleState.START;
+  }
+
+  @Override
+  public void stop() {
+    if (executorService != null) {
+      executorService.shutdown();
+      try {
+        if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+          LOGGER.debug("File watcher has not terminated. Forcing shutdown of \
executor."); +          executorService.shutdownNow();
+          while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+            LOGGER.debug("Waiting for file watcher to terminate");
+          }
+        }
+      } catch (InterruptedException e) {
+        LOGGER.debug("Interrupted while waiting for file watcher to terminate");
+        Thread.currentThread().interrupt();
+      }
+    }
+    lifecycleState = LifecycleState.STOP;
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+  protected List<ConfigurationSource> getConfigurationSources() {
+    return configurationSources;
   }
 
   @Override
   public FlumeConfiguration getFlumeConfiguration() {
-    BufferedReader reader = null;
-    try {
-      reader = new BufferedReader(new FileReader(file));
-      String resolverClassName = System.getProperty("propertiesImplementation",
-          DEFAULT_PROPERTIES_IMPLEMENTATION);
-      Class<? extends Properties> propsclass = Class.forName(resolverClassName)
-          .asSubclass(Properties.class);
-      Properties properties = propsclass.newInstance();
-      properties.load(reader);
-      return new FlumeConfiguration(toMap(properties));
-    } catch (IOException ex) {
-      LOGGER.error("Unable to load file:" + file
-          + " (I/O failure) - Exception follows.", ex);
-    } catch (ClassNotFoundException e) {
-      LOGGER.error("Configuration resolver class not found", e);
-    } catch (InstantiationException e) {
-      LOGGER.error("Instantiation exception", e);
-    } catch (IllegalAccessException e) {
-      LOGGER.error("Illegal access exception", e);
-    } finally {
-      if (reader != null) {
-        try {
-          reader.close();
-        } catch (IOException ex) {
-          LOGGER.warn(
-              "Unable to close file reader for file: " + file, ex);
+    Map<String, String> configMap = null;
+    Properties properties = new Properties();
+    for (ConfigurationSource configurationSource : configurationSources) {
+      try (InputStream is = configurationSource.getInputStream()) {
+        if (is != null) {
+          switch (configurationSource.getExtension()) {
+            case ConfigurationSource.JSON: case ConfigurationSource.YAML:
+            case ConfigurationSource.XML: {
+              LOGGER.warn("File extension type {} is unsupported",
+                  configurationSource.getExtension());
+              break;
+            }
+            default: {
+              properties.load(is);
+              break;
+            }
+          }
+        }
+      } catch (IOException ioe) {
+        LOGGER.warn("Unable to load properties from {}: {}", \
configurationSource.getUri(), +            ioe.getMessage());
+      }
+      if (properties.size() > 0) {
+        configMap = MapResolver.resolveProperties(properties);
+      }
+    }
+    if (configMap != null) {
+      Properties props = new Properties();
+      props.putAll(configMap);
+      if (backupDirectory != null) {
+        if (backupDirectory.mkdirs()) {
+          // This is only being logged to keep Spotbugs happy. We can't ignore the \
result of mkdirs. +          LOGGER.debug("Created directories for {}", \
backupDirectory.toString()); +        }
+        File backupFile = getBackupFile(backupDirectory, getAgentName());
+        try (OutputStream os = new FileOutputStream(backupFile)) {
+          props.store(os, "Backup created at " + LocalDateTime.now().toString());
+        } catch (IOException ioe) {
+          LOGGER.warn("Unable to create backup properties file: {}" + \
ioe.getMessage()); +        }
+      }
+    } else {
+      if (backupDirectory != null) {
+        File backup = getBackupFile(backupDirectory, getAgentName());
+        if (backup.exists()) {
+          Properties props = new Properties();
+          try (InputStream is = new FileInputStream(backup)) {
+            LOGGER.warn("Unable to access primary configuration. Trying backup");
+            props.load(is);
+            configMap = MapResolver.resolveProperties(props);
+          } catch (IOException ex) {
+            LOGGER.warn("Error reading backup file: {}", ex.getMessage());
+          }
+        }
+      }
+    }
+    if (configMap != null) {
+      return new FlumeConfiguration(configMap);
+    } else {
+      LOGGER.error("No configuration could be found");
+      return null;
+    }
+  }
+
+  private File getBackupFile(File backupDirectory, String agentName) {
+    if (backupDirectory != null) {
+      return new File(backupDirectory, "." + agentName + ".properties");
+    }
+    return null;
+  }
+
+  private class WatcherRunnable implements Runnable {
+
+    private List<ConfigurationSource> configurationSources;
+    private final CounterGroup counterGroup;
+    private final EventBus eventBus;
+
+    public WatcherRunnable(List<ConfigurationSource> sources, CounterGroup \
counterGroup, +        EventBus eventBus) {
+      this.configurationSources = sources;
+      this.counterGroup = counterGroup;
+      this.eventBus = eventBus;
+    }
+
+    @Override
+    public void run() {
+      LOGGER.debug("Checking for changes to sources");
+
+      counterGroup.incrementAndGet("uri.checks");
+      try {
+        boolean isModified = false;
+        for (ConfigurationSource source : configurationSources) {
+          if (source.isModified()) {
+            isModified = true;
+          }
+        }
+        if (isModified) {
+          eventBus.post(getConfiguration());
         }
+      } catch (ConfigurationException ex) {
+        LOGGER.warn("Unable to update configuration: {}", ex.getMessage());
       }
     }
-    return new FlumeConfiguration(new HashMap<String, String>());
   }
 }
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/net/AuthorizationProvider.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/net/AuthorizationProvider.java \
new file mode 100644 index 0000000..24542d4
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/net/AuthorizationProvider.java
 @@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node.net;
+
+import java.net.URLConnection;
+
+/**
+ * Interface to be implemented to add an Authorization header to an HTTP request.
+ */
+public interface AuthorizationProvider {
+
+  void addAuthorization(URLConnection urlConnection);
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/net/BasicAuthorizationProvider.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/net/BasicAuthorizationProvider.java
 new file mode 100644
index 0000000..aa1fc9a
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/net/BasicAuthorizationProvider.java
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node.net;
+
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+/**
+ * Provides the Basic Authorization header to a request.
+ */
+public class BasicAuthorizationProvider implements AuthorizationProvider {
+
+  private static final Base64.Encoder encoder = Base64.getEncoder();
+
+  private String authString = null;
+
+  public BasicAuthorizationProvider(String userName, String password) {
+    if (userName != null && password != null) {
+      String toEncode = userName + ":" + password;
+      authString = "Basic " + \
encoder.encodeToString(toEncode.getBytes(StandardCharsets.UTF_8)); +    }
+  }
+
+  @Override
+  public void addAuthorization(URLConnection urlConnection) {
+    if (authString != null) {
+      urlConnection.setRequestProperty("Authorization", authString);
+    }
+  }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/net/LaxHostnameVerifier.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/net/LaxHostnameVerifier.java new \
file mode 100644 index 0000000..d1cbaaf
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/net/LaxHostnameVerifier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node.net;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLSession;
+
+/**
+ * An HostnameVerifier which accepts everything.
+ */
+public final class LaxHostnameVerifier implements HostnameVerifier {
+  /**
+   * Singleton instance.
+   */
+  public static final HostnameVerifier INSTANCE = new LaxHostnameVerifier();
+
+  private LaxHostnameVerifier() {
+  }
+
+  @Override
+  public boolean verify(final String s, final SSLSession sslSession) {
+    return true;
+  }
+}
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/net/UrlConnectionFactory.java \
b/flume-ng-node/src/main/java/org/apache/flume/node/net/UrlConnectionFactory.java new \
file mode 100644 index 0000000..21e4f26
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/net/UrlConnectionFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node.net;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+import javax.net.ssl.HttpsURLConnection;
+
+/**
+ * Constructs an HTTPURLConnection.
+ */
+public class UrlConnectionFactory {
+
+  private static int DEFAULT_TIMEOUT = 60000;
+  private static int connectTimeoutMillis = DEFAULT_TIMEOUT;
+  private static int readTimeoutMillis = DEFAULT_TIMEOUT;
+  private static final String XML = "application/xml";
+  private static final String YAML = "application/yaml";
+  private static final String JSON = "application/json";
+  private static final String PROPERTIES = "text/x-java-properties";
+  private static final String TEXT = "text/plain";
+  public static final String HTTP = "http";
+  public static final String HTTPS = "https";
+
+  public static HttpURLConnection createConnection(URL url,
+      AuthorizationProvider authorizationProvider, long lastModifiedMillis, boolean \
verifyHost) +      throws IOException {
+    final HttpURLConnection urlConnection = (HttpURLConnection) \
url.openConnection(); +    if (HTTPS.equals(url.getProtocol()) && !verifyHost) {
+      ((HttpsURLConnection) \
urlConnection).setHostnameVerifier(LaxHostnameVerifier.INSTANCE); +    }
+    if (authorizationProvider != null) {
+      authorizationProvider.addAuthorization(urlConnection);
+    }
+    urlConnection.setAllowUserInteraction(false);
+    urlConnection.setDoOutput(true);
+    urlConnection.setDoInput(true);
+    urlConnection.setRequestMethod("GET");
+    if (connectTimeoutMillis > 0) {
+      urlConnection.setConnectTimeout(connectTimeoutMillis);
+    }
+    if (readTimeoutMillis > 0) {
+      urlConnection.setReadTimeout(readTimeoutMillis);
+    }
+    urlConnection.setRequestProperty("Content-Type", getContentType(url));
+    if (lastModifiedMillis > 0) {
+      urlConnection.setIfModifiedSince(lastModifiedMillis);
+    }
+    return urlConnection;
+  }
+
+  public static URLConnection createConnection(URL url) throws IOException {
+    return createConnection(url, null, 0, true);
+  }
+
+  public static URLConnection createConnection(URL url, AuthorizationProvider \
authorizationProvider) +      throws IOException {
+    URLConnection urlConnection = null;
+    if (url.getProtocol().equals(HTTPS) || url.getProtocol().equals(HTTP)) {
+      urlConnection = createConnection(url, authorizationProvider, 0, true);
+    } else {
+      urlConnection = url.openConnection();
+    }
+    return urlConnection;
+  }
+
+  private static String getContentType(URL url) {
+    String[] fileParts = url.getFile().split("\\.");
+    String type = fileParts[fileParts.length - 1].trim();
+    switch (type) {
+      case "properties": {
+        return PROPERTIES;
+      }
+      case "json": {
+        return JSON;
+      }
+      case "yaml": case "yml": {
+        return YAML;
+      }
+      case "xml": {
+        return XML;
+      }
+      default: {
+        return TEXT;
+      }
+    }
+  }
+}
diff --git a/flume-ng-node/src/main/resources/META-INF/services/org.apache.flume.node.ConfigurationSourceFactory \
b/flume-ng-node/src/main/resources/META-INF/services/org.apache.flume.node.ConfigurationSourceFactory
 new file mode 100644
index 0000000..c49998c
--- /dev/null
+++ b/flume-ng-node/src/main/resources/META-INF/services/org.apache.flume.node.ConfigurationSourceFactory
 @@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache license, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the license for the specific language governing permissions and
+# limitations under the license.
+org.apache.flume.node.ClasspathConfigurationSourceFactory
+org.apache.flume.node.FileConfigurationSourceFactory
+org.apache.flume.node.HttpConfigurationSourceFactory
\ No newline at end of file
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestClasspathConfigurationSource.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestClasspathConfigurationSource.java
 new file mode 100644
index 0000000..5fb208f
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestClasspathConfigurationSource.java
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.net.URI;
+import java.util.Properties;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests that files can be loaded from the Classpath.
+ */
+public class TestClasspathConfigurationSource {
+
+  @Test
+  public void testClasspath() throws Exception {
+    URI confFile = new URI("classpath:///flume-conf.properties");
+    ConfigurationSource source = new ClasspathConfigurationSource(confFile);
+    Assert.assertNotNull("No configuration returned", source);
+    Properties props = new Properties();
+    props.load(source.getInputStream());
+    String value = props.getProperty("host1.sources");
+    Assert.assertNotNull("Missing key", value);
+  }
+
+  @Test
+  public void testOddClasspath() throws Exception {
+    URI confFile = new URI("classpath:/flume-conf.properties");
+    ConfigurationSource source = new ClasspathConfigurationSource(confFile);
+    Assert.assertNotNull("No configuration returned", source);
+    Properties props = new Properties();
+    props.load(source.getInputStream());
+    String value = props.getProperty("host1.sources");
+    Assert.assertNotNull("Missing key", value);
+  }
+
+  @Test
+  public void testImproperClasspath() throws Exception {
+    URI confFile = new URI("classpath://flume-conf.properties");
+    ConfigurationSource source = new ClasspathConfigurationSource(confFile);
+    Assert.assertNotNull("No configuration returned", source);
+    Properties props = new Properties();
+    props.load(source.getInputStream());
+    String value = props.getProperty("host1.sources");
+    Assert.assertNotNull("Missing key", value);
+  }
+
+  @Test
+  public void testShorthandClasspath() throws Exception {
+    URI confFile = new URI("classpath:flume-conf.properties");
+    ConfigurationSource source = new ClasspathConfigurationSource(confFile);
+    Assert.assertNotNull("No configuration returned", source);
+    Properties props = new Properties();
+    props.load(source.getInputStream());
+    String value = props.getProperty("host1.sources");
+    Assert.assertNotNull("Missing key", value);
+  }
+}
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestEnvLookup.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestEnvLookup.java new file mode \
100644 index 0000000..36f95b1
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestEnvLookup.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Assert;
+
+public class TestEnvLookup {
+  private static final File TESTFILE = new File(
+      TestEnvLookup.class.getClassLoader()
+          .getResource("flume-conf-with-envLookup.properties").getFile());
+  private static final String NC_PORT = "6667";
+
+  @Rule
+  public final EnvironmentVariables environmentVariables = new \
EnvironmentVariables(); +  private UriConfigurationProvider provider;
+
+  @Before
+  public void setUp() throws Exception {
+    environmentVariables.set("NC_PORT", NC_PORT);
+    List<ConfigurationSource> sourceList =
+        Lists.newArrayList(new FileConfigurationSource(TESTFILE.toURI()));
+    provider = new UriConfigurationProvider("a1", sourceList, null,
+        null, 0);
+  }
+
+  @Test
+  public void getProperty() throws Exception {
+
+    Assert.assertEquals(NC_PORT, provider.getFlumeConfiguration()
+        .getConfigurationFor("a1")
+        .getSourceContext().get("r1").getParameters().get("port"));
+  }
+}
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java \
new file mode 100644 index 0000000..b128116
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.util.Base64;
+import java.util.Enumeration;
+import java.util.Properties;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.node.net.AuthorizationProvider;
+import org.apache.flume.node.net.BasicAuthorizationProvider;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests that files can be loaded via http.
+ */
+public class TestHttpConfigurationSource {
+
+  private static final String BASIC = "Basic ";
+  private static final String expectedCreds = "flume:flume";
+  private static Server server;
+  private static Base64.Decoder decoder = Base64.getDecoder();
+
+  @BeforeClass
+  public static void startServer() throws Exception {
+    try {
+      server = new Server(1080);
+      ServletContextHandler context = new ServletContextHandler();
+      ServletHolder defaultServ = new ServletHolder("default", TestServlet.class);
+      defaultServ.setInitParameter("resourceBase", System.getProperty("user.dir"));
+      defaultServ.setInitParameter("dirAllowed", "true");
+      context.addServlet(defaultServ, "/");
+      server.setHandler(context);
+
+      // Start Server
+      server.start();
+    } catch (Throwable ex) {
+      ex.printStackTrace();
+      throw ex;
+    }
+  }
+
+  @AfterClass
+  public static void stopServer() throws Exception {
+    server.stop();
+  }
+
+
+  @Test(expected = ConfigurationException.class)
+  public void testBadCrdentials() throws Exception {
+    URI confFile = new URI("http://localhost/flume-conf.properties");
+    AuthorizationProvider authProvider = new BasicAuthorizationProvider("foo", \
"bar"); +    ConfigurationSource source = new HttpConfigurationSource(confFile, \
authProvider, true); +  }
+
+  @Test
+  public void testGet() throws Exception {
+    URI confFile = new URI("http://localhost:1080/flume-conf.properties");
+    AuthorizationProvider authProvider = new BasicAuthorizationProvider("flume", \
"flume"); +    ConfigurationSource source = new HttpConfigurationSource(confFile, \
authProvider, true); +    Assert.assertNotNull("No configuration returned", source);
+    InputStream is = source.getInputStream();
+    Assert.assertNotNull("No data returned", is);
+    Properties props = new Properties();
+    props.load(is);
+    String value = props.getProperty("host1.sources");
+    Assert.assertNotNull("Missing key", value);
+    Assert.assertFalse(source.isModified());
+    File file = new File("target/test-classes/flume-conf.properties");
+    if (file.setLastModified(System.currentTimeMillis())) {
+      Assert.assertTrue(source.isModified());
+    }
+  }
+
+  public static class TestServlet extends DefaultServlet {
+
+    private static final long serialVersionUID = -2885158530511450659L;
+
+    @Override
+    protected void doGet(HttpServletRequest request,
+        HttpServletResponse response) throws ServletException, IOException {
+      Enumeration<String> headers = \
request.getHeaders(HttpHeader.AUTHORIZATION.toString()); +      if (headers == null) \
{ +        response.sendError(401, "No Auth header");
+        return;
+      }
+      while (headers.hasMoreElements()) {
+        String authData = headers.nextElement();
+        Assert.assertTrue("Not a Basic auth header", authData.startsWith(BASIC));
+        String credentials = new \
String(decoder.decode(authData.substring(BASIC.length()))); +        \
Assert.assertEquals(expectedCreds, credentials); +      }
+      if (request.getServletPath().equals("/flume-conf.properties")) {
+        File file = new File("target/test-classes/flume-conf.properties");
+        long modifiedSince = \
request.getDateHeader(HttpHeader.IF_MODIFIED_SINCE.toString()); +        long \
lastModified = file.lastModified(); +        if (modifiedSince > 0 && lastModified <= \
modifiedSince) { +          response.setStatus(304);
+          return;
+        }
+        response.setDateHeader(HttpHeader.LAST_MODIFIED.toString(), lastModified);
+        response.setContentLengthLong(file.length());
+        Files.copy(file.toPath(), response.getOutputStream());
+        response.getOutputStream().flush();
+        response.setStatus(200);
+      } else {
+        response.sendError(400, "Unsupported request");
+      }
+    }
+  }
+}
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestOverrideFile.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestOverrideFile.java new file \
mode 100644 index 0000000..0b5e5e0
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestOverrideFile.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Assert;
+
+public class TestOverrideFile {
+  private static final File TESTFILE = new File(
+      TestOverrideFile.class.getClassLoader()
+          .getResource("flume-conf-with-recursiveLookup.properties").getFile());
+  private static final File OVERRIDEFILE = new File(
+      TestOverrideFile.class.getClassLoader()
+          .getResource("flume-conf-override.properties").getFile());
+  private static final String BIND = "192.168.13.101";
+
+  @Rule
+  public final EnvironmentVariables environmentVariables = new \
EnvironmentVariables(); +  private UriConfigurationProvider provider;
+
+  @Before
+  public void setUp() throws Exception {
+    System.setProperty("env", "DEV");
+    List<ConfigurationSource> sourceList =
+        Lists.newArrayList(new FileConfigurationSource(TESTFILE.toURI()),
+            new FileConfigurationSource(OVERRIDEFILE.toURI()));
+    provider = new UriConfigurationProvider("a1", sourceList, null,
+        null, 0);
+  }
+
+  @Test
+  public void getProperty() throws Exception {
+
+    Assert.assertEquals(BIND, provider.getFlumeConfiguration()
+        .getConfigurationFor("a1")
+        .getSourceContext().get("r1").getParameters().get("bind"));
+  }
+}
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
 index 480f6a5..d20fa35 100644
--- a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
                
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
 @@ -40,7 +40,7 @@ public class TestPollingPropertiesFileConfigurationProvider  {
       TestPollingPropertiesFileConfigurationProvider.class.getClassLoader()
           .getResource("flume-conf.properties").getFile());
 
-  private PollingPropertiesFileConfigurationProvider provider;
+  private UriConfigurationProvider provider;
   private File baseDir;
   private File configFile;
   private EventBus eventBus;
@@ -54,9 +54,9 @@ public class TestPollingPropertiesFileConfigurationProvider  {
     Files.copy(TESTFILE, configFile);
 
     eventBus = new EventBus("test");
-    provider =
-        new PollingPropertiesFileConfigurationProvider("host1",
-            configFile, eventBus, 1);
+    ConfigurationSource source = new FileConfigurationSource(configFile.toURI());
+    provider = new UriConfigurationProvider("host1", Lists.newArrayList(source), \
null, +        eventBus, 1);
     provider.start();
     LifecycleController.waitForOneOf(provider, LifecycleState.START_OR_ERROR);
   }
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
 index 4875c56..95c7777 100644
--- a/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
                
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
 @@ -43,11 +43,15 @@ public class TestPropertiesFileConfigurationProvider  {
       TestPropertiesFileConfigurationProvider.class.getClassLoader()
           .getResource("flume-conf.properties").getFile());
 
-  private PropertiesFileConfigurationProvider provider;
+  private UriConfigurationProvider provider;
+  private List<ConfigurationSource> sources;
 
   @Before
   public void setUp() throws Exception {
-    provider = new PropertiesFileConfigurationProvider("test", TESTFILE);
+    ConfigurationSource source = new FileConfigurationSource(TESTFILE.toURI());
+    sources = Lists.newArrayList(source);
+    provider = new UriConfigurationProvider("test", sources, null, null, 0);
+    provider.start();
   }
 
   @After
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestRecursiveLookup.java \
b/flume-ng-node/src/test/java/org/apache/flume/node/TestRecursiveLookup.java new file \
mode 100644 index 0000000..de78ea6
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestRecursiveLookup.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.node;
+
+import java.io.File;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Assert;
+
+public class TestRecursiveLookup {
+  private static final File TESTFILE = new File(
+      TestRecursiveLookup.class.getClassLoader()
+          .getResource("flume-conf-with-recursiveLookup.properties").getFile());
+  private static final String BIND = "192.168.11.101";
+
+  @Rule
+  public final EnvironmentVariables environmentVariables = new \
EnvironmentVariables(); +  private UriConfigurationProvider provider;
+
+  @Before
+  public void setUp() throws Exception {
+    System.setProperty("env", "DEV");
+    List<ConfigurationSource> sourceList =
+        Lists.newArrayList(new FileConfigurationSource(TESTFILE.toURI()));
+    provider = new UriConfigurationProvider("a1", sourceList, null,
+        null, 0);
+  }
+
+  @Test
+  public void getProperty() throws Exception {
+
+    Assert.assertEquals(BIND, provider.getFlumeConfiguration()
+        .getConfigurationFor("a1")
+        .getSourceContext().get("r1").getParameters().get("bind"));
+  }
+}
diff --git a/flume-ng-node/src/test/resources/flume-conf-override.properties \
b/flume-ng-node/src/test/resources/flume-conf-override.properties new file mode \
100644 index 0000000..1d86d4b
--- /dev/null
+++ b/flume-ng-node/src/test/resources/flume-conf-override.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+PROD_BIND=192.168.12.110
+DEV_BIND=192.168.13.101
+
diff --git a/flume-ng-node/src/test/resources/flume-conf-with-envLookup.properties \
b/flume-ng-node/src/test/resources/flume-conf-with-envLookup.properties new file mode \
100644 index 0000000..20848e9
--- /dev/null
+++ b/flume-ng-node/src/test/resources/flume-conf-with-envLookup.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+a1.sources = r1
+a1.sources.r1.type = netcat
+a1.sources.r1.bind = 0.0.0.0
+a1.sources.r1.port = ${env:NC_PORT}
+a1.sources.r1.channels = c1
+
+a1.channels = c1
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 10000
+a1.channels.c1.transactionCapacity = 10000
+a1.channels.c1.byteCapacityBufferPercentage = 20
+a1.channels.c1.byteCapacity = 800000
+
+a1.channels = c1
+a1.sinks = k1
+a1.sinks.k1.type = logger
+a1.sinks.k1.channel = c1
diff --git a/flume-ng-node/src/test/resources/flume-conf-with-recursiveLookup.properties \
b/flume-ng-node/src/test/resources/flume-conf-with-recursiveLookup.properties new \
file mode 100644 index 0000000..c679384
--- /dev/null
+++ b/flume-ng-node/src/test/resources/flume-conf-with-recursiveLookup.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+PROD_BIND=192.168.10.110
+DEV_BIND=192.168.11.101
+a1.sources = r1
+a1.sources.r1.type = netcat
+a1.sources.r1.bind = ${${sys:env}_BIND}
+a1.sources.r1.port = 6667
+a1.sources.r1.channels = c1
+
+a1.channels = c1
+a1.channels.c1.type = memory
+a1.channels.c1.capacity = 10000
+a1.channels.c1.transactionCapacity = 10000
+a1.channels.c1.byteCapacityBufferPercentage = 20
+a1.channels.c1.byteCapacity = 800000
+
+a1.channels = c1
+a1.sinks = k1
+a1.sinks.k1.type = logger
+a1.sinks.k1.channel = c1
diff --git a/pom.xml b/pom.xml
index 2dc3dba..9a9e492 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,13 +52,14 @@ limitations under the License.
     <bundle-plugin.version>2.3.7</bundle-plugin.version>
     <checkstyle.tool.version>8.12</checkstyle.tool.version>
     <codehaus.jackson.version>1.9.13</codehaus.jackson.version>
-    <commons-cli.version>1.2</commons-cli.version>
+    <commons-cli.version>1.4</commons-cli.version>
     <commons-codec.version>1.8</commons-codec.version>
     <commons-collections.version>3.2.2</commons-collections.version>
     <commons-compress.version>1.4.1</commons-compress.version>
     <commons-dbcp.version>1.4</commons-dbcp.version>
     <commons-io.version>2.1</commons-io.version>
     <commons-lang.version>2.5</commons-lang.version>
+    <commons-text.version>1.9</commons-text.version>
     <curator.version>5.1.0</curator.version>
     <derby.version>10.14.1.0</derby.version>
     <dropwizard-metrics.version>4.1.18</dropwizard-metrics.version>
@@ -767,6 +768,12 @@ limitations under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-text</artifactId>
+        <version>${commons-text.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
         <version>${guava-old.version}</version>
@@ -1765,6 +1772,30 @@ limitations under the License.
         <version>${dropwizard-metrics.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>net.jcip</groupId>
+        <artifactId>jcip-annotations</artifactId>
+        <version>1.0</version>
+        <optional>true</optional>
+      </dependency>
+      <dependency>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-annotations</artifactId>
+        <version>${mvn-spotbugs-plugin.version}</version>
+        <optional>true</optional>
+      </dependency>
+      <dependency>
+        <groupId>org.mock-server</groupId>
+        <artifactId>mockserver-netty</artifactId>
+        <version>3.10.8</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.mock-server</groupId>
+        <artifactId>mockserver-client-java</artifactId>
+        <version>3.10.8</version>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic