[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r799407 [29/29] - in
From: tabish () apache ! org
Date: 2009-07-30 19:06:44
Message-ID: 20090730190700.42C342388A19 () eris ! apache ! org
[Download RAW message or body]
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=799407&r1=799406&r2=799407&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs \
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Thu \
Jul 30 19:06:34 2009 @@ -23,610 +23,608 @@
namespace Apache.NMS.ActiveMQ
{
- /// <summary>
- /// Default provider of ISession
- /// </summary>
- public class Session : ISession
- {
- /// <summary>
- /// Private object used for synchronization, instead of public "this"
- /// </summary>
- private readonly object myLock = new object();
- private int consumerCounter;
- private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
- private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
- private readonly DispatchingThread dispatchingThread;
- private DispatchingThread.ExceptionHandler dispatchingThread_ExceptionHandler;
- private readonly SessionInfo info;
- private int producerCounter;
- internal bool startedAsyncDelivery = false;
- private bool disposed = false;
- private bool closed = false;
- private bool closing = false;
- private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
-
- public Session(Connection connection, SessionInfo info, AcknowledgementMode \
acknowledgementMode)
- {
- this.connection = connection;
- this.info = info;
- this.acknowledgementMode = acknowledgementMode;
- this.AsyncSend = connection.AsyncSend;
- this.requestTimeout = connection.RequestTimeout;
- this.PrefetchSize = 1000;
- this.transactionContext = new TransactionContext(this);
- this.dispatchingThread = new DispatchingThread(new \
DispatchingThread.DispatchFunction(DispatchAsyncMessages));
- this.dispatchingThread_ExceptionHandler = new \
DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
- }
-
- ~Session()
- {
- Dispose(false);
- }
-
- /// <summary>
- /// Sets the prefetch size, the maximum number of messages a broker will dispatch \
to consumers
- /// until acknowledgements are received.
- /// </summary>
- public int PrefetchSize;
-
- /// <summary>
- /// Sets the maximum number of messages to keep around per consumer
- /// in addition to the prefetch window for non-durable topics until messages
- /// will start to be evicted for slow consumers.
- /// Must be > 0 to enable this feature
- /// </summary>
- public int MaximumPendingMessageLimit;
-
- /// <summary>
- /// Enables or disables whether asynchronous dispatch should be used by the broker
- /// </summary>
- public bool DispatchAsync;
-
- /// <summary>
- /// Enables or disables exclusive consumers when using queues. An exclusive \
consumer means
- /// only one instance of a consumer is allowed to process messages on a queue to \
preserve order
- /// </summary>
- public bool Exclusive;
-
- /// <summary>
- /// Enables or disables retroactive mode for consumers; i.e. do they go back in \
time or not?
- /// </summary>
- public bool Retroactive;
-
- /// <summary>
- /// Sets the default consumer priority for consumers
- /// </summary>
- public byte Priority;
-
- /// <summary>
- /// This property indicates whether or not async send is enabled.
- /// </summary>
- public bool AsyncSend;
-
- private Connection connection;
- public Connection Connection
- {
- get { return this.connection; }
- }
-
- public SessionId SessionId
- {
- get { return info.SessionId; }
- }
-
- private TransactionContext transactionContext;
- public TransactionContext TransactionContext
- {
- get { return this.transactionContext; }
- }
-
- #region ISession Members
-
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- protected void Dispose(bool disposing)
- {
- if(this.disposed)
- {
- return;
- }
-
- if(disposing)
- {
- // Dispose managed code here.
- }
-
- try
- {
- Close();
- }
- catch
- {
- // Ignore network errors.
- }
-
- this.disposed = true;
- }
-
- public void Close()
- {
- lock(myLock)
- {
- if(this.closed)
- {
- return;
- }
-
- try
- {
- this.closing = true;
- StopAsyncDelivery();
- lock(consumers.SyncRoot)
- {
- foreach(MessageConsumer consumer in consumers.Values)
- {
- consumer.Close();
- }
- }
- consumers.Clear();
-
- lock(producers.SyncRoot)
- {
- foreach(MessageProducer producer in producers.Values)
- {
- producer.Close();
- }
- }
- producers.Clear();
+ /// <summary>
+ /// Default provider of ISession
+ /// </summary>
+ public class Session : ISession
+ {
+ /// <summary>
+ /// Private object used for synchronization, instead of public "this"
+ /// </summary>
+ private readonly object myLock = new object();
+ private int consumerCounter;
+ private readonly IDictionary consumers = Hashtable.Synchronized(new \
Hashtable()); + private readonly IDictionary producers = \
Hashtable.Synchronized(new Hashtable()); + private readonly DispatchingThread \
dispatchingThread; + private DispatchingThread.ExceptionHandler \
dispatchingThread_ExceptionHandler; + private readonly SessionInfo info;
+ private int producerCounter;
+ internal bool startedAsyncDelivery = false;
+ private bool disposed = false;
+ private bool closed = false;
+ private bool closing = false;
+ private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
+
+ public Session(Connection connection, SessionInfo info, AcknowledgementMode \
acknowledgementMode) + {
+ this.connection = connection;
+ this.info = info;
+ this.acknowledgementMode = acknowledgementMode;
+ this.AsyncSend = connection.AsyncSend;
+ this.requestTimeout = connection.RequestTimeout;
+ this.PrefetchSize = 1000;
+ this.transactionContext = new TransactionContext(this);
+ this.dispatchingThread = new DispatchingThread(new \
DispatchingThread.DispatchFunction(DispatchAsyncMessages)); + \
this.dispatchingThread_ExceptionHandler = new \
DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener); + }
+
+ ~Session()
+ {
+ Dispose(false);
+ }
+
+ /// <summary>
+ /// Sets the prefetch size, the maximum number of messages a broker will \
dispatch to consumers + /// until acknowledgements are received.
+ /// </summary>
+ public int PrefetchSize;
+
+ /// <summary>
+ /// Sets the maximum number of messages to keep around per consumer
+ /// in addition to the prefetch window for non-durable topics until messages
+ /// will start to be evicted for slow consumers.
+ /// Must be > 0 to enable this feature
+ /// </summary>
+ public int MaximumPendingMessageLimit;
+
+ /// <summary>
+ /// Enables or disables whether asynchronous dispatch should be used by the \
broker + /// </summary>
+ public bool DispatchAsync;
+
+ /// <summary>
+ /// Enables or disables exclusive consumers when using queues. An exclusive \
consumer means + /// only one instance of a consumer is allowed to process \
messages on a queue to preserve order + /// </summary>
+ public bool Exclusive;
+
+ /// <summary>
+ /// Enables or disables retroactive mode for consumers; i.e. do they go back \
in time or not? + /// </summary>
+ public bool Retroactive;
+
+ /// <summary>
+ /// Sets the default consumer priority for consumers
+ /// </summary>
+ public byte Priority;
+
+ /// <summary>
+ /// This property indicates whether or not async send is enabled.
+ /// </summary>
+ public bool AsyncSend;
+
+ private Connection connection;
+ public Connection Connection
+ {
+ get { return this.connection; }
+ }
+
+ public SessionId SessionId
+ {
+ get { return info.SessionId; }
+ }
+
+ private TransactionContext transactionContext;
+ public TransactionContext TransactionContext
+ {
+ get { return this.transactionContext; }
+ }
+
+ #region ISession Members
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if(this.disposed)
+ {
+ return;
+ }
+
+ if(disposing)
+ {
+ // Dispose managed code here.
+ }
+
+ try
+ {
+ Close();
+ }
+ catch
+ {
+ // Ignore network errors.
+ }
+
+ this.disposed = true;
+ }
+
+ public void Close()
+ {
+ lock(myLock)
+ {
+ if(this.closed)
+ {
+ return;
+ }
+
+ try
+ {
+ this.closing = true;
+ StopAsyncDelivery();
+ lock(consumers.SyncRoot)
+ {
+ foreach(MessageConsumer consumer in consumers.Values)
+ {
+ consumer.Close();
+ }
+ }
+ consumers.Clear();
+
+ lock(producers.SyncRoot)
+ {
+ foreach(MessageProducer producer in producers.Values)
+ {
+ producer.Close();
+ }
+ }
+ producers.Clear();
Connection.RemoveSession(this);
- }
- catch(Exception ex)
- {
- Tracer.ErrorFormat("Error during session close: {0}", ex);
- }
- finally
- {
- this.connection = null;
- this.closed = true;
- this.closing = false;
- }
- }
- }
-
- public IMessageProducer CreateProducer()
- {
- return CreateProducer(null);
- }
-
- public IMessageProducer CreateProducer(IDestination destination)
- {
- ProducerInfo command = CreateProducerInfo(destination);
- ProducerId producerId = command.ProducerId;
- MessageProducer producer = null;
-
- try
- {
- producer = new MessageProducer(this, command);
- producers[producerId] = producer;
- this.DoSend(command);
- }
- catch(Exception)
- {
- if(producer != null)
- {
- producer.Close();
- }
-
- throw;
- }
-
- return producer;
- }
-
- public IMessageConsumer CreateConsumer(IDestination destination)
- {
- return CreateConsumer(destination, null, false);
- }
-
- public IMessageConsumer CreateConsumer(IDestination destination, string selector)
- {
- return CreateConsumer(destination, selector, false);
- }
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Error during session close: {0}", ex);
+ }
+ finally
+ {
+ this.connection = null;
+ this.closed = true;
+ this.closing = false;
+ }
+ }
+ }
+
+ public IMessageProducer CreateProducer()
+ {
+ return CreateProducer(null);
+ }
+
+ public IMessageProducer CreateProducer(IDestination destination)
+ {
+ ProducerInfo command = CreateProducerInfo(destination);
+ ProducerId producerId = command.ProducerId;
+ MessageProducer producer = null;
+
+ try
+ {
+ producer = new MessageProducer(this, command);
+ producers[producerId] = producer;
+ this.DoSend(command);
+ }
+ catch(Exception)
+ {
+ if(producer != null)
+ {
+ producer.Close();
+ }
+
+ throw;
+ }
+
+ return producer;
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination)
+ {
+ return CreateConsumer(destination, null, false);
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination, string \
selector) + {
+ return CreateConsumer(destination, selector, false);
+ }
- public IMessageConsumer CreateConsumer(IDestination destination, string selector, \
bool noLocal)
- {
+ public IMessageConsumer CreateConsumer(IDestination destination, string \
selector, bool noLocal) + {
if (destination == null)
{
throw new InvalidDestinationException("Cannot create a Consumer with \
a Null destination"); }
- ConsumerInfo command = CreateConsumerInfo(destination, selector);
- command.NoLocal = noLocal;
- command.AcknowledgementMode = this.AcknowledgementMode;
-
- ConsumerId consumerId = command.ConsumerId;
- MessageConsumer consumer = null;
-
- try
- {
- consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
- // lets register the consumer first in case we start dispatching messages \
immediately
- consumers[consumerId] = consumer;
- this.DoSend(command);
- return consumer;
- }
- catch(Exception)
- {
- if(consumer != null)
- {
- consumer.Close();
- }
-
- throw;
- }
- }
+ ConsumerInfo command = CreateConsumerInfo(destination, selector);
+ command.NoLocal = noLocal;
+ ConsumerId consumerId = command.ConsumerId;
+ MessageConsumer consumer = null;
+
+ try
+ {
+ consumer = new MessageConsumer(this, command, \
this.AcknowledgementMode); + // lets register the consumer first in \
case we start dispatching messages immediately + consumers[consumerId] \
= consumer; + this.DoSend(command);
+ return consumer;
+ }
+ catch(Exception)
+ {
+ if(consumer != null)
+ {
+ consumer.Close();
+ }
- public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, \
string selector, bool noLocal)
- {
+ throw;
+ }
+ }
+
+ public IMessageConsumer CreateDurableConsumer(ITopic destination, string \
name, string selector, bool noLocal) + {
if (destination == null)
{
throw new InvalidDestinationException("Cannot create a Consumer with \
a Null destination"); }
-
+
ConsumerInfo command = CreateConsumerInfo(destination, selector);
- ConsumerId consumerId = command.ConsumerId;
- command.SubscriptionName = name;
- command.NoLocal = noLocal;
- MessageConsumer consumer = null;
-
- try
- {
- consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
- // lets register the consumer first in case we start dispatching messages \
immediately
- consumers[consumerId] = consumer;
- this.DoSend(command);
- }
- catch(Exception)
- {
- if(consumer != null)
- {
- consumer.Close();
- }
-
- throw;
- }
-
- return consumer;
- }
-
- public void DeleteDurableConsumer(string name)
- {
- RemoveSubscriptionInfo command = new RemoveSubscriptionInfo();
- command.ConnectionId = Connection.ConnectionId;
- command.ClientId = Connection.ClientId;
- command.SubcriptionName = name;
- this.DoSend(command);
- }
-
- public IQueue GetQueue(string name)
- {
- return new ActiveMQQueue(name);
- }
-
- public ITopic GetTopic(string name)
- {
- return new ActiveMQTopic(name);
- }
-
- public ITemporaryQueue CreateTemporaryQueue()
- {
- ActiveMQTempQueue answer = new \
ActiveMQTempQueue(Connection.CreateTemporaryDestinationName());
- CreateTemporaryDestination(answer);
- return answer;
- }
-
- public ITemporaryTopic CreateTemporaryTopic()
- {
- ActiveMQTempTopic answer = new \
ActiveMQTempTopic(Connection.CreateTemporaryDestinationName());
- CreateTemporaryDestination(answer);
- return answer;
- }
-
- /// <summary>
- /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
- /// </summary>
- public void DeleteDestination(IDestination destination)
- {
- DestinationInfo command = new DestinationInfo();
- command.ConnectionId = Connection.ConnectionId;
- command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
- command.Destination = destination;
-
- this.DoSend(command);
- }
-
- public IMessage CreateMessage()
- {
- ActiveMQMessage answer = new ActiveMQMessage();
- Configure(answer);
- return answer;
- }
-
- public ITextMessage CreateTextMessage()
- {
- ActiveMQTextMessage answer = new ActiveMQTextMessage();
- Configure(answer);
- return answer;
- }
-
- public ITextMessage CreateTextMessage(string text)
- {
- ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
- Configure(answer);
- return answer;
- }
-
- public IMapMessage CreateMapMessage()
- {
- return new ActiveMQMapMessage();
- }
-
- public IBytesMessage CreateBytesMessage()
- {
- return new ActiveMQBytesMessage();
- }
-
- public IBytesMessage CreateBytesMessage(byte[] body)
- {
- ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
- answer.Content = body;
- return answer;
- }
-
- public IObjectMessage CreateObjectMessage(object body)
- {
- ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
- answer.Body = body;
- return answer;
- }
-
- public void Commit()
- {
- if(!Transacted)
- {
- throw new InvalidOperationException(
- "You cannot perform a Commit() on a non-transacted session. Acknowlegement \
mode is: "
- + this.AcknowledgementMode);
- }
- this.TransactionContext.Commit();
- }
-
- public void Rollback()
- {
- if(!Transacted)
- {
- throw new InvalidOperationException(
- "You cannot perform a Commit() on a non-transacted session. Acknowlegement \
mode is: "
- + this.AcknowledgementMode);
- }
- this.TransactionContext.Rollback();
-
- // lets ensure all the consumers redeliver any rolled back messages
- lock(consumers.SyncRoot)
- {
- foreach(MessageConsumer consumer in consumers.Values)
- {
- consumer.RedeliverRolledBackMessages();
- }
- }
- }
-
-
- // Properties
-
- private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout;
- public TimeSpan RequestTimeout
- {
- get { return this.requestTimeout; }
- set { this.requestTimeout = value; }
- }
-
- public bool Transacted
- {
- get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; }
- }
-
- private AcknowledgementMode acknowledgementMode;
- public AcknowledgementMode AcknowledgementMode
- {
- get { return this.acknowledgementMode; }
- }
-
- #endregion
-
- private void dispatchingThread_ExceptionListener(Exception exception)
- {
- if(null != Connection)
- {
- try
- {
- Connection.OnSessionException(this, exception);
- }
- catch
- {
- }
- }
- }
-
- protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
- {
- DestinationInfo command = new DestinationInfo();
- command.ConnectionId = Connection.ConnectionId;
- command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
- command.Destination = tempDestination;
-
- this.DoSend(command);
- }
-
- public void DoSend(Command message)
- {
- this.DoSend(message, this.RequestTimeout);
- }
-
- public void DoSend(Command message, TimeSpan requestTimeout)
- {
- if(AsyncSend)
- {
- Connection.OneWay(message);
- }
- else
- {
- Connection.SyncRequest(message, requestTimeout);
- }
- }
-
- /// <summary>
- /// Ensures that a transaction is started
- /// </summary>
- public void DoStartTransaction()
- {
- if(Transacted)
- {
- this.TransactionContext.Begin();
- }
- }
-
- public void DisposeOf(ConsumerId objectId)
- {
- Connection.DisposeOf(objectId);
- if(!this.closing)
- {
- consumers.Remove(objectId);
- }
- }
-
- public void DisposeOf(ProducerId objectId)
- {
- Connection.DisposeOf(objectId);
- if(!this.closing)
- {
- producers.Remove(objectId);
- }
- }
-
- public bool DispatchMessage(ConsumerId consumerId, Message message)
- {
- bool dispatched = false;
- MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
-
- if(consumer != null)
- {
- consumer.Dispatch((ActiveMQMessage) message);
- dispatched = true;
- }
-
- return dispatched;
- }
-
- /// <summary>
- /// Private method called by the dispatcher thread in order to perform
- /// asynchronous delivery of queued (inbound) messages.
- /// </summary>
- private void DispatchAsyncMessages()
- {
- // lets iterate through each consumer created by this session
- // ensuring that they have all pending messages dispatched
- lock(consumers.SyncRoot)
- {
- foreach(MessageConsumer consumer in consumers.Values)
- {
- consumer.DispatchAsyncMessages();
- }
- }
- }
-
- protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string \
selector)
- {
- ConsumerInfo answer = new ConsumerInfo();
- ConsumerId id = new ConsumerId();
- id.ConnectionId = info.SessionId.ConnectionId;
- id.SessionId = info.SessionId.Value;
- id.Value = Interlocked.Increment(ref consumerCounter);
- answer.ConsumerId = id;
- answer.Destination = ActiveMQDestination.Transform(destination);
- answer.Selector = selector;
- answer.PrefetchSize = this.PrefetchSize;
- answer.Priority = this.Priority;
- answer.Exclusive = this.Exclusive;
- answer.DispatchAsync = this.DispatchAsync;
- answer.Retroactive = this.Retroactive;
-
- // If the destination contained a URI query, then use it to set public properties
- // on the ConsumerInfo
- ActiveMQDestination amqDestination = destination as ActiveMQDestination;
- if(amqDestination != null && amqDestination.Options != null)
- {
- URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
- }
-
- return answer;
- }
-
- protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
- {
- ProducerInfo answer = new ProducerInfo();
- ProducerId id = new ProducerId();
- id.ConnectionId = info.SessionId.ConnectionId;
- id.SessionId = info.SessionId.Value;
- id.Value = Interlocked.Increment(ref producerCounter);
- answer.ProducerId = id;
- answer.Destination = ActiveMQDestination.Transform(destination);
-
- // If the destination contained a URI query, then use it to set public
- // properties on the ProducerInfo
- ActiveMQDestination amqDestination = destination as ActiveMQDestination;
- if(amqDestination != null && amqDestination.Options != null)
- {
- URISupport.SetProperties(answer, amqDestination.Options, "producer.");
- }
-
- return answer;
- }
-
- /// <summary>
- /// Configures the message command
- /// </summary>
- protected void Configure(ActiveMQMessage message)
- {
- }
-
- internal void StopAsyncDelivery()
- {
- if(startedAsyncDelivery)
- {
- this.dispatchingThread.ExceptionListener -= \
this.dispatchingThread_ExceptionHandler;
- dispatchingThread.Stop((int) MAX_THREAD_WAIT.TotalMilliseconds);
- startedAsyncDelivery = false;
- }
- }
-
- internal void StartAsyncDelivery()
- {
- if(!startedAsyncDelivery)
- {
- this.dispatchingThread.ExceptionListener += \
this.dispatchingThread_ExceptionHandler;
- dispatchingThread.Start();
- startedAsyncDelivery = true;
- }
- }
-
- internal void RegisterConsumerDispatcher(Dispatcher dispatcher)
- {
- dispatcher.SetAsyncDelivery(this.dispatchingThread.EventHandle);
- }
- }
+ ConsumerId consumerId = command.ConsumerId;
+ command.SubscriptionName = name;
+ command.NoLocal = noLocal;
+ MessageConsumer consumer = null;
+
+ try
+ {
+ consumer = new MessageConsumer(this, command, \
this.AcknowledgementMode); + // lets register the consumer first in \
case we start dispatching messages immediately + consumers[consumerId] \
= consumer; + this.DoSend(command);
+ }
+ catch(Exception)
+ {
+ if(consumer != null)
+ {
+ consumer.Close();
+ }
+
+ throw;
+ }
+
+ return consumer;
+ }
+
+ public void DeleteDurableConsumer(string name)
+ {
+ RemoveSubscriptionInfo command = new RemoveSubscriptionInfo();
+ command.ConnectionId = Connection.ConnectionId;
+ command.ClientId = Connection.ClientId;
+ command.SubcriptionName = name;
+ this.DoSend(command);
+ }
+
+ public IQueue GetQueue(string name)
+ {
+ return new ActiveMQQueue(name);
+ }
+
+ public ITopic GetTopic(string name)
+ {
+ return new ActiveMQTopic(name);
+ }
+
+ public ITemporaryQueue CreateTemporaryQueue()
+ {
+ ActiveMQTempQueue answer = new \
ActiveMQTempQueue(Connection.CreateTemporaryDestinationName()); + \
CreateTemporaryDestination(answer); + return answer;
+ }
+
+ public ITemporaryTopic CreateTemporaryTopic()
+ {
+ ActiveMQTempTopic answer = new \
ActiveMQTempTopic(Connection.CreateTemporaryDestinationName()); + \
CreateTemporaryDestination(answer); + return answer;
+ }
+
+ /// <summary>
+ /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
+ /// </summary>
+ public void DeleteDestination(IDestination destination)
+ {
+ DestinationInfo command = new DestinationInfo();
+ command.ConnectionId = Connection.ConnectionId;
+ command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is \
remove + command.Destination = (ActiveMQDestination) destination;
+
+ this.DoSend(command);
+ }
+
+ public IMessage CreateMessage()
+ {
+ ActiveMQMessage answer = new ActiveMQMessage();
+ Configure(answer);
+ return answer;
+ }
+
+ public ITextMessage CreateTextMessage()
+ {
+ ActiveMQTextMessage answer = new ActiveMQTextMessage();
+ Configure(answer);
+ return answer;
+ }
+
+ public ITextMessage CreateTextMessage(string text)
+ {
+ ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
+ Configure(answer);
+ return answer;
+ }
+
+ public IMapMessage CreateMapMessage()
+ {
+ return new ActiveMQMapMessage();
+ }
+
+ public IBytesMessage CreateBytesMessage()
+ {
+ return new ActiveMQBytesMessage();
+ }
+
+ public IBytesMessage CreateBytesMessage(byte[] body)
+ {
+ ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
+ answer.Content = body;
+ return answer;
+ }
+
+ public IObjectMessage CreateObjectMessage(object body)
+ {
+ ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
+ answer.Body = body;
+ return answer;
+ }
+
+ public void Commit()
+ {
+ if(!Transacted)
+ {
+ throw new InvalidOperationException(
+ "You cannot perform a Commit() on a non-transacted session. \
Acknowlegement mode is: " + + this.AcknowledgementMode);
+ }
+ this.TransactionContext.Commit();
+ }
+
+ public void Rollback()
+ {
+ if(!Transacted)
+ {
+ throw new InvalidOperationException(
+ "You cannot perform a Commit() on a non-transacted session. \
Acknowlegement mode is: " + + this.AcknowledgementMode);
+ }
+ this.TransactionContext.Rollback();
+
+ // lets ensure all the consumers redeliver any rolled back messages
+ lock(consumers.SyncRoot)
+ {
+ foreach(MessageConsumer consumer in consumers.Values)
+ {
+ consumer.RedeliverRolledBackMessages();
+ }
+ }
+ }
+
+
+ // Properties
+
+ private TimeSpan requestTimeout = \
Apache.NMS.NMSConstants.defaultRequestTimeout; + public TimeSpan \
RequestTimeout + {
+ get { return this.requestTimeout; }
+ set { this.requestTimeout = value; }
+ }
+
+ public bool Transacted
+ {
+ get { return this.AcknowledgementMode == \
AcknowledgementMode.Transactional; } + }
+
+ private AcknowledgementMode acknowledgementMode;
+ public AcknowledgementMode AcknowledgementMode
+ {
+ get { return this.acknowledgementMode; }
+ }
+
+ #endregion
+
+ private void dispatchingThread_ExceptionListener(Exception exception)
+ {
+ if(null != Connection)
+ {
+ try
+ {
+ Connection.OnSessionException(this, exception);
+ }
+ catch
+ {
+ }
+ }
+ }
+
+ protected void CreateTemporaryDestination(ActiveMQDestination \
tempDestination) + {
+ DestinationInfo command = new DestinationInfo();
+ command.ConnectionId = Connection.ConnectionId;
+ command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
+ command.Destination = tempDestination;
+
+ this.DoSend(command);
+ }
+
+ public void DoSend(Command message)
+ {
+ this.DoSend(message, this.RequestTimeout);
+ }
+
+ public void DoSend(Command message, TimeSpan requestTimeout)
+ {
+ if(AsyncSend)
+ {
+ Connection.OneWay(message);
+ }
+ else
+ {
+ Connection.SyncRequest(message, requestTimeout);
+ }
+ }
+
+ /// <summary>
+ /// Ensures that a transaction is started
+ /// </summary>
+ public void DoStartTransaction()
+ {
+ if(Transacted)
+ {
+ this.TransactionContext.Begin();
+ }
+ }
+
+ public void DisposeOf(ConsumerId objectId)
+ {
+ Connection.DisposeOf(objectId);
+ if(!this.closing)
+ {
+ consumers.Remove(objectId);
+ }
+ }
+
+ public void DisposeOf(ProducerId objectId)
+ {
+ Connection.DisposeOf(objectId);
+ if(!this.closing)
+ {
+ producers.Remove(objectId);
+ }
+ }
+
+ public bool DispatchMessage(ConsumerId consumerId, Message message)
+ {
+ bool dispatched = false;
+ MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
+
+ if(consumer != null)
+ {
+ consumer.Dispatch((ActiveMQMessage) message);
+ dispatched = true;
+ }
+
+ return dispatched;
+ }
+
+ /// <summary>
+ /// Private method called by the dispatcher thread in order to perform
+ /// asynchronous delivery of queued (inbound) messages.
+ /// </summary>
+ private void DispatchAsyncMessages()
+ {
+ // lets iterate through each consumer created by this session
+ // ensuring that they have all pending messages dispatched
+ lock(consumers.SyncRoot)
+ {
+ foreach(MessageConsumer consumer in consumers.Values)
+ {
+ consumer.DispatchAsyncMessages();
+ }
+ }
+ }
+
+ protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, \
string selector) + {
+ ConsumerInfo answer = new ConsumerInfo();
+ ConsumerId id = new ConsumerId();
+ id.ConnectionId = info.SessionId.ConnectionId;
+ id.SessionId = info.SessionId.Value;
+ id.Value = Interlocked.Increment(ref consumerCounter);
+ answer.ConsumerId = id;
+ answer.Destination = ActiveMQDestination.Transform(destination);
+ answer.Selector = selector;
+ answer.PrefetchSize = this.PrefetchSize;
+ answer.Priority = this.Priority;
+ answer.Exclusive = this.Exclusive;
+ answer.DispatchAsync = this.DispatchAsync;
+ answer.Retroactive = this.Retroactive;
+
+ // If the destination contained a URI query, then use it to set public \
properties + // on the ConsumerInfo
+ ActiveMQDestination amqDestination = destination as ActiveMQDestination;
+ if(amqDestination != null && amqDestination.Options != null)
+ {
+ URISupport.SetProperties(answer, amqDestination.Options, \
"consumer."); + }
+
+ return answer;
+ }
+
+ protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
+ {
+ ProducerInfo answer = new ProducerInfo();
+ ProducerId id = new ProducerId();
+ id.ConnectionId = info.SessionId.ConnectionId;
+ id.SessionId = info.SessionId.Value;
+ id.Value = Interlocked.Increment(ref producerCounter);
+ answer.ProducerId = id;
+ answer.Destination = ActiveMQDestination.Transform(destination);
+
+ // If the destination contained a URI query, then use it to set public
+ // properties on the ProducerInfo
+ ActiveMQDestination amqDestination = destination as ActiveMQDestination;
+ if(amqDestination != null && amqDestination.Options != null)
+ {
+ URISupport.SetProperties(answer, amqDestination.Options, \
"producer."); + }
+
+ return answer;
+ }
+
+ /// <summary>
+ /// Configures the message command
+ /// </summary>
+ protected void Configure(ActiveMQMessage message)
+ {
+ }
+
+ internal void StopAsyncDelivery()
+ {
+ if(startedAsyncDelivery)
+ {
+ this.dispatchingThread.ExceptionListener -= \
this.dispatchingThread_ExceptionHandler; + \
dispatchingThread.Stop((int) MAX_THREAD_WAIT.TotalMilliseconds); + \
startedAsyncDelivery = false; + }
+ }
+
+ internal void StartAsyncDelivery()
+ {
+ if(!startedAsyncDelivery)
+ {
+ this.dispatchingThread.ExceptionListener += \
this.dispatchingThread_ExceptionHandler; + dispatchingThread.Start();
+ startedAsyncDelivery = true;
+ }
+ }
+
+ internal void RegisterConsumerDispatcher(Dispatcher dispatcher)
+ {
+ dispatcher.SetAsyncDelivery(this.dispatchingThread.EventHandle);
+ }
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/s \
rc/main/csharp/State/CommandVisitorAdapter.cs?rev=799407&r1=799406&r2=799407&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs \
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs \
Thu Jul 30 19:06:34 2009 @@ -19,178 +19,188 @@
namespace Apache.NMS.ActiveMQ.State
{
- public class CommandVisitorAdapter : ICommandVisitor
- {
+ public class CommandVisitorAdapter : ICommandVisitor
+ {
- public virtual Response processAddConnection(ConnectionInfo info)
- {
- return null;
- }
-
- public virtual Response processAddConsumer(ConsumerInfo info)
- {
- return null;
- }
-
- public virtual Response processAddDestination(DestinationInfo info)
- {
- return null;
- }
-
- public virtual Response processAddProducer(ProducerInfo info)
- {
- return null;
- }
-
- public virtual Response processAddSession(SessionInfo info)
- {
- return null;
- }
-
- public virtual Response processBeginTransaction(TransactionInfo info)
- {
- return null;
- }
-
- public virtual Response processBrokerInfo(BrokerInfo info)
- {
- return null;
- }
-
- public virtual Response processCommitTransactionOnePhase(TransactionInfo info)
- {
- return null;
- }
-
- public virtual Response processCommitTransactionTwoPhase(TransactionInfo info)
- {
- return null;
- }
-
- public virtual Response processEndTransaction(TransactionInfo info)
- {
- return null;
- }
-
- public virtual Response processFlush(FlushCommand command)
- {
- return null;
- }
-
- public virtual Response processForgetTransaction(TransactionInfo info)
- {
- return null;
- }
-
- public virtual Response processKeepAlive(KeepAliveInfo info)
- {
- return null;
- }
-
- public virtual Response processMessage(Message send)
- {
- return null;
- }
-
- public virtual Response processMessageAck(MessageAck ack)
- {
- return null;
- }
-
- public virtual Response \
processMessageDispatchNotification(MessageDispatchNotification \
notification)
- {
- return null;
- }
-
- public virtual Response processMessagePull(MessagePull pull)
- {
- return null;
- }
-
- public virtual Response processPrepareTransaction(TransactionInfo info)
- {
- return null;
- }
-
- public virtual Response processProducerAck(ProducerAck ack)
- {
- return null;
- }
-
- public virtual Response processRecoverTransactions(TransactionInfo info)
- {
- return null;
- }
-
- public virtual Response processRemoveConnection(ConnectionId id)
- {
- return null;
- }
-
- public virtual Response processRemoveConsumer(ConsumerId id)
- {
- return null;
- }
-
- public virtual Response processRemoveDestination(DestinationInfo info)
- {
- return null;
- }
-
- public virtual Response processRemoveProducer(ProducerId id)
- {
- return null;
- }
-
- public virtual Response processRemoveSession(SessionId id)
- {
- return null;
- }
-
- public virtual Response processRemoveSubscription(RemoveSubscriptionInfo info)
- {
- return null;
- }
-
- public virtual Response processRollbackTransaction(TransactionInfo info)
- {
- return null;
- }
-
- public virtual Response processShutdown(ShutdownInfo info)
- {
- return null;
- }
-
- public virtual Response processWireFormat(WireFormatInfo info)
- {
- return null;
- }
-
- public virtual Response processMessageDispatch(MessageDispatch dispatch)
- {
- return null;
- }
-
- public virtual Response processControlCommand(ControlCommand command)
- {
- return null;
- }
-
- public virtual Response processConnectionControl(ConnectionControl control)
- {
- return null;
- }
-
- public virtual Response processConnectionError(ConnectionError error)
- {
- return null;
- }
-
- public virtual Response processConsumerControl(ConsumerControl control)
- {
- return null;
- }
+ public virtual Response processAddConnection(ConnectionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processAddConsumer(ConsumerInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processAddDestination(DestinationInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processAddProducer(ProducerInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processAddSession(SessionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processBeginTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processBrokerInfo(BrokerInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processCommitTransactionOnePhase(TransactionInfo \
info) + {
+ return null;
+ }
+
+ public virtual Response processCommitTransactionTwoPhase(TransactionInfo \
info) + {
+ return null;
+ }
+
+ public virtual Response processEndTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processFlushCommand(FlushCommand command)
+ {
+ return null;
+ }
+
+ public virtual Response processForgetTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processKeepAliveInfo(KeepAliveInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processMessage(Message send)
+ {
+ return null;
+ }
+
+ public virtual Response processMessageAck(MessageAck ack)
+ {
+ return null;
+ }
+
+ public virtual Response \
processMessageDispatchNotification(MessageDispatchNotification notification) + \
{ + return null;
+ }
+
+ public virtual Response processMessagePull(MessagePull pull)
+ {
+ return null;
+ }
+
+ public virtual Response processPrepareTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processProducerAck(ProducerAck ack)
+ {
+ return null;
+ }
+
+ public virtual Response processRecoverTransactions(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processRemoveConnection(ConnectionId id)
+ {
+ return null;
+ }
+
+ public virtual Response processRemoveConsumer(ConsumerId id)
+ {
+ return null;
+ }
+
+ public virtual Response processRemoveDestination(DestinationInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processRemoveProducer(ProducerId id)
+ {
+ return null;
+ }
+
+ public virtual Response processRemoveSession(SessionId id)
+ {
+ return null;
+ }
+
+ public virtual Response processRemoveSubscriptionInfo(RemoveSubscriptionInfo \
info) + {
+ return null;
+ }
+
+ public virtual Response processRollbackTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processShutdownInfo(ShutdownInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processWireFormat(WireFormatInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processMessageDispatch(MessageDispatch dispatch)
+ {
+ return null;
+ }
+
+ public virtual Response processControlCommand(ControlCommand command)
+ {
+ return null;
+ }
+
+ public virtual Response processConnectionControl(ConnectionControl control)
+ {
+ return null;
+ }
+
+ public virtual Response processConnectionError(ConnectionError error)
+ {
+ return null;
+ }
+
+ public virtual Response processConsumerControl(ConsumerControl control)
+ {
+ return null;
+ }
+
+ public virtual Response processResponse(Response response)
+ {
+ return null;
+ }
+
+ public virtual Response processReplayCommand(ReplayCommand replayCommand)
+ {
+ return null;
+ }
- }
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs?rev=799407&r1=799406&r2=799407&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs \
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs \
Thu Jul 30 19:06:34 2009 @@ -20,76 +20,80 @@
namespace Apache.NMS.ActiveMQ.State
{
- public interface ICommandVisitor
- {
+ public interface ICommandVisitor
+ {
- Response processAddConnection(ConnectionInfo info);
+ Response processAddConnection(ConnectionInfo info);
- Response processAddSession(SessionInfo info);
+ Response processAddSession(SessionInfo info);
- Response processAddProducer(ProducerInfo info);
+ Response processAddProducer(ProducerInfo info);
- Response processAddConsumer(ConsumerInfo info);
+ Response processAddConsumer(ConsumerInfo info);
- Response processRemoveConnection(ConnectionId id);
+ Response processRemoveConnection(ConnectionId id);
- Response processRemoveSession(SessionId id);
+ Response processRemoveSession(SessionId id);
- Response processRemoveProducer(ProducerId id);
+ Response processRemoveProducer(ProducerId id);
- Response processRemoveConsumer(ConsumerId id);
+ Response processRemoveConsumer(ConsumerId id);
- Response processAddDestination(DestinationInfo info);
+ Response processAddDestination(DestinationInfo info);
- Response processRemoveDestination(DestinationInfo info);
+ Response processRemoveDestination(DestinationInfo info);
- Response processRemoveSubscription(RemoveSubscriptionInfo info);
+ Response processRemoveSubscriptionInfo(RemoveSubscriptionInfo info);
- Response processMessage(Message send);
+ Response processMessage(Message send);
- Response processMessageAck(MessageAck ack);
+ Response processMessageAck(MessageAck ack);
- Response processMessagePull(MessagePull pull);
+ Response processMessagePull(MessagePull pull);
- Response processBeginTransaction(TransactionInfo info);
+ Response processBeginTransaction(TransactionInfo info);
- Response processPrepareTransaction(TransactionInfo info);
+ Response processPrepareTransaction(TransactionInfo info);
- Response processCommitTransactionOnePhase(TransactionInfo info);
+ Response processCommitTransactionOnePhase(TransactionInfo info);
- Response processCommitTransactionTwoPhase(TransactionInfo info);
+ Response processCommitTransactionTwoPhase(TransactionInfo info);
- Response processRollbackTransaction(TransactionInfo info);
+ Response processRollbackTransaction(TransactionInfo info);
- Response processWireFormat(WireFormatInfo info);
+ Response processWireFormat(WireFormatInfo info);
- Response processKeepAlive(KeepAliveInfo info);
+ Response processKeepAliveInfo(KeepAliveInfo info);
- Response processShutdown(ShutdownInfo info);
+ Response processShutdownInfo(ShutdownInfo info);
- Response processFlush(FlushCommand command);
+ Response processFlushCommand(FlushCommand command);
- Response processBrokerInfo(BrokerInfo info);
+ Response processBrokerInfo(BrokerInfo info);
- Response processRecoverTransactions(TransactionInfo info);
+ Response processRecoverTransactions(TransactionInfo info);
- Response processForgetTransaction(TransactionInfo info);
+ Response processForgetTransaction(TransactionInfo info);
- Response processEndTransaction(TransactionInfo info);
+ Response processEndTransaction(TransactionInfo info);
- Response processMessageDispatchNotification(MessageDispatchNotification \
notification); + Response \
processMessageDispatchNotification(MessageDispatchNotification notification);
- Response processProducerAck(ProducerAck ack);
+ Response processProducerAck(ProducerAck ack);
- Response processMessageDispatch(MessageDispatch dispatch);
+ Response processMessageDispatch(MessageDispatch dispatch);
- Response processControlCommand(ControlCommand command);
+ Response processControlCommand(ControlCommand command);
- Response processConnectionError(ConnectionError error);
+ Response processConnectionError(ConnectionError error);
- Response processConnectionControl(ConnectionControl control);
+ Response processConnectionControl(ConnectionControl control);
- Response processConsumerControl(ConsumerControl control);
+ Response processConsumerControl(ConsumerControl control);
- }
+ Response processResponse(Response response);
+
+ Response processReplayCommand(ReplayCommand replayCommand);
+
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/s \
rc/main/csharp/Transport/WireFormatNegotiator.cs?rev=799407&r1=799406&r2=799407&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs \
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs \
Thu Jul 30 19:06:34 2009 @@ -24,84 +24,84 @@
namespace Apache.NMS.ActiveMQ.Transport
{
- /// <summary>
- /// A Transport which negotiates the wire format
- /// </summary>
- public class WireFormatNegotiator : TransportFilter
- {
- private OpenWireFormat wireFormat;
- private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15);
-
- private AtomicBoolean firstStart=new AtomicBoolean(true);
- private CountDownLatch readyCountDownLatch = new CountDownLatch(1);
- private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
-
- public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat)
- : base(next)
- {
- this.wireFormat = wireFormat;
- }
-
- public override void Start()
- {
- base.Start();
- if (firstStart.CompareAndSet(true, false))
- {
- try
- {
- next.Oneway(wireFormat.PreferedWireFormatInfo);
- }
- finally
- {
- wireInfoSentDownLatch.countDown();
- }
- }
- }
-
- protected override void Dispose(bool disposing)
- {
- base.Dispose(disposing);
- readyCountDownLatch.countDown();
- }
-
- public override void Oneway(Command command)
- {
- if (!readyCountDownLatch.await(negotiateTimeout))
- throw new IOException("Wire format negotiation timeout: peer did not send his \
wire format.");
- next.Oneway(command);
- }
-
- protected override void OnCommand(ITransport sender, Command command)
- {
- if ( command.GetDataStructureType() == WireFormatInfo.ID_WireFormatInfo )
- {
- WireFormatInfo info = (WireFormatInfo)command;
- try
- {
- if (!info.Valid)
- {
- throw new IOException("Remote wire format magic is invalid");
- }
- wireInfoSentDownLatch.await(negotiateTimeout);
- wireFormat.renegotiateWireFormat(info);
- }
- catch (Exception e)
- {
- OnException(this, e);
- }
- finally
- {
- readyCountDownLatch.countDown();
- }
- }
- this.commandHandler(sender, command);
- }
-
- protected override void OnException(ITransport sender, Exception command)
- {
- readyCountDownLatch.countDown();
- this.exceptionHandler(sender, command);
- }
- }
+ /// <summary>
+ /// A Transport which negotiates the wire format
+ /// </summary>
+ public class WireFormatNegotiator : TransportFilter
+ {
+ private OpenWireFormat wireFormat;
+ private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15);
+
+ private AtomicBoolean firstStart=new AtomicBoolean(true);
+ private CountDownLatch readyCountDownLatch = new CountDownLatch(1);
+ private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
+
+ public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat)
+ : base(next)
+ {
+ this.wireFormat = wireFormat;
+ }
+
+ public override void Start()
+ {
+ base.Start();
+ if (firstStart.CompareAndSet(true, false))
+ {
+ try
+ {
+ next.Oneway(wireFormat.PreferedWireFormatInfo);
+ }
+ finally
+ {
+ wireInfoSentDownLatch.countDown();
+ }
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+ readyCountDownLatch.countDown();
+ }
+
+ public override void Oneway(Command command)
+ {
+ if (!readyCountDownLatch.await(negotiateTimeout))
+ throw new IOException("Wire format negotiation timeout: peer did not \
send his wire format."); + next.Oneway(command);
+ }
+
+ protected override void OnCommand(ITransport sender, Command command)
+ {
+ if ( command.IsWireFormatInfo )
+ {
+ WireFormatInfo info = (WireFormatInfo)command;
+ try
+ {
+ if (!info.Valid)
+ {
+ throw new IOException("Remote wire format magic is \
invalid"); + }
+ wireInfoSentDownLatch.await(negotiateTimeout);
+ wireFormat.renegotiateWireFormat(info);
+ }
+ catch (Exception e)
+ {
+ OnException(this, e);
+ }
+ finally
+ {
+ readyCountDownLatch.countDown();
+ }
+ }
+ this.commandHandler(sender, command);
+ }
+
+ protected override void OnException(ITransport sender, Exception command)
+ {
+ readyCountDownLatch.countDown();
+ this.exceptionHandler(sender, command);
+ }
+ }
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic