[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-dev
Subject:    [jira] [Updated] (AMQNET-471) Synchronous message consumer will lose a message that failed to commit
From:       "Timothy Bish (JIRA)" <jira+amqnet () apache ! org>
Date:       2014-08-28 14:33:09
Message-ID: JIRA.12696494.1392979442999.543.1409236389180 () arcas
[Download RAW message or body]


     [ https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel \
]

Timothy Bish updated AMQNET-471:
--------------------------------

    Fix Version/s: 1.7.0
                   1.6.4

> Synchronous message consumer will lose a message that failed to commit whilst the \
>                 broker was unavailable
> --------------------------------------------------------------------------------------------------------
>  
> Key: AMQNET-471
> URL: https://issues.apache.org/jira/browse/AMQNET-471
> Project: ActiveMQ .Net
> Issue Type: Bug
> Components: ActiveMQ
> Affects Versions: 1.6.2
> Reporter: Imran
> Assignee: Timothy Bish
> Fix For: 1.6.4, 1.7.0
> 
> Attachments: TransactionContext.cs.patch
> 
> 
> If the broker is down then the client can not commit the current message. An \
> exception is thrown by the library. This is the behavior you would expect. If you \
> then try and rollback the transaction on the session due to the exception and \
> resume message consumption, the rolled back message will never be redelivered. \
> {code:title=Failing Test|borderStyle=solid}  [TestFixture, Explicit]
> public class BrokerRestart
> {
> [Test]
> public void Message_should_be_redilivered_if_broker_is_down_and_try_commit()
> {
> StartService(ActiveMqMaster);
> DeleteQueue();
> SendMessageToQueue();
> var session = _connection.CreateSession(AcknowledgementMode.Transactional);
> var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQ));
> var message = consumer.Receive(TimeSpan.FromSeconds(30));
> _log.Debug("Received message");
> StopService(ActiveMqMaster);
> _log.Debug("Commiting transaction");
> try
> {
> session.Commit();
> }
> catch (Exception ex)
> {
> _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250));
> try
> {
> session.Rollback();
> }
> catch (Exception einner)
> {
> _log.Debug("Rollback transaction");
> _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0, 250));
> }
> }
> StartService(ActiveMqMaster);
> message = consumer.Receive(TimeSpan.FromSeconds(30));
> Assert.That(message, Is.Not.Null, "message was not redilivered");
> }
> private void StartService(ServiceController service)
> {
> if (service.Status != ServiceControllerStatus.Running)
> service.Start();
> service.WaitForStatus(ServiceControllerStatus.Running);
> _log.Debug("Started Broker");
> }
> private void StopService(ServiceController service)
> {
> if (service.Status != ServiceControllerStatus.Stopped)
> service.Stop();
> service.WaitForStatus(ServiceControllerStatus.Stopped);
> _log.Debug("Stopped Broker Broker");
> }
> private void SendMessageToQueue()
> {
> using (var session = _connection.CreateSession())
> using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, \
> InQ))) {
> producer.Send(producer.CreateTextMessage(1.ToString()));
> session.Commit();
> }
> _log.Debug("Primed Input Queue");
> }
> private void DeleteQueue()
> {
> using (var session = _connection.CreateSession())
> {
> SessionUtil.DeleteDestination(session, InQ);
> }
> }
> [SetUp]
> public void TestSetup()
> {
> LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, \
> true, "HH:MM:ss"); _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
> var factory = new ConnectionFactory(@"failover:(tcp://localhost:61616)")
> {
> AcknowledgementMode = AcknowledgementMode.Transactional,
> RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, \
> MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, \
> AsyncSend = false, PrefetchPolicy = new PrefetchPolicy {All = 5}
> };
> _connection = factory.CreateConnection();
> _connection.Start();
> //Tracer.Trace = new CommonLoggingTraceAdapter();
> }
> protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ");
> //protected ServiceController ActiveMqSlave = new \
> ServiceController(@"ActiveMQSlave"); private IConnection _connection;
> private const string InQ = "integration-test-q";
> private ILog _log;
> }
> {code}
> {code:title=Passing Test With Patch|borderStyle=solid} 
> using System;
> using System.Configuration;
> using System.ServiceProcess;
> using System.Threading;
> using System.Threading.Tasks;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Jira
> {
> [TestFixture]
> public class BrokerRestart
> {
> //AMQNET-471
> [Test]
> public void Message_should_be_redilivered_if_broker_is_down_and_try_to_commit()
> {
> var session = _connection.CreateSession(AcknowledgementMode.Transactional);
> var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, \
> InQueue)); SendMessageToQueue();
> consumer.Receive(TimeSpan.FromSeconds(5));
> StopService(ActiveMqMaster);
> var commiter = TryCommit(session);
> StartService(ActiveMqMaster);
> commiter.Wait();
> var message = consumer.Receive(TimeSpan.FromSeconds(5));
> TryCommit(session).Wait();
> Assert.That(message, Is.Not.Null, "message was not redilivered");
> Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(0));
> }
> //Commit blocks if the broker is down with the patch for AMQNET-471
> private Task TryCommit(ISession session)
> {
> var task = Task.Factory.StartNew(() =>
> {
> try
> {
> session.Commit();
> }
> catch (Exception ex)
> {
> _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250));
> try
> {
> session.Rollback();
> }
> catch (Exception einner)
> {
> _log.Debug("Rollback transaction");
> _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0, 250));
> }
> }
> });
> //Give it a chance to start.
> Thread.Sleep(1000);
> return task;
> }
> private int CountMessagesInQueue(string queue)
> {
> var count = 0;
> using (var session = _connection.CreateSession(AcknowledgementMode.Transactional))
> using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, \
> queue))) {
> while (true)
> {
> var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
> if (msg == null)
> break;
> count++;
> }
> }
> return count;
> }
> private void StartService(ServiceController service)
> {
> if (service.Status != ServiceControllerStatus.Running || service.Status == \
> ServiceControllerStatus.StartPending) service.Start();
> service.WaitForStatus(ServiceControllerStatus.Running);
> _log.Debug("Started Broker");
> }
> private void StopService(ServiceController service)
> {
> if (service.Status != ServiceControllerStatus.Stopped)
> service.Stop();
> service.WaitForStatus(ServiceControllerStatus.Stopped);
> _log.Debug("Stopped Broker");
> }
> private void SendMessageToQueue()
> {
> using (var session = _connection.CreateSession())
> using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, \
> InQueue))) {
> producer.Send(producer.CreateTextMessage(1.ToString()));
> session.Commit();
> }
> _log.Debug("Primed Input Queue");
> }
> private void DeleteQueue(string queue)
> {
> using (var session = _connection.CreateSession())
> {
> SessionUtil.DeleteDestination(session, queue);
> }
> }
> [SetUp]
> public void TestSetup()
> {
> LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, \
> true, "HH:MM:ss"); _log = LogManager.GetLogger(typeof(BrokerRestart).Name);
> StartService(ActiveMqMaster);
> _factory = new ConnectionFactory(ActiveMqConnectionString)
> {
> AcknowledgementMode = AcknowledgementMode.Transactional,
> RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, \
> MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, \
> AsyncSend = false, PrefetchPolicy = new PrefetchPolicy { All = 5 }
> };
> _connection = _factory.CreateConnection();
> _log.Debug("Starting connection");
> _connection.Start();
> _log.Debug("Connection established");
> DeleteQueue(InQueue);
> DeleteQueue(OutQueue);
> //Tracer.Trace = new CommonLoggingTraceAdapter();
> }
> [TearDown]
> public void TestTearDown()
> {
> _connection.Dispose();
> }
> protected ServiceController ActiveMqMaster = new \
> ServiceController(ActiveMasterServiceName, ActiveMqMachineName); protected \
> ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, \
> ActiveMqMachineName); private static readonly string ActiveMqMachineName = \
> ConfigurationManager.AppSettings["ActiveMqServiceMachineName"]; private static \
> readonly string ActiveMqConnectionString = \
> ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString; private \
> static readonly string ActiveMasterServiceName = \
> ConfigurationManager.AppSettings["ActiveMqMasterName"]; private static readonly \
> string ActiveMqSlaveServiceName = \
> ConfigurationManager.AppSettings["ActiveMqSlaveName"]; private IConnection \
> _connection; private const string InQueue = "integration-test-q-in";
> private const string OutQueue = "integration-test-q-out";
> private ILog _log;
> private ConnectionFactory _factory;
> }
> }
> {code}
> {code:title=Java Client|borderStyle=solid} 
> import static org.junit.Assert.*;
> import org.junit.Test;
> import org.apache.activemq.*;
> import javax.jms.*;
> import javax.jms.Message;
> public class BrokerRestart {
> 	@Test
> 	public void test() throws Exception {
> 		String SERVICE_NAME = "ActiveMQ";
> 		String[] stop = {"cmd.exe", "/c", "sc", "stop", SERVICE_NAME};
> 		String[] start = {"cmd.exe", "/c", "sc", "start", SERVICE_NAME};
> 		
> 		Runtime.getRuntime().exec(start);
> 		Thread.sleep(2000);
> 		ActiveMQConnectionFactory connectionFactory = new \
> ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");  \
> SendMessage(connectionFactory);  
> Connection connection = connectionFactory.createConnection();
> connection.start();
> Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
> Destination destination = session.createQueue("TEST.FOO");
> MessageConsumer consumer = session.createConsumer(destination);
> Message message = consumer.receive(1000);
> 		
> 		Runtime.getRuntime().exec(stop);
> 		Thread.sleep(2000);
> 		try
> 		{
> 			System.out.println("Committing transaction");
> 			//Looks like this blocks when the broker is down. If you start the service \
> manually here, the test will pass.  session.commit();
> 			System.out.println("Committed transaction");
> 		}
> 		catch(Exception e)
> 		{
> 			try
> 			{
> 				System.out.println("Transaction commit exception: " + e.toString());
> 				session.rollback();
> 			}
> 			catch(Exception e2)
> 			{
> 				System.out.println("Transaction rollback exception: " + e2.toString());
> 			}
> 		}
> 		Runtime.getRuntime().exec(start);
> 		Thread.sleep(5000);
> message = consumer.receive(1000);
> session.commit();
> consumer.close();
> session.close();
> connection.close();
> 
> assertNotNull(message);
> 	}
> 	
> 	private static void SendMessage(ActiveMQConnectionFactory connectionFactory) \
> throws Exception  {
> Connection connection = connectionFactory.createConnection();
> connection.start();
> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> Destination destination = session.createQueue("TEST.FOO");
> MessageProducer producer = session.createProducer(destination);
> producer.setDeliveryMode(DeliveryMode.PERSISTENT);
> String text = "Hello world! From: " + Thread.currentThread().getName();
> TextMessage message = session.createTextMessage(text);
> producer.send(message);
> session.close();
> connection.close();
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic