[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-dev
Subject:    [jira] [Commented] (AMQ-5260) Cross talk over duplex network connection can lead to blocking (altern
From:       "matteo rulli (JIRA)" <jira () apache ! org>
Date:       2014-09-26 14:55:34
Message-ID: JIRA.12725487.1404481815000.131398.1411743334713 () Atlassian ! JIRA
[Download RAW message or body]


    [ https://issues.apache.org/jira/browse/AMQ-5260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14149239#comment-14149239 \
] 

matteo rulli commented on AMQ-5260:
-----------------------------------

Thank you for your prompt reply! Now I understand the role of MutexTransport. By the \
way, I noticed that the deadlock involves two MutexTransport instances that wrap \
*VMTransport(s)*. Apparently if I apply the following hack {code:java}
--- MutexTransport.java	Thu Jun 05 14:48:36 2014
+++ MutexTransport.edited.java	Fri Sep 26 09:11:41 2014
@@ -19,17 +19,31 @@
 import java.io.IOException;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Thread safe Transport Filter that serializes calls to and from the Transport \
                Stack.
  */
 public class MutexTransport extends TransportFilter {
 
-    private final ReentrantLock writeLock = new ReentrantLock();
+	private static final Logger LOG = LoggerFactory.getLogger(MutexTransport.class);
+    private final static ReentrantLock vmWriteLock = new ReentrantLock();
+    private ReentrantLock writeLock;
     private boolean syncOnCommand;
 
     public MutexTransport(Transport next) {
         super(next);
         this.syncOnCommand = false;
+        
+        writeLock = null;
+        if(next != null && next.toString().startsWith("vm://")){
+        	writeLock = vmWriteLock;
+        	LOG.error("#@mrul# vm transport with mutex: " + next);
+        }else{ 
+        	writeLock = new ReentrantLock();
+        	LOG.error("#@mrul# non-vm transport with mutex: " + next);
+        }
     }
 
     public MutexTransport(Transport next, boolean syncOnCommand) {
{code}

the deadlock disappears. I noticed that the network bridge create many VMTransport of \
the form vm://brokerName#<number> using basically three different thread paths and \
all these local VMTransports trigger many intertwined command exchanges that somehow \
determine the deadlock:

h1. VMTransports creation PATH 1
{noformat}
java.lang.Thread.getStackTrace(Unknown Source)
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.transport.vm.VMTransportServer$1.<init>(VMTransportServer.java:77)
 org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:77)
 org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
 org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
 org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.network.NetworkConnector.createLocalTransport(NetworkConnector.java:154)
 org.apache.activemq.network.DiscoveryNetworkConnector.onServiceAdd(DiscoveryNetworkConnector.java:136)
 org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent.start(SimpleDiscoveryAgent.java:89)
 org.apache.activemq.network.DiscoveryNetworkConnector.handleStart(DiscoveryNetworkConnector.java:205)
 org.apache.activemq.network.NetworkConnector$1.doStart(NetworkConnector.java:59)
org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
org.apache.activemq.network.NetworkConnector.start(NetworkConnector.java:159)
org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2501)
org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:693)
org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:659)
org.apache.activemq.broker.BrokerService.start(BrokerService.java:595)
org.apache.activemq.JmsMultipleBrokersTestSupport.startAllBrokers(JmsMultipleBrokersTestSupport.java:277)
 {noformat}

h1. VMTransports creation PATH 2
{noformat}
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.transport.vm.VMTransportServer$1.<init>(VMTransportServer.java:77)
 org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:77)
 org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
 org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
 org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.network.NetworkBridgeFactory.createLocalTransport(NetworkBridgeFactory.java:80)
 org.apache.activemq.network.DemandForwardingBridgeSupport.start(DemandForwardingBridgeSupport.java:184)
 org.apache.activemq.network.DiscoveryNetworkConnector.onServiceAdd(DiscoveryNetworkConnector.java:152)
 org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent.start(SimpleDiscoveryAgent.java:89)
 org.apache.activemq.network.DiscoveryNetworkConnector.handleStart(DiscoveryNetworkConnector.java:205)
 org.apache.activemq.network.NetworkConnector$1.doStart(NetworkConnector.java:59)
org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
org.apache.activemq.network.NetworkConnector.start(NetworkConnector.java:159)
org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2501)
org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:693)
org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:659)
org.apache.activemq.broker.BrokerService.start(BrokerService.java:595)
org.apache.activemq.JmsMultipleBrokersTestSupport.startAllBrokers(JmsMultipleBrokersTestSupport.java:277)
 {noformat}

h1. VMTransports creation PATH 3
{noformat}
java.lang.Thread.getStackTrace(Unknown Source)
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.transport.vm.VMTransportServer$1.<init>(VMTransportServer.java:77)
 org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:77)
 org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
 org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
 org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.network.NetworkBridgeFactory.createLocalTransport(NetworkBridgeFactory.java:80)
 org.apache.activemq.broker.TransportConnection.processBrokerInfo(TransportConnection.java:1321)
 org.apache.activemq.command.BrokerInfo.visit(BrokerInfo.java:126)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
 org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:61)
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
 org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
 org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
 org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.lang.Thread.run(Unknown Source)
{noformat}

h1. VMTransports creation PATH 3, take 2 (cyclic???)
{noformat}
org.apache.activemq.broker.TransportConnection.processBrokerInfo(TransportConnection.java:1338)
 java.lang.Thread.getStackTrace(Unknown Source)
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.command.BrokerInfo.visit(BrokerInfo.java:126)            \
<------------------------------ !!! \
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:88) \
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294) \
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
 org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
 org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
 org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:61)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
 org.apache.activemq.network.NetworkBridgeFactory.createLocalTransport(NetworkBridgeFactory.java:80)
 org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
 org.apache.activemq.broker.TransportConnection.processBrokerInfo(TransportConnection.java:1321)
 org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
org.apache.activemq.command.BrokerInfo.visit(BrokerInfo.java:126)             \
<------------------------------ !!! \
org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138) \
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294) \
org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69) \
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
 org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
 org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:61)
org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.lang.Thread.run(Unknown Source)
{noformat}

So, as a tentative fix I tried to globally lock VMTransport usage under the \
assumption that VM transport should be negligible in number with respect to other \
transports (tcp, nio, etc.) and thus the static lock in VM trans. should not \
constitute a big performace bottleneck; I added a new constructor in \
_*MutexTransport*_ in this way: {code}
public class MutexTransport extends TransportFilter {
	// ...
	private final static ReentrantLock staticWriteLock = new ReentrantLock();
	private ReentrantLock writeLock = new ReentrantLock();
	// ...
	public MutexTransport(Transport next, boolean syncOnCommand, boolean useStaticLock) \
{  this(next, syncOnCommand);
		if(useStaticLock)
			writeLock = staticWriteLock;
	}
{code}
and I modified the _*org.apache.activemq.transport.vm.VMTransportServer.configure(Transport)*_ \
implementation in this way: {code}
public static Transport configure(Transport transport) {
    transport = new MutexTransport(transport, false, true); // use static locks for \
VMTransport  transport = new ResponseCorrelator(transport);
    return transport;
}
{code}

Apparently this solve the problem (anyway I'm going to perform additional tests to \
double check this). Is it in your opinion a reasonable approach or is it just a \
_quick and dirty_ solution?

> Cross talk over duplex network connection can lead to blocking (alternative take)
> ---------------------------------------------------------------------------------
> 
> Key: AMQ-5260
> URL: https://issues.apache.org/jira/browse/AMQ-5260
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.9.0, 5.10.0
> Reporter: matteo rulli
> Attachments: AMQ5260AdvancedTest.java, AMQ_5260.patch, AMQ_5260_2.patch, \
> AMQ_5260_3.patch, deadlock.jpg, debug.jpg 
> 
> Pretty the same description with respect to AMQ-4328. 
> ----
> !deadlock.jpg!
> h2. Stacktraces:
> Stacktrace no.1:
> {noformat}
> Name: ActiveMQ NIO Worker 12
> State: BLOCKED on java.net.URI@1bae2b28 owned by: ActiveMQ Transport: \
> tcp:///10.0.1.219:61616@57789 Total blocked: 2  Total waited: 67
> Stack trace: 
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:714)
>  org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:581)
>  org.apache.activemq.network.DemandForwardingBridgeSupport$3.onCommand(DemandForwardingBridgeSupport.java:191)
>  org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
>  org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
>  org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
>  org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
> org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
> org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
>  org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> java.lang.Thread.run(Unknown Source)
> {noformat}
> ----
> stack trace no.2
> {noformat}
> Name: ActiveMQ Transport: tcp:///10.0.1.219:61616@57789
> State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@3cdbfa3e \
> owned by: ActiveMQ BrokerService[master2] Task-4 Total blocked: 19  Total waited: 3
> Stack trace: 
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(Unknown Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown \
> Source) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Unknown \
> Source) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(Unknown \
> Source) java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(Unknown Source)
> java.util.concurrent.locks.ReentrantLock.lock(Unknown Source)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1339)
>  org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:858)
>  org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:818)
>  org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:151)
>  org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
>  org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:138)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:127)
> - locked java.util.concurrent.atomic.AtomicBoolean@689389da
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:104)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
>  org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
>  org.apache.activemq.network.DemandForwardingBridgeSupport.addSubscription(DemandForwardingBridgeSupport.java:856)
>  org.apache.activemq.network.DemandForwardingBridgeSupport.addConsumerInfo(DemandForwardingBridgeSupport.java:1128)
>  org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:714)
>                 
> - locked java.net.URI@1bae2b28
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:581)
>  org.apache.activemq.network.DemandForwardingBridgeSupport$3.onCommand(DemandForwardingBridgeSupport.java:191)
>  org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
>  org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.failover.FailoverTransport$3.onCommand(FailoverTransport.java:196)
>  org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
>  org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
>  org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
> java.lang.Thread.run(Unknown Source)
> {noformat}
> ----
> stack trace no.3
> {noformat}
> Name: ActiveMQ BrokerService[master2] Task-4
> State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@717bb9c \
> owned by: ActiveMQ Transport: tcp:///10.0.1.219:61616@57789 Total blocked: 19  \
> Total waited: 117 Stack trace: 
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(Unknown Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown \
> Source) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Unknown \
> Source) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(Unknown \
> Source) java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(Unknown Source)
> java.util.concurrent.locks.ReentrantLock.lock(Unknown Source)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:921)
>  org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:173)
>  org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
>  org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:138)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:127)
> - locked java.util.concurrent.atomic.AtomicBoolean@453bb109
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:104)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1339)
>  org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:858)
>  org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:904)
>  org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:129)
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:47)
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic