[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-commits
Subject:    git commit: FLUME-2056. Allow SpoolDir to pass just the filename that is the source of an event
From:       mpercy () apache ! org
Date:       2013-12-17 0:48:20
Message-ID: 8cc0d7713c79464295cb32b0616ca279 () git ! apache ! org
[Download RAW message or body]

Updated Branches:
  refs/heads/flume-1.5 bf917dd96 -> 77fd194bf


FLUME-2056. Allow SpoolDir to pass just the filename that is the source of an event

(Jeff Lord via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/77fd194b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/77fd194b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/77fd194b

Branch: refs/heads/flume-1.5
Commit: 77fd194bf403aeb187a973b8854d09a5beac780d
Parents: bf917dd
Author: Mike Percy <mpercy@cloudera.com>
Authored: Mon Dec 16 14:59:14 2013 -0800
Committer: Mike Percy <mpercy@cloudera.com>
Committed: Mon Dec 16 14:59:51 2013 -0800

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java   | 30 ++++++++++++++--
 .../flume/source/SpoolDirectorySource.java      |  8 +++++
 ...olDirectorySourceConfigurationConstants.java | 12 +++++--
 .../flume/source/TestSpoolDirectorySource.java  | 38 +++++++++++++++++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  6 ++--
 5 files changed, 87 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java \
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
 index bd684ed..a88ed6e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
                
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
 @@ -92,7 +92,9 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  private final Pattern ignorePattern;
   private final File metaFile;
   private final boolean annotateFileName;
+  private final boolean annotateBaseName;
   private final String fileNameHeader;
+  private final String baseNameHeader;
   private final String deletePolicy;
   private final Charset inputCharset;
   private final DecodeErrorPolicy decodeErrorPolicy;
@@ -108,6 +110,7 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  private ReliableSpoolingFileEventReader(File spoolDirectory,
       String completedSuffix, String ignorePattern, String trackerDirPath,
       boolean annotateFileName, String fileNameHeader,
+      boolean annotateBaseName, String baseNameHeader,
       String deserializerType, Context deserializerContext,
       String deletePolicy, String inputCharset,
       DecodeErrorPolicy decodeErrorPolicy) throws IOException {
@@ -164,6 +167,8 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  this.deserializerContext = deserializerContext;
     this.annotateFileName = annotateFileName;
     this.fileNameHeader = fileNameHeader;
+    this.annotateBaseName = annotateBaseName;
+    this.baseNameHeader = baseNameHeader;
     this.ignorePattern = Pattern.compile(ignorePattern);
     this.deletePolicy = deletePolicy;
     this.inputCharset = Charset.forName(inputCharset);
@@ -253,6 +258,13 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  }
     }
 
+    if (annotateBaseName) {
+      String basename = currentFile.get().getFile().getName();
+      for (Event event : events) {
+        event.getHeaders().put(baseNameHeader, basename);
+      }
+    }
+
     committed = false;
     lastFileRead = currentFile;
     return events;
@@ -510,6 +522,10 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  \
SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER;  private String \
                fileNameHeader =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
+    private Boolean annotateBaseName =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER;
+    private String baseNameHeader =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER_KEY;
     private String deserializerType =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER;
     private Context deserializerContext = new Context();
@@ -551,6 +567,16 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  return this;
     }
 
+    public Builder annotateBaseName(Boolean annotateBaseName) {
+      this.annotateBaseName = annotateBaseName;
+      return this;
+    }
+
+    public Builder baseNameHeader(String baseNameHeader) {
+      this.baseNameHeader = baseNameHeader;
+      return this;
+    }
+
     public Builder deserializerType(String deserializerType) {
       this.deserializerType = deserializerType;
       return this;
@@ -579,8 +605,8 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  public ReliableSpoolingFileEventReader build() throws \
                IOException {
       return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
           ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
-          deserializerType, deserializerContext, deletePolicy, inputCharset,
-          decodeErrorPolicy);
+          annotateBaseName, baseNameHeader, deserializerType,
+          deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java \
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index \
                0160215..f42ed2d 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -54,6 +54,8 @@ Configurable, EventDrivenSource {
   private String spoolDirectory;
   private boolean fileHeader;
   private String fileHeaderKey;
+  private boolean basenameHeader;
+  private String basenameHeaderKey;
   private int batchSize;
   private String ignorePattern;
   private String trackerDirPath;
@@ -87,6 +89,8 @@ Configurable, EventDrivenSource {
           .trackerDirPath(trackerDirPath)
           .annotateFileName(fileHeader)
           .fileNameHeader(fileHeaderKey)
+          .annotateBaseName(basenameHeader)
+          .baseNameHeader(basenameHeaderKey)
           .deserializerType(deserializerType)
           .deserializerContext(deserializerContext)
           .deletePolicy(deletePolicy)
@@ -142,6 +146,10 @@ Configurable, EventDrivenSource {
         DEFAULT_FILE_HEADER);
     fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
         DEFAULT_FILENAME_HEADER_KEY);
+    basenameHeader = context.getBoolean(BASENAME_HEADER,
+        DEFAULT_BASENAME_HEADER);
+    basenameHeaderKey = context.getString(BASENAME_HEADER_KEY,
+        DEFAULT_BASENAME_HEADER_KEY);
     batchSize = context.getInteger(BATCH_SIZE,
         DEFAULT_BATCH_SIZE);
     inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);

http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java \
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
 index a2befe8..83522c0 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
                
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
 @@ -27,14 +27,22 @@ public class SpoolDirectorySourceConfigurationConstants {
   public static final String SPOOLED_FILE_SUFFIX = "fileSuffix";
   public static final String DEFAULT_SPOOLED_FILE_SUFFIX = ".COMPLETED";
 
-  /** Header in which to put filename. */
+  /** Header in which to put absolute path filename. */
   public static final String FILENAME_HEADER_KEY = "fileHeaderKey";
   public static final String DEFAULT_FILENAME_HEADER_KEY = "file";
 
-  /** Whether to include filename in a header. */
+  /** Whether to include absolute path filename in a header. */
   public static final String FILENAME_HEADER = "fileHeader";
   public static final boolean DEFAULT_FILE_HEADER = false;
 
+  /** Header in which to put the basename of file. */
+  public static final String BASENAME_HEADER_KEY = "basenameHeaderKey";
+  public static final String DEFAULT_BASENAME_HEADER_KEY = "basename";
+
+  /** Whether to include the basename of a file in a header. */
+  public static final String BASENAME_HEADER = "basenameHeader";
+  public static final boolean DEFAULT_BASENAME_HEADER = false;
+
   /** What size to batch with before sending to ChannelProcessor. */
   public static final String BATCH_SIZE = "batchSize";
   public static final int DEFAULT_BATCH_SIZE = 100;

http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java \
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java \
                index 9a546a5..503ab4d 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
                
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
 @@ -93,7 +93,9 @@ public class TestSpoolDirectorySource {
 
     Configurables.configure(source, context);
     source.start();
-    Thread.sleep(500);
+    while (source.getSourceCounter().getEventAcceptedCount() < 8) {
+      Thread.sleep(10);
+    }
     Transaction txn = channel.getTransaction();
     txn.begin();
     Event e = channel.take();
@@ -107,6 +109,40 @@ public class TestSpoolDirectorySource {
   }
 
   @Test
+  public void testPutBasenameHeader() throws IOException,
+    InterruptedException {
+    Context context = new Context();
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+      "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+      f1, Charsets.UTF_8);
+
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+        tmpDir.getAbsolutePath());
+    context.put(SpoolDirectorySourceConfigurationConstants.BASENAME_HEADER,
+        "true");
+    context.put(SpoolDirectorySourceConfigurationConstants.BASENAME_HEADER_KEY,
+        "basenameHeaderKeyTest");
+
+    Configurables.configure(source, context);
+    source.start();
+    while (source.getSourceCounter().getEventAcceptedCount() < 8) {
+      Thread.sleep(10);
+    }
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    Event e = channel.take();
+    Assert.assertNotNull("Event must not be null", e);
+    Assert.assertNotNull("Event headers must not be null", e.getHeaders());
+    Assert.assertNotNull(e.getHeaders().get("basenameHeaderKeyTest"));
+    Assert.assertEquals(f1.getName(),
+      e.getHeaders().get("basenameHeaderKeyTest"));
+    txn.commit();
+    txn.close();
+  }
+
+  @Test
   public void testLifecycle() throws IOException, InterruptedException {
     Context context = new Context();
     File f1 = new File(tmpDir.getAbsolutePath() + "/file1");

http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-doc/sphinx/FlumeUserGuide.rst
                
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst \
b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 7a41efb..08c7740 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -946,8 +946,10 @@ Property Name         Default         Description
 **spoolDir**          --              The directory from which to read files from.
 fileSuffix            .COMPLETED      Suffix to append to completely ingested files
 deletePolicy          never           When to delete completed files: ``never`` or \
                ``immediate``
-fileHeader            false           Whether to add a header storing the filename
-fileHeaderKey         file            Header key to use when appending filename to \
header +fileHeader            false           Whether to add a header storing the \
absolute path filename. +fileHeaderKey         file            Header key to use when \
appending absolute path filename to event header. +basenameHeader        false        \
Whether to add a header storing the basename of the file. +basenameHeaderKey     \
basename        Header Key to use when appending  basename of file to event header.  \
ignorePattern         ^$              Regular expression specifying which files to \
ignore (skip)  trackerDir            .flumespool     Directory to store metadata \
                related to processing of files.
                                       If this path is not an absolute path, then it \
is interpreted as relative to the spoolDir.


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic