[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-commits
Subject:    git commit: FLUME-2062. Make it possible for HBase sink to deposit event headers into corresponding
From:       hshreedharan () apache ! org
Date:       2013-05-31 19:39:00
Message-ID: 5f0e202e57344d91a9900c6168aea917 () git ! apache ! org
[Download RAW message or body]

Updated Branches:
  refs/heads/flume-1.4 7f66495d9 -> f4239fca1


FLUME-2062. Make it possible for HBase sink to deposit event headers into \
corresponding column qualifiers

(Roman Shaposhnik via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f4239fca
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f4239fca
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f4239fca

Branch: refs/heads/flume-1.4
Commit: f4239fca168a63bfb58e777397faeff9ef2209f0
Parents: 7f66495
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Fri May 31 12:19:21 2013 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri May 31 12:38:33 2013 -0700

----------------------------------------------------------------------
 .../sink/hbase/RegexHbaseEventSerializer.java      |   25 +++++++--
 .../sink/hbase/TestRegexHbaseEventSerializer.java  |   40 ++++++++++++++-
 2 files changed, 58 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f4239fca/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
 index 965d6b0..0df559d 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
                
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
 @@ -20,6 +20,7 @@ package org.apache.flume.sink.hbase;
 
 import java.util.Calendar;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -63,6 +64,9 @@ public class RegexHbaseEventSerializer implements \
HbaseEventSerializer {  public static final String COL_NAME_CONFIG = "colNames";
   public static final String COLUMN_NAME_DEFAULT = "payload";
 
+  /** Whether to deposit event headers into corresponding column qualifiers */
+  public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders";
+  public static final boolean DEPOSIT_HEADERS_DEFAULT = false;
   
   /* This is a nonce used in HBase row-keys, such that the same row-key
    * never gets written more than once from within this JVM. */
@@ -72,7 +76,9 @@ public class RegexHbaseEventSerializer implements \
HbaseEventSerializer {  protected byte[] cf;
   private byte[] payload;
   private List<byte[]> colNames = Lists.newArrayList();
+  private Map<String, String> headers;
   private boolean regexIgnoreCase;
+  private boolean depositHeaders;
   private Pattern inputPattern;
   
   @Override
@@ -80,6 +86,8 @@ public class RegexHbaseEventSerializer implements \
HbaseEventSerializer {  String regex = context.getString(REGEX_CONFIG, \
REGEX_DEFAULT);  regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG, 
         INGORE_CASE_DEFAULT);
+    depositHeaders = context.getBoolean(DEPOSIT_HEADERS_CONFIG,
+        DEPOSIT_HEADERS_DEFAULT);
     inputPattern = Pattern.compile(regex, Pattern.DOTALL
         + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));
     
@@ -96,6 +104,7 @@ public class RegexHbaseEventSerializer implements \
HbaseEventSerializer {  
   @Override
   public void initialize(Event event, byte[] columnFamily) {
+    this.headers = event.getHeaders();
     this.payload = event.getBody();
     this.cf = columnFamily;
   }
@@ -142,19 +151,23 @@ public class RegexHbaseEventSerializer implements \
HbaseEventSerializer {  return Lists.newArrayList();
     }
     
-  	try {
+    try {
       rowKey = getRowKey();
       Put put = new Put(rowKey);
       
       for (int i = 0; i < colNames.size(); i++) {
         put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
       }
+      if (depositHeaders) {
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+          put.add(cf, entry.getKey().getBytes(Charsets.UTF_8), \
entry.getValue().getBytes(Charsets.UTF_8)); +        }
+      }
       actions.add(put);
-  	}
-  	catch (Exception e) {
-  	  throw new FlumeException("Could not get row key!", e);
-  	}
-  	return actions;
+    } catch (Exception e) {
+      throw new FlumeException("Could not get row key!", e);
+    }
+    return actions;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/f4239fca/flume-ng-sinks/flume-ng-hba \
                se-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
 index f63eed0..6cec36f 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
                
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
 @@ -158,4 +158,42 @@ public class TestRegexHbaseEventSerializer {
     assertEquals("100-" + randomString + "-2", rk3);
     
   }
-}
\ No newline at end of file
+
+   @Test
+  /** Test depositing of the header information. */
+  public void testDepositHeaders() throws Exception {
+    RegexHbaseEventSerializer s = new RegexHbaseEventSerializer();
+    Context context = new Context();
+    context.put(RegexHbaseEventSerializer.DEPOSIT_HEADERS_CONFIG,
+        "true");
+    s.configure(context);
+
+    String body = "body";
+    Map<String, String> headers = Maps.newHashMap();
+    headers.put("header1", "value1");
+    headers.put("header2", "value2");
+
+    Event e = EventBuilder.withBody(Bytes.toBytes(body), headers);
+    s.initialize(e, "CF".getBytes());
+    List<Row> actions = s.getActions();
+    assertEquals(1, s.getActions().size());
+    assertTrue(actions.get(0) instanceof Put);
+
+    Put put = (Put) actions.get(0);
+    assertTrue(put.getFamilyMap().containsKey(s.cf));
+    List<KeyValue> kvPairs = put.getFamilyMap().get(s.cf);
+    assertTrue(kvPairs.size() == 3);
+
+    Map<String, String> resultMap = Maps.newHashMap();
+    for (KeyValue kv : kvPairs) {
+      resultMap.put(new String(kv.getQualifier()), new String(kv.getValue()));
+    }
+
+    assertEquals(body, resultMap.get("payload"));
+    assertEquals("value1", resultMap.get("header1"));
+    assertEquals("value2", resultMap.get("header2"));
+
+    List<Increment> increments = s.getIncrements();
+    assertEquals(0, increments.size());
+  }
+}


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic