[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    [activemq-artemis] branch master updated: ARTEMIS-2241 Support direct deliver for InVMAcceptors
From:       michaelpearce () apache ! org
Date:       2019-01-30 8:11:35
Message-ID: 154883589512.16382.14122906101041386597 () gitbox ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new b76f006  ARTEMIS-2241 Support direct deliver for InVMAcceptors
     new 5fe8688  This closes #2525
b76f006 is described below

commit b76f0061f8ec15a8d46c2a8244a5f18eade11fb2
Author: Michael André Pearce <michael.andre.pearce@me.com>
AuthorDate: Fri Jan 25 00:35:55 2019 +0000

    ARTEMIS-2241 Support direct deliver for InVMAcceptors
    
    Push isDirectDeliver method from netty impl, to the Connection interface
    Add support to InVMConnection for isDirectDeliver flag and ability to set via \
config, defaulting to false, to keep current default behavior.  Extend \
                DirectDeliverTest to check InVM as well.
---
 .../core/remoting/impl/netty/NettyConnection.java  |  1 +
 .../artemis/spi/core/remoting/Connection.java      |  2 ++
 .../core/protocol/core/impl/ChannelImplTest.java   |  5 +++
 .../protocol/core/ServerSessionPacketHandler.java  |  7 +---
 .../core/remoting/impl/invm/InVMAcceptor.java      |  5 +++
 .../core/remoting/impl/invm/InVMConnection.java    | 12 +++++++
 .../remoting/impl/invm/TransportConstants.java     |  4 +++
 .../integration/remoting/DirectDeliverTest.java    | 41 ++++++++++++++++------
 8 files changed, 61 insertions(+), 16 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java \
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 index 1032a35..f3e5678 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
                
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 @@ -429,6 +429,7 @@ public class NettyConnection implements Connection {
       return "tcp://" + IPV6Util.encloseHost(address.toString());
    }
 
+   @Override
    public final boolean isDirectDeliver() {
       return directDeliver;
    }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java \
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
 index 46e1534..0683a50 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
                
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
 @@ -140,6 +140,8 @@ public interface Connection {
     */
    TransportConfiguration getConnectorConfig();
 
+   boolean isDirectDeliver();
+
    ActiveMQPrincipal getDefaultActiveMQPrincipal();
 
    /**
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java \
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
 index 416c911..3f6bdca 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
                
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
 @@ -412,6 +412,11 @@ public class ChannelImplTest {
             }
 
             @Override
+            public boolean isDirectDeliver() {
+               return false;
+            }
+
+            @Override
             public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
                return null;
             }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 index faefa39..88c35bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
                
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 @@ -86,7 +86,6 @@ import \
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS  import \
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; \
import org.apache.activemq.artemis.core.remoting.CloseListener;  import \
                org.apache.activemq.artemis.core.remoting.FailureListener;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -192,11 +191,7 @@ public class ServerSessionPacketHandler implements \
ChannelHandler {  // use the same executor
       this.packetActor = new Actor<>(callExecutor, this::onMessagePacket);
 
-      if (conn instanceof NettyConnection) {
-         direct = ((NettyConnection) conn).isDirectDeliver();
-      } else {
-         direct = false;
-      }
+      this.direct = conn.isDirectDeliver();
    }
 
    private void clearLargeMessage() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
 index a2fdeb0..f8d1915 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
                
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
 @@ -75,6 +75,8 @@ public final class InVMAcceptor extends AbstractAcceptor {
 
    private final boolean enableBufferPooling;
 
+   private final boolean directDeliver;
+
    public InVMAcceptor(final String name,
                        final ClusterConnection clusterConnection,
                        final Map<String, Object> configuration,
@@ -101,6 +103,8 @@ public final class InVMAcceptor extends AbstractAcceptor {
       connectionsAllowed = \
ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED, \
TransportConstants.DEFAULT_CONNECTIONS_ALLOWED, configuration);  
       enableBufferPooling = \
ConfigurationHelper.getBooleanProperty(TransportConstants.BUFFER_POOLING, \
TransportConstants.DEFAULT_BUFFER_POOLING, configuration); +
+      directDeliver = \
ConfigurationHelper.getBooleanProperty(TransportConstants.DIRECT_DELIVER, \
TransportConstants.DEFAULT_DIRECT_DELIVER, configuration);  }
 
    @Override
@@ -228,6 +232,7 @@ public final class InVMAcceptor extends AbstractAcceptor {
 
       InVMConnection inVMConnection = new InVMConnection(id, connectionID, \
remoteHandler, connectionListener, clientExecutor, defaultActiveMQPrincipal);  \
inVMConnection.setEnableBufferPooling(enableBufferPooling); +      \
inVMConnection.setDirectDeliver(directDeliver);  
       connectionListener.connectionCreated(this, inVMConnection, \
protocolMap.get(ActiveMQClient.DEFAULT_CORE_PROTOCOL));  }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
 index 46f7fea..ba4b997 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
                
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
 @@ -65,6 +65,8 @@ public class InVMConnection implements Connection {
 
    private boolean bufferPoolingEnabled = TransportConstants.DEFAULT_BUFFER_POOLING;
 
+   private boolean directDeliver = TransportConstants.DEFAULT_DIRECT_DELIVER;
+
    public InVMConnection(final int serverID,
                          final BufferHandler handler,
                          final BaseConnectionLifeCycleListener listener,
@@ -86,6 +88,7 @@ public class InVMConnection implements Connection {
                          final BaseConnectionLifeCycleListener listener,
                          final Executor executor,
                          final ActiveMQPrincipal defaultActiveMQPrincipal) {
+
       this.serverID = serverID;
 
       this.handler = handler;
@@ -276,6 +279,15 @@ public class InVMConnection implements Connection {
    }
 
    @Override
+   public boolean isDirectDeliver() {
+      return directDeliver;
+   }
+
+   public void setDirectDeliver(boolean directDeliver) {
+      this.directDeliver = directDeliver;
+   }
+
+   @Override
    public String toString() {
       return "InVMConnection [serverID=" + serverID + ", id=" + id + "]";
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
 index c02b0fd..18ba094 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
                
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
 @@ -30,6 +30,10 @@ public final class TransportConstants {
 
    public static final boolean DEFAULT_BUFFER_POOLING = true;
 
+   public static final boolean DEFAULT_DIRECT_DELIVER = false;
+
+   public static final String DIRECT_DELIVER = "directDeliver";
+
    private TransportConstants() {
       // Utility class
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/DirectDeliverTest.java \
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/DirectDeliverTest.java
 index ab00955..9fa0024 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/DirectDeliverTest.java
                
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/DirectDeliverTest.java
 @@ -29,8 +29,8 @@ import \
org.apache.activemq.artemis.api.core.client.ClientSessionFactory;  import \
org.apache.activemq.artemis.api.core.client.ServerLocator;  import \
org.apache.activemq.artemis.core.config.Configuration;  import \
org.apache.activemq.artemis.core.postoffice.Binding; +import \
org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;  import \
                org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
                
-import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -43,31 +43,52 @@ public class DirectDeliverTest extends ActiveMQTestBase {
 
    private ActiveMQServer server;
 
-   private ServerLocator locator;
+   private ServerLocator nettyLocator;
+   private ServerLocator inVMLocator;
 
    @Override
    @Before
    public void setUp() throws Exception {
       super.setUp();
 
-      Map<String, Object> params = new HashMap<>();
-      params.put(TransportConstants.DIRECT_DELIVER, true);
+      Map<String, Object> nettyParams = new HashMap<>();
+      nettyParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DIRECT_DELIVER, \
true);  
-      TransportConfiguration tc = new \
TransportConfiguration(NettyAcceptorFactory.class.getName(), params); +      \
TransportConfiguration nettyTransportConfiguration = new \
TransportConfiguration(NettyAcceptorFactory.class.getName(), nettyParams); +
+      Map<String, Object> inVMParams = new HashMap<>();
+      inVMParams.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.DIRECT_DELIVER, \
true); +
+      TransportConfiguration inVMTransportConfiguration = new \
TransportConfiguration(InVMAcceptorFactory.class.getName(), inVMParams); +
+      Configuration config = createBasicConfig();
+      config.addAcceptorConfiguration(nettyTransportConfiguration);
+      config.addAcceptorConfiguration(inVMTransportConfiguration);
 
-      Configuration config = createBasicConfig().addAcceptorConfiguration(tc);
       server = createServer(false, config);
       server.start();
 
-      locator = createNettyNonHALocator();
-      addServerLocator(locator);
+      nettyLocator = createNettyNonHALocator();
+      addServerLocator(nettyLocator);
+
+      inVMLocator = createInVMLocator(0);
+      addServerLocator(inVMLocator);
    }
 
    @Test
-   public void testDirectDeliver() throws Exception {
+   public void testDirectDeliverNetty() throws Exception {
+      testDirectDeliver(nettyLocator);
+   }
+
+   @Test
+   public void testDirectDeliverInVM() throws Exception {
+      testDirectDeliver(inVMLocator);
+   }
+
+   private void testDirectDeliver(ServerLocator serverLocator) throws Exception {
       final String foo = "foo";
 
-      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSessionFactory sf = createSessionFactory(serverLocator);
 
       ClientSession session = sf.createSession();
 


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic