[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r1073790 -
From: tabish () apache ! org
Date: 2011-02-23 15:42:36
Message-ID: 20110223154237.05424238890B () eris ! apache ! org
[Download RAW message or body]
Author: tabish
Date: Wed Feb 23 15:42:36 2011
New Revision: 1073790
URL: http://svn.apache.org/viewvc?rev=1073790&view=rev
Log:
Adds a somewhat contrived test case for using async consumers with .NET transactions.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/s \
rc/test/csharp/DtcConsumerTransactionsTest.cs?rev=1073790&r1=1073789&r2=1073790&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs \
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs \
Wed Feb 23 15:42:36 2011 @@ -15,10 +15,14 @@
* limitations under the License.
*/
+using System;
+using System.Data.SqlClient;
using System.IO;
using System.Threading;
+using System.Transactions;
using Apache.NMS.ActiveMQ.Transport;
using Apache.NMS.ActiveMQ.Transport.Tcp;
+using Apache.NMS.Util;
using NUnit.Framework;
namespace Apache.NMS.ActiveMQ.Test.src.test.csharp
@@ -296,5 +300,83 @@ namespace Apache.NMS.ActiveMQ.Test.src.t
VerifyBrokerQueueCount();
}
+ #region Asynchronous Consumer Inside of a Transaction Test / Example
+
+ private const int BATCH_COUNT = 5;
+ private int batchSequence;
+ private DependentTransaction batchTxControl;
+ private readonly ManualResetEvent awaitBatchProcessingStart = new \
ManualResetEvent(false); +
+ [Test]
+ public void TestTransactedAsyncConsumption()
+ {
+ PurgeDatabase();
+ PurgeAndFillQueue(MSG_COUNT * BATCH_COUNT);
+
+ INetTxConnectionFactory factory = new \
NetTxConnectionFactory(ReplaceEnvVar(connectionURI)); +
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ using (INetTxSession session = connection.CreateNetTxSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ consumer.Listener += AsyncTxAwareOnMessage;
+
+ connection.Start();
+
+ for (int i = 0; i < BATCH_COUNT; ++i)
+ {
+ using (TransactionScope scoped = new \
TransactionScope(TransactionScopeOption.RequiresNew)) + {
+ batchTxControl = \
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete); + \
awaitBatchProcessingStart.Set(); + scoped.Complete();
+ }
+ }
+ }
+
+ // verify sql server has commited the transaction
+ VerifyDatabaseTableIsFull(MSG_COUNT * BATCH_COUNT);
+
+ // check messages are NOT present in the queue
+ VerifyNoMessagesInQueue();
+ }
+
+ private void AsyncTxAwareOnMessage(IMessage message)
+ {
+ awaitBatchProcessingStart.WaitOne();
+
+ try
+ {
+ using (TransactionScope scoped = new \
TransactionScope(batchTxControl)) + using (SqlConnection sqlConnection \
= new SqlConnection(sqlConnectionString)) + using (SqlCommand \
sqlInsertCommand = new SqlCommand()) + {
+ sqlConnection.Open();
+ sqlInsertCommand.Connection = sqlConnection;
+
+ ITextMessage textMessage = message as ITextMessage;
+ Assert.IsNotNull(message, "missing message");
+ sqlInsertCommand.CommandText =
+ string.Format("INSERT INTO {0} VALUES ({1})", testTable, \
Convert.ToInt32(textMessage.Text)); + \
sqlInsertCommand.ExecuteNonQuery(); + scoped.Complete();
+ }
+
+ if(++batchSequence == MSG_COUNT)
+ {
+ batchSequence = 0;
+ awaitBatchProcessingStart.Reset();
+ batchTxControl.Complete();
+ }
+ }
+ catch (Exception e)
+ {
+ Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+ Tracer.Debug(e.ToString());
+ }
+ }
+
+ #endregion
}
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic