[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: git commit: FLUME-2062. Make it possible for HBase sink to deposit event headers into corresponding
From: hshreedharan () apache ! org
Date: 2013-05-31 19:39:00
Message-ID: 5f0e202e57344d91a9900c6168aea917 () git ! apache ! org
[Download RAW message or body]
Updated Branches:
refs/heads/flume-1.4 7f66495d9 -> f4239fca1
FLUME-2062. Make it possible for HBase sink to deposit event headers into \
corresponding column qualifiers
(Roman Shaposhnik via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f4239fca
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f4239fca
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f4239fca
Branch: refs/heads/flume-1.4
Commit: f4239fca168a63bfb58e777397faeff9ef2209f0
Parents: 7f66495
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Fri May 31 12:19:21 2013 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri May 31 12:38:33 2013 -0700
----------------------------------------------------------------------
.../sink/hbase/RegexHbaseEventSerializer.java | 25 +++++++--
.../sink/hbase/TestRegexHbaseEventSerializer.java | 40 ++++++++++++++-
2 files changed, 58 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/f4239fca/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
index 965d6b0..0df559d 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flume.sink.hbase;
import java.util.Calendar;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -63,6 +64,9 @@ public class RegexHbaseEventSerializer implements \
HbaseEventSerializer { public static final String COL_NAME_CONFIG = "colNames";
public static final String COLUMN_NAME_DEFAULT = "payload";
+ /** Whether to deposit event headers into corresponding column qualifiers */
+ public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders";
+ public static final boolean DEPOSIT_HEADERS_DEFAULT = false;
/* This is a nonce used in HBase row-keys, such that the same row-key
* never gets written more than once from within this JVM. */
@@ -72,7 +76,9 @@ public class RegexHbaseEventSerializer implements \
HbaseEventSerializer { protected byte[] cf;
private byte[] payload;
private List<byte[]> colNames = Lists.newArrayList();
+ private Map<String, String> headers;
private boolean regexIgnoreCase;
+ private boolean depositHeaders;
private Pattern inputPattern;
@Override
@@ -80,6 +86,8 @@ public class RegexHbaseEventSerializer implements \
HbaseEventSerializer { String regex = context.getString(REGEX_CONFIG, \
REGEX_DEFAULT); regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG,
INGORE_CASE_DEFAULT);
+ depositHeaders = context.getBoolean(DEPOSIT_HEADERS_CONFIG,
+ DEPOSIT_HEADERS_DEFAULT);
inputPattern = Pattern.compile(regex, Pattern.DOTALL
+ (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));
@@ -96,6 +104,7 @@ public class RegexHbaseEventSerializer implements \
HbaseEventSerializer {
@Override
public void initialize(Event event, byte[] columnFamily) {
+ this.headers = event.getHeaders();
this.payload = event.getBody();
this.cf = columnFamily;
}
@@ -142,19 +151,23 @@ public class RegexHbaseEventSerializer implements \
HbaseEventSerializer { return Lists.newArrayList();
}
- try {
+ try {
rowKey = getRowKey();
Put put = new Put(rowKey);
for (int i = 0; i < colNames.size(); i++) {
put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
}
+ if (depositHeaders) {
+ for (Map.Entry<String, String> entry : headers.entrySet()) {
+ put.add(cf, entry.getKey().getBytes(Charsets.UTF_8), \
entry.getValue().getBytes(Charsets.UTF_8)); + }
+ }
actions.add(put);
- }
- catch (Exception e) {
- throw new FlumeException("Could not get row key!", e);
- }
- return actions;
+ } catch (Exception e) {
+ throw new FlumeException("Could not get row key!", e);
+ }
+ return actions;
}
@Override
http://git-wip-us.apache.org/repos/asf/flume/blob/f4239fca/flume-ng-sinks/flume-ng-hba \
se-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
index f63eed0..6cec36f 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
@@ -158,4 +158,42 @@ public class TestRegexHbaseEventSerializer {
assertEquals("100-" + randomString + "-2", rk3);
}
-}
\ No newline at end of file
+
+ @Test
+ /** Test depositing of the header information. */
+ public void testDepositHeaders() throws Exception {
+ RegexHbaseEventSerializer s = new RegexHbaseEventSerializer();
+ Context context = new Context();
+ context.put(RegexHbaseEventSerializer.DEPOSIT_HEADERS_CONFIG,
+ "true");
+ s.configure(context);
+
+ String body = "body";
+ Map<String, String> headers = Maps.newHashMap();
+ headers.put("header1", "value1");
+ headers.put("header2", "value2");
+
+ Event e = EventBuilder.withBody(Bytes.toBytes(body), headers);
+ s.initialize(e, "CF".getBytes());
+ List<Row> actions = s.getActions();
+ assertEquals(1, s.getActions().size());
+ assertTrue(actions.get(0) instanceof Put);
+
+ Put put = (Put) actions.get(0);
+ assertTrue(put.getFamilyMap().containsKey(s.cf));
+ List<KeyValue> kvPairs = put.getFamilyMap().get(s.cf);
+ assertTrue(kvPairs.size() == 3);
+
+ Map<String, String> resultMap = Maps.newHashMap();
+ for (KeyValue kv : kvPairs) {
+ resultMap.put(new String(kv.getQualifier()), new String(kv.getValue()));
+ }
+
+ assertEquals(body, resultMap.get("payload"));
+ assertEquals("value1", resultMap.get("header1"));
+ assertEquals("value2", resultMap.get("header2"));
+
+ List<Increment> increments = s.getIncrements();
+ assertEquals(0, increments.size());
+ }
+}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic