[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-dev
Subject: [jira] [Updated] (AMQ-3644) Derby Persistence Adapter does not work with large queue
From: "Timothy Bish (JIRA)" <jira () apache ! org>
Date: 2013-01-30 23:53:12
Message-ID: JIRA.12536267.1324637622844.219011.1359589992738 () arcas
[Download RAW message or body]
[ https://issues.apache.org/jira/browse/AMQ-3644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel \
]
Timothy Bish updated AMQ-3644:
------------------------------
Fix Version/s: (was: NEEDS_REVIEWED)
AGING_TO_DIE
> Derby Persistence Adapter does not work with large queue
> --------------------------------------------------------
>
> Key: AMQ-3644
> URL: https://issues.apache.org/jira/browse/AMQ-3644
> Project: ActiveMQ
> Issue Type: Improvement
> Components: Message Store
> Affects Versions: 5.4.2, 5.4.3, 5.5.1
> Reporter: Steffen Kuche
> Priority: Minor
> Labels: broker, derby, messages, persistence
> Fix For: AGING_TO_DIE
>
> Attachments: patch.txt, test.zip
>
>
> Our application was blocked by activemq when a consumer reconnects to the broker \
> after a longer time. It pointed out, that the method \
> DefaultJDBCAdapter.recoverNextMessages took about *three minutes* to recover the \
> next 20 Messages for the Store Based Cursor. Here is the corresponding stacktrace: \
> {noFormat} "BrokerService[embedded] Task" daemon prio=10 tid=0x0000000042bcf000 \
> nid=0x17fd runnable [0x00007f2b342a1000]
> java.lang.Thread.State: RUNNABLE
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:145)
> at org.apache.derby.impl.io.DirFile.getOutputStream(Unknown Source)
> at org.apache.derby.impl.store.raw.data.StreamFileContainer.run(Unknown Source)
> at java.security.AccessController.doPrivileged(Native Method)
> at org.apache.derby.impl.store.raw.data.StreamFileContainer.privGetOutputStream(Unknown \
> Source)
> - locked <0x00000000f23e6640> (a \
> org.apache.derby.impl.store.raw.data.StreamFileContainer) at \
> org.apache.derby.impl.store.raw.data.StreamFileContainer.load(Unknown Source) at \
> org.apache.derby.impl.store.raw.data.BaseDataFileFactory.addAndLoadStreamContainer(Unknown \
> Source) at org.apache.derby.impl.store.raw.xact.Xact.addAndLoadStreamContainer(Unknown \
> Source) at org.apache.derby.impl.store.access.sort.MergeSort.createMergeRun(Unknown \
> Source) at org.apache.derby.impl.store.access.sort.MergeInserter.insert(Unknown \
> Source) at org.apache.derby.impl.sql.execute.SortResultSet.loadSorter(Unknown \
> Source) at org.apache.derby.impl.sql.execute.SortResultSet.openCore(Unknown \
> Source) at org.apache.derby.impl.sql.execute.BasicNoPutResultSetImpl.open(Unknown \
> Source) at org.apache.derby.impl.sql.GenericPreparedStatement.executeStmt(Unknown \
> Source) at org.apache.derby.impl.sql.GenericPreparedStatement.execute(Unknown \
> Source) at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown \
> Source)
> - locked <0x00000000a4981ec8> (a org.apache.derby.impl.jdbc.EmbedConnection40)
> at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeStatement(Unknown \
> Source) at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeQuery(Unknown \
> Source) at org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:96)
> at org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:96)
> at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecoverNextMessages(DefaultJDBCAdapter.java:929)
> at org.apache.activemq.store.jdbc.JDBCMessageStore.recoverNextMessages(JDBCMessageStore.java:229)
> at org.apache.activemq.store.ProxyMessageStore.recoverNextMessages(ProxyMessageStore.java:88)
> at org.apache.activemq.broker.region.cursors.QueueStorePrefetch.doFillBatch(QueueStorePrefetch.java:97)
> at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:262)
>
> - locked <0x00000000ad06ca48> (a \
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch) at \
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:110)
> at org.apache.activemq.broker.region.cursors.StoreQueueCursor.reset(StoreQueueCursor.java:157)
>
> - locked <0x00000000ad06ca00> (a \
> org.apache.activemq.broker.region.cursors.StoreQueueCursor) at \
> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1678) at \
> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1898) at \
> org.apache.activemq.broker.region.Queue.iterate(Queue.java:1425)
> - locked <0x00000000ad06d010> (a org.apache.activemq.broker.region.Queue$3)
> at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> {noFormat}
> During these three minutes the producer was blocked. This situation is illustrated \
> by the stack of the producer Thread: {noFormat}
> "RMI TCP Connection(13)-10.0.0.42" daemon prio=10 tid=0x0000000041cca800 nid=0x1716 \
> waiting on condition [0x00007f2b2b7f3000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000fae32cd0> (a \
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at \
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at \
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
> at org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
> at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87)
> at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1276)
> at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1760)
> - locked <0x00000000fae31548> (a java.lang.Object)
> at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
> at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
> - locked <0x00000000fae31068> (a org.apache.activemq.ActiveMQMessageProducer)
> at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:55)
> at com.subshell.sophora.server.application.replication.Master.sendMessage(Master.java:117)
> at com.subshell.sophora.server.application.replication.Master.sendServerEvent(Master.java:79)
> at com.subshell.sophora.server.application.replication.ReplicationMaster.consumeEvent(ReplicationMaster.java:100)
> at com.subshell.sophora.server.application.manager.impl.ObservationManager.afterCompletion(ObservationManager.java:74)
> at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCompletion(TransactionSynchronizationUtils.java:168)
> at org.springframework.transaction.support.AbstractPlatformTransactionManager.invokeAfterCompletion(AbstractPlatformTransactionManager.java:996)
> at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCompletion(AbstractPlatformTransactionManager.java:971)
> at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:799)
> at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:723)
> at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:393)
> at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:120)
> at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
> at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:80)
> at com.subshell.sophora.commons.profile.Profiler.profile(Profiler.java:130)
> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:621)
> at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:610)
> at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:65)
> at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
> at org.springframework.aop.aspectj.AspectJAfterAdvice.invoke(AspectJAfterAdvice.java:42)
> at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:161)
> at org.springframework.aop.framework.adapter.MethodBeforeAdviceInterceptor.invoke(MethodBeforeAdviceInterceptor.java:50)
> at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:161)
> at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:90)
> at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
> at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
> at $Proxy37.saveDocument(Unknown Source)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)
> at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
> at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
> at com.subshell.sophora.server.remoting.RemoteInvocationTraceInterceptor.invoke(RemoteInvocationTraceInterceptor.java:35)
> at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
> at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
> at $Proxy40.saveDocument(Unknown Source)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at org.springframework.remoting.support.RemoteInvocation.invoke(RemoteInvocation.java:205)
> at org.springframework.remoting.support.DefaultRemoteInvocationExecutor.invoke(DefaultRemoteInvocationExecutor.java:38)
> at org.springframework.remoting.support.RemoteInvocationBasedExporter.invoke(RemoteInvocationBasedExporter.java:78)
> at org.springframework.remoting.rmi.RmiBasedExporter.invoke(RmiBasedExporter.java:73)
> at org.springframework.remoting.rmi.RmiInvocationWrapper.invoke(RmiInvocationWrapper.java:72)
> at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:305)
> at sun.rmi.transport.Transport$1.run(Transport.java:159)
> at java.security.AccessController.doPrivileged(Native Method)
> at sun.rmi.transport.Transport.serviceCall(Transport.java:155)
> at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:535)
> at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:790)
> at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:649)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> {noFormat}
> After same research it turned out, that Derby writes multiple gigabyte of temporary \
> files in its tmp directory during the select statement. Afterwards these temporary \
> files are deleted and rebuild when the next messages are requested from the message \
> store. The problem is the PreparedStatement that is used for the select statement. \
> When I changed the PreparedStatement to a normal Statement the execution time got \
> lower than 10ms. In the attachment you can find a patch, how I fixed this issue.
> Furthermore you can find a Test-Program (you can import it as a Maven-Eclipse \
> project) which reproduces this issue. This program creates a derby database with a \
> the activemq Message table and fills it with data (2000 Messages, this can take \
> several hours). I assumed 3MB for one message as our messages contain same binary \
> data and has an average size of 3MB. After the database is built one time a \
> Statement and one time a PreparedStatment is used to retrieve the next messages. \
> The Statement takes about 1 ms and the PreparedStatement about 258805 ms. Also the \
> second PreparedStatement takes as much time. Here are the temporary files created \
> by derby during the select Statement: {noFormat}
> insgesamt 5,4G
> 287M 2011-12-23 09:48 T1324630116288.tmp
> 287M 2011-12-23 09:49 T1324630116289.tmp
> 287M 2011-12-23 09:49 T1324630116290.tmp
> 287M 2011-12-23 09:49 T1324630116291.tmp
> 287M 2011-12-23 09:49 T1324630116292.tmp
> 287M 2011-12-23 09:49 T1324630116293.tmp
> 287M 2011-12-23 09:50 T1324630116294.tmp
> 287M 2011-12-23 09:50 T1324630116295.tmp
> 287M 2011-12-23 09:50 T1324630116296.tmp
> 287M 2011-12-23 09:50 T1324630116297.tmp
> 287M 2011-12-23 09:51 T1324630116298.tmp
> 287M 2011-12-23 09:51 T1324630116299.tmp
> 287M 2011-12-23 09:51 T1324630116300.tmp
> 287M 2011-12-23 09:51 T1324630116301.tmp
> 287M 2011-12-23 09:51 T1324630116302.tmp
> 287M 2011-12-23 09:52 T1324630116303.tmp
> 287M 2011-12-23 09:52 T1324630116304.tmp
> 287M 2011-12-23 09:52 T1324630116305.tmp
> 284M 2011-12-23 09:52 T1324630116306.tmp
> {noFormat}
> The exeuction of a cleanup (DefaultJDBCAdapter.doDeleteOldMessages) also took about \
> 6 Minutes. But in this case I patched activemq not to do a cleanup at all, as our \
> messages don't expire. I'm almost sure that this issue can be observed in other \
> methodes of the DefaultJDBCAdapter. But for us it was sufficient to fix it in the \
> recovering of messages.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic