[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r1073790 -
From:       tabish () apache ! org
Date:       2011-02-23 15:42:36
Message-ID: 20110223154237.05424238890B () eris ! apache ! org
[Download RAW message or body]

Author: tabish
Date: Wed Feb 23 15:42:36 2011
New Revision: 1073790

URL: http://svn.apache.org/viewvc?rev=1073790&view=rev
Log:
Adds a somewhat contrived test case for using async consumers with .NET transactions.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs


Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
                
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/s \
rc/test/csharp/DtcConsumerTransactionsTest.cs?rev=1073790&r1=1073789&r2=1073790&view=diff
 ==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs \
                (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs \
Wed Feb 23 15:42:36 2011 @@ -15,10 +15,14 @@
  * limitations under the License.
  */
 
+using System;
+using System.Data.SqlClient;
 using System.IO;
 using System.Threading;
+using System.Transactions;
 using Apache.NMS.ActiveMQ.Transport;
 using Apache.NMS.ActiveMQ.Transport.Tcp;
+using Apache.NMS.Util;
 using NUnit.Framework;
 
 namespace Apache.NMS.ActiveMQ.Test.src.test.csharp
@@ -296,5 +300,83 @@ namespace Apache.NMS.ActiveMQ.Test.src.t
             VerifyBrokerQueueCount();
         }
 
+        #region Asynchronous Consumer Inside of a Transaction Test / Example
+
+        private const int BATCH_COUNT = 5;
+        private int batchSequence;
+        private DependentTransaction batchTxControl;
+        private readonly ManualResetEvent awaitBatchProcessingStart = new \
ManualResetEvent(false); +
+        [Test]
+        public void TestTransactedAsyncConsumption()
+        {
+            PurgeDatabase();
+            PurgeAndFillQueue(MSG_COUNT * BATCH_COUNT);
+
+            INetTxConnectionFactory factory = new \
NetTxConnectionFactory(ReplaceEnvVar(connectionURI)); +
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            using (INetTxSession session = connection.CreateNetTxSession())
+            {
+                IQueue queue = session.GetQueue(testQueueName);
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                consumer.Listener += AsyncTxAwareOnMessage;
+
+                connection.Start();
+
+                for (int i = 0; i < BATCH_COUNT; ++i)
+                {
+                    using (TransactionScope scoped = new \
TransactionScope(TransactionScopeOption.RequiresNew)) +                    {
+                        batchTxControl = \
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete); +  \
awaitBatchProcessingStart.Set(); +                        scoped.Complete();
+                    }                    
+                }
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull(MSG_COUNT * BATCH_COUNT);
+
+            // check messages are NOT present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        private void AsyncTxAwareOnMessage(IMessage message)
+        {
+            awaitBatchProcessingStart.WaitOne();
+
+            try
+            {
+                using (TransactionScope scoped = new \
TransactionScope(batchTxControl)) +                using (SqlConnection sqlConnection \
= new SqlConnection(sqlConnectionString)) +                using (SqlCommand \
sqlInsertCommand = new SqlCommand()) +                {
+                    sqlConnection.Open();
+                    sqlInsertCommand.Connection = sqlConnection;
+
+                    ITextMessage textMessage = message as ITextMessage;
+                    Assert.IsNotNull(message, "missing message");
+                    sqlInsertCommand.CommandText =
+                        string.Format("INSERT INTO {0} VALUES ({1})", testTable, \
Convert.ToInt32(textMessage.Text)); +                    \
sqlInsertCommand.ExecuteNonQuery(); +                    scoped.Complete();
+                }
+
+                if(++batchSequence == MSG_COUNT)
+                {
+                    batchSequence = 0;
+                    awaitBatchProcessingStart.Reset();
+                    batchTxControl.Complete();
+                }
+            }
+            catch (Exception e)
+            {
+                Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+                Tracer.Debug(e.ToString());
+            }
+        }
+
+        #endregion
     }
 }


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic