[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-dev
Subject:    [jira] [Commented] (AMQ-5077) Improve performance of ConcurrentStoreAndDispatch
From:       "Richard Wagg (JIRA)" <jira () apache ! org>
Date:       2014-03-31 16:53:16
Message-ID: JIRA.12697610.1393483473682.37216.1396284796069 () arcas
[Download RAW message or body]


    [ https://issues.apache.org/jira/browse/AMQ-5077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13955378#comment-13955378 \
] 

Richard Wagg commented on AMQ-5077:
-----------------------------------

Calling  
connectionFactory.setProducerWindowSize() 
With sizes varying from 10k to 10Mb has no effect on the throughput i can attain. All \
stack traces i take of the producer catch it in code like: {noformat}
"main" prio=10 tid=0x000000000bc3b000 nid=0x4109 runnable [0x0000000041ebe000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
        at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115)
  at java.io.DataOutputStream.flush(DataOutputStream.java:123)
        at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:176)
                
        at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:304)
                
        at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
                
        at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
                
        at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
                
        at org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:658)
                
        - locked <0x000000050f60c5e8> (a java.lang.Object)
        at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
                
        at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
                
        at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1321)
                
        at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1315)
                
        at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1853)
        - locked <0x000000050f60c668> (a java.lang.Object)
        at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:289)
                
        at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:224)
                
        at org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessageProducerSupport.java:269)


{noformat}


My understanding of flow control & the producer window size: 

Client side: 
- window size is set.
- Before each send, current size of all messages in flight is checked to see if \
                window is exceeded. 
- if producerWindow.waitForSpace() doesn't block, then the message is sent. 
- After the message is sent, the producer in flight size is incremented by the \
message size (and decremented when the ack is received). 

Broker side:
- Each queue has a memory limit set, as well as overall memory limit and disk store \
                limit. 
- For each message dispatched for a given queue, each of these limits is checked. 
- if any limit is set and sendFailIfNoSpace is set to true, the producer should get \
an exception sent back. 

In none of my tests have i caught any thread stuck inside the flow control handling \
logic. In all cases they're inside network code - producer side as above, broker side \
in something like:  {noformat}
"ActiveMQ NIO Worker 29" daemon prio=10 tid=0x000000001775d000 nid=0x6a0d runnable \
[0x000000004473d000]  java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000638b62430> (a \
                org.apache.activemq.store.kahadb.KahaDBStore$StoreQueueTask$InnerFutureTask)
                
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:425)
        at java.util.concurrent.FutureTask.get(FutureTask.java:187)
        at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:942)
        at org.apache.activemq.broker.region.Queue.send(Queue.java:727)
        at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:395)
                
        at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:441)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
                
        at org.apache.activemq.broker.region.virtual.CompositeDestinationFilter.send(CompositeDestinationFilter.java:86)
                
        at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:395)
                
        at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:441)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
                
        at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
                
        at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:307)
                
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147)
        at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:206)
                
        at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:152)
                
        at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:496)
                
        at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:756)
                
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
                
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
                
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
                
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
                
        at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
                
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
                
        at org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
                
        at org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
                
        at org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
                
        at org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
                
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
                
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:744)

   Locked ownable synchronizers:
        - <0x00000006723b8748> (a java.util.concurrent.ThreadPoolExecutor$Worker)
{noformat}


I think we're exhausting TCP level message in flight limits long before producer flow \
control or window sizes become relevant. We can't take advantage a lot of the higher \
level settings like async sends, producer message windows or async writes to the \
diskstore, because on the broker side, all the overhead associated with persisting \
messages to a queue is done on the NIO worker thread, which quickly becomes the \
blocking factor.  Are there any options that i'm missing, to take more of the work \
done off the NIO threads? 



> Improve performance of ConcurrentStoreAndDispatch
> -------------------------------------------------
> 
> Key: AMQ-5077
> URL: https://issues.apache.org/jira/browse/AMQ-5077
> Project: ActiveMQ
> Issue Type: Wish
> Components: Message Store
> Affects Versions: 5.9.0
> Environment: 5.9.0.redhat-610343
> Reporter: Jason Shepherd
> Assignee: Gary Tully
> Attachments: Test combinations.xlsx, compDesPerf.tar.gz, topicRouting.zip
> 
> 
> We have publishers publishing to a topic which has 5 topic -> queue routings, and \
> gets a max message rate attainable of ~833 messages/sec, with each message around \
> 5k in size. To test this i set up a JMS config with topic queues:
> Topic
> TopicRouted.1
> ...
> TopicRouted.11
> Each topic has an increasing number of routings to queues, and a client is set up \
> to subscribe to all the queues. Rough message rates:
> routings messages/sec
> 0 2500
> 1 1428
> 2 2000
> 3 1428
> 4 1111
> 5 833
> This occurs whether the broker config has producerFlowControl="false" set to true \
> or false , and KahaDB disk synching is turned off. We also tried experimenting with \
> concurrentStoreAndDispatch, but that didn't seem to help. LevelDB didn't give any \
> notable performance improvement either. We also have asyncSend enabled on the \
> producer, and have a requirement to use persistent messages. We have also \
> experimented with sending messages in a transaction, but that hasn't really helped. \
> It seems like producer throughput rate across all queue destinations, all \
> connections and all publisher machines is limited by something on the broker, \
> through a mechanism which is not producer flow control. I think the prime suspect \
> is still contention on the index. We did some test with Yourkit profiler.
> Profiler was attached to broker at startup, allowed to run and then a topic \
> publisher was started, routing to 5 queues.  Profiler statistics were reset, the \
> publisher allowed to run for 60 seconds, and then profiling snapshot was taken. \
> During that time, ~9600 messages were logged as being sent for a rate of ~160/sec. \
> This ties in roughly with the invocation counts recorded in the snapshot (i think) \
> - ~43k calls.  From what i can work out, in the snapshot (filtering everything but \
> org.apache.activemq.store.kahadb),  For the 60 second sample period, 
> 24.8 seconds elapsed in \
> org.apache.activemq.store.kahadb.KahaDbTransactionStore$1.removeAsyncMessage(ConnectionContext, \
> MessageAck). 18.3 seconds elapsed in \
> org.apache.activemq.store.kahadb.KahaDbTransactionStore$1.asyncAddQueueMessage(ConnectionContext, \
> Message, boolean). From these, a further large portion of the time is spent inside \
> MessageDatabase: org.apache.activemq.store.kahadb.MessageDatabase.process(KahaRemoveMessageCommand, \
> Location) - 10 secs elapsed \
> org.apache.activemq.store.kahadb.MessageDatabase.process(KahaAddMessageCommand, \
> Location) - 8.5 secs elapsed. As both of these lock on indexLock.writeLock(), and \
> both take place on the NIO transport threads, i think this accounts for at least \
> some of the message throughput limits. As messages are added and removed from the \
> index one by one, regardless of sync type settings, this adds a fair amount of \
> overhead.  While we're not synchronising on writes to disk, we are performing work \
> on the NIO worker thread which can block on locks, and could account for the \
> behaviour we've seen client side.  To Reproduce:
> 1. Install a broker and use the attached configuration.
> 2. Use the 5.8.0 example ant script to consume from the queues, TopicQueueRouted.1 \
> - 5. eg: ant consumer -Durl=tcp://localhost:61616 -Dsubject=TopicQueueRouted.1 \
> -Duser=admin -Dpassword=admin -Dmax=-1 3. Use the modified version of 5.8.0 example \
> ant script (attached) to send messages to topics, TopicRouted.1 - 5, eg: ant \
> producer -Durl='tcp://localhost:61616?jms.useAsyncSend=true&wireFormat.tightEncoding \
> Enabled=false&keepAlive=true&wireFormat.maxInactivityDuration=60000&socketBufferSize=32768' \
> -Dsubject=TopicRouted.1 -Duser=admin -Dpassword=admin -Dmax=1 -Dtopic=true \
> -DsleepTime=0 -Dmax=10000 -DmessageSize=5000 This modified version of the script \
> prints the number of messages per second and prints it to the console.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic