[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-commits
Subject:    git commit: FLUME-1800: Docs for spooling source durability changes
From:       brock () apache ! org
Date:       2013-01-25 21:09:52
Message-ID: 20130125210952.AC9FD8255E9 () tyr ! zones ! apache ! org
[Download RAW message or body]

Updated Branches:
  refs/heads/flume-1.4 d7b474c37 -> 05e8ebc35


FLUME-1800: Docs for spooling source durability changes

(Mike Percy via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/05e8ebc3
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/05e8ebc3
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/05e8ebc3

Branch: refs/heads/flume-1.4
Commit: 05e8ebc352b0f8f96747bd1046ac22caf09e1890
Parents: d7b474c
Author: Brock Noland <brock@apache.org>
Authored: Fri Jan 25 15:09:12 2013 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Fri Jan 25 15:09:24 2013 -0600

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java      |   20 +++-
 .../apache/flume/source/SpoolDirectorySource.java  |    5 +-
 ...SpoolDirectorySourceConfigurationConstants.java |   13 ++-
 .../avro/TestReliableSpoolingFileEventReader.java  |    2 +-
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   98 ++++++++++-----
 5 files changed, 98 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java \
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
 index b19d0ea..28df24c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
                
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
 @@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.*;
 import java.util.regex.Pattern;
 
@@ -84,6 +85,7 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  private final boolean annotateFileName;
   private final String fileNameHeader;
   private final String deletePolicy;
+  private final Charset inputCharset;
 
   private Optional<FileInfo> currentFile = Optional.absent();
   /** Always contains the last file from which lines have been read. **/
@@ -97,7 +99,7 @@ public class ReliableSpoolingFileEventReader implements \
                ReliableEventReader {
       String completedSuffix, String ignorePattern, String trackerDirPath,
       boolean annotateFileName, String fileNameHeader,
       String deserializerType, Context deserializerContext,
-      String deletePolicy) throws IOException {
+      String deletePolicy, String inputCharset) throws IOException {
 
     // Sanity checks
     Preconditions.checkNotNull(spoolDirectory);
@@ -107,6 +109,7 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  Preconditions.checkNotNull(deserializerType);
     Preconditions.checkNotNull(deserializerContext);
     Preconditions.checkNotNull(deletePolicy);
+    Preconditions.checkNotNull(inputCharset);
 
     // validate delete policy
     if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name()) &&
@@ -149,6 +152,7 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  this.fileNameHeader = fileNameHeader;
     this.ignorePattern = Pattern.compile(ignorePattern);
     this.deletePolicy = deletePolicy;
+    this.inputCharset = Charset.forName(inputCharset);
 
     File trackerDirectory = new File(trackerDirPath);
 
@@ -422,7 +426,8 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  tracker.getTarget(), nextPath);
 
         ResettableInputStream in =
-            new ResettableFileInputStream(nextFile, tracker);
+            new ResettableFileInputStream(nextFile, tracker,
+                ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset);
         EventDeserializer deserializer = EventDeserializerFactory.getInstance
             (deserializerType, deserializerContext, in);
 
@@ -482,7 +487,7 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  private String ignorePattern =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_IGNORE_PAT;
     private String trackerDirPath =
-        SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR;
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
     private Boolean annotateFileName =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER;
     private String fileNameHeader =
@@ -492,6 +497,8 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  private Context deserializerContext = new Context();
     private String deletePolicy =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY;
+    private String inputCharset =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_INPUT_CHARSET;
 
     public Builder spoolDirectory(File directory) {
       this.spoolDirectory = directory;
@@ -538,10 +545,15 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {  return this;
     }
 
+    public Builder inputCharset(String inputCharset) {
+      this.inputCharset = inputCharset;
+      return this;
+    }
+
     public ReliableSpoolingFileEventReader build() throws IOException {
       return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
           ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
-          deserializerType, deserializerContext, deletePolicy);
+          deserializerType, deserializerContext, deletePolicy, inputCharset);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java \
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index \
                552bd48..698b906 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -56,6 +56,7 @@ Configurable, EventDrivenSource {
   private String deserializerType;
   private Context deserializerContext;
   private String deletePolicy;
+  private String inputCharset;
 
   private CounterGroup counterGroup;
   ReliableSpoolingFileEventReader reader;
@@ -81,6 +82,7 @@ Configurable, EventDrivenSource {
           .deserializerType(deserializerType)
           .deserializerContext(deserializerContext)
           .deletePolicy(deletePolicy)
+          .inputCharset(inputCharset)
           .build();
     } catch (IOException ioe) {
       throw new FlumeException("Error instantiating spooling event parser",
@@ -115,9 +117,10 @@ Configurable, EventDrivenSource {
         DEFAULT_FILENAME_HEADER_KEY);
     batchSize = context.getInteger(BATCH_SIZE,
         DEFAULT_BATCH_SIZE);
+    inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
 
     ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
-    trackerDirPath = context.getString(META_DIR, DEFAULT_META_DIR);
+    trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);
 
     deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER);
     deserializerContext = new Context(context.getSubProperties(DESERIALIZER +

http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java \
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
 index afc7288..f3cc703 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
                
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
 @@ -35,14 +35,18 @@ public class SpoolDirectorySourceConfigurationConstants {
 
   /** What size to batch with before sending to ChannelProcessor. */
   public static final String BATCH_SIZE = "batchSize";
-  public static final int DEFAULT_BATCH_SIZE = 10;
+  public static final int DEFAULT_BATCH_SIZE = 100;
 
   /** Maximum number of lines to buffer between commits. */
+  @Deprecated
   public static final String BUFFER_MAX_LINES = "bufferMaxLines";
+  @Deprecated
   public static final int DEFAULT_BUFFER_MAX_LINES = 100;
 
   /** Maximum length of line (in characters) in buffer between commits. */
+  @Deprecated
   public static final String BUFFER_MAX_LINE_LENGTH = "bufferMaxLineLength";
+  @Deprecated
   public static final int DEFAULT_BUFFER_MAX_LINE_LENGTH = 5000;
 
   /** Pattern of files to ignore */
@@ -50,8 +54,8 @@ public class SpoolDirectorySourceConfigurationConstants {
   public static final String DEFAULT_IGNORE_PAT = "^$"; // no effect
 
   /** Directory to store metadata about files being processed */
-  public static final String META_DIR = "metaDir";
-  public static final String DEFAULT_META_DIR = ".flumespool";
+  public static final String TRACKER_DIR = "trackerDir";
+  public static final String DEFAULT_TRACKER_DIR = ".flumespool";
 
   /** Deserializer to use to parse the file data into Flume Events */
   public static final String DESERIALIZER = "deserializer";
@@ -59,4 +63,7 @@ public class SpoolDirectorySourceConfigurationConstants {
 
   public static final String DELETE_POLICY = "deletePolicy";
   public static final String DEFAULT_DELETE_POLICY = "never";
+
+  public static final String INPUT_CHARSET = "inputCharset";
+  public static final String DEFAULT_INPUT_CHARSET = "UTF-8";
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java \
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
 index a29606e..31ecf8e 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
                
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
 @@ -119,7 +119,7 @@ public class TestReliableSpoolingFileEventReader {
   @Test
   public void testRepeatedCallsWithCommitOnSuccess() throws IOException {
     String trackerDirPath =
-        SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR;
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
     File trackerDir = new File(WORK_DIR, trackerDirPath);
 
     ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()

http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-doc/sphinx/FlumeUserGuide.rst
                
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst \
b/flume-ng-doc/sphinx/FlumeUserGuide.rst index b2c58de..452c634 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -773,54 +773,90 @@ Example for agent named a1:
 
 Spooling Directory Source
 ~~~~~~~~~~~~~~~~~~~~~~~~~
-This source lets you ingest data by dropping files in a spooling directory on
-disk. **Unlike other asynchronous sources, this source
-avoids data loss even if Flume is restarted or fails.**
-Flume will watch the directory for new files and read then ingest them
-as they appear. After a given file has been fully read into the channel,
-it is renamed to indicate completion. This allows a cleaner process to remove
-completed files periodically. Note, however,
-that events may be duplicated if failures occur, consistent with the semantics
-offered by other Flume components. The channel optionally inserts the full path of
-the origin file into a header field of each event. This source buffers file data
-in memory during reads; be sure to set the `bufferMaxLineLength` option to a number
-greater than the longest line you expect to see in your input data.
-
-.. warning:: This channel expects that only immutable, uniquely named files
-             are dropped in the spooling directory. If duplicate names are
-             used, or files are modified while being read, the source will
-             fail with an error message. For some use cases this may require
-             adding unique identifiers (such as a timestamp) to log file names
-             when they are copied into the spooling directory.
+This source lets you ingest data by placing files to be ingested into a
+"spooling" directory on disk.
+This source will watch the specified directory for new files, and will parse
+events out of new files as they appear.
+The event parsing logic is pluggable.
+After a given file has been fully read
+into the channel, it is renamed to indicate completion (or optionally deleted).
+
+Unlike the Exec source, this source is reliable and will not miss data, even if
+Flume is restarted or killed. In exchange for this reliability, only immutable,
+uniquely-named files must be dropped into the spooling directory. Flume tries
+to detect these problem conditions and will fail loudly if they are violated:
+
+#. If a file is written to after being placed into the spooling directory,
+   Flume will print an error to its log file and stop processing.
+#. If a file name is reused at a later time, Flume will print an error to its
+   log file and stop processing.
+
+To avoid the above issues, it may be useful to add a unique identifier
+(such as a timestamp) to log file names when they are moved into the spooling
+directory.
+
+Despite the reliability guarantees of this source, there are still
+cases in which events may be duplicated if certain downstream failures occur.
+This is consistent with the guarantees offered by other Flume components.
 
 ====================  ==============  \
==========================================================  Property Name         \
Default         Description  ====================  ==============  \
                ==========================================================
 **channels**          --
-**type**              --              The component type name, needs to be \
                ``spooldir``
-**spoolDir**          --              The directory where log files will be spooled
+**type**              --              The component type name, needs to be \
``spooldir``. +**spoolDir**          --              The directory from which to read \
files from.  fileSuffix            .COMPLETED      Suffix to append to completely \
ingested files +deletePolicy          never           When to delete completed files: \
``never`` or ``immediate``  fileHeader            false           Whether to add a \
header storing the filename  fileHeaderKey         file            Header key to use \
                when appending filename to header
-batchSize             10              Granularity at which to batch transfer to the \
                channel
-bufferMaxLines        100             Maximum number of lines the commit buffer can \
                hold
-bufferMaxLineLength   5000            Maximum length of a line in the commit buffer
+ignorePattern         ^$              Regular expression specifying which files to \
ignore (skip) +trackerDir            .flumespool     Directory to store metadata \
related to processing of files. +                                      If this path \
is not an absolute path, then it is interpreted as relative to the spoolDir. \
+batchSize             100             Granularity at which to batch transfer to the \
channel +inputCharset          UTF-8           Character set used by deserializers \
that treat the input file as text. +deserializer          ``LINE``        Specify the \
deserializer used to parse the file into events. +                                    \
Defaults to parsing each line as an event. The class specified must implement +       \
``EventDeserializer.Builder``. +deserializer.*                        Varies per \
event deserializer. +bufferMaxLines        --              (Obselete) This option is \
now ignored. +bufferMaxLineLength   5000            (Deprecated) Maximum length of a \
line in the commit buffer. Use deserializer.maxLineLength instead.  selector.type     \
replicating     replicating or multiplexing  selector.*                            \
Depends on the selector.type value  interceptors          --              Space \
separated list of interceptors  interceptors.*
 ====================  ==============  \
==========================================================  
-Example for agent named a1:
+Example for an agent named agent-1:
 
 .. code-block:: properties
 
-  a1.sources = r1
-  a1.channels = c1
-  a1.sources.r1.type = spooldir
-  a1.sources.r1.spoolDir = /var/log/apache/flumeSpool
-  a1.sources.r1.fileHeader = true
-  a1.sources.r1.channels = c1
+  agent-1.channels = ch-1
+  agent-1.sources = src-1
+
+  agent-1.sources.src-1.type = spooldir
+  agent-1.sources.src-1.channels = ch-1
+  agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
+  agent-1.sources.src-1.fileHeader = true
+
+Event Deserializers
+'''''''''''''''''''
+
+The following event deserializers ship with Flume.
+
+LINE
+^^^^
+
+This deserializer generates one event per line of text input.
+
+==============================  ==============  \
========================================================== +Property Name             \
Default         Description +==============================  ==============  \
========================================================== \
+deserializer.maxLineLength      2048            Maximum number of characters to \
include in a single event. +                                                If a line \
exceeds this length, it is truncated, and the +                                       \
remaining characters on the line will appear in a +                                   \
subsequent event. +deserializer.outputCharset      UTF-8           Charset to use for \
encoding events put into the channel. +==============================  ============== \
==========================================================  
 NetCat Source
 ~~~~~~~~~~~~~


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic