[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r360167 - in /incubator/activemq/trunk/activemq-core/src:
From:       chirino () apache ! org
Date:       2005-12-30 21:14:15
Message-ID: 20051230211417.74245.qmail () minotaur ! apache ! org
[Download RAW message or body]

Author: chirino
Date: Fri Dec 30 13:14:03 2005
New Revision: 360167

URL: http://svn.apache.org/viewcvs?rev=360167&view=rev
Log:
Fixed the QueueSubscriptionTest.  When multiple producers would pound all at once at \
a destionation.  Some messages would not get dispatched.  Due to timing issue with \
the consumer list held by the destination. Added some more synchronization blocks and \
now everything is happy again.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
  incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
  incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
  incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
  incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java
  incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
  incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java
  incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java
  incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
  incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java


Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
                
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/AbstractRegion.java?rev=360167&r1=360166&r2=360167&view=diff
 ==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java \
                (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java \
Fri Dec 30 13:14:03 2005 @@ -63,16 +63,16 @@
         synchronized(destinationsMutex){
             destinations.put(destination,dest);
             destinationMap.put(destination,dest);
-        }
-
-        // Add all consumers that are interested in the destination. 
-        for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
-            Subscription sub = (Subscription) iter.next();
-            if( sub.matches(destination) ) {
-                dest.addSubscription(context, sub);
+            
+            // Add all consumers that are interested in the destination. 
+            for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) \
{ +                Subscription sub = (Subscription) iter.next();
+                if( sub.matches(destination) ) {
+                    dest.addSubscription(context, sub);
+                }
             }
+            return dest;
         }
-        return dest;
     }
 
     public void removeDestination(ConnectionContext context, ActiveMQDestination \
destination, long timeout)

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
                
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/Queue.java?rev=360167&r1=360166&r2=360167&view=diff
 ==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java \
                (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java \
Fri Dec 30 13:14:03 2005 @@ -20,12 +20,13 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.Message;
@@ -55,7 +56,7 @@
     private final Log log;
 
     protected final ActiveMQDestination destination;
-    protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
+    protected final List consumers = new CopyOnWriteArrayList();
     protected final LinkedList messages = new LinkedList();
     protected final Valve dispatchValve = new Valve(true);
     protected final UsageManager usageManager;
@@ -123,7 +124,9 @@
 
         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
         try {
-            consumers.add(sub);
+            synchronized (consumers) {
+                consumers.add(sub);
+            }
 
             highestSubscriptionPriority = calcHighestSubscriptionPriority();
             msgContext.setDestination(destination);
@@ -167,7 +170,9 @@
         dispatchValve.turnOff();
         try {
 
-            consumers.remove(sub);
+            synchronized (consumers) {
+                consumers.remove(sub);
+            }
             sub.remove(context, this);
 
             highestSubscriptionPriority = calcHighestSubscriptionPriority();
@@ -350,6 +355,7 @@
     }
 
     private void dispatch(ConnectionContext context, MessageReference node, Message \
message) throws Throwable { +
         dispatchValve.increment();
         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
         try {
@@ -358,8 +364,12 @@
                 messages.add(node);
             }
 
-            if (consumers.isEmpty())
-                return;
+            synchronized(consumers) {
+                if (consumers.isEmpty()) {
+                    log.debug("No subscriptions registered, will not dispatch \
message at this time."); +                    return;
+                }
+            }
 
             msgContext.setDestination(destination);
             msgContext.setMessageReference(node);
@@ -374,10 +384,12 @@
 
     private int calcHighestSubscriptionPriority() {
         int rc = Integer.MIN_VALUE;
-        for (Iterator iter = consumers.iterator(); iter.hasNext();) {
-            Subscription sub = (Subscription) iter.next();
-            if (sub.getConsumerInfo().getPriority() > rc) {
-                rc = sub.getConsumerInfo().getPriority();
+        synchronized (consumers) {
+            for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+                Subscription sub = (Subscription) iter.next();
+                if (sub.getConsumerInfo().getPriority() > rc) {
+                    rc = sub.getConsumerInfo().getPriority();
+                }
             }
         }
         return rc;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
                
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/QueueSubscription.java?rev=360167&r1=360166&r2=360167&view=diff
 ==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java \
                (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java \
Fri Dec 30 13:14:03 2005 @@ -37,6 +37,7 @@
     public void add(MessageReference node) throws Throwable {
         super.add(node);
     }
+    
     /**
      * In the queue case, mark the node as dropped and then a gc cycle will remove \
                it from 
      * the queue.

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
                
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/Topic.java?rev=360167&r1=360166&r2=360167&view=diff
 ==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java \
                (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java \
Fri Dec 30 13:14:03 2005 @@ -96,7 +96,9 @@
             if (sub.getConsumerInfo().isRetroactive()) {
                 subscriptionRecoveryPolicy.recover(context, this, sub);
             }
-            consumers.add(sub);
+            synchronized(consumers) {
+                consumers.add(sub);
+            }
         }
     }
 
@@ -108,8 +110,11 @@
         dispatchValve.turnOff();
         try {
 
-            if (initialActivation)
-                consumers.add(sub);
+            if (initialActivation) {
+                synchronized(consumers) {
+                    consumers.add(sub);
+                }
+            }
 
             if (store != null) {
                 String clientId = sub.getClientId();
@@ -166,7 +171,9 @@
 
     public void removeSubscription(ConnectionContext context, Subscription sub) \
throws Throwable {  destinationStatistics.getConsumers().decrement();
-        consumers.remove(sub);
+        synchronized(consumers) {
+            consumers.remove(sub);
+        }
         sub.remove(context, this);
     }
 
@@ -302,9 +309,11 @@
             if (!subscriptionRecoveryPolicy.add(context, message)) {
                 return;
             }
-            if (consumers.isEmpty()) {
-                onMessageWithNoConsumers(context, message);
-                return;
+            synchronized(consumers) {
+                if (consumers.isEmpty()) {
+                    onMessageWithNoConsumers(context, message);
+                    return;
+                }
             }
 
             msgContext.setDestination(destination);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java
                
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/policy/DispatchPolicy.java?rev=360167&r1=360166&r2=360167&view=diff
 ==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java \
                (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java \
Fri Dec 30 13:14:03 2005 @@ -16,7 +16,7 @@
  */
 package org.apache.activemq.broker.region.policy;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import java.util.List;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
@@ -44,6 +44,6 @@
      * 
      * @return true if at least one consumer was dispatched or false if there are no \
                active subscriptions that could be dispatched
      */
-    boolean dispatch(ConnectionContext newParam, MessageReference node, \
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws \
Throwable; +    boolean dispatch(ConnectionContext newParam, MessageReference node, \
MessageEvaluationContext msgContext, List consumers) throws Throwable;  
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
                
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java?rev=360167&r1=360166&r2=360167&view=diff
 ==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java \
                (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java \
Fri Dec 30 13:14:03 2005 @@ -16,7 +16,6 @@
  */
 package org.apache.activemq.broker.region.policy;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
@@ -24,6 +23,7 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 
 import java.util.Iterator;
+import java.util.List;
 
 /**
  * Simple dispatch policy that sends a message to every subscription that 
@@ -35,14 +35,12 @@
  */
 public class RoundRobinDispatchPolicy implements DispatchPolicy {
 
-    private final Object mutex = new Object();
-    
-    public boolean dispatch(ConnectionContext newParam, MessageReference node, \
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable \
{ +    public boolean dispatch(ConnectionContext newParam, MessageReference node, \
MessageEvaluationContext msgContext, List consumers) throws Throwable {  
         // Big synch here so that only 1 message gets dispatched at a time.  Ensures \
                
         // Everyone sees the same order and that the consumer list is not used while
         // it's being rotated.
-        synchronized(mutex) {
+        synchronized(consumers) {
             int count = 0;
             
             for (Iterator iter = consumers.iterator(); iter.hasNext();) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java
                
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java?rev=360167&r1=360166&r2=360167&view=diff
 ==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java \
                (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java \
Fri Dec 30 13:14:03 2005 @@ -16,7 +16,6 @@
  */
 package org.apache.activemq.broker.region.policy;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
@@ -24,6 +23,7 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 
 import java.util.Iterator;
+import java.util.List;
 
 /**
  * Simple dispatch policy that sends a message to every subscription that 
@@ -35,7 +35,7 @@
  */
 public class SimpleDispatchPolicy implements DispatchPolicy {
 
-    public boolean dispatch(ConnectionContext context, MessageReference node, \
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable \
{ +    public boolean dispatch(ConnectionContext context, MessageReference node, \
MessageEvaluationContext msgContext, List consumers) throws Throwable {  int count = \
0;  for (Iterator iter = consumers.iterator(); iter.hasNext();) {
             Subscription sub = (Subscription) iter.next();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java
                
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/jav \
a/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java?rev=360167&r1=360166&r2=360167&view=diff
 ==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java \
                (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java \
Fri Dec 30 13:14:03 2005 @@ -16,7 +16,6 @@
  */
 package org.apache.activemq.broker.region.policy;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
@@ -24,6 +23,7 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 
 import java.util.Iterator;
+import java.util.List;
 
 /**
  * Dispatch policy that causes every subscription to see messages in the same order.
@@ -33,15 +33,12 @@
  * @version $Revision$
  */
 public class StrictOrderDispatchPolicy implements DispatchPolicy {
-    int i=0;
-    private final Object mutex = new Object();
     
-    public boolean dispatch(ConnectionContext newParam, MessageReference node, \
MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable \
{ +    public boolean dispatch(ConnectionContext newParam, MessageReference node, \
                MessageEvaluationContext msgContext, List consumers) throws Throwable \
                {
         // Big synch here so that only 1 message gets dispatched at a time.  Ensures \
  // Everyone sees the same order.
-        synchronized(mutex) {
+        synchronized(consumers) {
             int count = 0;
-            i++;
             for (Iterator iter = consumers.iterator(); iter.hasNext();) {
                 Subscription sub = (Subscription) iter.next();
                 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
                
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/jav \
a/org/apache/activemq/JmsMultipleClientsTestSupport.java?rev=360167&r1=360166&r2=360167&view=diff
 ==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java \
                (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java \
Fri Dec 30 13:14:03 2005 @@ -196,7 +196,7 @@
     }
 
     protected BrokerService createBroker() throws Exception {
-        return BrokerFactory.createBroker(new \
URI("broker://()/localhost?persistent=false")); +        return \
BrokerFactory.createBroker(new \
URI("broker://()/localhost?persistent=false&useJmx=true"));  }
 
     protected void setUp() throws Exception {

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
                
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/jav \
a/org/apache/activemq/broker/QueueSubscriptionTest.java?rev=360167&r1=360166&r2=360167&view=diff
 ==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java \
                (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java \
Fri Dec 30 13:14:03 2005 @@ -30,6 +30,18 @@
         topic = false;
     }
 
+    public void testManyProducersOneConsumer() throws Exception {
+        consumerCount = 1;
+        producerCount = 10;
+        messageCount  = 100;
+        messageSize   = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+    }
+
     public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws \
Exception {  consumerCount = 2;
         producerCount = 1;
@@ -102,18 +114,6 @@
         assertTotalMessagesReceived(messageCount * producerCount);
     }
 
-    public void testManyProducersOneConsumer() throws Exception {
-        consumerCount = 1;
-        producerCount = 50;
-        messageCount  = 100;
-        messageSize   = 1; // 1 byte
-        prefetchCount = 10;
-
-        doMultipleClientsTest();
-
-        assertTotalMessagesReceived(messageCount * producerCount);
-    }
-
     public void testManyProducersManyConsumers() throws Exception {
         consumerCount = 50;
         producerCount = 50;
@@ -137,7 +137,7 @@
         startConsumers(consumerFactory, dest);
 
         // Wait for consumers to setup
-        Thread.sleep(1000);
+//        Thread.sleep(1000);
 
         startProducers(dest, messageCount);
 


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic