[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-dev
Subject: Re: ActiveMQ Redelivery not working with new transaction in consumer
From: Timothy Bish <tabish121 () gmail ! com>
Date: 2020-03-28 19:13:22
Message-ID: 3e6f68fb-5df9-bdc6-b0f0-ae44fa2ba149 () gmail ! com
[Download RAW message or body]
Please direct support questions to the ActiveMQ Users mailing list as
this list if for discussion of development of the broker itself.
On 3/28/20 5:17 AM, vedion wrote:
> Hi,
>
> I have a ActiveMQ where I have setup Redelivery on the client side. With a
> simple consumer it works as expected with the below configurations:
>
> ```
> import org.apache.activemq.ActiveMQXAConnectionFactory;
> import org.apache.activemq.RedeliveryPolicy;
> import org.springframework.boot.jta.atomikos.AtomikosConnectionFactoryBean;
> import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
> import org.springframework.jms.config.JmsListenerContainerFactory;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.listener.DefaultMessageListenerContainer;
>
> ...
> @Bean
> public ConnectionFactory atomikosConnectionFactoryBean() {
> String mqUrl = System.getenv("MQ_URL");
> AtomikosConnectionFactoryBean atomikos = new
> AtomikosConnectionFactoryBean();
> atomikos.setLocalTransactionMode(false);
> atomikos.setMaxPoolSize(10);
> atomikos.setUniqueResourceName("QUEUE_BROKER");
>
> RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
> redeliveryPolicy.setMaximumRedeliveries(4);
> redeliveryPolicy.setBackOffMultiplier(10);
> redeliveryPolicy.setRedeliveryDelay(1000L);
> redeliveryPolicy.setInitialRedeliveryDelay(1000L);
> redeliveryPolicy.setUseExponentialBackOff(true);
> redeliveryPolicy.setMaximumRedeliveryDelay(86400000L);
> ActiveMQXAConnectionFactory xaConnectionFactoryBean = new
> ActiveMQXAConnectionFactory(System.getenv("MQ_USERNAME"),
> System.getenv("MQ_PASSWORD"), mqUrl);
> xaConnectionFactoryBean.setRedeliveryPolicy(redeliveryPolicy);
> xaConnectionFactoryBean.setNonBlockingRedelivery(true);
> atomikos.setXaConnectionFactory(xaConnectionFactoryBean);
> return atomikos;
> }
>
> @Bean
> public JmsListenerContainerFactory<?>
> jmsListenerContainerFactory(ConnectionFactory connectionFactory,
> DefaultJmsListenerContainerFactoryConfigurer configurer) {
> DefaultJmsListenerContainerFactory factory = new
> DefaultJmsListenerContainerFactory();
> configurer.configure(factory, connectionFactory);
> factory.setErrorHandler(new EHealthEventErrorHandler());
> factory.setMessageConverter(jacksonJmsMessageConverter());
>
> factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
> factory.setDestinationResolver(new EHealthDestinationResolver());
> factory.setSessionTransacted(true);
> return factory;
> }
>
> @Bean(autowire = Autowire.BY_TYPE)
> public JmsTemplate jmsTemplate() {
> JmsTemplate jmsTemplate = new
> JmsTemplate(atomikosConnectionFactoryBean());
> jmsTemplate.setDestinationResolver(new
> EHealthDestinationResolver());
> jmsTemplate.setSessionTransacted(true);
> return jmsTemplate;
> }
> ...
> ```
>
> ```
> import org.springframework.jms.annotation.JmsListener;
> import org.springframework.transaction.annotation.Transactional;
>
> @Transactional
> @JmsListener(destination = "XXX")
> public void onMessageReceived(XXXEvent event) {
> throw new Exception();
> }
> ```
>
>
> So the above works as expected and the message is redelivered with the
> ExponentialBackOff strategy.
>
> BUT it goes sideways when the message consumer (onMessageReceived) calls a
> method on a class that sends a message to another queue in a new
> transaction.
> Then the message is not redelivered if the exception is thrown after the new
> transaction have been committed, ex:
>
> ```
> import org.springframework.transaction.annotation.Transactional;
>
> public class FooClass {
> @Transactional(propagation = Propagation.REQUIRES_NEW)
> public void createInNewTransaction() {
> sendMessageToAnotherQueue();
> }
> }
> ```
>
> ```
> import org.springframework.jms.annotation.JmsListener;
> import org.springframework.transaction.annotation.Transactional;
>
> @Transactional
> @JmsListener(destination = "Foo")
> public void onMessageReceived(FooEvent event) {
> fooClass.createInNewTransaction();
> throw new Exception();
> }
> ```
>
>
> In the stacktrace below it is seen that the
> org.apache.activemq.TransactionContext.synchronizations are nulled when
> sending the message in the new transaction. The
> TransactionContext.synchronizations contains the ActiveMQMessageConsumer
> that is used to receive the message and is needed for the redelivery after
> the exception is thrown. When this is cleared the message is not
> redelivered:
> <http://activemq.2283324.n4.nabble.com/file/t379855/Sync_nulled.png>
>
> ```
> private void afterRollback() throws JMSException {
> if (synchronizations == null) {
> return;
> }
> ...
> }
> ```
>
>
> It is the method
> com.atomikos.datasource.xa.session.BranchEnlistedStateHandler.checkEnlistBeforeUse()
> that detects that the transaction context is different and throws an
> exception that is catched in SessionHandleState.notifyBeforeUse():
> ```
> TransactionContextStateHandler checkEnlistBeforeUse ( CompositeTransaction
> currentTx)
> throws InvalidSessionHandleStateException,
> UnexpectedTransactionContextException
> {
>
> if ( currentTx == null || !currentTx.isSameTransaction ( ct ) ) {
> //OOPS! we are being used a different tx context than the one expected...
>
> //TODO check: what if subtransaction? Possible solution: ignore if
> serial_jta mode, error otherwise.
>
> String msg = "The connection/session object is already enlisted in a
> (different) transaction.";
> if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( msg );
> throw new UnexpectedTransactionContextException();
> }
>
> //tx context is still the same -> no change in state required
> return null;
> }
> ```
>
>
> Then a new context is created and currentContext.checkEnlistBeforeUse(ct) is
> called which ends up clearing the TransactionContext.synchronizations
>
> There is a comment in BranchEnlistedStateHandler.checkEnlistBeforeUse():
> "//TODO check: what if subtransaction? Possible solution: ignore if
> serial_jta mode, error otherwise."
>
> I have a subtransaction and have
> "com.atomikos.icatch.serial_jta_transactions" set to true. Am I just unlucky
> to have hit something that is not supported yet?
>
>
> Versions used: "org.springframework:spring-jms:5.1.10.RELEASE",
> "com.atomikos:transactions:5.0.3",
> "org.apache.activemq:activemq-client:5.15.10"
>
> Have tried to bump to newest versions, but didn't make a difference.
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dev-f2368404.html
--
Tim Bish
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic