[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r501659 - in
From:       tabish () apache ! org
Date:       2007-01-31 0:42:12
Message-ID: 20070131004213.79F1E1A981A () eris ! apache ! org
[Download RAW message or body]

Author: tabish
Date: Tue Jan 30 16:42:10 2007
New Revision: 501659

URL: http://svn.apache.org/viewvc?view=rev&rev=501659
Log:
http://issues.apache.org/activemq/browse/AMQCPP-63

Cleanup of some of the code to remove extra shutdown flags in ActiveMQConsumer.
Added async send as a URL property of ActiveMQSession, caller must append \
useAsyncSend=true to the Broker URL to get this to work.

Added:
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/
  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.cpp \
(with props)  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.h \
(with props) Modified:
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/   (props \
changed)  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/   (props \
changed)  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/   (props changed)
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/   (props \
changed)  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.cpp
  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.h
  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/durable/DurableTester.cpp
  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/simple/SimpleTester.cpp
  incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/transactional/TransactionTester.cpp


Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 30 16:42:10 2007
@@ -0,0 +1 @@
+Makefile.in

Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 30 16:42:10 2007
@@ -0,0 +1 @@
+Makefile.in

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
                
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/main/activemq/core/ActiveMQConsumer.cpp?view=diff&rev=501659&r1=501658&r2=501659 \
                ==============================================================================
                
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp \
                (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp \
Tue Jan 30 16:42:10 2007 @@ -45,7 +45,6 @@
     this->consumerInfo   = consumerInfo;
     this->listenerThread = NULL;
     this->listener       = NULL;
-    this->shutdown       = false;
     this->closed         = false;
 }
 
@@ -153,7 +152,7 @@
         {
             // Check for empty in case of spurious wakeup, or race to
             // queue lock.
-            while( !shutdown && !closed && msgQueue.empty() )
+            while( !closed && msgQueue.empty() )
             {
                 msgQueue.wait();
             }
@@ -161,7 +160,7 @@
             // This will only happen when this object is being
             // closed in another thread context - kind of
             // scary.
-            if( shutdown || closed ){
+            if( closed ){
                 throw ActiveMQException( __FILE__, __LINE__,
                     "Consumer is being closed in another thread" );
             }
@@ -202,7 +201,7 @@
         synchronized( &msgQueue )
         {
             // Check for empty, and wait if its not
-            if( !shutdown && !closed && msgQueue.empty() ){
+            if( !closed && msgQueue.empty() ){
                 
                 msgQueue.wait(millisecs);
 
@@ -215,7 +214,7 @@
             // This will only happen when this object is being
             // closed in another thread context - kind of
             // scary.
-            if( shutdown || closed ){
+            if( closed ){
                 throw ActiveMQException( __FILE__, __LINE__,
                     "Consumer is being closed in another thread" );
             }
@@ -246,7 +245,7 @@
 {
     try
     {
-        if( shutdown || closed )
+        if( closed )
         {
             throw InvalidStateException(
                 __FILE__, __LINE__,
@@ -332,7 +331,7 @@
 {
     try
     {
-        while( !shutdown )
+        while( !closed )
         {
             Message* message = NULL;
 
@@ -345,7 +344,7 @@
                 // registered, and then we will deliver the backlog
                 while( msgQueue.empty() || listener == NULL )
                 {
-                    if( shutdown )
+                    if( closed )
                     {
                         break;
                     }
@@ -353,7 +352,7 @@
                 }
                 
                 // don't want to process messages if we are shutting down.
-                if( shutdown )
+                if( closed )
                 {
                     return;
                 }
@@ -426,7 +425,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::purgeMessages() throw (ActiveMQException)
+void ActiveMQConsumer::purgeMessages() throw ( ActiveMQException )
 {
     try
     {
@@ -465,7 +464,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::notifyListener( Message* message ) throw (ActiveMQException){
+void ActiveMQConsumer::notifyListener( Message* message ) throw ( ActiveMQException \
){  
     try
     {
@@ -484,7 +483,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::destroyMessage( Message* message ) throw (ActiveMQException){
+void ActiveMQConsumer::destroyMessage( Message* message ) throw ( ActiveMQException \
){  
     try
     {
@@ -502,7 +501,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::startThread() throw (ActiveMQException) {
+void ActiveMQConsumer::startThread() throw ( ActiveMQException ) {
     
     try
     {
@@ -524,12 +523,10 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::stopThread() throw (ActiveMQException) {
+void ActiveMQConsumer::stopThread() throw ( ActiveMQException ) {
     
     try
     {
-        shutdown = true;
-        
         // if the thread is running signal it to quit and then
         // wait for run to return so thread can die
         if( listenerThread != NULL )

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
                
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/main/activemq/core/ActiveMQConsumer.h?view=diff&rev=501659&r1=501658&r2=501659 \
                ==============================================================================
                
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h \
                (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h \
Tue Jan 30 16:42:10 2007 @@ -57,15 +57,11 @@
         concurrent::Mutex listenerLock;
         
         // Message Queue
-        util::Queue< cms::Message* > msgQueue;
+        util::Queue<cms::Message*> msgQueue;
         
         // Thread to notif a listener if one is added
         concurrent::Thread* listenerThread;
         
-        // Boolean to indicate that the listener Thread is shutting
-        // down and the run method should return.
-        bool shutdown;
-        
         // Boolean that indicates if the consumer has been closed
         bool closed;
         
@@ -134,7 +130,7 @@
          * @throws cms::CMSException
          */
         virtual std::string getMessageSelector() const 
-          throw ( cms::CMSException );
+            throw ( cms::CMSException );
           
         /**
          * Method called to acknowledge the message passed

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
                
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=501659&r1=501658&r2=501659 \
                ==============================================================================
                
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp \
                (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp \
Tue Jan 30 16:42:10 2007 @@ -24,6 +24,7 @@
 #include <activemq/core/ActiveMQConsumer.h>
 #include <activemq/core/ActiveMQMessage.h>
 #include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/Boolean.h>
 
 #include <activemq/connector/TransactionInfo.h>
 
@@ -34,6 +35,7 @@
 using namespace activemq::util;
 using namespace activemq::connector;
 using namespace activemq::exceptions;
+using namespace activemq::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
@@ -47,10 +49,19 @@
             "ActiveMQSession::ActiveMQSession - Init with NULL data");
     }
 
-    this->sessionInfo = sessionInfo;
-    this->transaction = NULL;
-    this->connection  = connection;
-    this->closed      = false;
+    this->sessionInfo  = sessionInfo;
+    this->transaction  = NULL;
+    this->connection   = connection;
+    this->closed       = false;
+    this->asyncThread  = NULL;
+    this->useAsyncSend = Boolean::parseBoolean( 
+        properties.getProperty( "useAsyncSend", "false" ) );
+
+    // If we are in Async Send Mode we need to start the Thread
+    // otherwise we don't need to do anything.
+    if( this->useAsyncSend == true ) {
+        this->startThread();
+    }
 
     // Create a Transaction object only if the session is transactional
     if( isTransacted() )
@@ -110,6 +121,9 @@
         
         // Now indicate that this session is closed.
         closed = true;
+        
+        // Stop the Async Thread if its running
+        stopThread();
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW( )
@@ -193,6 +207,7 @@
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
+
 ////////////////////////////////////////////////////////////////////////////////
 cms::MessageConsumer* ActiveMQSession::createConsumer(
     const cms::Destination* destination,
@@ -555,9 +570,19 @@
                 "ActiveMQSession::onProducerClose - Session Already Closed" );
         }
 
-        // Send via the connection
-        connection->getConnectionData()->
-            getConnector()->send( message, producer->getProducerInfo() );
+        if( useAsyncSend == true ) {
+
+            // Put it in the send queue, thread will dispatch it.
+            synchronized( &msgQueue ) {
+                msgQueue.push( make_pair( message, producer ) );
+                msgQueue.notifyAll();
+            }
+
+        } else {
+            // Send via the connection syncrhronously.
+            connection->getConnectionData()->
+                getConnector()->send( message, producer->getProducerInfo() );
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
@@ -619,4 +644,101 @@
     }
 
     return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::run()
+{
+    try{
+        
+        while( !closed )
+        {    
+            std::pair<Message*, ActiveMQProducer*> messagePair;
+
+            synchronized( &msgQueue )
+            {
+                // Gaurd against spurious wakeup or race to sync lock
+                // also if the listner has been unregistered we don't
+                // have anyone to notify, so we wait till a new one is
+                // registered, and then we will deliver the backlog
+                while( msgQueue.empty() )
+                {
+                    if( closed )
+                    {
+                        break;
+                    }
+                    msgQueue.wait();
+                }
+                
+                // don't want to process messages if we are shutting down.
+                if( closed )
+                {
+                    return;
+                }
+                
+                // get the data
+                messagePair = msgQueue.pop();
+            }
+
+            // Dispatch the message
+            connection->getConnectionData()->
+                getConnector()->send( 
+                    messagePair.first, 
+                    messagePair.second->getProducerInfo() );
+
+        }
+    }
+    catch(...)
+    {
+        cms::ExceptionListener* listener = this->getExceptionListener();
+        
+        if( listener != NULL )
+        {
+            listener->onException( ActiveMQException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::run - "
+                "Connector threw an unknown Exception, recovering..." ) );
+        }
+    }        
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::startThread() throw ( ActiveMQException ) {
+    
+    try
+    {
+        // Start the thread, if it's not already started.
+        if( asyncThread == NULL )
+        {
+            asyncThread = new Thread( this );        
+            asyncThread->start();                        
+        }
+    }        
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::stopThread() throw ( ActiveMQException ) {
+    
+    try
+    {
+        // if the thread is running signal it to quit and then
+        // wait for run to return so thread can die
+        if( asyncThread != NULL )
+        {                        
+            synchronized( &msgQueue )
+            {
+                // Force a wakeup if run is in a wait.
+                msgQueue.notifyAll();
+            }
+
+            // Wait for it to die and then delete it.
+            asyncThread->join();
+            delete asyncThread;
+            asyncThread = NULL;
+        }
+    }        
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
 }

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
                
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=501659&r1=501658&r2=501659
 ==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h \
                (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h \
Tue Jan 30 16:42:10 2007 @@ -19,9 +19,12 @@
 
 #include <cms/Session.h>
 #include <cms/ExceptionListener.h>
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/Mutex.h>
 #include <activemq/connector/SessionInfo.h>
 #include <activemq/core/ActiveMQSessionResource.h>
 #include <activemq/util/Set.h>
+#include <activemq/util/Queue.h>
 #include <set>
 
 namespace activemq{
@@ -34,35 +37,52 @@
     class ActiveMQProducer;
     class ActiveMQConsumer;
    
-    class ActiveMQSession : public cms::Session
+    class ActiveMQSession : 
+        public cms::Session,
+        public concurrent::Runnable
     {
     private:
-   
+
         /**
          * SessionInfo for this Session
          */
         connector::SessionInfo* sessionInfo;
-      
+
         /**
          * Transaction Management object
          */
         ActiveMQTransaction* transaction;
-      
+
         /**
          * Connection
          */
         ActiveMQConnection* connection;
-      
+
         /**
          * Bool to indicate if this session was closed.
          */
         bool closed;
-        
+
         /**
          * The set of closable session resources (consumers and producers).
          */
         util::Set<cms::Closeable*> closableSessionResources;
-      
+
+        /**
+         *  Thread to notif a listener if one is added
+         */
+        concurrent::Thread* asyncThread;
+
+        /**
+         * Is this Session using Async Sends.
+         */
+        bool useAsyncSend;
+
+        /**
+         * Outgoing Message Queue
+         */
+        util::Queue< std::pair<cms::Message*, ActiveMQProducer*> > msgQueue;        
+
     public:
    
         ActiveMQSession( connector::SessionInfo* sessionInfo,
@@ -70,7 +90,7 @@
                          ActiveMQConnection* connection );
    
         virtual ~ActiveMQSession();
-   
+
     public:   // Implements Mehtods
    
         /**
@@ -294,8 +314,31 @@
         virtual connector::SessionInfo* getSessionInfo() {
             return sessionInfo;
         }
-      
-   };
+
+    protected:
+
+        /**
+         * Run method that is called from the Thread class when this object
+         * is registered with a Thread and started.  This function reads from
+         * the outgoing message queue and dispatches calls to the connector that
+         * is registered with this class.
+         */            
+        virtual void run();
+        
+        /**
+         * Starts the message processing thread to receive messages
+         * asynchronously.  This thread is started when setMessageListener
+         * is invoked, which means that the caller is choosing to use this
+         * consumer asynchronously instead of synchronously (receive).
+         */
+        void startThread() throw (exceptions::ActiveMQException);
+        
+        /**
+         * Stops the asynchronous message processing thread if it's started.
+         */
+        void stopThread() throw (exceptions::ActiveMQException);
+
+    };
 
 }}
 

Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 30 16:42:10 2007
@@ -0,0 +1 @@
+Makefile.in

Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 30 16:42:10 2007
@@ -0,0 +1 @@
+Makefile.in

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
                
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?view=diff&rev=501659&r1=501658&r2=501659
 ==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am \
                (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am \
Tue Jan 30 16:42:10 2007 @@ -21,6 +21,7 @@
   integration/durable/DurableTester.cpp \
   integration/expiration/ExpirationTest.cpp \
   integration/simple/SimpleTester.cpp \
+  integration/producer/AsyncSender.cpp \
   integration/transactional/TransactionTester.cpp \
   integration/various/SimpleRollbackTest.cpp \
   main.cpp

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.cpp
                
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/test-integration/integration/common/AbstractTester.cpp?view=diff&rev=501659&r1=501658&r2=501659
 ==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.cpp \
                (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.cpp \
Tue Jan 30 16:42:10 2007 @@ -46,9 +46,28 @@
  : connectionFactory( NULL ),
    connection( NULL )
 {
+    this->ackMode = ackMode;
+}
+
+AbstractTester::~AbstractTester()
+{
+    try
+    {
+        session->close();
+        connection->close();
+
+        delete session;
+        delete connection;
+        delete connectionFactory;
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+void AbstractTester::initialize(){
     try
     {
-        string url = IntegrationCommon::defaultURL;
+        string url = getBrokerURL();
         numReceived = 0;
     
         // Now create the connection
@@ -66,21 +85,6 @@
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
-AbstractTester::~AbstractTester()
-{
-    try
-    {
-        session->close();
-        connection->close();
-
-        delete session;
-        delete connection;
-        delete connectionFactory;
-    }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
-}
-
 cms::Connection* AbstractTester::createDetachedConnection(
     const std::string& username,
     const std::string& password,
@@ -88,7 +92,7 @@
 
     try
     {
-        string url = IntegrationCommon::defaultURL;
+        string url = getBrokerURL();
     
         if( connectionFactory == NULL ) {
             // Create a Factory

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.h
                
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/test-integration/integration/common/AbstractTester.h?view=diff&rev=501659&r1=501658&r2=501659
 ==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.h \
                (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/common/AbstractTester.h \
Tue Jan 30 16:42:10 2007 @@ -26,6 +26,7 @@
 #include <cms/Connection.h>
 #include <cms/Session.h>
 #include <cms/MessageProducer.h>
+#include <integration/common/IntegrationCommon.h>
 
 namespace integration{
 namespace common{
@@ -38,7 +39,12 @@
                             cms::Session::AUTO_ACKNOWLEDGE );
     	virtual ~AbstractTester();
     
-        virtual void doSleep(void);
+        virtual void initialize();
+        virtual void doSleep();
+        
+        virtual std::string getBrokerURL() const {
+            return IntegrationCommon::defaultURL;
+        }
 
         virtual unsigned int produceTextMessages( 
             cms::MessageProducer& producer,
@@ -65,6 +71,7 @@
 
         unsigned int numReceived;
         activemq::concurrent::Mutex mutex;
+        cms::Session::AcknowledgeMode ackMode;
 
     };
 

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/durable/DurableTester.cpp
                
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/test-integration/integration/durable/DurableTester.cpp?view=diff&rev=501659&r1=501658&r2=501659
 ==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/durable/DurableTester.cpp \
                (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/durable/DurableTester.cpp \
Tue Jan 30 16:42:10 2007 @@ -74,7 +74,9 @@
 using namespace integration::common;
 
 DurableTester::DurableTester() : AbstractTester()
-{}
+{
+    this->initialize();
+}
 
 DurableTester::~DurableTester()
 {}

Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.cpp
                
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.cpp?view=auto&rev=501659
 ==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.cpp \
                (added)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.cpp \
Tue Jan 30 16:42:10 2007 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "AsyncSender.h"
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace integration;
+using namespace integration::common;
+using namespace integration::producer;
+
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::producer::AsyncSender );
+
+////////////////////////////////////////////////////////////////////////////////
+AsyncSender::AsyncSender() : AbstractTester()
+{
+    this->initialize();
+    numReceived = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AsyncSender::~AsyncSender()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSender::test()
+{
+    try
+    {
+        if( IntegrationCommon::debug ) {
+            cout << "Starting activemqcms test (sending "
+                 << IntegrationCommon::defaultMsgCount
+                 << " messages per type and sleeping "
+                 << IntegrationCommon::defaultDelay 
+                 << " milli-seconds) ...\n"
+                 << endl;
+        }
+        
+        // Create CMS Object for Comms
+        cms::Topic* topic = session->createTopic("mytopic");
+        cms::MessageConsumer* consumer = 
+            session->createConsumer( topic );            
+        consumer->setMessageListener( this );
+        cms::MessageProducer* producer = 
+            session->createProducer( topic );
+
+        // Send some text messages
+        this->produceTextMessages( 
+            *producer, IntegrationCommon::defaultMsgCount );
+        
+        // Send some bytes messages.
+        this->produceTextMessages( 
+            *producer, IntegrationCommon::defaultMsgCount );
+
+        // Wait for the messages to get here
+        waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
+        
+        if( IntegrationCommon::debug ) {
+            printf("received: %d\n", numReceived );
+        }
+        CPPUNIT_ASSERT( 
+            numReceived == IntegrationCommon::defaultMsgCount * 2 );
+
+        if( IntegrationCommon::debug ) {
+            printf("Shutting Down\n" );
+        }
+        delete producer;                      
+        delete consumer;
+        delete topic;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.cpp
                
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.h
                
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.h?view=auto&rev=501659
 ==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.h \
                (added)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.h \
Tue Jan 30 16:42:10 2007 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef _INTEGRATION_PRODUCER_ASYNCSENDER_H_
+#define _INTEGRATION_PRODUCER_ASYNCSENDER_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <integration/common/AbstractTester.h>
+#include <integration/common/IntegrationCommon.h>
+
+namespace integration{
+namespace producer{
+
+    class AsyncSender : public CppUnit::TestFixture,
+                        public common::AbstractTester {
+        
+        CPPUNIT_TEST_SUITE( AsyncSender );
+        CPPUNIT_TEST( test );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+    
+        AsyncSender();
+        virtual ~AsyncSender();
+        
+        virtual std::string getBrokerURL() const {
+            return common::IntegrationCommon::defaultURL + "?useAsyncSend=true";
+        }
+
+        virtual void test();
+
+    };
+
+}}
+
+#endif /*_INTEGRATION_PRODUCER_ASYNCSENDER_H_*/

Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/producer/AsyncSender.h
                
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/simple/SimpleTester.cpp
                
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/test-integration/integration/simple/SimpleTester.cpp?view=diff&rev=501659&r1=501658&r2=501659
 ==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/simple/SimpleTester.cpp \
                (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/simple/SimpleTester.cpp \
Tue Jan 30 16:42:10 2007 @@ -75,6 +75,7 @@
 
 SimpleTester::SimpleTester() : AbstractTester()
 {
+    this->initialize();
     numReceived = 0;
 }
 

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/transactional/TransactionTester.cpp
                
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/s \
rc/test-integration/integration/transactional/TransactionTester.cpp?view=diff&rev=501659&r1=501658&r2=501659
 ==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/transactional/TransactionTester.cpp \
                (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/transactional/TransactionTester.cpp \
Tue Jan 30 16:42:10 2007 @@ -74,7 +74,9 @@
 using namespace integration::common;
 
 TransactionTester::TransactionTester() : AbstractTester( \
                cms::Session::SESSION_TRANSACTED )
-{}
+{
+    this->initialize();
+}
 
 TransactionTester::~TransactionTester()
 {}


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic