[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: [activemq-artemis] branch main updated: ARTEMIS-2007 - refactor to make use of existing refCountForC
From: gtully () apache ! org
Date: 2021-09-24 14:07:05
Message-ID: 163249242573.20558.3428134946971644541 () gitbox ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 72cfda6 ARTEMIS-2007 - refactor to make use of existing \
refCountForConsumers for tracking consumer count and remove need for volatile \
redistributor 72cfda6 is described below
commit 72cfda6b1a76ac356f84e308c3b59d0ff29de7ce
Author: gtully <gary.tully@gmail.com>
AuthorDate: Mon Sep 13 11:48:55 2021 +0100
ARTEMIS-2007 - refactor to make use of existing refCountForConsumers for tracking \
consumer count and remove need for volatile redistributor
---
.../utils/collections/ResettableIterator.java | 3 +-
.../apache/activemq/artemis/core/server/Queue.java | 4 +-
.../core/server/impl/ActiveMQServerImpl.java | 6 --
.../core/server/impl/PostOfficeJournalLoader.java | 2 -
.../artemis/core/server/impl/QueueImpl.java | 72 ++++++----------------
.../core/server/impl/QueueConsumersImplTest.java | 28 +++++++++
.../artemis/core/server/impl/QueueImplTest.java | 8 ++-
.../server/impl/ScheduledDeliveryHandlerTest.java | 5 --
.../tests/integration/client/HangConsumerTest.java | 2 +-
.../tests/unit/core/postoffice/impl/FakeQueue.java | 5 --
.../tests/unit/core/server/impl/QueueImplTest.java | 23 ++++++-
11 files changed, 78 insertions(+), 80 deletions(-)
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java \
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java
index f807eb1..9091a4e 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java
@@ -21,7 +21,8 @@ import java.util.Iterator;
public interface ResettableIterator<E> extends Iterator<E> {
/**
- * Resets the iterator so you can re-iterate over all elements.
+ * Resets the iterator so that you can iterate over all elements from your \
current position. + * Your current position (when reached again) signals the end \
of iteration as if the collection is circular.
*/
void reset();
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java \
index aa43170..8bf2b31 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -178,14 +178,12 @@ public interface Queue extends Bindable,CriticalComponent {
}
/**
- * This will set a reference counter for every consumer present on the queue.
+ * This will hold a reference counter for every consumer present on the queue.
* The ReferenceCounter will know what to do when the counter became zeroed.
* This is used to control what to do with temporary queues, especially
* on shared subscriptions where the queue needs to be deleted when all the
* consumers are closed.
*/
- void setConsumersRefCount(ReferenceCounter referenceCounter);
-
ReferenceCounter getConsumersRefCount();
/* Called when a message is cancelled back into the queue */
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index a66186d..8039a58 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -3805,12 +3805,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final Queue queue = queueFactory.createQueueWith(queueConfiguration, \
pagingManager);
- if (queueConfiguration.isTransient()) {
- queue.setConsumersRefCount(new TransientQueueManagerImpl(this, \
queue.getName()));
- } else {
- queue.setConsumersRefCount(new QueueManagerImpl(this, queue.getName()));
- }
-
final QueueBinding localQueueBinding = new \
LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
long txID = 0;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 7701bbb..f679188 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -51,7 +51,6 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
-import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
@@ -162,7 +161,6 @@ public class PostOfficeJournalLoader implements JournalLoader {
\
.setRingSize(queueBindingInfo.getRingSize()), pagingManager);
- queue.setConsumersRefCount(new \
QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), \
queueBindingInfo.getQueueName()));
if (queueBindingInfo.getQueueStatusEncodings() != null) {
for (QueueStatusEncoding encoding : \
queueBindingInfo.getQueueStatusEncodings()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 6909cf6..c0ed1bd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -179,7 +179,7 @@ public class QueueImpl extends CriticalComponentImpl implements \
Queue {
protected final PageSubscription pageSubscription;
- private ReferenceCounter refCountForConsumers;
+ private final ReferenceCounter refCountForConsumers;
private final PageIterator pageIterator;
@@ -218,17 +218,17 @@ public class QueueImpl extends CriticalComponentImpl implements \
Queue {
protected final ScheduledDeliveryHandler scheduledDeliveryHandler;
- private AtomicLong messagesAdded = new AtomicLong(0);
+ private final AtomicLong messagesAdded = new AtomicLong(0);
- private AtomicLong messagesAcknowledged = new AtomicLong(0);
+ private final AtomicLong messagesAcknowledged = new AtomicLong(0);
- private AtomicLong ackAttempts = new AtomicLong(0);
+ private final AtomicLong ackAttempts = new AtomicLong(0);
- private AtomicLong messagesExpired = new AtomicLong(0);
+ private final AtomicLong messagesExpired = new AtomicLong(0);
- private AtomicLong messagesKilled = new AtomicLong(0);
+ private final AtomicLong messagesKilled = new AtomicLong(0);
- private AtomicLong messagesReplaced = new AtomicLong(0);
+ private final AtomicLong messagesReplaced = new AtomicLong(0);
private boolean paused;
@@ -261,8 +261,8 @@ public class QueueImpl extends CriticalComponentImpl implements \
Queue {
private final SimpleString address;
- // redistributor goes in the consumers list, this signals its presence and allows \
for easy comparison/check
- private volatile ConsumerHolder<Redistributor> redistributor;
+ // redistributor singleton goes in the consumers list
+ private ConsumerHolder<Redistributor> redistributor;
private ScheduledFuture<?> redistributorFuture;
@@ -634,6 +634,7 @@ public class QueueImpl extends CriticalComponentImpl implements \
Queue { this.id = queueConfiguration.getId();
this.address = queueConfiguration.getAddress();
+ this.refCountForConsumers = queueConfiguration.isTransient() ? new \
TransientQueueManagerImpl(server, queueConfiguration.getName()) : new \
QueueManagerImpl(server, queueConfiguration.getName());
this.addressInfo = postOffice == null ? null : \
postOffice.getAddressInfo(address);
@@ -862,13 +863,6 @@ public class QueueImpl extends CriticalComponentImpl implements \
Queue {
// Queue implementation \
----------------------------------------------------------------------------------------
@Override
- public synchronized void setConsumersRefCount(final ReferenceCounter \
referenceCounter) {
- if (refCountForConsumers == null) {
- this.refCountForConsumers = referenceCounter;
- }
- }
-
- @Override
public ReferenceCounter getConsumersRefCount() {
return refCountForConsumers;
}
@@ -1442,13 +1436,8 @@ public class QueueImpl extends CriticalComponentImpl \
implements Queue { if (delayBeforeDispatch >= 0) {
dispatchStartTimeUpdater.compareAndSet(this,-1, \
delayBeforeDispatch + System.currentTimeMillis()); }
-
- }
-
- if (refCountForConsumers != null) {
refCountForConsumers.increment();
}
-
}
}
}
@@ -1485,7 +1474,7 @@ public class QueueImpl extends CriticalComponentImpl implements \
Queue {
if (consumerRemoved) {
consumerRemovedTimestampUpdater.set(this, \
System.currentTimeMillis());
- if (getConsumerCount() == 0) {
+ if (refCountForConsumers.decrement() == 0) {
stopDispatch();
}
}
@@ -1496,11 +1485,6 @@ public class QueueImpl extends CriticalComponentImpl \
implements Queue {
groups.removeIf(consumer::equals);
-
- if (refCountForConsumers != null) {
- refCountForConsumers.decrement();
- }
-
}
}
}
@@ -1557,7 +1541,7 @@ public class QueueImpl extends CriticalComponentImpl implements \
Queue { @Override
public synchronized void cancelRedistributor() {
clearRedistributorFuture();
-
+ hasUnMatchedPending = false;
if (redistributor != null) {
try {
redistributor.consumer.stop();
@@ -1572,18 +1556,7 @@ public class QueueImpl extends CriticalComponentImpl \
implements Queue {
@Override
public int getConsumerCount() {
- // we don't want to count the redistributor, it is an internal transient entry \
in the consumer list
- if (redistributor != null) {
- synchronized (this) {
- final int size = consumers.size();
- if (size > 0 && redistributor != null) {
- return size - 1;
- } else {
- return size;
- }
- }
- }
- return consumers.size();
+ return refCountForConsumers.getCount();
}
@Override
@@ -3014,7 +2987,7 @@ public class QueueImpl extends CriticalComponentImpl implements \
Queue { synchronized (this) {
// Need to do these checks inside the synchronized
- if (isPaused() || !canDispatch() && redistributor == null) {
+ if (isPaused() || !canDispatch()) {
return false;
}
@@ -3082,9 +3055,7 @@ public class QueueImpl extends CriticalComponentImpl implements \
Queue { numNoMatch = 0;
numAttempts = 0;
- if (consumer != redistributor) {
- ref = handleMessageGroup(ref, consumer, groupConsumer, \
groupID);
- }
+ ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);
deliveriesInTransit.countUp();
@@ -3118,7 +3089,7 @@ public class QueueImpl extends CriticalComponentImpl implements \
Queue { consumers.reset();
numNoMatch++;
// every attempt resulted in noMatch for number of consumers means \
we tried all consumers for a single message
- if (numNoMatch == numAttempts && numAttempts == consumers.size()) \
{ + if (numNoMatch == numAttempts && numAttempts == consumers.size() \
&& redistributor == null) { hasUnMatchedPending = true;
// one hit of unmatched message is enough, no need to reset \
counters }
@@ -3753,7 +3724,7 @@ public class QueueImpl extends CriticalComponentImpl implements \
Queue { if (!supportsDirectDeliver) {
return false;
}
- if (isPaused() || !canDispatch() && redistributor == null) {
+ if (isPaused() || !canDispatch()) {
return false;
}
@@ -3777,12 +3748,7 @@ public class QueueImpl extends CriticalComponentImpl \
implements Queue {
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
- final MessageReference reference;
- if (consumer != redistributor) {
- reference = handleMessageGroup(ref, consumer, groupConsumer, \
groupID);
- } else {
- reference = ref;
- }
+ final MessageReference reference = handleMessageGroup(ref, consumer, \
groupConsumer, groupID);
incrementMesssagesAdded();
@@ -3793,7 +3759,7 @@ public class QueueImpl extends CriticalComponentImpl implements \
Queue { return true;
}
- if (redistributor != null || groupConsumer != null) {
+ if (groupConsumer != null) {
break;
}
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java \
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java
index 1055af7..e599b88 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java
@@ -39,6 +39,9 @@ public class QueueConsumersImplTest {
assertFalse(queueConsumers.hasNext());
queueConsumers.add(testPriority);
+ // not visible till reset
+ assertFalse(queueConsumers.hasNext());
+
queueConsumers.reset();
assertTrue(queueConsumers.hasNext());
@@ -109,6 +112,31 @@ public class QueueConsumersImplTest {
}
+ @Test
+ public void roundRobinEqualPriorityResetTest() {
+ queueConsumers.add(new TestPriority("A", 0));
+ queueConsumers.add(new TestPriority("B", 0));
+ queueConsumers.add(new TestPriority("C", 0));
+ queueConsumers.reset();
+ assertTrue(queueConsumers.hasNext());
+
+ assertEquals("A", queueConsumers.next().getName());
+
+ //Reset iterator should mark start as current position
+ queueConsumers.reset();
+ assertTrue(queueConsumers.hasNext());
+ assertEquals("B", queueConsumers.next().getName());
+
+ assertTrue(queueConsumers.hasNext());
+ assertEquals("C", queueConsumers.next().getName());
+
+ //Expect another A as after reset, we started at B so after A, we then expect \
the next level + assertTrue(queueConsumers.hasNext());
+ assertEquals("A", queueConsumers.next().getName());
+
+ //We have iterated all.
+ assertFalse(queueConsumers.hasNext());
+ }
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueImplTest.java \
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueImplTest.java
index b39c79a..cac1039 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueImplTest.java
@@ -35,6 +35,7 @@ import \
org.apache.activemq.artemis.core.persistence.StorageManager; import \
org.apache.activemq.artemis.core.postoffice.PostOffice; import \
org.apache.activemq.artemis.core.server.ActiveMQServer; import \
org.apache.activemq.artemis.core.server.QueueFactory; +import \
org.apache.activemq.artemis.utils.ExecutorFactory; import \
org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.junit.Assert;
import org.junit.Test;
@@ -50,6 +51,8 @@ public class QueueImplTest {
PageSubscription pageSubscription = Mockito.mock(PageSubscription.class);
ExecutorService executorService = Executors.newSingleThreadExecutor();
StorageManager storageManager = Mockito.mock(StorageManager.class);
+ ActiveMQServer server = Mockito.mock(ActiveMQServer.class);
+ ExecutorFactory executorFactory = Mockito.mock(ExecutorFactory.class);
final int flushLimit = 100;
final int pagedReferences = 5 * flushLimit;
@@ -76,10 +79,13 @@ public class QueueImplTest {
return null;
}).when(storageManager).afterCompleteOperations(Mockito.any(IOCallback.class));
+ // Mock server
+ Mockito.doReturn(executorFactory).when(server).getExecutorFactory();
+
QueueImpl queue = new QueueImpl(0, address, address, null, null, \
pageSubscription, null, false,
false, false, \
Mockito.mock(ScheduledExecutorService.class),
Mockito.mock(PostOffice.class), \
storageManager, null,
- Mockito.mock(ArtemisExecutor.class), \
Mockito.mock(ActiveMQServer.class), + \
Mockito.mock(ArtemisExecutor.class), server, Mockito.mock(QueueFactory.class));
Mockito.doReturn(queue).when(pageSubscription).getQueue();
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java \
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index a9e506f..90b14e7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1130,11 +1130,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public void setConsumersRefCount(ReferenceCounter referenceCounter) {
-
- }
-
- @Override
public ReferenceCounter getConsumersRefCount() {
return null;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java \
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index c3fca77..794761c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -351,7 +351,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
// Forcing a situation where the server would unexpectedly create a duplicated \
queue. The server should still start normally LocalQueueBinding newBinding = new \
LocalQueueBinding(QUEUE,
new QueueImpl(queueID, \
QUEUE, QUEUE, null, null, true, false,
- false, \
null, null, null, null, null, null, null), + \
false, null, null, null, null, null, server, null),
server.getNodeID());
server.getStorageManager().addQueueBinding(txID, newBinding);
server.getStorageManager().commitBindings(txID);
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java \
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index fb2fdc5..adb79f8 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -268,11 +268,6 @@ public class FakeQueue extends CriticalComponentImpl implements \
Queue { }
@Override
- public void setConsumersRefCount(ReferenceCounter referenceCounter) {
-
- }
-
- @Override
public void setInternalQueue(boolean internalQueue) {
// no-op
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java \
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 0493c8b..ae04068 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -71,12 +71,16 @@ public class QueueImplTest extends ActiveMQTestBase {
private ExecutorService executor;
+ private ActiveMQServer defaultServer;
+
@Override
@Before
public void setUp() throws Exception {
super.setUp();
scheduledExecutor = \
Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory());
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
+ defaultServer = createServer(createDefaultConfig(1, false));
+ defaultServer.start();
}
@Override
@@ -1016,6 +1020,19 @@ public class QueueImplTest extends ActiveMQTestBase {
Assert.assertEquals(0, consumer2.getReferences().size());
Assert.assertEquals(0, consumer3.getReferences().size());
+ // verify redistributor not yet needed, only consumer3 gets to
+ // peek at pending
+ // should not attempt to add (and throw) due to unmatched not being set
+ queue.addRedistributor(0);
+
+ // on new message dispatch, need for redistributor will kick in
+ MessageReference ref = generateReference(queue, numMessages);
+ ref.getMessage().putStringProperty("color", "red");
+ refs.add(ref);
+
+ queue.addTail(ref);
+ queue.deliverNow();
+
// verify redistributor is doing some work....
try {
// should attempt to add due to unmatched
@@ -1024,7 +1041,7 @@ public class QueueImplTest extends ActiveMQTestBase {
} catch (NullPointerException expected) {
}
- Assert.assertEquals(numMessages, getMessageCount(queue));
+ Assert.assertEquals(numMessages + 1, getMessageCount(queue));
}
@Test
@@ -1450,7 +1467,7 @@ public class QueueImplTest extends ActiveMQTestBase {
final QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), \
QueueImplTest.queue1,
null, null, false, true, false,
scheduledExecutor, null, null, null,
- ArtemisExecutor.delegate(executor), \
null, null); + \
ArtemisExecutor.delegate(executor), defaultServer, null); \
queue.addConsumer(groupConsumer); queue.addConsumer(noConsumer);
final MessageReference firstMessageReference = generateReference(queue, 1);
@@ -1490,6 +1507,6 @@ public class QueueImplTest extends ActiveMQTestBase {
private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, \
Filter filter) {
return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, \
temporary, false, scheduledExecutor,
- new FakePostOffice(), null, null, \
ArtemisExecutor.delegate(executor), null, null); + new \
FakePostOffice(), null, null, ArtemisExecutor.delegate(executor), defaultServer, \
null); }
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic