[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: git commit: FLUME-1800: Docs for spooling source durability changes
From: brock () apache ! org
Date: 2013-01-25 21:09:52
Message-ID: 20130125210952.AC9FD8255E9 () tyr ! zones ! apache ! org
[Download RAW message or body]
Updated Branches:
refs/heads/flume-1.4 d7b474c37 -> 05e8ebc35
FLUME-1800: Docs for spooling source durability changes
(Mike Percy via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/05e8ebc3
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/05e8ebc3
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/05e8ebc3
Branch: refs/heads/flume-1.4
Commit: 05e8ebc352b0f8f96747bd1046ac22caf09e1890
Parents: d7b474c
Author: Brock Noland <brock@apache.org>
Authored: Fri Jan 25 15:09:12 2013 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Fri Jan 25 15:09:24 2013 -0600
----------------------------------------------------------------------
.../avro/ReliableSpoolingFileEventReader.java | 20 +++-
.../apache/flume/source/SpoolDirectorySource.java | 5 +-
...SpoolDirectorySourceConfigurationConstants.java | 13 ++-
.../avro/TestReliableSpoolingFileEventReader.java | 2 +-
flume-ng-doc/sphinx/FlumeUserGuide.rst | 98 ++++++++++-----
5 files changed, 98 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java \
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index b19d0ea..28df24c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.*;
import java.util.regex.Pattern;
@@ -84,6 +85,7 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { private final boolean annotateFileName;
private final String fileNameHeader;
private final String deletePolicy;
+ private final Charset inputCharset;
private Optional<FileInfo> currentFile = Optional.absent();
/** Always contains the last file from which lines have been read. **/
@@ -97,7 +99,7 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader {
String completedSuffix, String ignorePattern, String trackerDirPath,
boolean annotateFileName, String fileNameHeader,
String deserializerType, Context deserializerContext,
- String deletePolicy) throws IOException {
+ String deletePolicy, String inputCharset) throws IOException {
// Sanity checks
Preconditions.checkNotNull(spoolDirectory);
@@ -107,6 +109,7 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { Preconditions.checkNotNull(deserializerType);
Preconditions.checkNotNull(deserializerContext);
Preconditions.checkNotNull(deletePolicy);
+ Preconditions.checkNotNull(inputCharset);
// validate delete policy
if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name()) &&
@@ -149,6 +152,7 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { this.fileNameHeader = fileNameHeader;
this.ignorePattern = Pattern.compile(ignorePattern);
this.deletePolicy = deletePolicy;
+ this.inputCharset = Charset.forName(inputCharset);
File trackerDirectory = new File(trackerDirPath);
@@ -422,7 +426,8 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { tracker.getTarget(), nextPath);
ResettableInputStream in =
- new ResettableFileInputStream(nextFile, tracker);
+ new ResettableFileInputStream(nextFile, tracker,
+ ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset);
EventDeserializer deserializer = EventDeserializerFactory.getInstance
(deserializerType, deserializerContext, in);
@@ -482,7 +487,7 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { private String ignorePattern =
SpoolDirectorySourceConfigurationConstants.DEFAULT_IGNORE_PAT;
private String trackerDirPath =
- SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR;
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
private Boolean annotateFileName =
SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER;
private String fileNameHeader =
@@ -492,6 +497,8 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { private Context deserializerContext = new Context();
private String deletePolicy =
SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY;
+ private String inputCharset =
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_INPUT_CHARSET;
public Builder spoolDirectory(File directory) {
this.spoolDirectory = directory;
@@ -538,10 +545,15 @@ public class ReliableSpoolingFileEventReader implements \
ReliableEventReader { return this;
}
+ public Builder inputCharset(String inputCharset) {
+ this.inputCharset = inputCharset;
+ return this;
+ }
+
public ReliableSpoolingFileEventReader build() throws IOException {
return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
- deserializerType, deserializerContext, deletePolicy);
+ deserializerType, deserializerContext, deletePolicy, inputCharset);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java \
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index \
552bd48..698b906 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -56,6 +56,7 @@ Configurable, EventDrivenSource {
private String deserializerType;
private Context deserializerContext;
private String deletePolicy;
+ private String inputCharset;
private CounterGroup counterGroup;
ReliableSpoolingFileEventReader reader;
@@ -81,6 +82,7 @@ Configurable, EventDrivenSource {
.deserializerType(deserializerType)
.deserializerContext(deserializerContext)
.deletePolicy(deletePolicy)
+ .inputCharset(inputCharset)
.build();
} catch (IOException ioe) {
throw new FlumeException("Error instantiating spooling event parser",
@@ -115,9 +117,10 @@ Configurable, EventDrivenSource {
DEFAULT_FILENAME_HEADER_KEY);
batchSize = context.getInteger(BATCH_SIZE,
DEFAULT_BATCH_SIZE);
+ inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
- trackerDirPath = context.getString(META_DIR, DEFAULT_META_DIR);
+ trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);
deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER);
deserializerContext = new Context(context.getSubProperties(DESERIALIZER +
http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java \
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index afc7288..f3cc703 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -35,14 +35,18 @@ public class SpoolDirectorySourceConfigurationConstants {
/** What size to batch with before sending to ChannelProcessor. */
public static final String BATCH_SIZE = "batchSize";
- public static final int DEFAULT_BATCH_SIZE = 10;
+ public static final int DEFAULT_BATCH_SIZE = 100;
/** Maximum number of lines to buffer between commits. */
+ @Deprecated
public static final String BUFFER_MAX_LINES = "bufferMaxLines";
+ @Deprecated
public static final int DEFAULT_BUFFER_MAX_LINES = 100;
/** Maximum length of line (in characters) in buffer between commits. */
+ @Deprecated
public static final String BUFFER_MAX_LINE_LENGTH = "bufferMaxLineLength";
+ @Deprecated
public static final int DEFAULT_BUFFER_MAX_LINE_LENGTH = 5000;
/** Pattern of files to ignore */
@@ -50,8 +54,8 @@ public class SpoolDirectorySourceConfigurationConstants {
public static final String DEFAULT_IGNORE_PAT = "^$"; // no effect
/** Directory to store metadata about files being processed */
- public static final String META_DIR = "metaDir";
- public static final String DEFAULT_META_DIR = ".flumespool";
+ public static final String TRACKER_DIR = "trackerDir";
+ public static final String DEFAULT_TRACKER_DIR = ".flumespool";
/** Deserializer to use to parse the file data into Flume Events */
public static final String DESERIALIZER = "deserializer";
@@ -59,4 +63,7 @@ public class SpoolDirectorySourceConfigurationConstants {
public static final String DELETE_POLICY = "deletePolicy";
public static final String DEFAULT_DELETE_POLICY = "never";
+
+ public static final String INPUT_CHARSET = "inputCharset";
+ public static final String DEFAULT_INPUT_CHARSET = "UTF-8";
}
http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java \
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index a29606e..31ecf8e 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -119,7 +119,7 @@ public class TestReliableSpoolingFileEventReader {
@Test
public void testRepeatedCallsWithCommitOnSuccess() throws IOException {
String trackerDirPath =
- SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR;
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
File trackerDir = new File(WORK_DIR, trackerDirPath);
ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
http://git-wip-us.apache.org/repos/asf/flume/blob/05e8ebc3/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst \
b/flume-ng-doc/sphinx/FlumeUserGuide.rst index b2c58de..452c634 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -773,54 +773,90 @@ Example for agent named a1:
Spooling Directory Source
~~~~~~~~~~~~~~~~~~~~~~~~~
-This source lets you ingest data by dropping files in a spooling directory on
-disk. **Unlike other asynchronous sources, this source
-avoids data loss even if Flume is restarted or fails.**
-Flume will watch the directory for new files and read then ingest them
-as they appear. After a given file has been fully read into the channel,
-it is renamed to indicate completion. This allows a cleaner process to remove
-completed files periodically. Note, however,
-that events may be duplicated if failures occur, consistent with the semantics
-offered by other Flume components. The channel optionally inserts the full path of
-the origin file into a header field of each event. This source buffers file data
-in memory during reads; be sure to set the `bufferMaxLineLength` option to a number
-greater than the longest line you expect to see in your input data.
-
-.. warning:: This channel expects that only immutable, uniquely named files
- are dropped in the spooling directory. If duplicate names are
- used, or files are modified while being read, the source will
- fail with an error message. For some use cases this may require
- adding unique identifiers (such as a timestamp) to log file names
- when they are copied into the spooling directory.
+This source lets you ingest data by placing files to be ingested into a
+"spooling" directory on disk.
+This source will watch the specified directory for new files, and will parse
+events out of new files as they appear.
+The event parsing logic is pluggable.
+After a given file has been fully read
+into the channel, it is renamed to indicate completion (or optionally deleted).
+
+Unlike the Exec source, this source is reliable and will not miss data, even if
+Flume is restarted or killed. In exchange for this reliability, only immutable,
+uniquely-named files must be dropped into the spooling directory. Flume tries
+to detect these problem conditions and will fail loudly if they are violated:
+
+#. If a file is written to after being placed into the spooling directory,
+ Flume will print an error to its log file and stop processing.
+#. If a file name is reused at a later time, Flume will print an error to its
+ log file and stop processing.
+
+To avoid the above issues, it may be useful to add a unique identifier
+(such as a timestamp) to log file names when they are moved into the spooling
+directory.
+
+Despite the reliability guarantees of this source, there are still
+cases in which events may be duplicated if certain downstream failures occur.
+This is consistent with the guarantees offered by other Flume components.
==================== ============== \
========================================================== Property Name \
Default Description ==================== ============== \
==========================================================
**channels** --
-**type** -- The component type name, needs to be \
``spooldir``
-**spoolDir** -- The directory where log files will be spooled
+**type** -- The component type name, needs to be \
``spooldir``. +**spoolDir** -- The directory from which to read \
files from. fileSuffix .COMPLETED Suffix to append to completely \
ingested files +deletePolicy never When to delete completed files: \
``never`` or ``immediate`` fileHeader false Whether to add a \
header storing the filename fileHeaderKey file Header key to use \
when appending filename to header
-batchSize 10 Granularity at which to batch transfer to the \
channel
-bufferMaxLines 100 Maximum number of lines the commit buffer can \
hold
-bufferMaxLineLength 5000 Maximum length of a line in the commit buffer
+ignorePattern ^$ Regular expression specifying which files to \
ignore (skip) +trackerDir .flumespool Directory to store metadata \
related to processing of files. + If this path \
is not an absolute path, then it is interpreted as relative to the spoolDir. \
+batchSize 100 Granularity at which to batch transfer to the \
channel +inputCharset UTF-8 Character set used by deserializers \
that treat the input file as text. +deserializer ``LINE`` Specify the \
deserializer used to parse the file into events. + \
Defaults to parsing each line as an event. The class specified must implement + \
``EventDeserializer.Builder``. +deserializer.* Varies per \
event deserializer. +bufferMaxLines -- (Obselete) This option is \
now ignored. +bufferMaxLineLength 5000 (Deprecated) Maximum length of a \
line in the commit buffer. Use deserializer.maxLineLength instead. selector.type \
replicating replicating or multiplexing selector.* \
Depends on the selector.type value interceptors -- Space \
separated list of interceptors interceptors.*
==================== ============== \
==========================================================
-Example for agent named a1:
+Example for an agent named agent-1:
.. code-block:: properties
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = spooldir
- a1.sources.r1.spoolDir = /var/log/apache/flumeSpool
- a1.sources.r1.fileHeader = true
- a1.sources.r1.channels = c1
+ agent-1.channels = ch-1
+ agent-1.sources = src-1
+
+ agent-1.sources.src-1.type = spooldir
+ agent-1.sources.src-1.channels = ch-1
+ agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
+ agent-1.sources.src-1.fileHeader = true
+
+Event Deserializers
+'''''''''''''''''''
+
+The following event deserializers ship with Flume.
+
+LINE
+^^^^
+
+This deserializer generates one event per line of text input.
+
+============================== ============== \
========================================================== +Property Name \
Default Description +============================== ============== \
========================================================== \
+deserializer.maxLineLength 2048 Maximum number of characters to \
include in a single event. + If a line \
exceeds this length, it is truncated, and the + \
remaining characters on the line will appear in a + \
subsequent event. +deserializer.outputCharset UTF-8 Charset to use for \
encoding events put into the channel. +============================== ============== \
==========================================================
NetCat Source
~~~~~~~~~~~~~
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic