[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-dev
Subject: [jira] [Commented] (AMQ-5077) Improve performance of ConcurrentStoreAndDispatch
From: "Richard Wagg (JIRA)" <jira () apache ! org>
Date: 2014-03-31 16:53:16
Message-ID: JIRA.12697610.1393483473682.37216.1396284796069 () arcas
[Download RAW message or body]
[ https://issues.apache.org/jira/browse/AMQ-5077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13955378#comment-13955378 \
]
Richard Wagg commented on AMQ-5077:
-----------------------------------
Calling
connectionFactory.setProducerWindowSize()
With sizes varying from 10k to 10Mb has no effect on the throughput i can attain. All \
stack traces i take of the producer catch it in code like: {noformat}
"main" prio=10 tid=0x000000000bc3b000 nid=0x4109 runnable [0x0000000041ebe000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:176)
at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:304)
at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
at org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:658)
- locked <0x000000050f60c5e8> (a java.lang.Object)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1321)
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1315)
at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1853)
- locked <0x000000050f60c668> (a java.lang.Object)
at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:289)
at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:224)
at org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessageProducerSupport.java:269)
{noformat}
My understanding of flow control & the producer window size:
Client side:
- window size is set.
- Before each send, current size of all messages in flight is checked to see if \
window is exceeded.
- if producerWindow.waitForSpace() doesn't block, then the message is sent.
- After the message is sent, the producer in flight size is incremented by the \
message size (and decremented when the ack is received).
Broker side:
- Each queue has a memory limit set, as well as overall memory limit and disk store \
limit.
- For each message dispatched for a given queue, each of these limits is checked.
- if any limit is set and sendFailIfNoSpace is set to true, the producer should get \
an exception sent back.
In none of my tests have i caught any thread stuck inside the flow control handling \
logic. In all cases they're inside network code - producer side as above, broker side \
in something like: {noformat}
"ActiveMQ NIO Worker 29" daemon prio=10 tid=0x000000001775d000 nid=0x6a0d runnable \
[0x000000004473d000] java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000638b62430> (a \
org.apache.activemq.store.kahadb.KahaDBStore$StoreQueueTask$InnerFutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:425)
at java.util.concurrent.FutureTask.get(FutureTask.java:187)
at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:942)
at org.apache.activemq.broker.region.Queue.send(Queue.java:727)
at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:395)
at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:441)
at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
at org.apache.activemq.broker.region.virtual.CompositeDestinationFilter.send(CompositeDestinationFilter.java:86)
at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:395)
at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:441)
at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:307)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147)
at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:206)
at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:152)
at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:496)
at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:756)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
at org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
at org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
at org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Locked ownable synchronizers:
- <0x00000006723b8748> (a java.util.concurrent.ThreadPoolExecutor$Worker)
{noformat}
I think we're exhausting TCP level message in flight limits long before producer flow \
control or window sizes become relevant. We can't take advantage a lot of the higher \
level settings like async sends, producer message windows or async writes to the \
diskstore, because on the broker side, all the overhead associated with persisting \
messages to a queue is done on the NIO worker thread, which quickly becomes the \
blocking factor. Are there any options that i'm missing, to take more of the work \
done off the NIO threads?
> Improve performance of ConcurrentStoreAndDispatch
> -------------------------------------------------
>
> Key: AMQ-5077
> URL: https://issues.apache.org/jira/browse/AMQ-5077
> Project: ActiveMQ
> Issue Type: Wish
> Components: Message Store
> Affects Versions: 5.9.0
> Environment: 5.9.0.redhat-610343
> Reporter: Jason Shepherd
> Assignee: Gary Tully
> Attachments: Test combinations.xlsx, compDesPerf.tar.gz, topicRouting.zip
>
>
> We have publishers publishing to a topic which has 5 topic -> queue routings, and \
> gets a max message rate attainable of ~833 messages/sec, with each message around \
> 5k in size. To test this i set up a JMS config with topic queues:
> Topic
> TopicRouted.1
> ...
> TopicRouted.11
> Each topic has an increasing number of routings to queues, and a client is set up \
> to subscribe to all the queues. Rough message rates:
> routings messages/sec
> 0 2500
> 1 1428
> 2 2000
> 3 1428
> 4 1111
> 5 833
> This occurs whether the broker config has producerFlowControl="false" set to true \
> or false , and KahaDB disk synching is turned off. We also tried experimenting with \
> concurrentStoreAndDispatch, but that didn't seem to help. LevelDB didn't give any \
> notable performance improvement either. We also have asyncSend enabled on the \
> producer, and have a requirement to use persistent messages. We have also \
> experimented with sending messages in a transaction, but that hasn't really helped. \
> It seems like producer throughput rate across all queue destinations, all \
> connections and all publisher machines is limited by something on the broker, \
> through a mechanism which is not producer flow control. I think the prime suspect \
> is still contention on the index. We did some test with Yourkit profiler.
> Profiler was attached to broker at startup, allowed to run and then a topic \
> publisher was started, routing to 5 queues. Profiler statistics were reset, the \
> publisher allowed to run for 60 seconds, and then profiling snapshot was taken. \
> During that time, ~9600 messages were logged as being sent for a rate of ~160/sec. \
> This ties in roughly with the invocation counts recorded in the snapshot (i think) \
> - ~43k calls. From what i can work out, in the snapshot (filtering everything but \
> org.apache.activemq.store.kahadb), For the 60 second sample period,
> 24.8 seconds elapsed in \
> org.apache.activemq.store.kahadb.KahaDbTransactionStore$1.removeAsyncMessage(ConnectionContext, \
> MessageAck). 18.3 seconds elapsed in \
> org.apache.activemq.store.kahadb.KahaDbTransactionStore$1.asyncAddQueueMessage(ConnectionContext, \
> Message, boolean). From these, a further large portion of the time is spent inside \
> MessageDatabase: org.apache.activemq.store.kahadb.MessageDatabase.process(KahaRemoveMessageCommand, \
> Location) - 10 secs elapsed \
> org.apache.activemq.store.kahadb.MessageDatabase.process(KahaAddMessageCommand, \
> Location) - 8.5 secs elapsed. As both of these lock on indexLock.writeLock(), and \
> both take place on the NIO transport threads, i think this accounts for at least \
> some of the message throughput limits. As messages are added and removed from the \
> index one by one, regardless of sync type settings, this adds a fair amount of \
> overhead. While we're not synchronising on writes to disk, we are performing work \
> on the NIO worker thread which can block on locks, and could account for the \
> behaviour we've seen client side. To Reproduce:
> 1. Install a broker and use the attached configuration.
> 2. Use the 5.8.0 example ant script to consume from the queues, TopicQueueRouted.1 \
> - 5. eg: ant consumer -Durl=tcp://localhost:61616 -Dsubject=TopicQueueRouted.1 \
> -Duser=admin -Dpassword=admin -Dmax=-1 3. Use the modified version of 5.8.0 example \
> ant script (attached) to send messages to topics, TopicRouted.1 - 5, eg: ant \
> producer -Durl='tcp://localhost:61616?jms.useAsyncSend=true&wireFormat.tightEncoding \
> Enabled=false&keepAlive=true&wireFormat.maxInactivityDuration=60000&socketBufferSize=32768' \
> -Dsubject=TopicRouted.1 -Duser=admin -Dpassword=admin -Dmax=1 -Dtopic=true \
> -DsleepTime=0 -Dmax=10000 -DmessageSize=5000 This modified version of the script \
> prints the number of messages per second and prints it to the console.
--
This message was sent by Atlassian JIRA
(v6.2#6252)
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic