[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r360167 - in /incubator/activemq/trunk/activemq-core/src:
From: chirino () apache ! org
Date: 2005-12-30 21:14:15
Message-ID: 20051230211417.74245.qmail () minotaur ! apache ! org
[Download RAW message or body]
Author: chirino
Date: Fri Dec 30 13:14:03 2005
New Revision: 360167
URL: http://svn.apache.org/viewcvs?rev=360167&view=rev
Log:
Fixed the QueueSubscriptionTest. When multiple producers would pound all at once at \
a destionation. Some messages would not get dispatched. Due to timing issue with \
the consumer list held by the destination. Added some more synchronization blocks and \
now everything is happy again.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/AbstractRegion.java?rev=360167&r1=360166&r2=360167&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java \
Fri Dec 30 13:14:03 2005 @@ -63,16 +63,16 @@
synchronized(destinationsMutex){
destinations.put(destination,dest);
destinationMap.put(destination,dest);
- }
-
- // Add all consumers that are interested in the destination.
- for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
- Subscription sub = (Subscription) iter.next();
- if( sub.matches(destination) ) {
- dest.addSubscription(context, sub);
+
+ // Add all consumers that are interested in the destination.
+ for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) \
{ + Subscription sub = (Subscription) iter.next();
+ if( sub.matches(destination) ) {
+ dest.addSubscription(context, sub);
+ }
}
+ return dest;
}
- return dest;
}
public void removeDestination(ConnectionContext context, ActiveMQDestination \
destination, long timeout)
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/Queue.java?rev=360167&r1=360166&r2=360167&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java \
Fri Dec 30 13:14:03 2005 @@ -20,12 +20,13 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
@@ -55,7 +56,7 @@
private final Log log;
protected final ActiveMQDestination destination;
- protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
+ protected final List consumers = new CopyOnWriteArrayList();
protected final LinkedList messages = new LinkedList();
protected final Valve dispatchValve = new Valve(true);
protected final UsageManager usageManager;
@@ -123,7 +124,9 @@
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try {
- consumers.add(sub);
+ synchronized (consumers) {
+ consumers.add(sub);
+ }
highestSubscriptionPriority = calcHighestSubscriptionPriority();
msgContext.setDestination(destination);
@@ -167,7 +170,9 @@
dispatchValve.turnOff();
try {
- consumers.remove(sub);
+ synchronized (consumers) {
+ consumers.remove(sub);
+ }
sub.remove(context, this);
highestSubscriptionPriority = calcHighestSubscriptionPriority();
@@ -350,6 +355,7 @@
}
private void dispatch(ConnectionContext context, MessageReference node, Message \
message) throws Throwable { +
dispatchValve.increment();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try {
@@ -358,8 +364,12 @@
messages.add(node);
}
- if (consumers.isEmpty())
- return;
+ synchronized(consumers) {
+ if (consumers.isEmpty()) {
+ log.debug("No subscriptions registered, will not dispatch \
message at this time."); + return;
+ }
+ }
msgContext.setDestination(destination);
msgContext.setMessageReference(node);
@@ -374,10 +384,12 @@
private int calcHighestSubscriptionPriority() {
int rc = Integer.MIN_VALUE;
- for (Iterator iter = consumers.iterator(); iter.hasNext();) {
- Subscription sub = (Subscription) iter.next();
- if (sub.getConsumerInfo().getPriority() > rc) {
- rc = sub.getConsumerInfo().getPriority();
+ synchronized (consumers) {
+ for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+ Subscription sub = (Subscription) iter.next();
+ if (sub.getConsumerInfo().getPriority() > rc) {
+ rc = sub.getConsumerInfo().getPriority();
+ }
}
}
return rc;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/QueueSubscription.java?rev=360167&r1=360166&r2=360167&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java \
Fri Dec 30 13:14:03 2005 @@ -37,6 +37,7 @@
public void add(MessageReference node) throws Throwable {
super.add(node);
}
+
/**
* In the queue case, mark the node as dropped and then a gc cycle will remove \
it from
* the queue.
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/Topic.java?rev=360167&r1=360166&r2=360167&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java \
Fri Dec 30 13:14:03 2005 @@ -96,7 +96,9 @@
if (sub.getConsumerInfo().isRetroactive()) {
subscriptionRecoveryPolicy.recover(context, this, sub);
}
- consumers.add(sub);
+ synchronized(consumers) {
+ consumers.add(sub);
+ }
}
}
@@ -108,8 +110,11 @@
dispatchValve.turnOff();
try {
- if (initialActivation)
- consumers.add(sub);
+ if (initialActivation) {
+ synchronized(consumers) {
+ consumers.add(sub);
+ }
+ }
if (store != null) {
String clientId = sub.getClientId();
@@ -166,7 +171,9 @@
public void removeSubscription(ConnectionContext context, Subscription sub) \
throws Throwable { destinationStatistics.getConsumers().decrement();
- consumers.remove(sub);
+ synchronized(consumers) {
+ consumers.remove(sub);
+ }
sub.remove(context, this);
}
@@ -302,9 +309,11 @@
if (!subscriptionRecoveryPolicy.add(context, message)) {
return;
}
- if (consumers.isEmpty()) {
- onMessageWithNoConsumers(context, message);
- return;
+ synchronized(consumers) {
+ if (consumers.isEmpty()) {
+ onMessageWithNoConsumers(context, message);
+ return;
+ }
}
msgContext.setDestination(destination);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/policy/DispatchPolicy.java?rev=360167&r1=360166&r2=360167&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java \
Fri Dec 30 13:14:03 2005 @@ -16,7 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import java.util.List;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
@@ -44,6 +44,6 @@
*
* @return true if at least one consumer was dispatched or false if there are no \
active subscriptions that could be dispatched
*/
- boolean dispatch(ConnectionContext newParam, MessageReference node, \
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws \
Throwable; + boolean dispatch(ConnectionContext newParam, MessageReference node, \
MessageEvaluationContext msgContext, List consumers) throws Throwable;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java?rev=360167&r1=360166&r2=360167&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java \
Fri Dec 30 13:14:03 2005 @@ -16,7 +16,6 @@
*/
package org.apache.activemq.broker.region.policy;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
@@ -24,6 +23,7 @@
import org.apache.activemq.filter.MessageEvaluationContext;
import java.util.Iterator;
+import java.util.List;
/**
* Simple dispatch policy that sends a message to every subscription that
@@ -35,14 +35,12 @@
*/
public class RoundRobinDispatchPolicy implements DispatchPolicy {
- private final Object mutex = new Object();
-
- public boolean dispatch(ConnectionContext newParam, MessageReference node, \
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable \
{ + public boolean dispatch(ConnectionContext newParam, MessageReference node, \
MessageEvaluationContext msgContext, List consumers) throws Throwable {
// Big synch here so that only 1 message gets dispatched at a time. Ensures \
// Everyone sees the same order and that the consumer list is not used while
// it's being rotated.
- synchronized(mutex) {
+ synchronized(consumers) {
int count = 0;
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java?rev=360167&r1=360166&r2=360167&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java \
Fri Dec 30 13:14:03 2005 @@ -16,7 +16,6 @@
*/
package org.apache.activemq.broker.region.policy;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
@@ -24,6 +23,7 @@
import org.apache.activemq.filter.MessageEvaluationContext;
import java.util.Iterator;
+import java.util.List;
/**
* Simple dispatch policy that sends a message to every subscription that
@@ -35,7 +35,7 @@
*/
public class SimpleDispatchPolicy implements DispatchPolicy {
- public boolean dispatch(ConnectionContext context, MessageReference node, \
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable \
{ + public boolean dispatch(ConnectionContext context, MessageReference node, \
MessageEvaluationContext msgContext, List consumers) throws Throwable { int count = \
0; for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java?rev=360167&r1=360166&r2=360167&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java \
Fri Dec 30 13:14:03 2005 @@ -16,7 +16,6 @@
*/
package org.apache.activemq.broker.region.policy;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
@@ -24,6 +23,7 @@
import org.apache.activemq.filter.MessageEvaluationContext;
import java.util.Iterator;
+import java.util.List;
/**
* Dispatch policy that causes every subscription to see messages in the same order.
@@ -33,15 +33,12 @@
* @version $Revision$
*/
public class StrictOrderDispatchPolicy implements DispatchPolicy {
- int i=0;
- private final Object mutex = new Object();
- public boolean dispatch(ConnectionContext newParam, MessageReference node, \
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable \
{ + public boolean dispatch(ConnectionContext newParam, MessageReference node, \
MessageEvaluationContext msgContext, List consumers) throws Throwable \
{
// Big synch here so that only 1 message gets dispatched at a time. Ensures \
// Everyone sees the same order.
- synchronized(mutex) {
+ synchronized(consumers) {
int count = 0;
- i++;
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next();
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/jav \
a/org/apache/activemq/JmsMultipleClientsTestSupport.java?rev=360167&r1=360166&r2=360167&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java \
Fri Dec 30 13:14:03 2005 @@ -196,7 +196,7 @@
}
protected BrokerService createBroker() throws Exception {
- return BrokerFactory.createBroker(new \
URI("broker://()/localhost?persistent=false")); + return \
BrokerFactory.createBroker(new \
URI("broker://()/localhost?persistent=false&useJmx=true")); }
protected void setUp() throws Exception {
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/jav \
a/org/apache/activemq/broker/QueueSubscriptionTest.java?rev=360167&r1=360166&r2=360167&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java \
Fri Dec 30 13:14:03 2005 @@ -30,6 +30,18 @@
topic = false;
}
+ public void testManyProducersOneConsumer() throws Exception {
+ consumerCount = 1;
+ producerCount = 10;
+ messageCount = 100;
+ messageSize = 1; // 1 byte
+ prefetchCount = 10;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * producerCount);
+ }
+
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws \
Exception { consumerCount = 2;
producerCount = 1;
@@ -102,18 +114,6 @@
assertTotalMessagesReceived(messageCount * producerCount);
}
- public void testManyProducersOneConsumer() throws Exception {
- consumerCount = 1;
- producerCount = 50;
- messageCount = 100;
- messageSize = 1; // 1 byte
- prefetchCount = 10;
-
- doMultipleClientsTest();
-
- assertTotalMessagesReceived(messageCount * producerCount);
- }
-
public void testManyProducersManyConsumers() throws Exception {
consumerCount = 50;
producerCount = 50;
@@ -137,7 +137,7 @@
startConsumers(consumerFactory, dest);
// Wait for consumers to setup
- Thread.sleep(1000);
+// Thread.sleep(1000);
startProducers(dest, messageCount);
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic