[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r1461355 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/ t
From: tabish () apache ! org
Date: 2013-03-26 22:36:23
Message-ID: 20130326223623.C4C9423888FD () eris ! apache ! org
[Download RAW message or body]
Author: tabish
Date: Tue Mar 26 22:36:22 2013
New Revision: 1461355
URL: http://svn.apache.org/r1461355
Log:
https://issues.apache.org/jira/browse/AMQCPP-367
Adds ConnectionAudit for use in dup detection and some more tests.
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp \
(with props) activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h \
(with props) activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp \
(with props) activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h \
(with props) Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Tue Mar 26 22:36:22 \
2013 @@ -106,6 +106,7 @@ cc_sources = \
activemq/core/ActiveMQXAConnectionFactory.cpp \
activemq/core/ActiveMQXASession.cpp \
activemq/core/AdvisoryConsumer.cpp \
+ activemq/core/ConnectionAudit.cpp \
activemq/core/DispatchData.cpp \
activemq/core/Dispatcher.cpp \
activemq/core/FifoMessageDispatchChannel.cpp \
@@ -743,6 +744,7 @@ h_sources = \
activemq/core/ActiveMQXAConnectionFactory.h \
activemq/core/ActiveMQXASession.h \
activemq/core/AdvisoryConsumer.h \
+ activemq/core/ConnectionAudit.h \
activemq/core/DispatchData.h \
activemq/core/Dispatcher.h \
activemq/core/FifoMessageDispatchChannel.h \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp \
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp \
Tue Mar 26 22:36:22 2013 @@ -23,6 +23,7 @@
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/core/ActiveMQConnectionMetaData.h>
#include <activemq/core/AdvisoryConsumer.h>
+#include <activemq/core/ConnectionAudit.h>
#include <activemq/core/kernels/ActiveMQSessionKernel.h>
#include <activemq/core/kernels/ActiveMQProducerKernel.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
@@ -195,6 +196,8 @@ namespace core{
TempDestinationMap activeTempDestinations;
+ ConnectionAudit connectionAudit;
+
ConnectionConfig(const Pointer<transport::Transport> transport,
const Pointer<decaf::util::Properties> properties) :
properties(properties),
@@ -454,6 +457,8 @@ ActiveMQConnection::ActiveMQConnection(c
configuration->connectionInfo->setManageable(true);
configuration->connectionInfo->setFaultTolerant(transport->isFaultTolerant());
+ configuration->connectionAudit.setCheckForDuplicates(transport->isFaultTolerant());
+
this->config = configuration.release();
}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp?rev=1461355&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp \
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp \
Tue Mar 26 22:36:22 2013 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConnectionAudit.h"
+
+#include <decaf/util/LinkedHashMap.h>
+
+#include <activemq/core/ActiveMQMessageAudit.h>
+#include <activemq/commands/ActiveMQDestination.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace core {
+
+ class ConnectionAuditImpl {
+ private:
+
+ ConnectionAuditImpl(const ConnectionAuditImpl&);
+ ConnectionAuditImpl& operator= (const ConnectionAuditImpl&);
+
+ public:
+
+ Mutex mutex;
+ LinkedHashMap<Pointer<ActiveMQDestination>, Pointer<ActiveMQMessageAudit> > \
destinations; + LinkedHashMap<Pointer<Dispatcher>, \
Pointer<ActiveMQMessageAudit> > dispatchers; +
+ ConnectionAuditImpl() : mutex(), destinations(1000), dispatchers(1000) {
+ }
+ };
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAudit::ConnectionAudit() : impl(new ConnectionAuditImpl),
+ checkForDuplicates(true),
+ \
auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE), + \
auditMaximumProducerNumber(ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT) { +}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAudit::ConnectionAudit(int auditDepth, int maxProducers) :
+ impl(new ConnectionAuditImpl),
+ checkForDuplicates(true),
+ auditDepth(auditDepth),
+ auditMaximumProducerNumber(maxProducers) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAudit::~ConnectionAudit() {
+ try {
+ delete this->impl;
+ }
+ AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAudit::removeDispatcher(Pointer<Dispatcher> dispatcher) {
+ synchronized(&this->impl->mutex) {
+ this->impl->dispatchers.remove(dispatcher);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ConnectionAudit::isDuplicate(Pointer<Dispatcher> dispatcher, \
Pointer<commands::Message> message) { +
+ if (checkForDuplicates && message != NULL) {
+ Pointer<ActiveMQDestination> destination = message->getDestination();
+ if (destination != NULL) {
+ if (destination->isQueue()) {
+ Pointer<ActiveMQMessageAudit> audit;
+ try {
+ audit = this->impl->destinations.get(destination);
+ } catch (NoSuchElementException& ex) {
+ audit.reset(new ActiveMQMessageAudit(auditDepth, \
auditMaximumProducerNumber)); + \
this->impl->destinations.put(destination, audit); + }
+ bool result = audit->isDuplicate(message->getMessageId());
+ return result;
+ }
+ Pointer<ActiveMQMessageAudit> audit;
+ try {
+ audit = this->impl->dispatchers.get(dispatcher);
+ } catch (NoSuchElementException& ex) {
+ audit.reset(new ActiveMQMessageAudit(auditDepth, \
auditMaximumProducerNumber)); + \
this->impl->dispatchers.put(dispatcher, audit); + }
+ bool result = audit->isDuplicate(message->getMessageId());
+ return result;
+ }
+ }
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAudit::rollbackDuplicate(Pointer<Dispatcher> dispatcher, \
Pointer<commands::Message> message) { + if (checkForDuplicates && message != NULL) \
{ + Pointer<ActiveMQDestination> destination = message->getDestination();
+ if (destination != NULL) {
+ if (destination->isQueue()) {
+ try {
+ Pointer<ActiveMQMessageAudit> audit = \
this->impl->destinations.get(destination); + \
audit->rollback(message->getMessageId()); + } catch \
(NoSuchElementException& ex) {} + } else {
+ try {
+ Pointer<ActiveMQMessageAudit> audit = \
this->impl->dispatchers.get(dispatcher); + \
audit->rollback(message->getMessageId()); + } catch \
(NoSuchElementException& ex) {} + }
+ }
+ }
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h?rev=1461355&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h \
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h \
Tue Mar 26 22:36:22 2013 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_CONNECTIONAUDIT_H_
+#define _ACTIVEMQ_CORE_CONNECTIONAUDIT_H_
+
+#include <activemq/util/Config.h>
+
+#include <activemq/commands/Message.h>
+#include <activemq/core/Dispatcher.h>
+
+namespace activemq {
+namespace core {
+
+ class ConnectionAuditImpl;
+
+ /**
+ * Provides the Auditing functionality used by Connections to attempt to
+ * filter out duplicate Messages.
+ *
+ * @since 3.7.0
+ */
+ class AMQCPP_API ConnectionAudit {
+ private:
+
+ ConnectionAudit(const ConnectionAudit&);
+ ConnectionAudit& operator= (const ConnectionAudit&);
+
+ private:
+
+ ConnectionAuditImpl* impl;
+
+ bool checkForDuplicates;
+ int auditDepth;
+ int auditMaximumProducerNumber;
+
+ public:
+
+ ConnectionAudit();
+
+ ConnectionAudit(int auditDepth, int maxProducers);
+
+ ~ConnectionAudit();
+
+ public:
+
+ void removeDispatcher(Pointer<Dispatcher> dispatcher);
+
+ bool isDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message> \
message); +
+ void rollbackDuplicate(Pointer<Dispatcher> dispatcher, \
Pointer<commands::Message> message); +
+ public:
+
+ bool isCheckForDuplicates() const {
+ return this->checkForDuplicates;
+ }
+
+ void setCheckForDuplicates(bool checkForDuplicates) {
+ this->checkForDuplicates = checkForDuplicates;
+ }
+
+ int getAuditDepth() {
+ return auditDepth;
+ }
+
+ void setAuditDepth(int auditDepth) {
+ this->auditDepth = auditDepth;
+ }
+
+ int getAuditMaximumProducerNumber() {
+ return auditMaximumProducerNumber;
+ }
+
+ void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
+ this->auditMaximumProducerNumber = auditMaximumProducerNumber;
+ }
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_CONNECTIONAUDIT_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Tue Mar 26 22:36:22 \
2013 @@ -38,6 +38,7 @@ cc_sources = \
activemq/core/ActiveMQConnectionTest.cpp \
activemq/core/ActiveMQMessageAuditTest.cpp \
activemq/core/ActiveMQSessionTest.cpp \
+ activemq/core/ConnectionAuditTest.cpp \
activemq/core/FifoMessageDispatchChannelTest.cpp \
activemq/core/SimplePriorityMessageDispatchChannelTest.cpp \
activemq/exceptions/ActiveMQExceptionTest.cpp \
@@ -286,6 +287,7 @@ h_sources = \
activemq/core/ActiveMQConnectionTest.h \
activemq/core/ActiveMQMessageAuditTest.h \
activemq/core/ActiveMQSessionTest.h \
+ activemq/core/ConnectionAuditTest.h \
activemq/core/FifoMessageDispatchChannelTest.h \
activemq/core/SimplePriorityMessageDispatchChannelTest.h \
activemq/exceptions/ActiveMQExceptionTest.h \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/ac \
tivemq/core/ActiveMQMessageAuditTest.cpp?rev=1461355&r1=1461354&r2=1461355&view=diff \
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp \
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp \
Tue Mar 26 22:36:22 2013 @@ -88,6 +88,58 @@ void ActiveMQMessageAuditTest::testIsDup
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testRollbackString() {
+
+ int count = 10000;
+ ActiveMQMessageAudit audit;
+ IdGenerator idGen;
+
+ ArrayList<std::string> list;
+ for (int i = 0; i < count; i++) {
+ std::string id = idGen.generateId();
+ list.add(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(id));
+ }
+
+ int index = list.size() -1 -audit.getAuditDepth();
+ for (; index < list.size(); index++) {
+ std::string id = list.get(index);
+ CPPUNIT_ASSERT_MESSAGE("duplicate, id:" + id, audit.isDuplicate(id));
+ audit.rollback(id);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id, \
!audit.isDuplicate(id)); + }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testRollbackMessageId() {
+
+ int count = 10000;
+ ActiveMQMessageAudit audit;
+ ArrayList<Pointer<MessageId> > list;
+
+ Pointer<ProducerId> pid(new ProducerId);
+ pid->setConnectionId("test");
+ pid->setSessionId(0);
+ pid->setValue(1);
+
+ for (int i = 0; i < count; i++) {
+ Pointer<MessageId> id(new MessageId);
+ id->setProducerId(pid);
+ id->setProducerSequenceId(i);
+ list.add(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(id));
+ }
+
+ int index = list.size() -1 -audit.getAuditDepth();
+ for (; index < list.size(); index++) {
+ Pointer<MessageId> id = list.get(index);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(), \
audit.isDuplicate(id)); + audit.rollback(id);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(), \
!audit.isDuplicate(id)); + }
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQMessageAuditTest::testIsInOrderString() {
int count = 10000;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/ac \
tivemq/core/ActiveMQMessageAuditTest.h?rev=1461355&r1=1461354&r2=1461355&view=diff \
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h \
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h \
Tue Mar 26 22:36:22 2013 @@ -31,6 +31,8 @@ namespace core {
CPPUNIT_TEST( testIsDuplicateMessageId );
CPPUNIT_TEST( testIsInOrderString );
CPPUNIT_TEST( testIsInOrderMessageId );
+ CPPUNIT_TEST( testRollbackString );
+ CPPUNIT_TEST( testRollbackMessageId );
CPPUNIT_TEST( testGetLastSeqId );
CPPUNIT_TEST_SUITE_END();
@@ -43,6 +45,8 @@ namespace core {
void testIsDuplicateMessageId();
void testIsInOrderString();
void testIsInOrderMessageId();
+ void testRollbackString();
+ void testRollbackMessageId();
void testGetLastSeqId();
};
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp?rev=1461355&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp \
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp \
Tue Mar 26 22:36:22 2013 @@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConnectionAuditTest.h"
+
+#include <activemq/core/ConnectionAudit.h>
+#include <activemq/core/ActiveMQMessageAudit.h>
+#include <activemq/util/IdGenerator.h>
+#include <activemq/commands/Message.h>
+#include <activemq/commands/ActiveMQDestination.h>
+#include <activemq/commands/ActiveMQQueue.h>
+
+#include <decaf/util/ArrayList.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::util;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ class MyDispatcher : public Dispatcher {
+ public:
+
+ virtual ~MyDispatcher() {}
+
+ virtual void dispatch(const Pointer<commands::MessageDispatch>& message) {
+
+ }
+
+ virtual int getHashCode() const {
+ return 1;
+ }
+
+ };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAuditTest::ConnectionAuditTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAuditTest::~ConnectionAuditTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAuditTest::testConstructor1() {
+
+ ConnectionAudit audit;
+ CPPUNIT_ASSERT(audit.isCheckForDuplicates());
+ CPPUNIT_ASSERT(audit.getAuditDepth() == \
ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE); + \
CPPUNIT_ASSERT(audit.getAuditMaximumProducerNumber() == \
ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT); +}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAuditTest::testConstructor2() {
+
+ ConnectionAudit audit(100, 200);
+ CPPUNIT_ASSERT(audit.isCheckForDuplicates());
+ CPPUNIT_ASSERT(audit.getAuditDepth() == 100);
+ CPPUNIT_ASSERT(audit.getAuditMaximumProducerNumber() == 200);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAuditTest::testIsDuplicate() {
+
+ int count = 10000;
+ ConnectionAudit audit;
+ ArrayList<Pointer<MessageId> > list;
+ Pointer<MyDispatcher> dispatcher(new MyDispatcher);
+
+ Pointer<ProducerId> pid(new ProducerId);
+ pid->setConnectionId("test");
+ pid->setSessionId(0);
+ pid->setValue(1);
+
+ Pointer<ActiveMQDestination> destination(new ActiveMQQueue("TEST.QUEUE"));
+ Pointer<Message> message(new Message());
+ message->setDestination(destination);
+
+ for (int i = 0; i < count; i++) {
+ Pointer<MessageId> id(new MessageId);
+ id->setProducerId(pid);
+ id->setProducerSequenceId(i);
+ list.add(id);
+
+ message->setMessageId(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher, message));
+ }
+
+ int index = list.size() -1 -audit.getAuditDepth();
+ for (; index < list.size(); index++) {
+ Pointer<MessageId> id = list.get(index);
+ message->setMessageId(id);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(),
+ audit.isDuplicate(dispatcher, message));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAuditTest::testRollbackDuplicate() {
+
+ int count = 10000;
+ ConnectionAudit audit;
+ ArrayList<Pointer<MessageId> > list;
+ Pointer<MyDispatcher> dispatcher(new MyDispatcher);
+
+ Pointer<ProducerId> pid(new ProducerId);
+ pid->setConnectionId("test");
+ pid->setSessionId(0);
+ pid->setValue(1);
+
+ Pointer<ActiveMQDestination> destination(new ActiveMQQueue("TEST.QUEUE"));
+ Pointer<Message> message(new Message());
+ message->setDestination(destination);
+
+ for (int i = 0; i < count; i++) {
+ Pointer<MessageId> id(new MessageId);
+ id->setProducerId(pid);
+ id->setProducerSequenceId(i);
+ list.add(id);
+
+ message->setMessageId(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher, message));
+ }
+
+ int index = list.size() -1 -audit.getAuditDepth();
+ for (; index < list.size(); index++) {
+ Pointer<MessageId> id = list.get(index);
+ message->setMessageId(id);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(),
+ audit.isDuplicate(dispatcher, message));
+ audit.rollbackDuplicate(dispatcher, message);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(),
+ !audit.isDuplicate(dispatcher, message));
+ }
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h?rev=1461355&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h \
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h \
Tue Mar 26 22:36:22 2013 @@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_CONNECTIONAUDITTEST_H_
+#define _ACTIVEMQ_CORE_CONNECTIONAUDITTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace activemq {
+namespace core {
+
+ class ConnectionAuditTest : public CppUnit::TestFixture {
+
+ CPPUNIT_TEST_SUITE( ConnectionAuditTest );
+ CPPUNIT_TEST( testConstructor1 );
+ CPPUNIT_TEST( testConstructor2 );
+ CPPUNIT_TEST( testIsDuplicate );
+ CPPUNIT_TEST( testRollbackDuplicate );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ ConnectionAuditTest();
+ virtual ~ConnectionAuditTest();
+
+ void testConstructor1();
+ void testConstructor2();
+ void testIsDuplicate();
+ void testRollbackDuplicate();
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_CONNECTIONAUDITTEST_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Tue Mar 26 \
22:36:22 2013 @@ -90,6 +90,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
CPPUNIT_TEST_SUITE_REGISTRATION( \
activemq::core::SimplePriorityMessageDispatchChannelTest ); #include \
<activemq/core/ActiveMQMessageAuditTest.h> CPPUNIT_TEST_SUITE_REGISTRATION( \
activemq::core::ActiveMQMessageAuditTest ); +#include \
<activemq/core/ConnectionAuditTest.h> +CPPUNIT_TEST_SUITE_REGISTRATION( \
activemq::core::ConnectionAuditTest );
#include <activemq/state/ConnectionStateTrackerTest.h>
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::state::ConnectionStateTrackerTest );
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic