[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-dev
Subject:    Re: Review Request 50438: FLUME-2961:Make TaildirSource work with multiline
From:       qiao wen <315524513 () qq ! com>
Date:       2017-02-28 15:13:17
Message-ID: 20170228151317.32557.29986 () reviews ! apache ! org
[Download RAW message or body]

--===============5921498555617922092==
MIME-Version: 1.0
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 8bit



> On 二月 28, 2017, 2:24 p.m., Balázs Donát Bessenyei wrote:
> > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java, \
> > line 196 <https://reviews.apache.org/r/50438/diff/2/?file=1638339#file1638339line196>
> >  
> > Can the pattern be cached? (Eg. in configure)

OK, I will fix it.


> On 二月 28, 2017, 2:24 p.m., Balázs Donát Bessenyei wrote:
> > flume-ng-doc/sphinx/FlumeUserGuide.rst, line 1191
> > <https://reviews.apache.org/r/50438/diff/2/?file=1638337#file1638337line1191>
> > 
> > Can you please clarify this setting?

It's explained in the function readMultilineEventPre() and readMultilineEventNext(). \
And you can see the effect in TestTaildirSource.

Previous:   If not matched, this line is not part of previous event when the buffer \
                event is not null.
               Then create a new event with buffer event's message and put the \
current line into the  cleared buffer event.
Next:        If not matched, this line is not part of next event. Then merge the \
                current line into the
               buffer event and create a new event with the merged message.

private Event readMultilineEventPre(LineResult line, boolean match, Pattern pattern)
          throws IOException {
    Event event = null;
    Matcher m = pattern.matcher(new String(line.line));
    boolean find = m.find();
    match = (find && match) || (!find && !match);
    byte[] lineBytes = toOriginBytes(line);
    if (match) {
      /** If matched, merge it to the buffer event. */
      mergeEvent(line);
    } else {
      /**
       * If not matched, this line is not part of previous event when the buffer \
                event is not null.
       * Then create a new event with buffer event's message and put the current line \
                into the
       * cleared buffer event.
       */
      if (bufferEvent != null) {
        event = EventBuilder.withBody(bufferEvent.getBody());
      }
      bufferEvent = null;
      bufferEvent = EventBuilder.withBody(lineBytes);
      if (line.lineSepInclude) {
        bufferEvent.getHeaders().put("lineCount", "1");
      }
      long now = System.currentTimeMillis();
      bufferEvent.getHeaders().put(TimestampInterceptor.Constants.TIMESTAMP, \
Long.toString(now));  }
    return event;
  }

  private Event readMultilineEventNext(LineResult line, boolean match, Pattern \
pattern)  throws IOException {
    Event event = null;
    Matcher m = pattern.matcher(new String(line.line));
    boolean find = m.find();
    match = (find && match) || (!find && !match);
    if (match) {
      /** If matched, merge it to the buffer event. */
      mergeEvent(line);
    } else {
      /**
       * If not matched, this line is not part of next event. Then merge the current \
                line into the
       * buffer event and create a new event with the merged message.
       */
      mergeEvent(line);
      event = EventBuilder.withBody(bufferEvent.getBody());
      bufferEvent = null;
    }
    return event;
  }


> On 二月 28, 2017, 2:24 p.m., Balázs Donát Bessenyei wrote:
> > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java, \
> > line 115 <https://reviews.apache.org/r/50438/diff/2/?file=1638339#file1638339line115>
> >  
> > Isn't maxBytes and maxLines missing here?

No, maxBytes and maxLines logic is after readline() in readEvents(). Precisely the \
function should be called needFlushTimeoutEvent(). I will fix it ASAP.

public List<Event> readEvents(int numEvents, boolean backoffWithoutNL,
      boolean addByteOffset) throws IOException {
    List<Event> events = Lists.newLinkedList();
    if (this.multiline) {
      boolean match = this.multilinePatternMatched;
      Pattern pattern = Pattern.compile(this.multilinePattern);
      while (events.size() < numEvents) {
        LineResult line = readLine();
        if (line == null) {
          break;
        }
        Event event = null;
        switch (this.multilinePatternBelong) {
          case "next":
            event = readMultilineEventNext(line, match, pattern);
            break;
          case "previous":
            event = readMultilineEventPre(line, match, pattern);
            break;
          default:
            break;
        }
        if (event != null) {
          events.add(event);
        }
        if (bufferEvent != null) {
          if (bufferEvent.getBody().length >= multilineMaxBytes
              || Integer.parseInt(bufferEvent.getHeaders().get("lineCount")) == \
multilineMaxLines) {  flushBufferEvent(events);
          }
        }
      }
      if (isNeedFlushBufferEvent()) {
        flushBufferEvent(events);
      }
    } else {
    ...
    }


- qiao


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50438/#review167074
-----------------------------------------------------------


On 二月 17, 2017, 11:13 a.m., qiao wen wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/50438/
> -----------------------------------------------------------
> 
> (Updated 二月 17, 2017, 11:13 a.m.)
> 
> 
> Review request for Flume.
> 
> 
> Repository: flume-git
> 
> 
> Description
> -------
> 
> TaidirSource defaults to LINE, this has issue when multiline log events like stack \
> traces and have request/responses. Following part is Java traceback logs. We expect \
> to have log line start regex Key to aggregate all the log lines till the next regex \
> key is found. 2016-07-16 14:59:43,956 ERROR lifecycleSupervisor-1-7 \
> LifecycleSupervisor.run - Unable to start EventDrivenSourceRunner: { \
> source:cn.yottabyte.flume.source.http.HTTPSource{name:sourceHttp,state:IDLE} } - \
>                 Exception follows.
> java.lang.IllegalStateException: Running HTTP Server found in source: sourceHttp \
> before I started one. Will not attempt to start. at \
> com.google.common.base.Preconditions.checkState(Preconditions.java:145) at \
> cn.yottabyte.flume.source.http.HTTPSource.startHttpSourceServer(HTTPSource.java:170)
>  at cn.yottabyte.flume.source.http.HTTPSource.start(HTTPSource.java:166)
> at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
>  at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 
> 
> Diffs
> -----
> 
> flume-ng-doc/sphinx/FlumeUserGuide.rst afa6625 
> flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java \
> 8838320  flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java \
> 42474c4  flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java \
> a107a01  flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java \
> f2347f3  flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java \
> 097ee0b  
> Diff: https://reviews.apache.org/r/50438/diff/
> 
> 
> Testing
> -------
> 
> All tests in TestTaildirSource passed.
> 
> 
> Thanks,
> 
> qiao wen
> 
> 


--===============5921498555617922092==--


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic