[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r501659 - in
From: tabish () apache ! org
Date: 2007-01-31 0:42:12
Message-ID: 20070131004213.79F1E1A981A () eris ! apache ! org
[Download RAW message or body]
Author: tabish
Date: Tue Jan 30 16:42:10 2007
New Revision: 501659
URL: http://svn.apache.org/viewvc?view=rev&rev=501659
Log:
http://issues.apache.org/activemq/browse/AMQCPP-63
Cleanup of some of the code to remove extra shutdown flags in ActiveMQConsumer.
Added async send as a URL property of ActiveMQSession, caller must append \
useAsyncSend=true to the Broker URL to get this to work.
Added:
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.cpp \
(with props) incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.h \
(with props) Modified:
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/ (props \
changed) incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/ (props \
changed) incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/ (props changed)
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/ (props \
changed) incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.cpp
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.h
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/durable/DurableTester.cpp
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/simple/SimpleTester.cpp
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/transactional/TransactionTester.cpp
Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 30 16:42:10 2007
@@ -0,0 +1 @@
+Makefile.in
Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 30 16:42:10 2007
@@ -0,0 +1 @@
+Makefile.in
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/main/activemq/core/ActiveMQConsumer.cpp?view=diff&rev=501659&r1=501658&r2=501659 \
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp \
(original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp \
Tue Jan 30 16:42:10 2007 @@ -45,7 +45,6 @@
this->consumerInfo = consumerInfo;
this->listenerThread = NULL;
this->listener = NULL;
- this->shutdown = false;
this->closed = false;
}
@@ -153,7 +152,7 @@
{
// Check for empty in case of spurious wakeup, or race to
// queue lock.
- while( !shutdown && !closed && msgQueue.empty() )
+ while( !closed && msgQueue.empty() )
{
msgQueue.wait();
}
@@ -161,7 +160,7 @@
// This will only happen when this object is being
// closed in another thread context - kind of
// scary.
- if( shutdown || closed ){
+ if( closed ){
throw ActiveMQException( __FILE__, __LINE__,
"Consumer is being closed in another thread" );
}
@@ -202,7 +201,7 @@
synchronized( &msgQueue )
{
// Check for empty, and wait if its not
- if( !shutdown && !closed && msgQueue.empty() ){
+ if( !closed && msgQueue.empty() ){
msgQueue.wait(millisecs);
@@ -215,7 +214,7 @@
// This will only happen when this object is being
// closed in another thread context - kind of
// scary.
- if( shutdown || closed ){
+ if( closed ){
throw ActiveMQException( __FILE__, __LINE__,
"Consumer is being closed in another thread" );
}
@@ -246,7 +245,7 @@
{
try
{
- if( shutdown || closed )
+ if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
@@ -332,7 +331,7 @@
{
try
{
- while( !shutdown )
+ while( !closed )
{
Message* message = NULL;
@@ -345,7 +344,7 @@
// registered, and then we will deliver the backlog
while( msgQueue.empty() || listener == NULL )
{
- if( shutdown )
+ if( closed )
{
break;
}
@@ -353,7 +352,7 @@
}
// don't want to process messages if we are shutting down.
- if( shutdown )
+ if( closed )
{
return;
}
@@ -426,7 +425,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::purgeMessages() throw (ActiveMQException)
+void ActiveMQConsumer::purgeMessages() throw ( ActiveMQException )
{
try
{
@@ -465,7 +464,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::notifyListener( Message* message ) throw (ActiveMQException){
+void ActiveMQConsumer::notifyListener( Message* message ) throw ( ActiveMQException \
){
try
{
@@ -484,7 +483,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::destroyMessage( Message* message ) throw (ActiveMQException){
+void ActiveMQConsumer::destroyMessage( Message* message ) throw ( ActiveMQException \
){
try
{
@@ -502,7 +501,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::startThread() throw (ActiveMQException) {
+void ActiveMQConsumer::startThread() throw ( ActiveMQException ) {
try
{
@@ -524,12 +523,10 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::stopThread() throw (ActiveMQException) {
+void ActiveMQConsumer::stopThread() throw ( ActiveMQException ) {
try
{
- shutdown = true;
-
// if the thread is running signal it to quit and then
// wait for run to return so thread can die
if( listenerThread != NULL )
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/main/activemq/core/ActiveMQConsumer.h?view=diff&rev=501659&r1=501658&r2=501659 \
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h \
(original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h \
Tue Jan 30 16:42:10 2007 @@ -57,15 +57,11 @@
concurrent::Mutex listenerLock;
// Message Queue
- util::Queue< cms::Message* > msgQueue;
+ util::Queue<cms::Message*> msgQueue;
// Thread to notif a listener if one is added
concurrent::Thread* listenerThread;
- // Boolean to indicate that the listener Thread is shutting
- // down and the run method should return.
- bool shutdown;
-
// Boolean that indicates if the consumer has been closed
bool closed;
@@ -134,7 +130,7 @@
* @throws cms::CMSException
*/
virtual std::string getMessageSelector() const
- throw ( cms::CMSException );
+ throw ( cms::CMSException );
/**
* Method called to acknowledge the message passed
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=501659&r1=501658&r2=501659 \
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp \
(original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp \
Tue Jan 30 16:42:10 2007 @@ -24,6 +24,7 @@
#include <activemq/core/ActiveMQConsumer.h>
#include <activemq/core/ActiveMQMessage.h>
#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/Boolean.h>
#include <activemq/connector/TransactionInfo.h>
@@ -34,6 +35,7 @@
using namespace activemq::util;
using namespace activemq::connector;
using namespace activemq::exceptions;
+using namespace activemq::concurrent;
////////////////////////////////////////////////////////////////////////////////
ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
@@ -47,10 +49,19 @@
"ActiveMQSession::ActiveMQSession - Init with NULL data");
}
- this->sessionInfo = sessionInfo;
- this->transaction = NULL;
- this->connection = connection;
- this->closed = false;
+ this->sessionInfo = sessionInfo;
+ this->transaction = NULL;
+ this->connection = connection;
+ this->closed = false;
+ this->asyncThread = NULL;
+ this->useAsyncSend = Boolean::parseBoolean(
+ properties.getProperty( "useAsyncSend", "false" ) );
+
+ // If we are in Async Send Mode we need to start the Thread
+ // otherwise we don't need to do anything.
+ if( this->useAsyncSend == true ) {
+ this->startThread();
+ }
// Create a Transaction object only if the session is transactional
if( isTransacted() )
@@ -110,6 +121,9 @@
// Now indicate that this session is closed.
closed = true;
+
+ // Stop the Async Thread if its running
+ stopThread();
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
@@ -193,6 +207,7 @@
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
+
////////////////////////////////////////////////////////////////////////////////
cms::MessageConsumer* ActiveMQSession::createConsumer(
const cms::Destination* destination,
@@ -555,9 +570,19 @@
"ActiveMQSession::onProducerClose - Session Already Closed" );
}
- // Send via the connection
- connection->getConnectionData()->
- getConnector()->send( message, producer->getProducerInfo() );
+ if( useAsyncSend == true ) {
+
+ // Put it in the send queue, thread will dispatch it.
+ synchronized( &msgQueue ) {
+ msgQueue.push( make_pair( message, producer ) );
+ msgQueue.notifyAll();
+ }
+
+ } else {
+ // Send via the connection syncrhronously.
+ connection->getConnectionData()->
+ getConnector()->send( message, producer->getProducerInfo() );
+ }
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -619,4 +644,101 @@
}
return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::run()
+{
+ try{
+
+ while( !closed )
+ {
+ std::pair<Message*, ActiveMQProducer*> messagePair;
+
+ synchronized( &msgQueue )
+ {
+ // Gaurd against spurious wakeup or race to sync lock
+ // also if the listner has been unregistered we don't
+ // have anyone to notify, so we wait till a new one is
+ // registered, and then we will deliver the backlog
+ while( msgQueue.empty() )
+ {
+ if( closed )
+ {
+ break;
+ }
+ msgQueue.wait();
+ }
+
+ // don't want to process messages if we are shutting down.
+ if( closed )
+ {
+ return;
+ }
+
+ // get the data
+ messagePair = msgQueue.pop();
+ }
+
+ // Dispatch the message
+ connection->getConnectionData()->
+ getConnector()->send(
+ messagePair.first,
+ messagePair.second->getProducerInfo() );
+
+ }
+ }
+ catch(...)
+ {
+ cms::ExceptionListener* listener = this->getExceptionListener();
+
+ if( listener != NULL )
+ {
+ listener->onException( ActiveMQException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::run - "
+ "Connector threw an unknown Exception, recovering..." ) );
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::startThread() throw ( ActiveMQException ) {
+
+ try
+ {
+ // Start the thread, if it's not already started.
+ if( asyncThread == NULL )
+ {
+ asyncThread = new Thread( this );
+ asyncThread->start();
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::stopThread() throw ( ActiveMQException ) {
+
+ try
+ {
+ // if the thread is running signal it to quit and then
+ // wait for run to return so thread can die
+ if( asyncThread != NULL )
+ {
+ synchronized( &msgQueue )
+ {
+ // Force a wakeup if run is in a wait.
+ msgQueue.notifyAll();
+ }
+
+ // Wait for it to die and then delete it.
+ asyncThread->join();
+ delete asyncThread;
+ asyncThread = NULL;
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
}
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=501659&r1=501658&r2=501659
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h \
(original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h \
Tue Jan 30 16:42:10 2007 @@ -19,9 +19,12 @@
#include <cms/Session.h>
#include <cms/ExceptionListener.h>
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/Mutex.h>
#include <activemq/connector/SessionInfo.h>
#include <activemq/core/ActiveMQSessionResource.h>
#include <activemq/util/Set.h>
+#include <activemq/util/Queue.h>
#include <set>
namespace activemq{
@@ -34,35 +37,52 @@
class ActiveMQProducer;
class ActiveMQConsumer;
- class ActiveMQSession : public cms::Session
+ class ActiveMQSession :
+ public cms::Session,
+ public concurrent::Runnable
{
private:
-
+
/**
* SessionInfo for this Session
*/
connector::SessionInfo* sessionInfo;
-
+
/**
* Transaction Management object
*/
ActiveMQTransaction* transaction;
-
+
/**
* Connection
*/
ActiveMQConnection* connection;
-
+
/**
* Bool to indicate if this session was closed.
*/
bool closed;
-
+
/**
* The set of closable session resources (consumers and producers).
*/
util::Set<cms::Closeable*> closableSessionResources;
-
+
+ /**
+ * Thread to notif a listener if one is added
+ */
+ concurrent::Thread* asyncThread;
+
+ /**
+ * Is this Session using Async Sends.
+ */
+ bool useAsyncSend;
+
+ /**
+ * Outgoing Message Queue
+ */
+ util::Queue< std::pair<cms::Message*, ActiveMQProducer*> > msgQueue;
+
public:
ActiveMQSession( connector::SessionInfo* sessionInfo,
@@ -70,7 +90,7 @@
ActiveMQConnection* connection );
virtual ~ActiveMQSession();
-
+
public: // Implements Mehtods
/**
@@ -294,8 +314,31 @@
virtual connector::SessionInfo* getSessionInfo() {
return sessionInfo;
}
-
- };
+
+ protected:
+
+ /**
+ * Run method that is called from the Thread class when this object
+ * is registered with a Thread and started. This function reads from
+ * the outgoing message queue and dispatches calls to the connector that
+ * is registered with this class.
+ */
+ virtual void run();
+
+ /**
+ * Starts the message processing thread to receive messages
+ * asynchronously. This thread is started when setMessageListener
+ * is invoked, which means that the caller is choosing to use this
+ * consumer asynchronously instead of synchronously (receive).
+ */
+ void startThread() throw (exceptions::ActiveMQException);
+
+ /**
+ * Stops the asynchronous message processing thread if it's started.
+ */
+ void stopThread() throw (exceptions::ActiveMQException);
+
+ };
}}
Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 30 16:42:10 2007
@@ -0,0 +1 @@
+Makefile.in
Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 30 16:42:10 2007
@@ -0,0 +1 @@
+Makefile.in
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?view=diff&rev=501659&r1=501658&r2=501659
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am \
(original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am \
Tue Jan 30 16:42:10 2007 @@ -21,6 +21,7 @@
integration/durable/DurableTester.cpp \
integration/expiration/ExpirationTest.cpp \
integration/simple/SimpleTester.cpp \
+ integration/producer/AsyncSender.cpp \
integration/transactional/TransactionTester.cpp \
integration/various/SimpleRollbackTest.cpp \
main.cpp
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/test-integration/integration/common/AbstractTester.cpp?view=diff&rev=501659&r1=501658&r2=501659
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.cpp \
(original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.cpp \
Tue Jan 30 16:42:10 2007 @@ -46,9 +46,28 @@
: connectionFactory( NULL ),
connection( NULL )
{
+ this->ackMode = ackMode;
+}
+
+AbstractTester::~AbstractTester()
+{
+ try
+ {
+ session->close();
+ connection->close();
+
+ delete session;
+ delete connection;
+ delete connectionFactory;
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW( )
+}
+
+void AbstractTester::initialize(){
try
{
- string url = IntegrationCommon::defaultURL;
+ string url = getBrokerURL();
numReceived = 0;
// Now create the connection
@@ -66,21 +85,6 @@
AMQ_CATCHALL_THROW( ActiveMQException )
}
-AbstractTester::~AbstractTester()
-{
- try
- {
- session->close();
- connection->close();
-
- delete session;
- delete connection;
- delete connectionFactory;
- }
- AMQ_CATCH_NOTHROW( ActiveMQException )
- AMQ_CATCHALL_NOTHROW( )
-}
-
cms::Connection* AbstractTester::createDetachedConnection(
const std::string& username,
const std::string& password,
@@ -88,7 +92,7 @@
try
{
- string url = IntegrationCommon::defaultURL;
+ string url = getBrokerURL();
if( connectionFactory == NULL ) {
// Create a Factory
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/test-integration/integration/common/AbstractTester.h?view=diff&rev=501659&r1=501658&r2=501659
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.h \
(original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.h \
Tue Jan 30 16:42:10 2007 @@ -26,6 +26,7 @@
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/MessageProducer.h>
+#include <integration/common/IntegrationCommon.h>
namespace integration{
namespace common{
@@ -38,7 +39,12 @@
cms::Session::AUTO_ACKNOWLEDGE );
virtual ~AbstractTester();
- virtual void doSleep(void);
+ virtual void initialize();
+ virtual void doSleep();
+
+ virtual std::string getBrokerURL() const {
+ return IntegrationCommon::defaultURL;
+ }
virtual unsigned int produceTextMessages(
cms::MessageProducer& producer,
@@ -65,6 +71,7 @@
unsigned int numReceived;
activemq::concurrent::Mutex mutex;
+ cms::Session::AcknowledgeMode ackMode;
};
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/durable/DurableTester.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/test-integration/integration/durable/DurableTester.cpp?view=diff&rev=501659&r1=501658&r2=501659
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/durable/DurableTester.cpp \
(original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/durable/DurableTester.cpp \
Tue Jan 30 16:42:10 2007 @@ -74,7 +74,9 @@
using namespace integration::common;
DurableTester::DurableTester() : AbstractTester()
-{}
+{
+ this->initialize();
+}
DurableTester::~DurableTester()
{}
Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.cpp?view=auto&rev=501659
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.cpp \
(added)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.cpp \
Tue Jan 30 16:42:10 2007 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AsyncSender.h"
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace integration;
+using namespace integration::common;
+using namespace integration::producer;
+
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::producer::AsyncSender );
+
+////////////////////////////////////////////////////////////////////////////////
+AsyncSender::AsyncSender() : AbstractTester()
+{
+ this->initialize();
+ numReceived = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AsyncSender::~AsyncSender()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSender::test()
+{
+ try
+ {
+ if( IntegrationCommon::debug ) {
+ cout << "Starting activemqcms test (sending "
+ << IntegrationCommon::defaultMsgCount
+ << " messages per type and sleeping "
+ << IntegrationCommon::defaultDelay
+ << " milli-seconds) ...\n"
+ << endl;
+ }
+
+ // Create CMS Object for Comms
+ cms::Topic* topic = session->createTopic("mytopic");
+ cms::MessageConsumer* consumer =
+ session->createConsumer( topic );
+ consumer->setMessageListener( this );
+ cms::MessageProducer* producer =
+ session->createProducer( topic );
+
+ // Send some text messages
+ this->produceTextMessages(
+ *producer, IntegrationCommon::defaultMsgCount );
+
+ // Send some bytes messages.
+ this->produceTextMessages(
+ *producer, IntegrationCommon::defaultMsgCount );
+
+ // Wait for the messages to get here
+ waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
+
+ if( IntegrationCommon::debug ) {
+ printf("received: %d\n", numReceived );
+ }
+ CPPUNIT_ASSERT(
+ numReceived == IntegrationCommon::defaultMsgCount * 2 );
+
+ if( IntegrationCommon::debug ) {
+ printf("Shutting Down\n" );
+ }
+ delete producer;
+ delete consumer;
+ delete topic;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.h?view=auto&rev=501659
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.h \
(added)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.h \
Tue Jan 30 16:42:10 2007 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_PRODUCER_ASYNCSENDER_H_
+#define _INTEGRATION_PRODUCER_ASYNCSENDER_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <integration/common/AbstractTester.h>
+#include <integration/common/IntegrationCommon.h>
+
+namespace integration{
+namespace producer{
+
+ class AsyncSender : public CppUnit::TestFixture,
+ public common::AbstractTester {
+
+ CPPUNIT_TEST_SUITE( AsyncSender );
+ CPPUNIT_TEST( test );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ AsyncSender();
+ virtual ~AsyncSender();
+
+ virtual std::string getBrokerURL() const {
+ return common::IntegrationCommon::defaultURL + "?useAsyncSend=true";
+ }
+
+ virtual void test();
+
+ };
+
+}}
+
+#endif /*_INTEGRATION_PRODUCER_ASYNCSENDER_H_*/
Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/simple/SimpleTester.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/test-integration/integration/simple/SimpleTester.cpp?view=diff&rev=501659&r1=501658&r2=501659
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/simple/SimpleTester.cpp \
(original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/simple/SimpleTester.cpp \
Tue Jan 30 16:42:10 2007 @@ -75,6 +75,7 @@
SimpleTester::SimpleTester() : AbstractTester()
{
+ this->initialize();
numReceived = 0;
}
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/transactional/TransactionTester.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/test-integration/integration/transactional/TransactionTester.cpp?view=diff&rev=501659&r1=501658&r2=501659
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/transactional/TransactionTester.cpp \
(original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/transactional/TransactionTester.cpp \
Tue Jan 30 16:42:10 2007 @@ -74,7 +74,9 @@
using namespace integration::common;
TransactionTester::TransactionTester() : AbstractTester( \
cms::Session::SESSION_TRANSACTED )
-{}
+{
+ this->initialize();
+}
TransactionTester::~TransactionTester()
{}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic