[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-commits
Subject:    flume git commit: FLUME-2620. File Channel to support empty values in headers
From:       denes () apache ! org
Date:       2017-07-25 12:04:20
Message-ID: 885745f180ff46f485c341e1430480f6 () git ! apache ! org
[Download RAW message or body]

Repository: flume
Updated Branches:
  refs/heads/trunk c570a51b3 -> 1e69fc7c2


FLUME-2620. File Channel to support empty values in headers

Flume user guide does not specify whether a value in event header could be null or \
not. Given an external system generating events which header values can be null and a \
user configures Flume with Memory Channel then he will have no trouble.
Later on when the user changes Memory Channel to File Channel then Flume will fail \
with NPE. It is because FC is serializing events with protocol buffer and header \
values are defined as required in the proto file.
In this patch I have changed the value field to optional. However protocol buffer \
does not have a notation for null and setting a field to null raises NPE again. Added \
a null check before serialization to prevent this.
There is on caveat: When an optional field is not set, at deserialization it will be \
set to a default value: in this case it will be empty string.

Reviewers: Miklos Csanady

(Marcell Hegedus via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1e69fc7c
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1e69fc7c
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1e69fc7c

Branch: refs/heads/trunk
Commit: 1e69fc7c29f104a2117a62de11cba9b2a2c740e1
Parents: c570a51
Author: Marcell Hegedus <marcell.hegedus@gmail.com>
Authored: Wed Jul 19 14:27:56 2017 +0200
Committer: Denes Arvay <denes@apache.org>
Committed: Wed Jul 19 14:27:56 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flume/channel/file/Put.java |  7 +++-
 .../flume/channel/file/proto/ProtosFactory.java | 40 ++++++++------------
 .../src/main/proto/filechannel.proto            |  2 +-
 .../flume/channel/file/TestFileChannel.java     | 24 ++++++++++++
 .../apache/flume/channel/TestMemoryChannel.java | 22 +++++++++++
 5 files changed, 68 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
 index 0a70a24..c5ea290 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
                
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
 @@ -82,8 +82,11 @@ class Put extends TransactionEventRecord {
       for (String key : headers.keySet()) {
         String value = headers.get(key);
         headerBuilder.clear();
-        eventBuilder.addHeaders(headerBuilder.setKey(key)
-            .setValue(value).build());
+        headerBuilder.setKey(key);
+        if (value != null) {
+          headerBuilder.setValue(value);
+        }
+        eventBuilder.addHeaders(headerBuilder.build());
       }
     }
     eventBuilder.setBody(ByteString.copyFrom(event.getBody()));

http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
 index 50492cc..202f33d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
                
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
 @@ -6831,17 +6831,17 @@ public final class ProtosFactory {
     com.google.protobuf.ByteString
         getKeyBytes();
 
-    // required string value = 2;
+    // optional string value = 2;
     /**
-     * <code>required string value = 2;</code>
+     * <code>optional string value = 2;</code>
      */
     boolean hasValue();
     /**
-     * <code>required string value = 2;</code>
+     * <code>optional string value = 2;</code>
      */
     java.lang.String getValue();
     /**
-     * <code>required string value = 2;</code>
+     * <code>optional string value = 2;</code>
      */
     com.google.protobuf.ByteString
         getValueBytes();
@@ -6990,17 +6990,17 @@ public final class ProtosFactory {
       }
     }
 
-    // required string value = 2;
+    // optional string value = 2;
     public static final int VALUE_FIELD_NUMBER = 2;
     private java.lang.Object value_;
     /**
-     * <code>required string value = 2;</code>
+     * <code>optional string value = 2;</code>
      */
     public boolean hasValue() {
       return ((bitField0_ & 0x00000002) == 0x00000002);
     }
     /**
-     * <code>required string value = 2;</code>
+     * <code>optional string value = 2;</code>
      */
     public java.lang.String getValue() {
       java.lang.Object ref = value_;
@@ -7017,7 +7017,7 @@ public final class ProtosFactory {
       }
     }
     /**
-     * <code>required string value = 2;</code>
+     * <code>optional string value = 2;</code>
      */
     public com.google.protobuf.ByteString
         getValueBytes() {
@@ -7046,10 +7046,6 @@ public final class ProtosFactory {
         memoizedIsInitialized = 0;
         return false;
       }
-      if (!hasValue()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -7271,10 +7267,6 @@ public final class ProtosFactory {
           
           return false;
         }
-        if (!hasValue()) {
-          
-          return false;
-        }
         return true;
       }
 
@@ -7371,16 +7363,16 @@ public final class ProtosFactory {
         return this;
       }
 
-      // required string value = 2;
+      // optional string value = 2;
       private java.lang.Object value_ = "";
       /**
-       * <code>required string value = 2;</code>
+       * <code>optional string value = 2;</code>
        */
       public boolean hasValue() {
         return ((bitField0_ & 0x00000002) == 0x00000002);
       }
       /**
-       * <code>required string value = 2;</code>
+       * <code>optional string value = 2;</code>
        */
       public java.lang.String getValue() {
         java.lang.Object ref = value_;
@@ -7394,7 +7386,7 @@ public final class ProtosFactory {
         }
       }
       /**
-       * <code>required string value = 2;</code>
+       * <code>optional string value = 2;</code>
        */
       public com.google.protobuf.ByteString
           getValueBytes() {
@@ -7410,7 +7402,7 @@ public final class ProtosFactory {
         }
       }
       /**
-       * <code>required string value = 2;</code>
+       * <code>optional string value = 2;</code>
        */
       public Builder setValue(
           java.lang.String value) {
@@ -7423,7 +7415,7 @@ public final class ProtosFactory {
         return this;
       }
       /**
-       * <code>required string value = 2;</code>
+       * <code>optional string value = 2;</code>
        */
       public Builder clearValue() {
         bitField0_ = (bitField0_ & ~0x00000002);
@@ -7432,7 +7424,7 @@ public final class ProtosFactory {
         return this;
       }
       /**
-       * <code>required string value = 2;</code>
+       * <code>optional string value = 2;</code>
        */
       public Builder setValueBytes(
           com.google.protobuf.ByteString value) {
@@ -7546,7 +7538,7 @@ public final class ProtosFactory {
       "ansactionEventFooter\">\n\nFlumeEvent\022\"\n\007he" +
       "aders\030\001 \003(\0132\021.FlumeEventHeader\022\014\n\004body\030\002",
       " \002(\014\".\n\020FlumeEventHeader\022\013\n\003key\030\001 \002(\t\022\r\n" \
                +
-      "\005value\030\002 \002(\tB4\n#org.apache.flume.channel" +
+      "\005value\030\002 \001(\tB4\n#org.apache.flume.channel" +
       ".file.protoB\rProtosFactory"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner \
assigner =

http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto \
b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto index \
                25520e8..929b41d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
+++ b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
@@ -83,5 +83,5 @@ message FlumeEvent {
 
 message FlumeEventHeader {
   required string key = 1;
-  required string value = 2;
+  optional string value = 2;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
 index 8efe991..a3d27f7 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
                
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
 @@ -18,6 +18,7 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -69,6 +70,7 @@ public class TestFileChannel extends TestFileChannelBase {
 
   private static final Logger LOG = LoggerFactory
           .getLogger(TestFileChannel.class);
+  public static final String TEST_KEY = "test_key";
 
   @Before
   public void setup() throws Exception {
@@ -234,6 +236,28 @@ public class TestFileChannel extends TestFileChannelBase {
   }
 
   @Test
+  public void testPutConvertsNullValueToEmptyStrInHeader() throws Exception {
+    channel.start();
+
+    Event event = EventBuilder.withBody("test body".getBytes(Charsets.UTF_8),
+        Collections.<String, String>singletonMap(TEST_KEY, null));
+
+    Transaction txPut = channel.getTransaction();
+    txPut.begin();
+    channel.put(event);
+    txPut.commit();
+    txPut.close();
+
+    Transaction txTake = channel.getTransaction();
+    txTake.begin();
+    Event eventTaken = channel.take();
+    Assert.assertArrayEquals(event.getBody(), eventTaken.getBody());
+    Assert.assertEquals("", eventTaken.getHeaders().get(TEST_KEY));
+    txTake.commit();
+    txTake.close();
+  }
+
+  @Test
   public void testCommitAfterNoPutTake() throws Exception {
     channel.start();
     Assert.assertTrue(channel.isOpen());

http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java \
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java index \
                344bb58..f7e43eb 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
@@ -19,6 +19,7 @@
 
 package org.apache.flume.channel;
 
+import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableMap;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -32,6 +33,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -72,6 +74,26 @@ public class TestMemoryChannel {
   }
 
   @Test
+  public void testPutAcceptsNullValueInHeader() {
+    Configurables.configure(channel, new Context());
+
+    Event event = EventBuilder.withBody("test body".getBytes(Charsets.UTF_8),
+        Collections.<String, String>singletonMap("test_key", null));
+
+    Transaction txPut = channel.getTransaction();
+    txPut.begin();
+    channel.put(event);
+    txPut.commit();
+    txPut.close();
+
+    Transaction txTake = channel.getTransaction();
+    txTake.begin();
+    Event eventTaken = channel.take();
+    Assert.assertEquals(event, eventTaken);
+    txTake.commit();
+  }
+
+  @Test
   public void testChannelResize() {
     Context context = new Context();
     Map<String, String> parms = new HashMap<String, String>();


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic