[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-dev
Subject: Re: Review Request 50438: FLUME-2961:Make TaildirSource work with multiline
From: qiao wen <315524513 () qq ! com>
Date: 2017-02-28 15:13:17
Message-ID: 20170228151317.32557.29986 () reviews ! apache ! org
[Download RAW message or body]
--===============5921498555617922092==
MIME-Version: 1.0
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 8bit
> On 二月 28, 2017, 2:24 p.m., Balázs Donát Bessenyei wrote:
> > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java, \
> > line 196 <https://reviews.apache.org/r/50438/diff/2/?file=1638339#file1638339line196>
> >
> > Can the pattern be cached? (Eg. in configure)
OK, I will fix it.
> On 二月 28, 2017, 2:24 p.m., Balázs Donát Bessenyei wrote:
> > flume-ng-doc/sphinx/FlumeUserGuide.rst, line 1191
> > <https://reviews.apache.org/r/50438/diff/2/?file=1638337#file1638337line1191>
> >
> > Can you please clarify this setting?
It's explained in the function readMultilineEventPre() and readMultilineEventNext(). \
And you can see the effect in TestTaildirSource.
Previous: If not matched, this line is not part of previous event when the buffer \
event is not null.
Then create a new event with buffer event's message and put the \
current line into the cleared buffer event.
Next: If not matched, this line is not part of next event. Then merge the \
current line into the
buffer event and create a new event with the merged message.
private Event readMultilineEventPre(LineResult line, boolean match, Pattern pattern)
throws IOException {
Event event = null;
Matcher m = pattern.matcher(new String(line.line));
boolean find = m.find();
match = (find && match) || (!find && !match);
byte[] lineBytes = toOriginBytes(line);
if (match) {
/** If matched, merge it to the buffer event. */
mergeEvent(line);
} else {
/**
* If not matched, this line is not part of previous event when the buffer \
event is not null.
* Then create a new event with buffer event's message and put the current line \
into the
* cleared buffer event.
*/
if (bufferEvent != null) {
event = EventBuilder.withBody(bufferEvent.getBody());
}
bufferEvent = null;
bufferEvent = EventBuilder.withBody(lineBytes);
if (line.lineSepInclude) {
bufferEvent.getHeaders().put("lineCount", "1");
}
long now = System.currentTimeMillis();
bufferEvent.getHeaders().put(TimestampInterceptor.Constants.TIMESTAMP, \
Long.toString(now)); }
return event;
}
private Event readMultilineEventNext(LineResult line, boolean match, Pattern \
pattern) throws IOException {
Event event = null;
Matcher m = pattern.matcher(new String(line.line));
boolean find = m.find();
match = (find && match) || (!find && !match);
if (match) {
/** If matched, merge it to the buffer event. */
mergeEvent(line);
} else {
/**
* If not matched, this line is not part of next event. Then merge the current \
line into the
* buffer event and create a new event with the merged message.
*/
mergeEvent(line);
event = EventBuilder.withBody(bufferEvent.getBody());
bufferEvent = null;
}
return event;
}
> On 二月 28, 2017, 2:24 p.m., Balázs Donát Bessenyei wrote:
> > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java, \
> > line 115 <https://reviews.apache.org/r/50438/diff/2/?file=1638339#file1638339line115>
> >
> > Isn't maxBytes and maxLines missing here?
No, maxBytes and maxLines logic is after readline() in readEvents(). Precisely the \
function should be called needFlushTimeoutEvent(). I will fix it ASAP.
public List<Event> readEvents(int numEvents, boolean backoffWithoutNL,
boolean addByteOffset) throws IOException {
List<Event> events = Lists.newLinkedList();
if (this.multiline) {
boolean match = this.multilinePatternMatched;
Pattern pattern = Pattern.compile(this.multilinePattern);
while (events.size() < numEvents) {
LineResult line = readLine();
if (line == null) {
break;
}
Event event = null;
switch (this.multilinePatternBelong) {
case "next":
event = readMultilineEventNext(line, match, pattern);
break;
case "previous":
event = readMultilineEventPre(line, match, pattern);
break;
default:
break;
}
if (event != null) {
events.add(event);
}
if (bufferEvent != null) {
if (bufferEvent.getBody().length >= multilineMaxBytes
|| Integer.parseInt(bufferEvent.getHeaders().get("lineCount")) == \
multilineMaxLines) { flushBufferEvent(events);
}
}
}
if (isNeedFlushBufferEvent()) {
flushBufferEvent(events);
}
} else {
...
}
- qiao
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50438/#review167074
-----------------------------------------------------------
On 二月 17, 2017, 11:13 a.m., qiao wen wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/50438/
> -----------------------------------------------------------
>
> (Updated 二月 17, 2017, 11:13 a.m.)
>
>
> Review request for Flume.
>
>
> Repository: flume-git
>
>
> Description
> -------
>
> TaidirSource defaults to LINE, this has issue when multiline log events like stack \
> traces and have request/responses. Following part is Java traceback logs. We expect \
> to have log line start regex Key to aggregate all the log lines till the next regex \
> key is found. 2016-07-16 14:59:43,956 ERROR lifecycleSupervisor-1-7 \
> LifecycleSupervisor.run - Unable to start EventDrivenSourceRunner: { \
> source:cn.yottabyte.flume.source.http.HTTPSource{name:sourceHttp,state:IDLE} } - \
> Exception follows.
> java.lang.IllegalStateException: Running HTTP Server found in source: sourceHttp \
> before I started one. Will not attempt to start. at \
> com.google.common.base.Preconditions.checkState(Preconditions.java:145) at \
> cn.yottabyte.flume.source.http.HTTPSource.startHttpSourceServer(HTTPSource.java:170)
> at cn.yottabyte.flume.source.http.HTTPSource.start(HTTPSource.java:166)
> at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
> at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Diffs
> -----
>
> flume-ng-doc/sphinx/FlumeUserGuide.rst afa6625
> flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java \
> 8838320 flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java \
> 42474c4 flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java \
> a107a01 flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java \
> f2347f3 flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java \
> 097ee0b
> Diff: https://reviews.apache.org/r/50438/diff/
>
>
> Testing
> -------
>
> All tests in TestTaildirSource passed.
>
>
> Thanks,
>
> qiao wen
>
>
--===============5921498555617922092==--
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic