[prev in list] [next in list] [prev in thread] [next in thread] 

List:       hadoop-commits
Subject:    [hadoop] 02/02: HADOOP-18406: Adds alignment context to call path for creating RPC proxy with multip
From:       omalley () apache ! org
Date:       2022-08-24 23:49:31
Message-ID: 20220824234929.D0CB2440715 () gitbox2-he-fi ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 0326b7e935c839c5e0aecdf496a2b08719250c34
Author: Simba Dzinamarira <sdzinamarira@linkedin.com>
AuthorDate: Wed Aug 17 09:33:33 2022 -0400

    HADOOP-18406: Adds alignment context to call path for creating RPC proxy with \
multiple connections per user.  
    Fixes #4748
    
    Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
---
 .../org/apache/hadoop/ipc/ProtobufRpcEngine.java   | 15 +++++++-------
 .../org/apache/hadoop/ipc/ProtobufRpcEngine2.java  | 15 +++++++-------
 .../src/main/java/org/apache/hadoop/ipc/RPC.java   | 23 +++++++++++++++++++++-
 .../main/java/org/apache/hadoop/ipc/RpcEngine.java |  4 +++-
 .../org/apache/hadoop/ipc/WritableRpcEngine.java   |  6 ++++--
 .../test/java/org/apache/hadoop/ipc/TestRPC.java   |  3 ++-
 .../java/org/apache/hadoop/ipc/TestRpcBase.java    |  3 ++-
 7 files changed, 49 insertions(+), 20 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
 index dce6631bb1d..01fceeb954e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
 @@ -80,9 +80,9 @@ public class ProtobufRpcEngine implements RpcEngine {
   @Override
   @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      ConnectionId connId, Configuration conf, SocketFactory factory)
-      throws IOException {
-    final Invoker invoker = new Invoker(protocol, connId, conf, factory);
+      ConnectionId connId, Configuration conf, SocketFactory factory,
+      AlignmentContext alignmentContext) throws IOException {
+    final Invoker invoker = new Invoker(protocol, connId, conf, factory, \
alignmentContext);  return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
         protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
   }
@@ -126,7 +126,7 @@ public class ProtobufRpcEngine implements RpcEngine {
     return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
         (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
             new Class[] { protocol }, new Invoker(protocol, connId, conf,
-                factory)), false);
+                factory, null)), false);
   }
 
   protected static class Invoker implements RpcInvocationHandler {
@@ -147,9 +147,8 @@ public class ProtobufRpcEngine implements RpcEngine {
         throws IOException {
       this(protocol, Client.ConnectionId.getConnectionId(
           addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
-          conf, factory);
+          conf, factory, alignmentContext);
       this.fallbackToSimpleAuth = fallbackToSimpleAuth;
-      this.alignmentContext = alignmentContext;
     }
     
     /**
@@ -158,14 +157,16 @@ public class ProtobufRpcEngine implements RpcEngine {
      * @param connId input connId.
      * @param conf input Configuration.
      * @param factory input factory.
+     * @param alignmentContext Alignment context
      */
     protected Invoker(Class<?> protocol, Client.ConnectionId connId,
-        Configuration conf, SocketFactory factory) {
+        Configuration conf, SocketFactory factory, AlignmentContext \
alignmentContext) {  this.remoteId = connId;
       this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
       this.protocolName = RPC.getProtocolName(protocol);
       this.clientProtocolVersion = RPC
           .getProtocolVersion(protocol);
+      this.alignmentContext = alignmentContext;
     }
 
     private RequestHeaderProto constructRpcRequestHeader(Method method) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
 index ea2dbba467e..3594320ce06 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
 @@ -103,9 +103,9 @@ public class ProtobufRpcEngine2 implements RpcEngine {
   @Override
   @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      ConnectionId connId, Configuration conf, SocketFactory factory)
-      throws IOException {
-    final Invoker invoker = new Invoker(protocol, connId, conf, factory);
+      ConnectionId connId, Configuration conf, SocketFactory factory,
+      AlignmentContext alignmentContext) throws IOException {
+    final Invoker invoker = new Invoker(protocol, connId, conf, factory, \
alignmentContext);  return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
         protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
   }
@@ -133,7 +133,7 @@ public class ProtobufRpcEngine2 implements RpcEngine {
     return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
         (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
             new Class[]{protocol}, new Invoker(protocol, connId, conf,
-                factory)), false);
+                factory, null)), false);
   }
 
   protected static class Invoker implements RpcInvocationHandler {
@@ -154,9 +154,8 @@ public class ProtobufRpcEngine2 implements RpcEngine {
         throws IOException {
       this(protocol, Client.ConnectionId.getConnectionId(
           addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
-          conf, factory);
+          conf, factory, alignmentContext);
       this.fallbackToSimpleAuth = fallbackToSimpleAuth;
-      this.alignmentContext = alignmentContext;
     }
 
     /**
@@ -166,14 +165,16 @@ public class ProtobufRpcEngine2 implements RpcEngine {
      * @param connId input connId.
      * @param conf input Configuration.
      * @param factory input factory.
+     * @param alignmentContext Alignment context
      */
     protected Invoker(Class<?> protocol, Client.ConnectionId connId,
-        Configuration conf, SocketFactory factory) {
+        Configuration conf, SocketFactory factory, AlignmentContext \
alignmentContext) {  this.remoteId = connId;
       this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
       this.protocolName = RPC.getProtocolName(protocol);
       this.clientProtocolVersion = RPC
           .getProtocolVersion(protocol);
+      this.alignmentContext = alignmentContext;
     }
 
     private RequestHeaderProto constructRpcRequestHeader(Method method) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java \
                index 7f35b13aec9..fc562b525ad 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
 @@ -558,11 +558,32 @@ public class RPC {
   public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
       long clientVersion, ConnectionId connId, Configuration conf,
       SocketFactory factory) throws IOException {
+    return getProtocolProxy(protocol, clientVersion, connId, conf,
+        factory, null);
+  }
+
+  /**
+   * Get a protocol proxy that contains a proxy connection to a remote server
+   * and a set of methods that are supported by the server.
+   *
+   * @param <T> Generics Type T
+   * @param protocol protocol class
+   * @param clientVersion client's version
+   * @param connId client connection identifier
+   * @param conf configuration
+   * @param factory socket factory
+   * @param alignmentContext StateID alignment context
+   * @return the protocol proxy
+   * @throws IOException if the far end through a RemoteException
+   */
+  public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+      long clientVersion, ConnectionId connId, Configuration conf,
+      SocketFactory factory, AlignmentContext alignmentContext) throws IOException {
     if (UserGroupInformation.isSecurityEnabled()) {
       SaslRpcServer.init(conf);
     }
     return getProtocolEngine(protocol, conf).getProxy(
-        protocol, clientVersion, connId, conf, factory);
+        protocol, clientVersion, connId, conf, factory, alignmentContext);
   }
   
   /**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
 index 1f0ff2d99d3..f322f6eb98a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
 @@ -66,11 +66,13 @@ public interface RpcEngine {
    * @param connId input ConnectionId.
    * @param conf input Configuration.
    * @param factory input factory.
+   * @param alignmentContext Alignment context
    * @throws IOException raised on errors performing I/O.
    * @return ProtocolProxy.
    */
   <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      Client.ConnectionId connId, Configuration conf, SocketFactory factory)
+      Client.ConnectionId connId, Configuration conf, SocketFactory factory,
+      AlignmentContext alignmentContext)
       throws IOException;
 
   /**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
 index 3e4ee707d46..d92bcea5d2e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
                
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
 @@ -315,16 +315,18 @@ public class WritableRpcEngine implements RpcEngine {
    * @param connId input ConnectionId.
    * @param conf input Configuration.
    * @param factory input factory.
+   * @param alignmentContext Alignment context
    * @throws IOException raised on errors performing I/O.
    * @return ProtocolProxy.
    */
   @Override
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      Client.ConnectionId connId, Configuration conf, SocketFactory factory)
+      Client.ConnectionId connId, Configuration conf, SocketFactory factory,
+      AlignmentContext alignmentContext)
       throws IOException {
     return getProxy(protocol, clientVersion, connId.getAddress(),
         connId.getTicket(), conf, factory, connId.getRpcTimeout(),
-        connId.getRetryPolicy(), null, null);
+        connId.getRetryPolicy(), null, alignmentContext);
   }
 
   /**
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
 index a184ea173e5..85b0a7b6c84 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
                
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
 @@ -294,7 +294,8 @@ public class TestRPC extends TestRpcBase {
 
     @Override
     public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-        ConnectionId connId, Configuration conf, SocketFactory factory)
+        ConnectionId connId, Configuration conf, SocketFactory factory,
+        AlignmentContext alignmentContext)
         throws IOException {
       throw new UnsupportedOperationException("This proxy is not supported");
     }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
 index 7635b16dac0..5b5c8bbaa9b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
                
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
 @@ -189,7 +189,8 @@ public class TestRpcBase {
           0,
           connId,
           clientConf,
-          NetUtils.getDefaultSocketFactory(clientConf)).getProxy();
+          NetUtils.getDefaultSocketFactory(clientConf),
+          null).getProxy();
     } catch (IOException e) {
       throw new ServiceException(e);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic