[prev in list] [next in list] [prev in thread] [next in thread]
List: hadoop-commits
Subject: [hadoop] 02/02: HADOOP-18406: Adds alignment context to call path for creating RPC proxy with multip
From: omalley () apache ! org
Date: 2022-08-24 23:49:31
Message-ID: 20220824234929.D0CB2440715 () gitbox2-he-fi ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 0326b7e935c839c5e0aecdf496a2b08719250c34
Author: Simba Dzinamarira <sdzinamarira@linkedin.com>
AuthorDate: Wed Aug 17 09:33:33 2022 -0400
HADOOP-18406: Adds alignment context to call path for creating RPC proxy with \
multiple connections per user.
Fixes #4748
Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
---
.../org/apache/hadoop/ipc/ProtobufRpcEngine.java | 15 +++++++-------
.../org/apache/hadoop/ipc/ProtobufRpcEngine2.java | 15 +++++++-------
.../src/main/java/org/apache/hadoop/ipc/RPC.java | 23 +++++++++++++++++++++-
.../main/java/org/apache/hadoop/ipc/RpcEngine.java | 4 +++-
.../org/apache/hadoop/ipc/WritableRpcEngine.java | 6 ++++--
.../test/java/org/apache/hadoop/ipc/TestRPC.java | 3 ++-
.../java/org/apache/hadoop/ipc/TestRpcBase.java | 3 ++-
7 files changed, 49 insertions(+), 20 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index dce6631bb1d..01fceeb954e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -80,9 +80,9 @@ public class ProtobufRpcEngine implements RpcEngine {
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- ConnectionId connId, Configuration conf, SocketFactory factory)
- throws IOException {
- final Invoker invoker = new Invoker(protocol, connId, conf, factory);
+ ConnectionId connId, Configuration conf, SocketFactory factory,
+ AlignmentContext alignmentContext) throws IOException {
+ final Invoker invoker = new Invoker(protocol, connId, conf, factory, \
alignmentContext); return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
}
@@ -126,7 +126,7 @@ public class ProtobufRpcEngine implements RpcEngine {
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, connId, conf,
- factory)), false);
+ factory, null)), false);
}
protected static class Invoker implements RpcInvocationHandler {
@@ -147,9 +147,8 @@ public class ProtobufRpcEngine implements RpcEngine {
throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
- conf, factory);
+ conf, factory, alignmentContext);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
- this.alignmentContext = alignmentContext;
}
/**
@@ -158,14 +157,16 @@ public class ProtobufRpcEngine implements RpcEngine {
* @param connId input connId.
* @param conf input Configuration.
* @param factory input factory.
+ * @param alignmentContext Alignment context
*/
protected Invoker(Class<?> protocol, Client.ConnectionId connId,
- Configuration conf, SocketFactory factory) {
+ Configuration conf, SocketFactory factory, AlignmentContext \
alignmentContext) { this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
this.protocolName = RPC.getProtocolName(protocol);
this.clientProtocolVersion = RPC
.getProtocolVersion(protocol);
+ this.alignmentContext = alignmentContext;
}
private RequestHeaderProto constructRpcRequestHeader(Method method) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
index ea2dbba467e..3594320ce06 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
@@ -103,9 +103,9 @@ public class ProtobufRpcEngine2 implements RpcEngine {
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- ConnectionId connId, Configuration conf, SocketFactory factory)
- throws IOException {
- final Invoker invoker = new Invoker(protocol, connId, conf, factory);
+ ConnectionId connId, Configuration conf, SocketFactory factory,
+ AlignmentContext alignmentContext) throws IOException {
+ final Invoker invoker = new Invoker(protocol, connId, conf, factory, \
alignmentContext); return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
}
@@ -133,7 +133,7 @@ public class ProtobufRpcEngine2 implements RpcEngine {
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[]{protocol}, new Invoker(protocol, connId, conf,
- factory)), false);
+ factory, null)), false);
}
protected static class Invoker implements RpcInvocationHandler {
@@ -154,9 +154,8 @@ public class ProtobufRpcEngine2 implements RpcEngine {
throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
- conf, factory);
+ conf, factory, alignmentContext);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
- this.alignmentContext = alignmentContext;
}
/**
@@ -166,14 +165,16 @@ public class ProtobufRpcEngine2 implements RpcEngine {
* @param connId input connId.
* @param conf input Configuration.
* @param factory input factory.
+ * @param alignmentContext Alignment context
*/
protected Invoker(Class<?> protocol, Client.ConnectionId connId,
- Configuration conf, SocketFactory factory) {
+ Configuration conf, SocketFactory factory, AlignmentContext \
alignmentContext) { this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
this.protocolName = RPC.getProtocolName(protocol);
this.clientProtocolVersion = RPC
.getProtocolVersion(protocol);
+ this.alignmentContext = alignmentContext;
}
private RequestHeaderProto constructRpcRequestHeader(Method method) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java \
index 7f35b13aec9..fc562b525ad 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -558,11 +558,32 @@ public class RPC {
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion, ConnectionId connId, Configuration conf,
SocketFactory factory) throws IOException {
+ return getProtocolProxy(protocol, clientVersion, connId, conf,
+ factory, null);
+ }
+
+ /**
+ * Get a protocol proxy that contains a proxy connection to a remote server
+ * and a set of methods that are supported by the server.
+ *
+ * @param <T> Generics Type T
+ * @param protocol protocol class
+ * @param clientVersion client's version
+ * @param connId client connection identifier
+ * @param conf configuration
+ * @param factory socket factory
+ * @param alignmentContext StateID alignment context
+ * @return the protocol proxy
+ * @throws IOException if the far end through a RemoteException
+ */
+ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+ long clientVersion, ConnectionId connId, Configuration conf,
+ SocketFactory factory, AlignmentContext alignmentContext) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol, conf).getProxy(
- protocol, clientVersion, connId, conf, factory);
+ protocol, clientVersion, connId, conf, factory, alignmentContext);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
index 1f0ff2d99d3..f322f6eb98a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
@@ -66,11 +66,13 @@ public interface RpcEngine {
* @param connId input ConnectionId.
* @param conf input Configuration.
* @param factory input factory.
+ * @param alignmentContext Alignment context
* @throws IOException raised on errors performing I/O.
* @return ProtocolProxy.
*/
<T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- Client.ConnectionId connId, Configuration conf, SocketFactory factory)
+ Client.ConnectionId connId, Configuration conf, SocketFactory factory,
+ AlignmentContext alignmentContext)
throws IOException;
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index 3e4ee707d46..d92bcea5d2e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -315,16 +315,18 @@ public class WritableRpcEngine implements RpcEngine {
* @param connId input ConnectionId.
* @param conf input Configuration.
* @param factory input factory.
+ * @param alignmentContext Alignment context
* @throws IOException raised on errors performing I/O.
* @return ProtocolProxy.
*/
@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- Client.ConnectionId connId, Configuration conf, SocketFactory factory)
+ Client.ConnectionId connId, Configuration conf, SocketFactory factory,
+ AlignmentContext alignmentContext)
throws IOException {
return getProxy(protocol, clientVersion, connId.getAddress(),
connId.getTicket(), conf, factory, connId.getRpcTimeout(),
- connId.getRetryPolicy(), null, null);
+ connId.getRetryPolicy(), null, alignmentContext);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index a184ea173e5..85b0a7b6c84 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -294,7 +294,8 @@ public class TestRPC extends TestRpcBase {
@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- ConnectionId connId, Configuration conf, SocketFactory factory)
+ ConnectionId connId, Configuration conf, SocketFactory factory,
+ AlignmentContext alignmentContext)
throws IOException {
throw new UnsupportedOperationException("This proxy is not supported");
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
index 7635b16dac0..5b5c8bbaa9b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
@@ -189,7 +189,8 @@ public class TestRpcBase {
0,
connId,
clientConf,
- NetUtils.getDefaultSocketFactory(clientConf)).getProxy();
+ NetUtils.getDefaultSocketFactory(clientConf),
+ null).getProxy();
} catch (IOException e) {
throw new ServiceException(e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic