[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-dev
Subject:    [jira] [Assigned] (AMQNET-474) DTC Consumer is forcibly closed if a transaction is in progress and c
From:       "Timothy Bish (JIRA)" <jira+amqnet () apache ! org>
Date:       2014-08-29 15:30:53
Message-ID: JIRA.12702371.1395233403773.2656.1409326253731 () arcas
[Download RAW message or body]


     [ https://issues.apache.org/jira/browse/AMQNET-474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel \
]

Timothy Bish reassigned AMQNET-474:
-----------------------------------

    Assignee: Timothy Bish  (was: Jim Gomes)

> DTC Consumer is forcibly closed if a transaction is in progress and connection to \
>                 the broker is interrupted
> -----------------------------------------------------------------------------------------------------------
>  
> Key: AMQNET-474
> URL: https://issues.apache.org/jira/browse/AMQNET-474
> Project: ActiveMQ .Net
> Issue Type: Bug
> Components: ActiveMQ
> Affects Versions: 1.6.2
> Reporter: Imran
> Assignee: Timothy Bish
> Attachments: NetTxTransactionContext.cs.patch
> 
> 
> DTC Consumer is forcibly closed if a transaction is in progress and the connection \
> to the broker is interrupted. This behavior is different to non DTC consumers. This \
> happens with a fail over connection specified which is not the correct behavior as \
> you would expect the fail over feature to reestablish the connection on behalf of \
> the client. {code}
> using System;
> using System.ServiceProcess;
> using System.Transactions;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Tests.Jira.DistributedTransaction
> {
> [TestFixture]
> public class BrokerRestartAndFailover
> {
> [Test, Explicit("After a broker restart the consumer is forcibly closed. This is \
> not desirable as this behaviour is different to non dtc consumers.")] public void \
> Should_rediliver_message_after_broker_restart() {
> SendMessageToQueue("1");
> var session = _connection.CreateSession(AcknowledgementMode.Transactional);
> var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, \
> InQueue)); var transaction = new \
> TransactionScope(TransactionScopeOption.RequiresNew); \
> consumer.Receive(TimeSpan.FromSeconds(1)); StopService(ActiveMqMaster);
> StartService(ActiveMqMaster);
> transaction.Complete();
> transaction.Dispose();
> //try a few times to drain the queue
> var messageRedilivered = 0;
> for (var i = 0; i < 2; i++)
> {
> transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
> try
> {
> var message = consumer.Receive(TimeSpan.FromSeconds(1));
> transaction.Complete();
> if (message != null)
> messageRedilivered++;
> }
> catch (Exception ex)
> {
> LogManager.GetCurrentClassLogger().Error(ex);
> }
> finally
> {
> transaction.Dispose();
> }
> }
> Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
> Assert.That(messageRedilivered, Is.EqualTo(1));
> }
> public int CountMessagesInQueue(string queue)
> {
> var factory = new ConnectionFactory(ConnectionString)
> {
> AcknowledgementMode = AcknowledgementMode.Transactional
> };
> 
> var count = 0;
> using (var connection = factory.CreateConnection())
> using (var session = connection.CreateSession())
> using (var consumer = session.CreateConsumer(SessionUtil.GetQueue(session, queue)))
> {
> connection.Start();
> while (true)
> {
> var message = consumer.Receive(TimeSpan.FromSeconds(1));
> if (message == null)
> break;
> count++;
> }
> }
> return count;
> }
> private void DeleteQueue(string queue)
> {
> using (var session = _connection.CreateSession())
> {
> SessionUtil.DeleteDestination(session, queue);
> }
> }
> private void SendMessageToQueue(string message)
> {
> using (var session = _connection.CreateSession())
> using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, \
> InQueue))) using (var scope = new \
> TransactionScope(TransactionScopeOption.RequiresNew)) {
> producer.Send(producer.CreateTextMessage(message));
> scope.Complete();
> }
> Log.Debug("Primed Input Queue");
> }
> private void StartService(ServiceController service)
> {
> if(service.Status != ServiceControllerStatus.Running)
> service.Start();
> service.WaitForStatus(ServiceControllerStatus.Running);
> }
> private void StopService(ServiceController service)
> {
> if (service.Status != ServiceControllerStatus.Stopped)
> service.Stop();
> service.WaitForStatus(ServiceControllerStatus.Stopped);
> }
> [SetUp]
> public void TestSetup()
> {
> LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, \
> true, "HH:MM:ss"); StartService(ActiveMqMaster);
> StopService(ActiveMqSlave);
> _connectionFactory = new NetTxConnectionFactory(ConnectionString)
> {
> AcknowledgementMode = AcknowledgementMode.Transactional,
> RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 10, \
> MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, \
> DispatchAsync = true, AsyncSend = false,
> PrefetchPolicy = new PrefetchPolicy { All = 10 },
> };
> _connection = _connectionFactory.CreateConnection();
> _connection.ConnectionInterruptedListener += () => \
> LogManager.GetCurrentClassLogger().Debug("Connection interrupted"); \
> _connection.ConnectionResumedListener += () => \
> LogManager.GetCurrentClassLogger().Debug("Connection resumed"); \
> _connection.ExceptionListener += ex => \
> LogManager.GetCurrentClassLogger().ErrorFormat("Connection exception: '{0}'", \
> ex.ToString()); _connection.Start();
> DeleteQueue(InQueue);
> DeleteQueue(OutQueue);
> }
> [TearDown]
> public void TestTeardown()
> {
> StartService(ActiveMqMaster);
> StopService(ActiveMqSlave);
> }
> private const string ConnectionString = @"failover:(tcp://localhost:61616)";
> protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ");
> protected ServiceController ActiveMqSlave = new \
> ServiceController(@"ActiveMQSlave"); private IConnection _connection;
> private const string InQueue = "in-q";
> private const string OutQueue = "out-q";
> private static readonly ILog Log = \
> LogManager.GetLogger(typeof(BrokerRestartAndFailover).Name); private \
> NetTxConnectionFactory _connectionFactory; }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic