[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: [activemq-artemis] branch master updated: ARTEMIS-2241 Support direct deliver for InVMAcceptors
From: michaelpearce () apache ! org
Date: 2019-01-30 8:11:35
Message-ID: 154883589512.16382.14122906101041386597 () gitbox ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new b76f006 ARTEMIS-2241 Support direct deliver for InVMAcceptors
new 5fe8688 This closes #2525
b76f006 is described below
commit b76f0061f8ec15a8d46c2a8244a5f18eade11fb2
Author: Michael André Pearce <michael.andre.pearce@me.com>
AuthorDate: Fri Jan 25 00:35:55 2019 +0000
ARTEMIS-2241 Support direct deliver for InVMAcceptors
Push isDirectDeliver method from netty impl, to the Connection interface
Add support to InVMConnection for isDirectDeliver flag and ability to set via \
config, defaulting to false, to keep current default behavior. Extend \
DirectDeliverTest to check InVM as well.
---
.../core/remoting/impl/netty/NettyConnection.java | 1 +
.../artemis/spi/core/remoting/Connection.java | 2 ++
.../core/protocol/core/impl/ChannelImplTest.java | 5 +++
.../protocol/core/ServerSessionPacketHandler.java | 7 +---
.../core/remoting/impl/invm/InVMAcceptor.java | 5 +++
.../core/remoting/impl/invm/InVMConnection.java | 12 +++++++
.../remoting/impl/invm/TransportConstants.java | 4 +++
.../integration/remoting/DirectDeliverTest.java | 41 ++++++++++++++++------
8 files changed, 61 insertions(+), 16 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java \
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 1032a35..f3e5678 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -429,6 +429,7 @@ public class NettyConnection implements Connection {
return "tcp://" + IPV6Util.encloseHost(address.toString());
}
+ @Override
public final boolean isDirectDeliver() {
return directDeliver;
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java \
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index 46e1534..0683a50 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -140,6 +140,8 @@ public interface Connection {
*/
TransportConfiguration getConnectorConfig();
+ boolean isDirectDeliver();
+
ActiveMQPrincipal getDefaultActiveMQPrincipal();
/**
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java \
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
index 416c911..3f6bdca 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -412,6 +412,11 @@ public class ChannelImplTest {
}
@Override
+ public boolean isDirectDeliver() {
+ return false;
+ }
+
+ @Override
public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
return null;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index faefa39..88c35bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -86,7 +86,6 @@ import \
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS import \
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; \
import org.apache.activemq.artemis.core.remoting.CloseListener; import \
org.apache.activemq.artemis.core.remoting.FailureListener;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -192,11 +191,7 @@ public class ServerSessionPacketHandler implements \
ChannelHandler { // use the same executor
this.packetActor = new Actor<>(callExecutor, this::onMessagePacket);
- if (conn instanceof NettyConnection) {
- direct = ((NettyConnection) conn).isDirectDeliver();
- } else {
- direct = false;
- }
+ this.direct = conn.isDirectDeliver();
}
private void clearLargeMessage() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
index a2fdeb0..f8d1915 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
@@ -75,6 +75,8 @@ public final class InVMAcceptor extends AbstractAcceptor {
private final boolean enableBufferPooling;
+ private final boolean directDeliver;
+
public InVMAcceptor(final String name,
final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
@@ -101,6 +103,8 @@ public final class InVMAcceptor extends AbstractAcceptor {
connectionsAllowed = \
ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED, \
TransportConstants.DEFAULT_CONNECTIONS_ALLOWED, configuration);
enableBufferPooling = \
ConfigurationHelper.getBooleanProperty(TransportConstants.BUFFER_POOLING, \
TransportConstants.DEFAULT_BUFFER_POOLING, configuration); +
+ directDeliver = \
ConfigurationHelper.getBooleanProperty(TransportConstants.DIRECT_DELIVER, \
TransportConstants.DEFAULT_DIRECT_DELIVER, configuration); }
@Override
@@ -228,6 +232,7 @@ public final class InVMAcceptor extends AbstractAcceptor {
InVMConnection inVMConnection = new InVMConnection(id, connectionID, \
remoteHandler, connectionListener, clientExecutor, defaultActiveMQPrincipal); \
inVMConnection.setEnableBufferPooling(enableBufferPooling); + \
inVMConnection.setDirectDeliver(directDeliver);
connectionListener.connectionCreated(this, inVMConnection, \
protocolMap.get(ActiveMQClient.DEFAULT_CORE_PROTOCOL)); }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 46f7fea..ba4b997 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -65,6 +65,8 @@ public class InVMConnection implements Connection {
private boolean bufferPoolingEnabled = TransportConstants.DEFAULT_BUFFER_POOLING;
+ private boolean directDeliver = TransportConstants.DEFAULT_DIRECT_DELIVER;
+
public InVMConnection(final int serverID,
final BufferHandler handler,
final BaseConnectionLifeCycleListener listener,
@@ -86,6 +88,7 @@ public class InVMConnection implements Connection {
final BaseConnectionLifeCycleListener listener,
final Executor executor,
final ActiveMQPrincipal defaultActiveMQPrincipal) {
+
this.serverID = serverID;
this.handler = handler;
@@ -276,6 +279,15 @@ public class InVMConnection implements Connection {
}
@Override
+ public boolean isDirectDeliver() {
+ return directDeliver;
+ }
+
+ public void setDirectDeliver(boolean directDeliver) {
+ this.directDeliver = directDeliver;
+ }
+
+ @Override
public String toString() {
return "InVMConnection [serverID=" + serverID + ", id=" + id + "]";
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
index c02b0fd..18ba094 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/TransportConstants.java
@@ -30,6 +30,10 @@ public final class TransportConstants {
public static final boolean DEFAULT_BUFFER_POOLING = true;
+ public static final boolean DEFAULT_DIRECT_DELIVER = false;
+
+ public static final String DIRECT_DELIVER = "directDeliver";
+
private TransportConstants() {
// Utility class
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/DirectDeliverTest.java \
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/DirectDeliverTest.java
index ab00955..9fa0024 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/DirectDeliverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/DirectDeliverTest.java
@@ -29,8 +29,8 @@ import \
org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import \
org.apache.activemq.artemis.api.core.client.ServerLocator; import \
org.apache.activemq.artemis.core.config.Configuration; import \
org.apache.activemq.artemis.core.postoffice.Binding; +import \
org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory; import \
org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -43,31 +43,52 @@ public class DirectDeliverTest extends ActiveMQTestBase {
private ActiveMQServer server;
- private ServerLocator locator;
+ private ServerLocator nettyLocator;
+ private ServerLocator inVMLocator;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
- Map<String, Object> params = new HashMap<>();
- params.put(TransportConstants.DIRECT_DELIVER, true);
+ Map<String, Object> nettyParams = new HashMap<>();
+ nettyParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DIRECT_DELIVER, \
true);
- TransportConfiguration tc = new \
TransportConfiguration(NettyAcceptorFactory.class.getName(), params); + \
TransportConfiguration nettyTransportConfiguration = new \
TransportConfiguration(NettyAcceptorFactory.class.getName(), nettyParams); +
+ Map<String, Object> inVMParams = new HashMap<>();
+ inVMParams.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.DIRECT_DELIVER, \
true); +
+ TransportConfiguration inVMTransportConfiguration = new \
TransportConfiguration(InVMAcceptorFactory.class.getName(), inVMParams); +
+ Configuration config = createBasicConfig();
+ config.addAcceptorConfiguration(nettyTransportConfiguration);
+ config.addAcceptorConfiguration(inVMTransportConfiguration);
- Configuration config = createBasicConfig().addAcceptorConfiguration(tc);
server = createServer(false, config);
server.start();
- locator = createNettyNonHALocator();
- addServerLocator(locator);
+ nettyLocator = createNettyNonHALocator();
+ addServerLocator(nettyLocator);
+
+ inVMLocator = createInVMLocator(0);
+ addServerLocator(inVMLocator);
}
@Test
- public void testDirectDeliver() throws Exception {
+ public void testDirectDeliverNetty() throws Exception {
+ testDirectDeliver(nettyLocator);
+ }
+
+ @Test
+ public void testDirectDeliverInVM() throws Exception {
+ testDirectDeliver(inVMLocator);
+ }
+
+ private void testDirectDeliver(ServerLocator serverLocator) throws Exception {
final String foo = "foo";
- ClientSessionFactory sf = createSessionFactory(locator);
+ ClientSessionFactory sf = createSessionFactory(serverLocator);
ClientSession session = sf.createSession();
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic