[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-dev
Subject:    [jira] [Updated] (AMQ-3644) Derby Persistence Adapter does not work with large queue
From:       "Timothy Bish (JIRA)" <jira () apache ! org>
Date:       2013-01-30 23:53:12
Message-ID: JIRA.12536267.1324637622844.219011.1359589992738 () arcas
[Download RAW message or body]


     [ https://issues.apache.org/jira/browse/AMQ-3644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel \
]

Timothy Bish updated AMQ-3644:
------------------------------

    Fix Version/s:     (was: NEEDS_REVIEWED)
                   AGING_TO_DIE
    
> Derby Persistence Adapter does not work with large queue
> --------------------------------------------------------
> 
> Key: AMQ-3644
> URL: https://issues.apache.org/jira/browse/AMQ-3644
> Project: ActiveMQ
> Issue Type: Improvement
> Components: Message Store
> Affects Versions: 5.4.2, 5.4.3, 5.5.1
> Reporter: Steffen Kuche
> Priority: Minor
> Labels: broker, derby, messages, persistence
> Fix For: AGING_TO_DIE
> 
> Attachments: patch.txt, test.zip
> 
> 
> Our application was blocked by activemq when a consumer reconnects to the broker \
> after a longer time. It pointed out, that the method \
> DefaultJDBCAdapter.recoverNextMessages took about *three minutes* to recover the \
> next 20 Messages for the Store Based Cursor. Here is the corresponding stacktrace: \
> {noFormat} "BrokerService[embedded] Task" daemon prio=10 tid=0x0000000042bcf000 \
>                 nid=0x17fd runnable [0x00007f2b342a1000]
> java.lang.Thread.State: RUNNABLE
> 	at java.io.FileOutputStream.open(Native Method)
> 	at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
> 	at java.io.FileOutputStream.<init>(FileOutputStream.java:145)
> 	at org.apache.derby.impl.io.DirFile.getOutputStream(Unknown Source)
> 	at org.apache.derby.impl.store.raw.data.StreamFileContainer.run(Unknown Source)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at org.apache.derby.impl.store.raw.data.StreamFileContainer.privGetOutputStream(Unknown \
>                 Source)
> 	- locked <0x00000000f23e6640> (a \
> org.apache.derby.impl.store.raw.data.StreamFileContainer)  at \
> org.apache.derby.impl.store.raw.data.StreamFileContainer.load(Unknown Source)  at \
> org.apache.derby.impl.store.raw.data.BaseDataFileFactory.addAndLoadStreamContainer(Unknown \
> Source)  at org.apache.derby.impl.store.raw.xact.Xact.addAndLoadStreamContainer(Unknown \
> Source)  at org.apache.derby.impl.store.access.sort.MergeSort.createMergeRun(Unknown \
> Source)  at org.apache.derby.impl.store.access.sort.MergeInserter.insert(Unknown \
> Source)  at org.apache.derby.impl.sql.execute.SortResultSet.loadSorter(Unknown \
> Source)  at org.apache.derby.impl.sql.execute.SortResultSet.openCore(Unknown \
> Source)  at org.apache.derby.impl.sql.execute.BasicNoPutResultSetImpl.open(Unknown \
> Source)  at org.apache.derby.impl.sql.GenericPreparedStatement.executeStmt(Unknown \
> Source)  at org.apache.derby.impl.sql.GenericPreparedStatement.execute(Unknown \
> Source)  at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown \
>                 Source)
> 	- locked <0x00000000a4981ec8> (a org.apache.derby.impl.jdbc.EmbedConnection40)
> 	at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeStatement(Unknown \
> Source)  at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeQuery(Unknown \
> Source)  at org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:96)
>   at org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:96)
>   at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecoverNextMessages(DefaultJDBCAdapter.java:929)
>   at org.apache.activemq.store.jdbc.JDBCMessageStore.recoverNextMessages(JDBCMessageStore.java:229)
>   at org.apache.activemq.store.ProxyMessageStore.recoverNextMessages(ProxyMessageStore.java:88)
>   at org.apache.activemq.broker.region.cursors.QueueStorePrefetch.doFillBatch(QueueStorePrefetch.java:97)
>   at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:262)
>                 
> 	- locked <0x00000000ad06ca48> (a \
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch)  at \
> org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:110)
>   at org.apache.activemq.broker.region.cursors.StoreQueueCursor.reset(StoreQueueCursor.java:157)
>                 
> 	- locked <0x00000000ad06ca00> (a \
> org.apache.activemq.broker.region.cursors.StoreQueueCursor)  at \
> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1678)  at \
> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1898)  at \
>                 org.apache.activemq.broker.region.Queue.iterate(Queue.java:1425)
> 	- locked <0x00000000ad06d010> (a org.apache.activemq.broker.region.Queue$3)
> 	at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> 	at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>   at java.lang.Thread.run(Thread.java:662)
> {noFormat}
> During these three minutes the producer was blocked. This situation is illustrated \
> by the stack of the producer Thread: {noFormat}
> "RMI TCP Connection(13)-10.0.0.42" daemon prio=10 tid=0x0000000041cca800 nid=0x1716 \
>                 waiting on condition [0x00007f2b2b7f3000]
> java.lang.Thread.State: WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x00000000fae32cd0> (a \
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)  at \
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)  at \
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>   at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
> 	at org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
> 	at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87)
>   at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1276)
>   at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1760)
> 	- locked <0x00000000fae31548> (a java.lang.Object)
> 	at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
>   at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
> 	- locked <0x00000000fae31068> (a org.apache.activemq.ActiveMQMessageProducer)
> 	at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:55)
> 	at com.subshell.sophora.server.application.replication.Master.sendMessage(Master.java:117)
>   at com.subshell.sophora.server.application.replication.Master.sendServerEvent(Master.java:79)
>   at com.subshell.sophora.server.application.replication.ReplicationMaster.consumeEvent(ReplicationMaster.java:100)
>   at com.subshell.sophora.server.application.manager.impl.ObservationManager.afterCompletion(ObservationManager.java:74)
>   at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCompletion(TransactionSynchronizationUtils.java:168)
>   at org.springframework.transaction.support.AbstractPlatformTransactionManager.invokeAfterCompletion(AbstractPlatformTransactionManager.java:996)
>   at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCompletion(AbstractPlatformTransactionManager.java:971)
>   at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:799)
>   at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:723)
>   at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:393)
>   at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:120)
>   at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
>   at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:80)
>   at com.subshell.sophora.commons.profile.Profiler.profile(Profiler.java:130)
> 	at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>   at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:621)
>   at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:610)
>   at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:65)
>   at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
>   at org.springframework.aop.aspectj.AspectJAfterAdvice.invoke(AspectJAfterAdvice.java:42)
>   at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:161)
>   at org.springframework.aop.framework.adapter.MethodBeforeAdviceInterceptor.invoke(MethodBeforeAdviceInterceptor.java:50)
>   at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:161)
>   at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:90)
>   at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
>   at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
>   at $Proxy37.saveDocument(Unknown Source)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>   at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)
>   at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
>   at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
>   at com.subshell.sophora.server.remoting.RemoteInvocationTraceInterceptor.invoke(RemoteInvocationTraceInterceptor.java:35)
>   at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
>   at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
>   at $Proxy40.saveDocument(Unknown Source)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>   at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.springframework.remoting.support.RemoteInvocation.invoke(RemoteInvocation.java:205)
>   at org.springframework.remoting.support.DefaultRemoteInvocationExecutor.invoke(DefaultRemoteInvocationExecutor.java:38)
>   at org.springframework.remoting.support.RemoteInvocationBasedExporter.invoke(RemoteInvocationBasedExporter.java:78)
>   at org.springframework.remoting.rmi.RmiBasedExporter.invoke(RmiBasedExporter.java:73)
>   at org.springframework.remoting.rmi.RmiInvocationWrapper.invoke(RmiInvocationWrapper.java:72)
>   at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>   at java.lang.reflect.Method.invoke(Method.java:597)
> 	at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:305)
> 	at sun.rmi.transport.Transport$1.run(Transport.java:159)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at sun.rmi.transport.Transport.serviceCall(Transport.java:155)
> 	at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:535)
> 	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:790)
>   at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:649)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>   at java.lang.Thread.run(Thread.java:662)
> {noFormat}
> After same research it turned out, that Derby writes multiple gigabyte of temporary \
> files in its tmp directory during the select statement. Afterwards these temporary \
> files are deleted and rebuild when the next messages are requested from the message \
> store. The problem is the PreparedStatement that is used for the select statement. \
> When I changed the PreparedStatement to a normal Statement the execution time got \
> lower than 10ms.  In the attachment you can find a patch, how I fixed this issue. 
> Furthermore you can find a Test-Program (you can import it as a Maven-Eclipse \
> project) which reproduces this issue.  This program creates a derby database with a \
> the activemq Message table and fills it with data (2000 Messages, this can take \
> several hours). I assumed 3MB for one message as our messages contain same binary \
> data and has an average size of 3MB. After the database is built one time a \
> Statement and one time a PreparedStatment is used to retrieve the next messages. \
> The Statement takes about 1 ms and the PreparedStatement about 258805 ms. Also the \
> second PreparedStatement takes as much time. Here are the temporary files created \
> by derby during the select Statement: {noFormat}
> insgesamt 5,4G
> 287M 2011-12-23 09:48 T1324630116288.tmp
> 287M 2011-12-23 09:49 T1324630116289.tmp
> 287M 2011-12-23 09:49 T1324630116290.tmp
> 287M 2011-12-23 09:49 T1324630116291.tmp
> 287M 2011-12-23 09:49 T1324630116292.tmp
> 287M 2011-12-23 09:49 T1324630116293.tmp
> 287M 2011-12-23 09:50 T1324630116294.tmp
> 287M 2011-12-23 09:50 T1324630116295.tmp
> 287M 2011-12-23 09:50 T1324630116296.tmp
> 287M 2011-12-23 09:50 T1324630116297.tmp
> 287M 2011-12-23 09:51 T1324630116298.tmp
> 287M 2011-12-23 09:51 T1324630116299.tmp
> 287M 2011-12-23 09:51 T1324630116300.tmp
> 287M 2011-12-23 09:51 T1324630116301.tmp
> 287M 2011-12-23 09:51 T1324630116302.tmp
> 287M 2011-12-23 09:52 T1324630116303.tmp
> 287M 2011-12-23 09:52 T1324630116304.tmp
> 287M 2011-12-23 09:52 T1324630116305.tmp
> 284M 2011-12-23 09:52 T1324630116306.tmp
> {noFormat}
> The exeuction of a cleanup (DefaultJDBCAdapter.doDeleteOldMessages) also took about \
> 6 Minutes. But in this case I patched activemq not to do a cleanup at all, as our \
> messages don't expire. I'm almost sure that this issue can be observed in other \
> methodes of the DefaultJDBCAdapter. But for us it was sufficient to fix it in the \
> recovering of messages.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic