[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: git commit: FLUME-1757. Improve configuration of hbase serializers.
From: hshreedharan () apache ! org
Date: 2013-05-21 3:14:39
Message-ID: 8367ceeec548488c96ff78e91955d62c () git ! apache ! org
[Download RAW message or body]
Updated Branches:
refs/heads/trunk a13e9e6a8 -> 669e5d327
FLUME-1757. Improve configuration of hbase serializers.
(Sravya Tirukkovalur via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/669e5d32
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/669e5d32
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/669e5d32
Branch: refs/heads/trunk
Commit: 669e5d327720021391245949d6a947b4e963b728
Parents: a13e9e6
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Mon May 20 20:12:38 2013 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Mon May 20 20:12:38 2013 -0700
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 +-
.../hbase/SimpleAsyncHbaseEventSerializer.java | 4 +-
.../sink/hbase/SimpleHbaseEventSerializer.java | 4 +-
.../flume/sink/hbase/TestAsyncHBaseSink.java | 42 ++++++++++++++-
.../org/apache/flume/sink/hbase/TestHBaseSink.java | 41 +++++++++++++-
5 files changed, 84 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst \
b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 2ee41be..1b4d216 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1697,7 +1697,7 @@ Property Name Default \
Desc
**table** -- The name \
of the table in Hbase to write to.
**columnFamily** -- The \
column family in Hbase to write to. batchSize 100 \
Number of events to be written per txn.
-serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
+serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default \
increment column = "iCol", payload column = "pCol". serializer.* -- \
Properties to be passed to the serializer. kerberosPrincipal -- \
Kerberos user principal for accessing secure HBase kerberosKeytab -- \
Kerberos keytab for accessing secure HBase
http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hba \
se-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
index dd19616..96095d1 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
@@ -115,8 +115,8 @@ public class SimpleAsyncHbaseEventSerializer implements \
AsyncHbaseEventSerialize
@Override
public void configure(Context context) {
- String pCol = context.getString("payloadColumn");
- String iCol = context.getString("incrementColumn");
+ String pCol = context.getString("payloadColumn", "pCol");
+ String iCol = context.getString("incrementColumn", "iCol");
rowPrefix = context.getString("rowPrefix", "default");
String suffix = context.getString("suffix", "uuid");
if(pCol != null && !pCol.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hba \
se-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
index 52bc84d..758252b 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
@@ -70,8 +70,8 @@ public class SimpleHbaseEventSerializer implements \
HbaseEventSerializer {
context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
String suffix = context.getString("suffix", "uuid");
- String payloadColumn = context.getString("payloadColumn");
- String incColumn = context.getString("incrementColumn");
+ String payloadColumn = context.getString("payloadColumn","pCol");
+ String incColumn = context.getString("incrementColumn","iCol");
if(payloadColumn != null && !payloadColumn.isEmpty()) {
if(suffix.equals("timestamp")){
keyType = KeyType.TS;
http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index 03c3e4c..7ddfdae 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -69,8 +69,8 @@ public class TestAsyncHBaseSink {
private static String tableName = "TestHbaseSink";
private static String columnFamily = "TestColumnFamily";
- private static String inColumn = "Increment";
- private static String plCol = "pc";
+ private static String inColumn = "iCol";
+ private static String plCol = "pCol";
private static Context ctx = new Context();
private static String valBase = "testing hbase sink: jham";
private boolean deleteTable = true;
@@ -164,6 +164,44 @@ public class TestAsyncHBaseSink {
}
@Test
+ public void testOneEventWithDefaults() throws Exception {
+ Map<String,String> ctxMap = new HashMap<String,String>();
+ ctxMap.put("table", tableName);
+ ctxMap.put("columnFamily", columnFamily);
+ ctxMap.put("serializer",
+ "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer");
+ ctxMap.put("keep-alive", "0");
+ ctxMap.put("timeout", "10000");
+ Context tmpctx = new Context();
+ tmpctx.putAll(ctxMap);
+
+ testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ deleteTable = true;
+ AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
+ Configurables.configure(sink, tmpctx);
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, tmpctx);
+ sink.setChannel(channel);
+ sink.start();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ Event e = EventBuilder.withBody(
+ Bytes.toBytes(valBase));
+ channel.put(e);
+ tx.commit();
+ tx.close();
+ Assert.assertFalse(sink.isConfNull());
+ sink.process();
+ sink.stop();
+ HTable table = new HTable(testUtility.getConfiguration(), tableName);
+ byte[][] results = getResults(table, 1);
+ byte[] out = results[0];
+ Assert.assertArrayEquals(e.getBody(), out);
+ out = results[1];
+ Assert.assertArrayEquals(Longs.toByteArray(1), out);
+ }
+
+ @Test
public void testOneEvent() throws Exception {
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
deleteTable = true;
http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java \
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
index ad94fc9..ab4128e 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
@@ -55,8 +55,8 @@ public class TestHBaseSink {
private static HBaseTestingUtility testUtility = new HBaseTestingUtility();
private static String tableName = "TestHbaseSink";
private static String columnFamily = "TestColumnFamily";
- private static String inColumn = "Increment";
- private static String plCol = "pc";
+ private static String inColumn = "iCol";
+ private static String plCol = "pCol";
private static Context ctx = new Context();
private static String valBase = "testing hbase sink: jham";
@@ -80,6 +80,43 @@ public class TestHBaseSink {
}
@Test
+ public void testOneEventWithDefaults() throws Exception {
+ //Create a context without setting increment column and payload Column
+ Map<String,String> ctxMap = new HashMap<String,String>();
+ ctxMap.put("table", tableName);
+ ctxMap.put("columnFamily", columnFamily);
+ ctxMap.put("serializer",
+ "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer");
+ Context tmpctx = new Context();
+ tmpctx.putAll(ctxMap);
+
+ testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+ Configurables.configure(sink, tmpctx);
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ sink.setChannel(channel);
+ sink.start();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ Event e = EventBuilder.withBody(
+ Bytes.toBytes(valBase));
+ channel.put(e);
+ tx.commit();
+ tx.close();
+
+ sink.process();
+ sink.stop();
+ HTable table = new HTable(testUtility.getConfiguration(), tableName);
+ byte[][] results = getResults(table, 1);
+ byte[] out = results[0];
+ Assert.assertArrayEquals(e.getBody(), out);
+ out = results[1];
+ Assert.assertArrayEquals(Longs.toByteArray(1), out);
+ testUtility.deleteTable(tableName.getBytes());
+ }
+
+ @Test
public void testOneEvent() throws Exception {
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic