[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-commits
Subject:    flume git commit: FLUME-2797. Use SourceCounter for SyslogTcpSource
From:       mpercy () apache ! org
Date:       2016-09-08 17:06:05
Message-ID: f01fbb5e0f5b4e39ace8558fd4e7604f () git ! apache ! org
[Download RAW message or body]

Repository: flume
Updated Branches:
  refs/heads/trunk a0a50849d -> d9c9a7dd9


FLUME-2797. Use SourceCounter for SyslogTcpSource

This patch uses the newer SourceCounter class for the SyslogTcpSource.
It also marks the SyslogTcpSource as deprecated and improves how the
unit tests are written so they don't require DNS.

Reviewers: Attila Simon, Lior Zeno, Balázs Donát Bessenyei, Mike Percy

(Denes Arvay via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d9c9a7dd
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d9c9a7dd
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d9c9a7dd

Branch: refs/heads/trunk
Commit: d9c9a7dd9a6889ecf6b9dc88fb8e02ccc1cd5167
Parents: a0a5084
Author: Denes Arvay <denes@cloudera.com>
Authored: Thu Sep 8 10:00:55 2016 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Thu Sep 8 10:05:12 2016 -0700

----------------------------------------------------------------------
 .../apache/flume/source/SyslogTcpSource.java    | 36 ++++++---
 .../apache/flume/source/SyslogUDPSource.java    | 32 +++++---
 .../flume/source/TestSyslogTcpSource.java       | 21 +++--
 .../flume/source/TestSyslogUdpSource.java       | 85 ++++++++++++--------
 4 files changed, 113 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d9c9a7dd/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java \
b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java index \
                185c00c..c7e8248 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
@@ -28,11 +28,11 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
@@ -47,6 +47,10 @@ import \
org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;  import \
org.slf4j.Logger;  import org.slf4j.LoggerFactory;
 
+/**
+ * @deprecated use {@link MultiportSyslogTCPSource} instead.
+ */
+@Deprecated
 public class SyslogTcpSource extends AbstractSource
                              implements EventDrivenSource, Configurable {
   private static final Logger logger = \
LoggerFactory.getLogger(SyslogTcpSource.class); @@ -56,7 +60,7 @@ public class \
SyslogTcpSource extends AbstractSource  private Channel nettyChannel;
   private Integer eventSize;
   private Map<String, String> formaterProp;
-  private CounterGroup counterGroup = new CounterGroup();
+  private SourceCounter sourceCounter;
   private Set<String> keepFields;
 
   public class syslogTcpHandler extends SimpleChannelHandler {
@@ -85,14 +89,14 @@ public class SyslogTcpSource extends AbstractSource
               "rest of the event is received.");
           continue;
         }
+        sourceCounter.incrementEventReceivedCount();
+
         try {
           getChannelProcessor().processEvent(e);
-          counterGroup.incrementAndGet("events.success");
+          sourceCounter.incrementEventAcceptedCount();
         } catch (ChannelException ex) {
-          counterGroup.incrementAndGet("events.dropped");
           logger.error("Error writting to channel, event dropped", ex);
         } catch (RuntimeException ex) {
-          counterGroup.incrementAndGet("events.dropped");
           logger.error("Error parsing event from syslog stream, event dropped", ex);
           return;
         }
@@ -126,13 +130,14 @@ public class SyslogTcpSource extends AbstractSource
       nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
     }
 
+    sourceCounter.start();
     super.start();
   }
 
   @Override
   public void stop() {
     logger.info("Syslog TCP Source stopping...");
-    logger.info("Metrics:{}", counterGroup);
+    logger.info("Metrics: {}", sourceCounter);
 
     if (nettyChannel != null) {
       nettyChannel.close();
@@ -145,6 +150,7 @@ public class SyslogTcpSource extends AbstractSource
       }
     }
 
+    sourceCounter.stop();
     super.stop();
   }
 
@@ -161,16 +167,24 @@ public class SyslogTcpSource extends AbstractSource
         context.getString(
             SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
             SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
+
+    if (sourceCounter == null) {
+      sourceCounter = new SourceCounter(getName());
+    }
   }
 
   @VisibleForTesting
-  public int getSourcePort() {
+  InetSocketAddress getBoundAddress() {
     SocketAddress localAddress = nettyChannel.getLocalAddress();
-    if (localAddress instanceof InetSocketAddress) {
-      InetSocketAddress addr = (InetSocketAddress) localAddress;
-      return addr.getPort();
+    if (!(localAddress instanceof InetSocketAddress)) {
+      throw new IllegalArgumentException("Not bound to an internet address");
     }
-    return 0;
+    return (InetSocketAddress) localAddress;
   }
 
+
+  @VisibleForTesting
+  SourceCounter getSourceCounter() {
+    return sourceCounter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/d9c9a7dd/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java \
b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java index \
                175bebb..ae0b8ac 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
@@ -28,11 +28,11 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
@@ -60,7 +60,7 @@ public class SyslogUDPSource extends AbstractSource
 
   private static final Logger logger = \
LoggerFactory.getLogger(SyslogUDPSource.class);  
-  private CounterGroup counterGroup = new CounterGroup();
+  private SourceCounter sourceCounter;
 
   // Default Min size
   public static final int DEFAULT_MIN_SIZE = 2048;
@@ -85,14 +85,14 @@ public class SyslogUDPSource extends AbstractSource
         if (e == null) {
           return;
         }
+        sourceCounter.incrementEventReceivedCount();
+
         getChannelProcessor().processEvent(e);
-        counterGroup.incrementAndGet("events.success");
+        sourceCounter.incrementEventAcceptedCount();
       } catch (ChannelException ex) {
-        counterGroup.incrementAndGet("events.dropped");
         logger.error("Error writting to channel", ex);
         return;
       } catch (RuntimeException ex) {
-        counterGroup.incrementAndGet("events.dropped");
         logger.error("Error parsing event from syslog stream, event dropped", ex);
         return;
       }
@@ -123,13 +123,14 @@ public class SyslogUDPSource extends AbstractSource
       nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
     }
 
+    sourceCounter.start();
     super.start();
   }
 
   @Override
   public void stop() {
     logger.info("Syslog UDP Source stopping...");
-    logger.info("Metrics:{}", counterGroup);
+    logger.info("Metrics: {}", sourceCounter);
     if (nettyChannel != null) {
       nettyChannel.close();
       try {
@@ -141,6 +142,7 @@ public class SyslogUDPSource extends AbstractSource
       }
     }
 
+    sourceCounter.stop();
     super.stop();
   }
 
@@ -156,15 +158,23 @@ public class SyslogUDPSource extends AbstractSource
         context.getString(
             SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
             SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
+
+    if (sourceCounter == null) {
+      sourceCounter = new SourceCounter(getName());
+    }
   }
 
   @VisibleForTesting
-  public int getSourcePort() {
+  InetSocketAddress getBoundAddress() {
     SocketAddress localAddress = nettyChannel.getLocalAddress();
-    if (localAddress instanceof InetSocketAddress) {
-      InetSocketAddress addr = (InetSocketAddress) localAddress;
-      return addr.getPort();
+    if (!(localAddress instanceof InetSocketAddress)) {
+      throw new IllegalArgumentException("Not bound to an internet address");
     }
-    return 0;
+    return (InetSocketAddress) localAddress;
+  }
+
+  @VisibleForTesting
+  SourceCounter getSourceCounter() {
+    return sourceCounter;
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/d9c9a7dd/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java \
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java index \
                10ef8d8..f07acc6 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.ArrayList;
 import java.util.List;
@@ -62,7 +63,7 @@ public class TestSyslogTcpSource {
 
     Configurables.configure(channel, new Context());
 
-    List<Channel> channels = new ArrayList<Channel>();
+    List<Channel> channels = new ArrayList<>();
     channels.add(channel);
 
     ChannelSelector rcs = new ReplicatingChannelSelector();
@@ -82,15 +83,14 @@ public class TestSyslogTcpSource {
     init(keepFields);
     source.start();
     // Write some message to the syslog port
-    Socket syslogSocket;
+    InetSocketAddress addr = source.getBoundAddress();
     for (int i = 0; i < 10 ; i++) {
-      syslogSocket = new Socket(
-        InetAddress.getLocalHost(), source.getSourcePort());
-      syslogSocket.getOutputStream().write(bodyWithTandH.getBytes());
-      syslogSocket.close();
+      try (Socket syslogSocket = new Socket(addr.getAddress(), addr.getPort())) {
+        syslogSocket.getOutputStream().write(bodyWithTandH.getBytes());
+      }
     }
 
-    List<Event> channelEvents = new ArrayList<Event>();
+    List<Event> channelEvents = new ArrayList<>();
     Transaction txn = channel.getTransaction();
     txn.begin();
     for (int i = 0; i < 10; i++) {
@@ -152,5 +152,12 @@ public class TestSyslogTcpSource {
   public void testKeepTimestamp() throws IOException {
     runKeepFieldsTest("timestamp");
   }
+
+  @Test
+  public void testSourceCounter() throws IOException {
+    runKeepFieldsTest("all");
+    Assert.assertEquals(10, source.getSourceCounter().getEventAcceptedCount());
+    Assert.assertEquals(10, source.getSourceCounter().getEventReceivedCount());
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/d9c9a7dd/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java \
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java index \
                e5b7a06..6ba7512 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
@@ -37,6 +37,9 @@ import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -71,6 +74,7 @@ public class TestSyslogUdpSource {
 
     source.setChannelProcessor(new ChannelProcessor(rcs));
     Context context = new Context();
+    context.put("host", InetAddress.getLoopbackAddress().getHostAddress());
     context.put("port", String.valueOf(TEST_SYSLOG_PORT));
     context.put("keepFields", keepFields);
 
@@ -85,18 +89,12 @@ public class TestSyslogUdpSource {
     init(keepFields);
     source.start();
     // Write some message to the syslog port
-    DatagramSocket syslogSocket;
-    DatagramPacket datagramPacket;
-    datagramPacket = new DatagramPacket(bodyWithTandH.getBytes(),
-      bodyWithTandH.getBytes().length,
-      InetAddress.getLocalHost(), source.getSourcePort());
+    DatagramPacket datagramPacket = createDatagramPacket(bodyWithTandH.getBytes());
     for (int i = 0; i < 10 ; i++) {
-      syslogSocket = new DatagramSocket();
-      syslogSocket.send(datagramPacket);
-      syslogSocket.close();
+      sendDatagramPacket(datagramPacket);
     }
 
-    List<Event> channelEvents = new ArrayList<Event>();
+    List<Event> channelEvents = new ArrayList<>();
     Transaction txn = channel.getTransaction();
     txn.begin();
     for (int i = 0; i < 10; i++) {
@@ -105,13 +103,7 @@ public class TestSyslogUdpSource {
       channelEvents.add(e);
     }
 
-    try {
-      txn.commit();
-    } catch (Throwable t) {
-      txn.rollback();
-    } finally {
-      txn.close();
-    }
+    commitAndCloseTransaction(txn);
 
     source.stop();
     for (Event e : channelEvents) {
@@ -139,18 +131,13 @@ public class TestSyslogUdpSource {
 
     byte[] largePayload = getPayload(1000).getBytes();
 
-    DatagramSocket syslogSocket;
-    DatagramPacket datagramPacket;
-    datagramPacket = new DatagramPacket(largePayload,
-            1000,
-            InetAddress.getLocalHost(), source.getSourcePort());
+    DatagramPacket datagramPacket = createDatagramPacket(largePayload);
+
     for (int i = 0; i < 10 ; i++) {
-      syslogSocket = new DatagramSocket();
-      syslogSocket.send(datagramPacket);
-      syslogSocket.close();
+      sendDatagramPacket(datagramPacket);
     }
 
-    List<Event> channelEvents = new ArrayList<Event>();
+    List<Event> channelEvents = new ArrayList<>();
     Transaction txn = channel.getTransaction();
     txn.begin();
     for (int i = 0; i < 10; i++) {
@@ -159,13 +146,7 @@ public class TestSyslogUdpSource {
       channelEvents.add(e);
     }
 
-    try {
-      txn.commit();
-    } catch (Throwable t) {
-      txn.rollback();
-    } finally {
-      txn.close();
-    }
+    commitAndCloseTransaction(txn);
 
     source.stop();
     for (Event e : channelEvents) {
@@ -200,6 +181,46 @@ public class TestSyslogUdpSource {
     runKeepFieldsTest("timestamp");
   }
 
+  @Test
+  public void testSourceCounter() throws Exception {
+    init("true");
+
+    source.start();
+    DatagramPacket datagramPacket = createDatagramPacket("test".getBytes());
+    sendDatagramPacket(datagramPacket);
+
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+
+    channel.take();
+    commitAndCloseTransaction(txn);
+
+    Assert.assertEquals(1, source.getSourceCounter().getEventAcceptedCount());
+    Assert.assertEquals(1, source.getSourceCounter().getEventReceivedCount());
+  }
+
+  private DatagramPacket createDatagramPacket(byte[] payload) {
+    InetSocketAddress addr = source.getBoundAddress();
+    return new DatagramPacket(payload, payload.length, addr.getAddress(), \
addr.getPort()); +  }
+
+  private void sendDatagramPacket(DatagramPacket datagramPacket) throws IOException \
{ +    try (DatagramSocket syslogSocket = new DatagramSocket()) {
+      syslogSocket.send(datagramPacket);
+    }
+  }
+
+  private void commitAndCloseTransaction(Transaction txn) {
+    try {
+      txn.commit();
+    } catch (Throwable t) {
+      logger.error("Transaction commit failed, rolling back", t);
+      txn.rollback();
+    } finally {
+      txn.close();
+    }
+  }
+
   private String getPayload(int length) {
     StringBuilder payload = new StringBuilder(length);
     for (int n = 0; n < length; ++n) {


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic