[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: flume git commit: FLUME-2373. Support TBinaryProtocol in Thrift RPC.
From: hshreedharan () apache ! org
Date: 2014-12-18 20:14:44
Message-ID: 23f4e8c919144fdda0729316d826394e () git ! apache ! org
[Download RAW message or body]
Repository: flume
Updated Branches:
refs/heads/trunk 1003d1f41 -> 34e9bda31
FLUME-2373. Support TBinaryProtocol in Thrift RPC.
(Stefan Krawczyk via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/34e9bda3
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/34e9bda3
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/34e9bda3
Branch: refs/heads/trunk
Commit: 34e9bda312506a118fad87fcbdecc48bf3918c95
Parents: 1003d1f
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Dec 18 12:13:19 2014 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Dec 18 12:13:19 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/flume/sink/ThriftSink.java | 4 +--
.../org/apache/flume/source/ThriftSource.java | 29 ++++++++++++++++--
.../org/apache/flume/sink/TestThriftSink.java | 11 ++++---
.../org/apache/flume/api/ThriftRpcClient.java | 32 ++++++++++++++++++--
.../apache/flume/api/TestThriftRpcClient.java | 13 ++++----
.../apache/flume/api/ThriftTestingSource.java | 15 +++++++--
6 files changed, 85 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/34e9bda3/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java \
b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java index \
48a9775..baa60d0 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
@@ -18,11 +18,11 @@
*/
package org.apache.flume.sink;
+import java.util.Properties;
+
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClientFactory;
-
-import java.util.Properties;
/**
* <p>
* A {@link org.apache.flume.Sink} implementation that can send events to an RPC \
server (such as
http://git-wip-us.apache.org/repos/asf/flume/blob/34e9bda3/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java \
b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java index \
c3881b4..551fe13 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
@@ -34,6 +34,7 @@ import org.apache.flume.thrift.ThriftSourceProtocol;
import org.apache.flume.thrift.ThriftFlumeEvent;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFastFramedTransport;
@@ -70,6 +71,13 @@ public class ThriftSource extends AbstractSource implements \
Configurable,
* Config param for the port to listen on.
*/
public static final String CONFIG_PORT = "port";
+ /**
+ * Config param for the thrift protocol to use.
+ */
+ public static final String CONFIG_PROTOCOL = "protocol";
+ public static final String BINARY_PROTOCOL = "binary";
+ public static final String COMPACT_PROTOCOL = "compact";
+
private Integer port;
private String bindAddress;
private int maxThreads = 0;
@@ -77,6 +85,7 @@ public class ThriftSource extends AbstractSource implements \
Configurable, private TServer server;
private TServerTransport serverTransport;
private ExecutorService servingExecutor;
+ private String protocol;
@Override
public void configure(Context context) {
@@ -98,6 +107,17 @@ public class ThriftSource extends AbstractSource implements \
Configurable, if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
}
+
+ protocol = context.getString(CONFIG_PROTOCOL);
+ if (protocol == null) {
+ // default is to use the compact protocol.
+ protocol = COMPACT_PROTOCOL;
+ }
+ Preconditions.checkArgument(
+ (protocol.equalsIgnoreCase(BINARY_PROTOCOL) ||
+ protocol.equalsIgnoreCase(COMPACT_PROTOCOL)),
+ "binary or compact are the only valid Thrift protocol types to " +
+ "choose from.");
}
@Override
@@ -167,8 +187,13 @@ public class ThriftSource extends AbstractSource implements \
Configurable, }
try {
-
- args.protocolFactory(new TCompactProtocol.Factory());
+ if (protocol.equals(BINARY_PROTOCOL)) {
+ logger.info("Using TBinaryProtocol");
+ args.protocolFactory(new TBinaryProtocol.Factory());
+ } else {
+ logger.info("Using TCompactProtocol");
+ args.protocolFactory(new TCompactProtocol.Factory());
+ }
args.inputTransportFactory(new TFastFramedTransport.Factory());
args.outputTransportFactory(new TFastFramedTransport.Factory());
args.processor(new ThriftSourceProtocol
http://git-wip-us.apache.org/repos/asf/flume/blob/34e9bda3/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java \
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java index \
5f70d1b..fccaede 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
@@ -19,15 +19,18 @@
package org.apache.flume.sink;
import com.google.common.base.Charsets;
+
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
+import org.apache.flume.api.ThriftRpcClient;
import org.apache.flume.api.ThriftTestingSource;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.ThriftSource;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -58,7 +61,7 @@ public class TestThriftSink {
context.put("port", String.valueOf(port));
context.put("batch-size", String.valueOf(2));
context.put("request-timeout", String.valueOf(2000L));
-
+ context.put(ThriftRpcClient.CONFIG_PROTOCOL, ThriftRpcClient.COMPACT_PROTOCOL);
sink.setChannel(channel);
Configurables.configure(sink, context);
@@ -77,7 +80,7 @@ public class TestThriftSink {
Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(),
- port);
+ port, ThriftRpcClient.COMPACT_PROTOCOL);
channel.start();
sink.start();
@@ -108,7 +111,7 @@ public class TestThriftSink {
public void testTimeout() throws Exception {
AtomicLong delay = new AtomicLong();
src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ALTERNATE
- .name(), port);
+ .name(), port, ThriftRpcClient.COMPACT_PROTOCOL);
src.setDelay(delay);
delay.set(2500);
@@ -182,7 +185,7 @@ public class TestThriftSink {
}
src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(),
- port);
+ port, ThriftRpcClient.COMPACT_PROTOCOL);
for (int i = 0; i < 5; i++) {
Sink.Status status = sink.process();
http://git-wip-us.apache.org/repos/asf/flume/blob/34e9bda3/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java \
b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java index \
cf45ab9..6382a0e 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
@@ -24,6 +24,7 @@ import org.apache.flume.FlumeException;
import org.apache.flume.thrift.Status;
import org.apache.flume.thrift.ThriftFlumeEvent;
import org.apache.flume.thrift.ThriftSourceProtocol;
+import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
@@ -57,6 +58,13 @@ public class ThriftRpcClient extends AbstractRpcClient {
private static final Logger LOGGER =
LoggerFactory.getLogger(ThriftRpcClient.class);
+ /**
+ * Config param for the thrift protocol to use.
+ */
+ public static final String CONFIG_PROTOCOL = "protocol";
+ public static final String BINARY_PROTOCOL = "binary";
+ public static final String COMPACT_PROTOCOL = "compact";
+
private int batchSize;
private long requestTimeout;
private final Lock stateLock;
@@ -68,6 +76,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
private final AtomicLong threadCounter;
private int connectionPoolSize;
private final Random random = new Random();
+ private String protocol;
public ThriftRpcClient() {
stateLock = new ReentrantLock(true);
@@ -267,6 +276,18 @@ public class ThriftRpcClient extends AbstractRpcClient {
HostInfo host = HostInfo.getHostInfoList(properties).get(0);
hostname = host.getHostName();
port = host.getPortNumber();
+ protocol = properties.getProperty(CONFIG_PROTOCOL);
+ if (protocol == null) {
+ // default is to use the compact protocol.
+ protocol = COMPACT_PROTOCOL;
+ }
+ // check in case that garbage was put in.
+ if (!(protocol.equalsIgnoreCase(BINARY_PROTOCOL) ||
+ protocol.equalsIgnoreCase(COMPACT_PROTOCOL))) {
+ LOGGER.warn("'binary' or 'compact' are the only valid Thrift protocol types \
to " + + "choose from. Defaulting to 'compact'.");
+ protocol = COMPACT_PROTOCOL;
+ }
batchSize = Integer.parseInt(properties.getProperty(
RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE.toString()));
@@ -322,8 +343,15 @@ public class ThriftRpcClient extends AbstractRpcClient {
public ClientWrapper() throws Exception{
transport = new TFastFramedTransport(new TSocket(hostname, port));
transport.open();
- client = new ThriftSourceProtocol.Client(new TCompactProtocol
- (transport));
+ if (protocol.equals(BINARY_PROTOCOL)) {
+ LOGGER.info("Using TBinaryProtocol");
+ client = new ThriftSourceProtocol.Client(new TBinaryProtocol
+ (transport));
+ } else {
+ LOGGER.info("Using TCompactProtocol");
+ client = new ThriftSourceProtocol.Client(new TCompactProtocol
+ (transport));
+ }
// Not a great hash code, but since this class is immutable and there
// is at most one instance of the components of this class,
// this works fine [If the objects are equal, hash code is the same]
http://git-wip-us.apache.org/repos/asf/flume/blob/34e9bda3/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java \
b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java index \
88eb5e7..a8baaa8 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java
@@ -56,6 +56,7 @@ public class TestThriftRpcClient {
props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "10");
props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
"2000");
+ props.setProperty(ThriftRpcClient.CONFIG_PROTOCOL, \
ThriftRpcClient.COMPACT_PROTOCOL); }
@After
@@ -103,7 +104,7 @@ public class TestThriftRpcClient {
@Test
public void testOK() throws Exception {
src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(),
- port);
+ port, ThriftRpcClient.COMPACT_PROTOCOL);
client = (ThriftRpcClient) RpcClientFactory.getInstance(props);
insertEvents(client, 10); //10 events
insertAsBatch(client, 10, 25); //16 events
@@ -121,7 +122,7 @@ public class TestThriftRpcClient {
@Test
public void testSlow() throws Exception {
src = new ThriftTestingSource(ThriftTestingSource.HandlerType.SLOW.name(),
- port);
+ port, ThriftRpcClient.COMPACT_PROTOCOL);
client = (ThriftRpcClient) RpcClientFactory.getInstance(props);
insertEvents(client, 2); //2 events
insertAsBatch(client, 2, 25); //24 events (3 batches)
@@ -139,7 +140,7 @@ public class TestThriftRpcClient {
@Test(expected = EventDeliveryException.class)
public void testFail() throws Exception {
src = new ThriftTestingSource(ThriftTestingSource.HandlerType.FAIL.name(),
- port);
+ port, ThriftRpcClient.COMPACT_PROTOCOL);
client = (ThriftRpcClient) RpcClientFactory.getInstance(props);
insertEvents(client, 2); //2 events
Assert.fail("Expected EventDeliveryException to be thrown.");
@@ -149,7 +150,7 @@ public class TestThriftRpcClient {
public void testError() throws Throwable {
try {
src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ERROR
- .name(), port);
+ .name(), port, ThriftRpcClient.COMPACT_PROTOCOL);
client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0" +
".0", port);
insertEvents(client, 2); //2 events
@@ -163,7 +164,7 @@ public class TestThriftRpcClient {
public void testTimeout() throws Throwable {
try {
src = new ThriftTestingSource(ThriftTestingSource.HandlerType.TIMEOUT
- .name(), port);
+ .name(), port, ThriftRpcClient.COMPACT_PROTOCOL);
client = (ThriftRpcClient) RpcClientFactory.getThriftInstance(props);
insertEvents(client, 2); //2 events
} catch (EventDeliveryException ex) {
@@ -174,7 +175,7 @@ public class TestThriftRpcClient {
@Test
public void testMultipleThreads() throws Throwable {
src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(),
- port);
+ port, ThriftRpcClient.COMPACT_PROTOCOL);
client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0" +
".0", port, 10);
int threadCount = 100;
http://git-wip-us.apache.org/repos/asf/flume/blob/34e9bda3/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java \
b/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java index \
cde7269..63d2fc3 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
@@ -25,7 +25,11 @@ import org.apache.flume.thrift.Status;
import org.apache.flume.thrift.ThriftFlumeEvent;
import org.apache.flume.thrift.ThriftSourceProtocol;
import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
@@ -180,7 +184,7 @@ public class ThriftTestingSource {
}
}
- public ThriftTestingSource(String handlerName, int port) throws Exception {
+ public ThriftTestingSource(String handlerName, int port, String protocol) throws \
Exception {
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(new
InetSocketAddress("0.0.0.0", port));
ThriftSourceProtocol.Iface handler = null;
@@ -197,11 +201,16 @@ public class ThriftTestingSource {
} else if (handlerName.equals(HandlerType.ALTERNATE.name())) {
handler = new ThriftAlternateHandler();
}
-
+ TProtocolFactory transportProtocolFactory = null;
+ if (protocol != null && protocol == ThriftRpcClient.BINARY_PROTOCOL) {
+ transportProtocolFactory = new TBinaryProtocol.Factory();
+ } else {
+ transportProtocolFactory = new TCompactProtocol.Factory();
+ }
server = new THsHaServer(new THsHaServer.Args
(serverTransport).processor(
new ThriftSourceProtocol.Processor(handler)).protocolFactory(
- new TCompactProtocol.Factory()));
+ transportProtocolFactory));
Executors.newSingleThreadExecutor().submit(new Runnable() {
@Override
public void run() {
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic