[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: git commit: FLUME-2056. Allow SpoolDir to pass just the filename that is the source of an event
From: mpercy () apache ! org
Date: 2013-12-17 0:48:20
Message-ID: 8cc0d7713c79464295cb32b0616ca279 () git ! apache ! org
[Download RAW message or body]
Updated Branches:
refs/heads/flume-1.5 bf917dd96 -> 77fd194bf
FLUME-2056. Allow SpoolDir to pass just the filename that is the source of an event
(Jeff Lord via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/77fd194b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/77fd194b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/77fd194b
Branch: refs/heads/flume-1.5
Commit: 77fd194bf403aeb187a973b8854d09a5beac780d
Parents: bf917dd
Author: Mike Percy <mpercy@cloudera.com>
Authored: Mon Dec 16 14:59:14 2013 -0800
Committer: Mike Percy <mpercy@cloudera.com>
Committed: Mon Dec 16 14:59:51 2013 -0800
----------------------------------------------------------------------
.../avro/ReliableSpoolingFileEventReader.java | 30 ++++++++++++++--
.../flume/source/SpoolDirectorySource.java | 8 +++++
...olDirectorySourceConfigurationConstants.java | 12 +++++--
.../flume/source/TestSpoolDirectorySource.java | 38 +++++++++++++++++++-
flume-ng-doc/sphinx/FlumeUserGuide.rst | 6 ++--
5 files changed, 87 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java \
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index bd684ed..a88ed6e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -92,7 +92,9 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { private final Pattern ignorePattern;
private final File metaFile;
private final boolean annotateFileName;
+ private final boolean annotateBaseName;
private final String fileNameHeader;
+ private final String baseNameHeader;
private final String deletePolicy;
private final Charset inputCharset;
private final DecodeErrorPolicy decodeErrorPolicy;
@@ -108,6 +110,7 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { private ReliableSpoolingFileEventReader(File spoolDirectory,
String completedSuffix, String ignorePattern, String trackerDirPath,
boolean annotateFileName, String fileNameHeader,
+ boolean annotateBaseName, String baseNameHeader,
String deserializerType, Context deserializerContext,
String deletePolicy, String inputCharset,
DecodeErrorPolicy decodeErrorPolicy) throws IOException {
@@ -164,6 +167,8 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { this.deserializerContext = deserializerContext;
this.annotateFileName = annotateFileName;
this.fileNameHeader = fileNameHeader;
+ this.annotateBaseName = annotateBaseName;
+ this.baseNameHeader = baseNameHeader;
this.ignorePattern = Pattern.compile(ignorePattern);
this.deletePolicy = deletePolicy;
this.inputCharset = Charset.forName(inputCharset);
@@ -253,6 +258,13 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { }
}
+ if (annotateBaseName) {
+ String basename = currentFile.get().getFile().getName();
+ for (Event event : events) {
+ event.getHeaders().put(baseNameHeader, basename);
+ }
+ }
+
committed = false;
lastFileRead = currentFile;
return events;
@@ -510,6 +522,10 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { \
SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER; private String \
fileNameHeader =
SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
+ private Boolean annotateBaseName =
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER;
+ private String baseNameHeader =
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER_KEY;
private String deserializerType =
SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER;
private Context deserializerContext = new Context();
@@ -551,6 +567,16 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { return this;
}
+ public Builder annotateBaseName(Boolean annotateBaseName) {
+ this.annotateBaseName = annotateBaseName;
+ return this;
+ }
+
+ public Builder baseNameHeader(String baseNameHeader) {
+ this.baseNameHeader = baseNameHeader;
+ return this;
+ }
+
public Builder deserializerType(String deserializerType) {
this.deserializerType = deserializerType;
return this;
@@ -579,8 +605,8 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { public ReliableSpoolingFileEventReader build() throws \
IOException {
return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
- deserializerType, deserializerContext, deletePolicy, inputCharset,
- decodeErrorPolicy);
+ annotateBaseName, baseNameHeader, deserializerType,
+ deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java \
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index \
0160215..f42ed2d 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -54,6 +54,8 @@ Configurable, EventDrivenSource {
private String spoolDirectory;
private boolean fileHeader;
private String fileHeaderKey;
+ private boolean basenameHeader;
+ private String basenameHeaderKey;
private int batchSize;
private String ignorePattern;
private String trackerDirPath;
@@ -87,6 +89,8 @@ Configurable, EventDrivenSource {
.trackerDirPath(trackerDirPath)
.annotateFileName(fileHeader)
.fileNameHeader(fileHeaderKey)
+ .annotateBaseName(basenameHeader)
+ .baseNameHeader(basenameHeaderKey)
.deserializerType(deserializerType)
.deserializerContext(deserializerContext)
.deletePolicy(deletePolicy)
@@ -142,6 +146,10 @@ Configurable, EventDrivenSource {
DEFAULT_FILE_HEADER);
fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
DEFAULT_FILENAME_HEADER_KEY);
+ basenameHeader = context.getBoolean(BASENAME_HEADER,
+ DEFAULT_BASENAME_HEADER);
+ basenameHeaderKey = context.getString(BASENAME_HEADER_KEY,
+ DEFAULT_BASENAME_HEADER_KEY);
batchSize = context.getInteger(BATCH_SIZE,
DEFAULT_BATCH_SIZE);
inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java \
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index a2befe8..83522c0 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -27,14 +27,22 @@ public class SpoolDirectorySourceConfigurationConstants {
public static final String SPOOLED_FILE_SUFFIX = "fileSuffix";
public static final String DEFAULT_SPOOLED_FILE_SUFFIX = ".COMPLETED";
- /** Header in which to put filename. */
+ /** Header in which to put absolute path filename. */
public static final String FILENAME_HEADER_KEY = "fileHeaderKey";
public static final String DEFAULT_FILENAME_HEADER_KEY = "file";
- /** Whether to include filename in a header. */
+ /** Whether to include absolute path filename in a header. */
public static final String FILENAME_HEADER = "fileHeader";
public static final boolean DEFAULT_FILE_HEADER = false;
+ /** Header in which to put the basename of file. */
+ public static final String BASENAME_HEADER_KEY = "basenameHeaderKey";
+ public static final String DEFAULT_BASENAME_HEADER_KEY = "basename";
+
+ /** Whether to include the basename of a file in a header. */
+ public static final String BASENAME_HEADER = "basenameHeader";
+ public static final boolean DEFAULT_BASENAME_HEADER = false;
+
/** What size to batch with before sending to ChannelProcessor. */
public static final String BATCH_SIZE = "batchSize";
public static final int DEFAULT_BATCH_SIZE = 100;
http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java \
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java \
index 9a546a5..503ab4d 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -93,7 +93,9 @@ public class TestSpoolDirectorySource {
Configurables.configure(source, context);
source.start();
- Thread.sleep(500);
+ while (source.getSourceCounter().getEventAcceptedCount() < 8) {
+ Thread.sleep(10);
+ }
Transaction txn = channel.getTransaction();
txn.begin();
Event e = channel.take();
@@ -107,6 +109,40 @@ public class TestSpoolDirectorySource {
}
@Test
+ public void testPutBasenameHeader() throws IOException,
+ InterruptedException {
+ Context context = new Context();
+ File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+ Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+ "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+ f1, Charsets.UTF_8);
+
+ context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+ tmpDir.getAbsolutePath());
+ context.put(SpoolDirectorySourceConfigurationConstants.BASENAME_HEADER,
+ "true");
+ context.put(SpoolDirectorySourceConfigurationConstants.BASENAME_HEADER_KEY,
+ "basenameHeaderKeyTest");
+
+ Configurables.configure(source, context);
+ source.start();
+ while (source.getSourceCounter().getEventAcceptedCount() < 8) {
+ Thread.sleep(10);
+ }
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ Event e = channel.take();
+ Assert.assertNotNull("Event must not be null", e);
+ Assert.assertNotNull("Event headers must not be null", e.getHeaders());
+ Assert.assertNotNull(e.getHeaders().get("basenameHeaderKeyTest"));
+ Assert.assertEquals(f1.getName(),
+ e.getHeaders().get("basenameHeaderKeyTest"));
+ txn.commit();
+ txn.close();
+ }
+
+ @Test
public void testLifecycle() throws IOException, InterruptedException {
Context context = new Context();
File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst \
b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 7a41efb..08c7740 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -946,8 +946,10 @@ Property Name Default Description
**spoolDir** -- The directory from which to read files from.
fileSuffix .COMPLETED Suffix to append to completely ingested files
deletePolicy never When to delete completed files: ``never`` or \
``immediate``
-fileHeader false Whether to add a header storing the filename
-fileHeaderKey file Header key to use when appending filename to \
header +fileHeader false Whether to add a header storing the \
absolute path filename. +fileHeaderKey file Header key to use when \
appending absolute path filename to event header. +basenameHeader false \
Whether to add a header storing the basename of the file. +basenameHeaderKey \
basename Header Key to use when appending basename of file to event header. \
ignorePattern ^$ Regular expression specifying which files to \
ignore (skip) trackerDir .flumespool Directory to store metadata \
related to processing of files.
If this path is not an absolute path, then it \
is interpreted as relative to the spoolDir.
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic