[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: flume git commit: FLUME-2620. File Channel to support empty values in headers
From: denes () apache ! org
Date: 2017-07-25 12:04:20
Message-ID: 885745f180ff46f485c341e1430480f6 () git ! apache ! org
[Download RAW message or body]
Repository: flume
Updated Branches:
refs/heads/trunk c570a51b3 -> 1e69fc7c2
FLUME-2620. File Channel to support empty values in headers
Flume user guide does not specify whether a value in event header could be null or \
not. Given an external system generating events which header values can be null and a \
user configures Flume with Memory Channel then he will have no trouble.
Later on when the user changes Memory Channel to File Channel then Flume will fail \
with NPE. It is because FC is serializing events with protocol buffer and header \
values are defined as required in the proto file.
In this patch I have changed the value field to optional. However protocol buffer \
does not have a notation for null and setting a field to null raises NPE again. Added \
a null check before serialization to prevent this.
There is on caveat: When an optional field is not set, at deserialization it will be \
set to a default value: in this case it will be empty string.
Reviewers: Miklos Csanady
(Marcell Hegedus via Denes Arvay)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1e69fc7c
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1e69fc7c
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1e69fc7c
Branch: refs/heads/trunk
Commit: 1e69fc7c29f104a2117a62de11cba9b2a2c740e1
Parents: c570a51
Author: Marcell Hegedus <marcell.hegedus@gmail.com>
Authored: Wed Jul 19 14:27:56 2017 +0200
Committer: Denes Arvay <denes@apache.org>
Committed: Wed Jul 19 14:27:56 2017 +0200
----------------------------------------------------------------------
.../java/org/apache/flume/channel/file/Put.java | 7 +++-
.../flume/channel/file/proto/ProtosFactory.java | 40 ++++++++------------
.../src/main/proto/filechannel.proto | 2 +-
.../flume/channel/file/TestFileChannel.java | 24 ++++++++++++
.../apache/flume/channel/TestMemoryChannel.java | 22 +++++++++++
5 files changed, 68 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
index 0a70a24..c5ea290 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
@@ -82,8 +82,11 @@ class Put extends TransactionEventRecord {
for (String key : headers.keySet()) {
String value = headers.get(key);
headerBuilder.clear();
- eventBuilder.addHeaders(headerBuilder.setKey(key)
- .setValue(value).build());
+ headerBuilder.setKey(key);
+ if (value != null) {
+ headerBuilder.setValue(value);
+ }
+ eventBuilder.addHeaders(headerBuilder.build());
}
}
eventBuilder.setBody(ByteString.copyFrom(event.getBody()));
http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
index 50492cc..202f33d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
@@ -6831,17 +6831,17 @@ public final class ProtosFactory {
com.google.protobuf.ByteString
getKeyBytes();
- // required string value = 2;
+ // optional string value = 2;
/**
- * <code>required string value = 2;</code>
+ * <code>optional string value = 2;</code>
*/
boolean hasValue();
/**
- * <code>required string value = 2;</code>
+ * <code>optional string value = 2;</code>
*/
java.lang.String getValue();
/**
- * <code>required string value = 2;</code>
+ * <code>optional string value = 2;</code>
*/
com.google.protobuf.ByteString
getValueBytes();
@@ -6990,17 +6990,17 @@ public final class ProtosFactory {
}
}
- // required string value = 2;
+ // optional string value = 2;
public static final int VALUE_FIELD_NUMBER = 2;
private java.lang.Object value_;
/**
- * <code>required string value = 2;</code>
+ * <code>optional string value = 2;</code>
*/
public boolean hasValue() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
- * <code>required string value = 2;</code>
+ * <code>optional string value = 2;</code>
*/
public java.lang.String getValue() {
java.lang.Object ref = value_;
@@ -7017,7 +7017,7 @@ public final class ProtosFactory {
}
}
/**
- * <code>required string value = 2;</code>
+ * <code>optional string value = 2;</code>
*/
public com.google.protobuf.ByteString
getValueBytes() {
@@ -7046,10 +7046,6 @@ public final class ProtosFactory {
memoizedIsInitialized = 0;
return false;
}
- if (!hasValue()) {
- memoizedIsInitialized = 0;
- return false;
- }
memoizedIsInitialized = 1;
return true;
}
@@ -7271,10 +7267,6 @@ public final class ProtosFactory {
return false;
}
- if (!hasValue()) {
-
- return false;
- }
return true;
}
@@ -7371,16 +7363,16 @@ public final class ProtosFactory {
return this;
}
- // required string value = 2;
+ // optional string value = 2;
private java.lang.Object value_ = "";
/**
- * <code>required string value = 2;</code>
+ * <code>optional string value = 2;</code>
*/
public boolean hasValue() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
- * <code>required string value = 2;</code>
+ * <code>optional string value = 2;</code>
*/
public java.lang.String getValue() {
java.lang.Object ref = value_;
@@ -7394,7 +7386,7 @@ public final class ProtosFactory {
}
}
/**
- * <code>required string value = 2;</code>
+ * <code>optional string value = 2;</code>
*/
public com.google.protobuf.ByteString
getValueBytes() {
@@ -7410,7 +7402,7 @@ public final class ProtosFactory {
}
}
/**
- * <code>required string value = 2;</code>
+ * <code>optional string value = 2;</code>
*/
public Builder setValue(
java.lang.String value) {
@@ -7423,7 +7415,7 @@ public final class ProtosFactory {
return this;
}
/**
- * <code>required string value = 2;</code>
+ * <code>optional string value = 2;</code>
*/
public Builder clearValue() {
bitField0_ = (bitField0_ & ~0x00000002);
@@ -7432,7 +7424,7 @@ public final class ProtosFactory {
return this;
}
/**
- * <code>required string value = 2;</code>
+ * <code>optional string value = 2;</code>
*/
public Builder setValueBytes(
com.google.protobuf.ByteString value) {
@@ -7546,7 +7538,7 @@ public final class ProtosFactory {
"ansactionEventFooter\">\n\nFlumeEvent\022\"\n\007he" +
"aders\030\001 \003(\0132\021.FlumeEventHeader\022\014\n\004body\030\002",
" \002(\014\".\n\020FlumeEventHeader\022\013\n\003key\030\001 \002(\t\022\r\n" \
+
- "\005value\030\002 \002(\tB4\n#org.apache.flume.channel" +
+ "\005value\030\002 \001(\tB4\n#org.apache.flume.channel" +
".file.protoB\rProtosFactory"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner \
assigner =
http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto \
b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto index \
25520e8..929b41d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
+++ b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
@@ -83,5 +83,5 @@ message FlumeEvent {
message FlumeEventHeader {
required string key = 1;
- required string value = 2;
+ optional string value = 2;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 8efe991..a3d27f7 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -18,6 +18,7 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -69,6 +70,7 @@ public class TestFileChannel extends TestFileChannelBase {
private static final Logger LOG = LoggerFactory
.getLogger(TestFileChannel.class);
+ public static final String TEST_KEY = "test_key";
@Before
public void setup() throws Exception {
@@ -234,6 +236,28 @@ public class TestFileChannel extends TestFileChannelBase {
}
@Test
+ public void testPutConvertsNullValueToEmptyStrInHeader() throws Exception {
+ channel.start();
+
+ Event event = EventBuilder.withBody("test body".getBytes(Charsets.UTF_8),
+ Collections.<String, String>singletonMap(TEST_KEY, null));
+
+ Transaction txPut = channel.getTransaction();
+ txPut.begin();
+ channel.put(event);
+ txPut.commit();
+ txPut.close();
+
+ Transaction txTake = channel.getTransaction();
+ txTake.begin();
+ Event eventTaken = channel.take();
+ Assert.assertArrayEquals(event.getBody(), eventTaken.getBody());
+ Assert.assertEquals("", eventTaken.getHeaders().get(TEST_KEY));
+ txTake.commit();
+ txTake.close();
+ }
+
+ @Test
public void testCommitAfterNoPutTake() throws Exception {
channel.start();
Assert.assertTrue(channel.isOpen());
http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java \
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java index \
344bb58..f7e43eb 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
@@ -19,6 +19,7 @@
package org.apache.flume.channel;
+import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
@@ -32,6 +33,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
@@ -72,6 +74,26 @@ public class TestMemoryChannel {
}
@Test
+ public void testPutAcceptsNullValueInHeader() {
+ Configurables.configure(channel, new Context());
+
+ Event event = EventBuilder.withBody("test body".getBytes(Charsets.UTF_8),
+ Collections.<String, String>singletonMap("test_key", null));
+
+ Transaction txPut = channel.getTransaction();
+ txPut.begin();
+ channel.put(event);
+ txPut.commit();
+ txPut.close();
+
+ Transaction txTake = channel.getTransaction();
+ txTake.begin();
+ Event eventTaken = channel.take();
+ Assert.assertEquals(event, eventTaken);
+ txTake.commit();
+ }
+
+ @Test
public void testChannelResize() {
Context context = new Context();
Map<String, String> parms = new HashMap<String, String>();
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic