[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-commits
Subject:    git commit: FLUME-1757. Improve configuration of hbase serializers.
From:       hshreedharan () apache ! org
Date:       2013-05-21 3:14:39
Message-ID: 8367ceeec548488c96ff78e91955d62c () git ! apache ! org
[Download RAW message or body]

Updated Branches:
  refs/heads/trunk a13e9e6a8 -> 669e5d327


FLUME-1757. Improve configuration of hbase serializers.

(Sravya Tirukkovalur via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/669e5d32
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/669e5d32
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/669e5d32

Branch: refs/heads/trunk
Commit: 669e5d327720021391245949d6a947b4e963b728
Parents: a13e9e6
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Mon May 20 20:12:38 2013 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Mon May 20 20:12:38 2013 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    2 +-
 .../hbase/SimpleAsyncHbaseEventSerializer.java     |    4 +-
 .../sink/hbase/SimpleHbaseEventSerializer.java     |    4 +-
 .../flume/sink/hbase/TestAsyncHBaseSink.java       |   42 ++++++++++++++-
 .../org/apache/flume/sink/hbase/TestHBaseSink.java |   41 +++++++++++++-
 5 files changed, 84 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-doc/sphinx/FlumeUserGuide.rst
                
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst \
b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 2ee41be..1b4d216 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1697,7 +1697,7 @@ Property Name       Default                                     \
                Desc
 **table**           --                                                      The name \
                of the table in Hbase to write to.
 **columnFamily**    --                                                      The \
column family in Hbase to write to.  batchSize           100                          \
                Number of events to be written per txn.
-serializer          org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
+serializer          org.apache.flume.sink.hbase.SimpleHbaseEventSerializer  Default \
increment column = "iCol", payload column = "pCol".  serializer.*        --           \
Properties to be passed to the serializer.  kerberosPrincipal   --                    \
Kerberos user principal for accessing secure HBase  kerberosKeytab      --            \
Kerberos keytab for accessing secure HBase

http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hba \
se-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
 index dd19616..96095d1 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
                
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
 @@ -115,8 +115,8 @@ public class SimpleAsyncHbaseEventSerializer implements \
AsyncHbaseEventSerialize  
   @Override
   public void configure(Context context) {
-    String pCol = context.getString("payloadColumn");
-    String iCol = context.getString("incrementColumn");
+    String pCol = context.getString("payloadColumn", "pCol");
+    String iCol = context.getString("incrementColumn", "iCol");
     rowPrefix = context.getString("rowPrefix", "default");
     String suffix = context.getString("suffix", "uuid");
     if(pCol != null && !pCol.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hba \
                se-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
 index 52bc84d..758252b 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
                
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
 @@ -70,8 +70,8 @@ public class SimpleHbaseEventSerializer implements \
                HbaseEventSerializer {
         context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
     String suffix = context.getString("suffix", "uuid");
 
-    String payloadColumn = context.getString("payloadColumn");
-    String incColumn = context.getString("incrementColumn");
+    String payloadColumn = context.getString("payloadColumn","pCol");
+    String incColumn = context.getString("incrementColumn","iCol");
     if(payloadColumn != null && !payloadColumn.isEmpty()) {
       if(suffix.equals("timestamp")){
         keyType = KeyType.TS;

http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
 index 03c3e4c..7ddfdae 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
                
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
 @@ -69,8 +69,8 @@ public class TestAsyncHBaseSink {
 
   private static String tableName = "TestHbaseSink";
   private static String columnFamily = "TestColumnFamily";
-  private static String inColumn = "Increment";
-  private static String plCol = "pc";
+  private static String inColumn = "iCol";
+  private static String plCol = "pCol";
   private static Context ctx = new Context();
   private static String valBase = "testing hbase sink: jham";
   private boolean deleteTable = true;
@@ -164,6 +164,44 @@ public class TestAsyncHBaseSink {
   }
 
   @Test
+  public void testOneEventWithDefaults() throws Exception {
+    Map<String,String> ctxMap = new HashMap<String,String>();
+    ctxMap.put("table", tableName);
+    ctxMap.put("columnFamily", columnFamily);
+    ctxMap.put("serializer",
+            "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer");
+    ctxMap.put("keep-alive", "0");
+    ctxMap.put("timeout", "10000");
+    Context tmpctx = new Context();
+    tmpctx.putAll(ctxMap);
+
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    deleteTable = true;
+    AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
+    Configurables.configure(sink, tmpctx);
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, tmpctx);
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = EventBuilder.withBody(
+            Bytes.toBytes(valBase));
+    channel.put(e);
+    tx.commit();
+    tx.close();
+    Assert.assertFalse(sink.isConfNull());
+    sink.process();
+    sink.stop();
+    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    byte[][] results = getResults(table, 1);
+    byte[] out = results[0];
+    Assert.assertArrayEquals(e.getBody(), out);
+    out = results[1];
+    Assert.assertArrayEquals(Longs.toByteArray(1), out);
+  }
+
+  @Test
   public void testOneEvent() throws Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     deleteTable = true;

http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
 index ad94fc9..ab4128e 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
                
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
 @@ -55,8 +55,8 @@ public class TestHBaseSink {
   private static HBaseTestingUtility testUtility = new HBaseTestingUtility();
   private static String tableName = "TestHbaseSink";
   private static String columnFamily = "TestColumnFamily";
-  private static String inColumn = "Increment";
-  private static String plCol = "pc";
+  private static String inColumn = "iCol";
+  private static String plCol = "pCol";
   private static Context ctx = new Context();
   private static String valBase = "testing hbase sink: jham";
 
@@ -80,6 +80,43 @@ public class TestHBaseSink {
   }
 
   @Test
+  public void testOneEventWithDefaults() throws Exception {
+    //Create a context without setting increment column and payload Column
+    Map<String,String> ctxMap = new HashMap<String,String>();
+    ctxMap.put("table", tableName);
+    ctxMap.put("columnFamily", columnFamily);
+    ctxMap.put("serializer",
+            "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer");
+    Context tmpctx = new Context();
+    tmpctx.putAll(ctxMap);
+
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    Configurables.configure(sink, tmpctx);
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = EventBuilder.withBody(
+            Bytes.toBytes(valBase));
+    channel.put(e);
+    tx.commit();
+    tx.close();
+
+    sink.process();
+    sink.stop();
+    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    byte[][] results = getResults(table, 1);
+    byte[] out = results[0];
+    Assert.assertArrayEquals(e.getBody(), out);
+    out = results[1];
+    Assert.assertArrayEquals(Longs.toByteArray(1), out);
+    testUtility.deleteTable(tableName.getBytes());
+  }
+
+  @Test
   public void testOneEvent() throws Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     HBaseSink sink = new HBaseSink(testUtility.getConfiguration());


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic