[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-dev
Subject:    Re: ActiveMQ Redelivery not working with new transaction in consumer
From:       Timothy Bish <tabish121 () gmail ! com>
Date:       2020-03-28 19:13:22
Message-ID: 3e6f68fb-5df9-bdc6-b0f0-ae44fa2ba149 () gmail ! com
[Download RAW message or body]


Please direct support questions to the ActiveMQ Users mailing list as 
this list if for discussion of development of the broker itself.

On 3/28/20 5:17 AM, vedion wrote:
> Hi,
>
> I have a ActiveMQ where I have setup Redelivery on the client side. With a
> simple consumer it works as expected with the below configurations:
>
> ```
> import org.apache.activemq.ActiveMQXAConnectionFactory;
> import org.apache.activemq.RedeliveryPolicy;
> import org.springframework.boot.jta.atomikos.AtomikosConnectionFactoryBean;
> import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
> import org.springframework.jms.config.JmsListenerContainerFactory;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.listener.DefaultMessageListenerContainer;
>
> ...
>      @Bean
>      public ConnectionFactory atomikosConnectionFactoryBean() {
>          String mqUrl = System.getenv("MQ_URL");
>          AtomikosConnectionFactoryBean atomikos = new
> AtomikosConnectionFactoryBean();
>          atomikos.setLocalTransactionMode(false);
>          atomikos.setMaxPoolSize(10);
>          atomikos.setUniqueResourceName("QUEUE_BROKER");
>
>          RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
>          redeliveryPolicy.setMaximumRedeliveries(4);
>          redeliveryPolicy.setBackOffMultiplier(10);
>          redeliveryPolicy.setRedeliveryDelay(1000L);
>          redeliveryPolicy.setInitialRedeliveryDelay(1000L);
>          redeliveryPolicy.setUseExponentialBackOff(true);
>          redeliveryPolicy.setMaximumRedeliveryDelay(86400000L);
>          ActiveMQXAConnectionFactory xaConnectionFactoryBean = new
> ActiveMQXAConnectionFactory(System.getenv("MQ_USERNAME"),
> System.getenv("MQ_PASSWORD"), mqUrl);
>          xaConnectionFactoryBean.setRedeliveryPolicy(redeliveryPolicy);
>          xaConnectionFactoryBean.setNonBlockingRedelivery(true);
>          atomikos.setXaConnectionFactory(xaConnectionFactoryBean);
>          return atomikos;
>      }
>
>      @Bean
>      public JmsListenerContainerFactory<?>
> jmsListenerContainerFactory(ConnectionFactory connectionFactory,
> DefaultJmsListenerContainerFactoryConfigurer configurer) {
>          DefaultJmsListenerContainerFactory factory = new
> DefaultJmsListenerContainerFactory();
>          configurer.configure(factory, connectionFactory);
>          factory.setErrorHandler(new EHealthEventErrorHandler());
>          factory.setMessageConverter(jacksonJmsMessageConverter());
>         
> factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
>          factory.setDestinationResolver(new EHealthDestinationResolver());
>          factory.setSessionTransacted(true);
>          return factory;
>      }
>
>      @Bean(autowire = Autowire.BY_TYPE)
>      public JmsTemplate jmsTemplate() {
>          JmsTemplate jmsTemplate = new
> JmsTemplate(atomikosConnectionFactoryBean());
>          jmsTemplate.setDestinationResolver(new
> EHealthDestinationResolver());
>          jmsTemplate.setSessionTransacted(true);
>          return jmsTemplate;
>      }
>      ...
> ```
>
> ```
> import org.springframework.jms.annotation.JmsListener;
> import org.springframework.transaction.annotation.Transactional;
>
> @Transactional
> @JmsListener(destination = "XXX")
> public void onMessageReceived(XXXEvent event) {
> 	throw new Exception();
> }
> ```
>
>
> So the above works as expected and the message is redelivered with the
> ExponentialBackOff strategy.
>
> BUT it goes sideways when the message consumer (onMessageReceived) calls a
> method on a class that sends a message to another queue in a new
> transaction.
> Then the message is not redelivered if the exception is thrown after the new
> transaction have been committed, ex:
>
> ```
> import org.springframework.transaction.annotation.Transactional;
>
> public class FooClass {
>      @Transactional(propagation = Propagation.REQUIRES_NEW)
>      public void createInNewTransaction() {
>          sendMessageToAnotherQueue();
>      }
> }
> ```
>
> ```
> import org.springframework.jms.annotation.JmsListener;
> import org.springframework.transaction.annotation.Transactional;
>
> @Transactional
> @JmsListener(destination = "Foo")
> public void onMessageReceived(FooEvent event) {
> 	fooClass.createInNewTransaction();
> 	throw new Exception();
> }
> ```
>
>
> In the stacktrace below it is seen that the
> org.apache.activemq.TransactionContext.synchronizations are nulled when
> sending the message in the new transaction. The
> TransactionContext.synchronizations contains the ActiveMQMessageConsumer
> that is used to receive the message and is needed for the redelivery after
> the exception is thrown. When this is cleared the message is not
> redelivered:
> <http://activemq.2283324.n4.nabble.com/file/t379855/Sync_nulled.png>
>
> ```
> private void afterRollback() throws JMSException {
>          if (synchronizations == null) {
>              return;
>          }
> 	...
> }
> ```
>
>
> It is the method
> com.atomikos.datasource.xa.session.BranchEnlistedStateHandler.checkEnlistBeforeUse()
> that detects that the transaction context is different and throws an
> exception that is catched in SessionHandleState.notifyBeforeUse():
> ```
> TransactionContextStateHandler checkEnlistBeforeUse ( CompositeTransaction
> currentTx)
> 			throws InvalidSessionHandleStateException,
> UnexpectedTransactionContextException
> 	{
> 		
> 		if ( currentTx == null || !currentTx.isSameTransaction ( ct ) ) {
> 			//OOPS! we are being used a different tx context than the one expected...
> 			
> 			//TODO check: what if subtransaction? Possible solution: ignore if
> serial_jta mode, error otherwise.
> 			
> 			String msg = "The connection/session object is already enlisted in a
> (different) transaction.";
> 			if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( msg );
> 			throw new UnexpectedTransactionContextException();
> 		}
> 		
> 		//tx context is still the same -> no change in state required
> 		return null;
> 	}
> ```
>
>
> Then a new context is created and currentContext.checkEnlistBeforeUse(ct) is
> called which ends up clearing the TransactionContext.synchronizations
>
> There is a comment in BranchEnlistedStateHandler.checkEnlistBeforeUse():
> "//TODO check: what if subtransaction? Possible solution: ignore if
> serial_jta mode, error otherwise."
>
> I have a subtransaction and have
> "com.atomikos.icatch.serial_jta_transactions" set to true. Am I just unlucky
> to have hit something that is not supported yet?
>
>
> Versions used: "org.springframework:spring-jms:5.1.10.RELEASE",
> "com.atomikos:transactions:5.0.3",
> "org.apache.activemq:activemq-client:5.15.10"
>
> Have tried to bump to newest versions, but didn't make a difference.
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dev-f2368404.html


-- 
Tim Bish

[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic