[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    svn commit: r599129 - in /activemq/trunk/activemq-core/src:
From:       rajdavies () apache ! org
Date:       2007-11-28 20:19:28
Message-ID: 20071128201932.24C931A9832 () eris ! apache ! org
[Download RAW message or body]

Author: rajdavies
Date: Wed Nov 28 12:19:27 2007
New Revision: 599129

URL: http://svn.apache.org/viewvc?rev=599129&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1490

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
  activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
  activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
  activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java


Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java \
Wed Nov 28 12:19:27 2007 @@ -33,8 +33,8 @@
  */
 public class ActiveMQMessageAudit {
 
-    private static final int DEFAULT_WINDOW_SIZE = 1024;
-    private static final int MAXIMUM_PRODUCER_COUNT = 128;
+    private static final int DEFAULT_WINDOW_SIZE = 2048;
+    private static final int MAXIMUM_PRODUCER_COUNT = 64;
     private int auditDepth;
     private int maximumNumberOfProducersToTrack;
     private LRUCache<Object, BitArrayBin> map;
@@ -220,23 +220,33 @@
     
     /**
      * Check the MessageId is in order
+     * @param message 
+     * @return
+     */
+    public synchronized boolean isInOrder(final MessageReference message) {
+        return isInOrder(message.getMessageId());
+    }
+    
+    /**
+     * Check the MessageId is in order
      * @param id
      * @return
      */
     public synchronized boolean isInOrder(final MessageId id) {
-        boolean answer = true;
-        
+        boolean answer = false;
+
         if (id != null) {
             ProducerId pid = id.getProducerId();
             if (pid != null) {
                 BitArrayBin bab = map.get(pid);
-                if (bab != null) {
-                    answer = bab.isInOrder(id.getProducerSequenceId());
+                if (bab == null) {
+                    bab = new BitArrayBin(auditDepth);
+                    map.put(pid, bab);
                 }
-               
+                answer = bab.isInOrder(id.getProducerSequenceId());
+
             }
         }
         return answer;
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java \
Wed Nov 28 12:19:27 2007 @@ -113,7 +113,7 @@
     private TaskRunnerFactory persistenceTaskRunnerFactory;
     private SystemUsage systemUsage;
     private SystemUsage producerSystemUsage;
-    private SystemUsage consumerSystemUsage;
+    private SystemUsage storeSystemUsage;
     private PersistenceAdapter persistenceAdapter;
     private PersistenceAdapterFactory persistenceFactory;
     private DestinationFactory destinationFactory;
@@ -668,23 +668,23 @@
      * @throws IOException 
      */
     public SystemUsage getConsumerSystemUsage() throws IOException {
-        if (consumerSystemUsage == null) {
-            consumerSystemUsage = new SystemUsage(getSystemUsage(), "Consumer");
-            consumerSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
-            addService(consumerSystemUsage);
+        if (this.storeSystemUsage == null) {
+            this.storeSystemUsage = new SystemUsage(getSystemUsage(), "Store");
+            this.storeSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
+            addService(this.storeSystemUsage);
         }
-        return consumerSystemUsage;
+        return this.storeSystemUsage;
     }
 
     /**
-     * @param consumerUsageManager the consumerUsageManager to set
+     * @param storeSystemUsage the storeSystemUsage to set
      */
-    public void setConsumerSystemUsage(SystemUsage consumerUsageManager) {
-        if (this.consumerSystemUsage != null) {
-            removeService(this.consumerSystemUsage);
+    public void setConsumerSystemUsage(SystemUsage storeSystemUsage) {
+        if (this.storeSystemUsage != null) {
+            removeService(this.storeSystemUsage);
         }
-        this.consumerSystemUsage = consumerUsageManager;
-        addService(this.producerSystemUsage);
+        this.storeSystemUsage = storeSystemUsage;
+        addService(this.storeSystemUsage);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/AbstractRegion.java?rev=599129&r1=599128&r2=599129&view=diff \
                ==============================================================================
                
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java \
Wed Nov 28 12:19:27 2007 @@ -56,7 +56,7 @@
     protected final Map<ActiveMQDestination, Destination> destinations = new \
ConcurrentHashMap<ActiveMQDestination, Destination>();  protected final \
                DestinationMap destinationMap = new DestinationMap();
     protected final Map<ConsumerId, Subscription> subscriptions = new \
                ConcurrentHashMap<ConsumerId, Subscription>();
-    protected final SystemUsage memoryManager;
+    protected final SystemUsage usageManager;
     protected final DestinationFactory destinationFactory;
     protected final DestinationStatistics destinationStatistics;
     protected final RegionBroker broker;
@@ -73,7 +73,7 @@
         }
         this.broker = broker;
         this.destinationStatistics = destinationStatistics;
-        this.memoryManager = memoryManager;
+        this.usageManager = memoryManager;
         this.taskRunnerFactory = taskRunnerFactory;
         if (broker == null) {
             throw new IllegalArgumentException("null destinationFactory");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/DurableTopicSubscription.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java \
Wed Nov 28 12:19:27 2007 @@ -43,14 +43,13 @@
     private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = \
new ConcurrentHashMap<ActiveMQDestination, Destination>();  private final \
SubscriptionKey subscriptionKey;  private final boolean keepDurableSubsActive;
-    private final SystemUsage usageManager;
     private boolean active;
 
     public DurableTopicSubscription(Broker broker, SystemUsage usageManager, \
ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)  throws \
                InvalidSelectorException {
-        super(broker, context, info);
+        super(broker,usageManager, context, info);
         this.pending = new StoreDurableSubscriberCursor(context.getClientId(), \
                info.getSubscriptionName(), broker.getTempDataStore(), \
                info.getPrefetchSize(), this);
-        this.usageManager = usageManager;
+        this.pending.setSystemUsage(usageManager);
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), \
info.getSubscriptionName());  }
@@ -191,7 +190,7 @@
         return active;
     }
 
-    protected synchronized void acknowledge(ConnectionContext context, MessageAck \
ack, MessageReference node) throws IOException { +    protected void \
acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws \
                IOException {
         node.getRegionDestination().acknowledge(context, this, ack, node);
         redeliveredMessages.remove(node.getMessageId());
         node.decrementReferenceCount();
@@ -238,7 +237,7 @@
     }
 
     /**
-     * @param memoryManager
+     * @param usageManager
      * @param oldPercentUsage
      * @param newPercentUsage
      * @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage,


Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/PrefetchSubscription.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java \
Wed Nov 28 12:19:27 2007 @@ -23,6 +23,7 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -38,6 +39,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -55,14 +57,20 @@
     protected long enqueueCounter;
     protected long dispatchCounter;
     protected long dequeueCounter;
+    protected boolean optimizedDispatch=false;
+    private int maxProducersToAudit=32;
+    private int maxAuditDepth=2048;
+    protected final SystemUsage usageManager;
+    protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
 
-    public PrefetchSubscription(Broker broker, ConnectionContext context, \
ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException { +   \
public PrefetchSubscription(Broker broker, SystemUsage usageManager, \
ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws \
InvalidSelectorException {  super(broker, context, info);
+        this.usageManager=usageManager;
         pending = cursor;
     }
 
-    public PrefetchSubscription(Broker broker, ConnectionContext context, \
                ConsumerInfo info) throws InvalidSelectorException {
-        this(broker, context, info, new VMPendingMessageCursor());
+    public PrefetchSubscription(Broker broker, SystemUsage usageManager, \
ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { +     \
this(broker,usageManager,context, info, new VMPendingMessageCursor());  }
 
     /**
@@ -118,8 +126,7 @@
         boolean pendingEmpty = false;
         pendingEmpty = pending.isEmpty();
         enqueueCounter++;
-
-        if (!isFull() && pendingEmpty && !isSlave()) {
+        if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) {
             dispatch(node);
         } else {
             optimizePrefetch();
@@ -128,6 +135,7 @@
                     LOG.debug("Prefetch limit.");
                 }
                 pending.addMessageLast(node);
+                dispatchMatched();
             }
         }
     }
@@ -364,6 +372,9 @@
 
     public synchronized void setPending(PendingMessageCursor pending) {
         this.pending = pending;
+        if (this.pending!=null) {
+            this.pending.setSystemUsage(usageManager);
+        }
     }
 
     /**
@@ -440,6 +451,9 @@
             if (node != QueueMessageReference.NULL_MESSAGE) {
                 dispatchCounter++;
                 dispatched.addLast(node);
+                if(pending != null) {
+                    pending.dispatched(message);
+                }
             } else {
                 prefetchExtension = Math.max(0, prefetchExtension - 1);
             }
@@ -459,8 +473,6 @@
                 context.getConnection().dispatchSync(md);
                 onDispatch(node, message);
             }
-            // System.err.println(broker.getBrokerName() + " " + this + " (" +
-            // enqueueCounter + ", " + dispatchCounter +") " + node);
             return true;
         } else {
             return false;
@@ -534,6 +546,30 @@
      * @throws IOException
      */
     protected void acknowledge(ConnectionContext context, final MessageAck ack, \
final MessageReference node) throws IOException { +    }
+
+    public boolean isOptimizedDispatch() {
+        return optimizedDispatch;
+    }
+
+    public void setOptimizedDispatch(boolean optimizedDispatch) {
+        this.optimizedDispatch = optimizedDispatch;
+    }
+
+    public int getMaxProducersToAudit() {
+        return maxProducersToAudit;
+    }
+
+    public void setMaxProducersToAudit(int maxProducersToAudit) {
+        this.maxProducersToAudit = maxProducersToAudit;
+    }
+
+    public int getMaxAuditDepth() {
+        return maxAuditDepth;
+    }
+
+    public void setMaxAuditDepth(int maxAuditDepth) {
+        this.maxAuditDepth = maxAuditDepth;
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java \
Wed Nov 28 12:19:27 2007 @@ -96,7 +96,7 @@
     private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
     private final MessageEvaluationContext queueMsgConext = new \
MessageEvaluationContext();  private final Object exclusiveLockMutex = new Object();
-    private TaskRunner taskRunner;
+    private final TaskRunner taskRunner;
     
     private final LinkedList<Runnable> messagesWaitingForSpace = new \
                LinkedList<Runnable>();
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/QueueBrowserSubscription.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java \
Wed Nov 28 12:19:27 2007 @@ -25,14 +25,15 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.usage.SystemUsage;
 
 public class QueueBrowserSubscription extends QueueSubscription {
 
     boolean browseDone;
 
-    public QueueBrowserSubscription(Broker broker, ConnectionContext context, \
ConsumerInfo info) +    public QueueBrowserSubscription(Broker broker, SystemUsage \
usageManager, ConnectionContext context, ConsumerInfo info)  throws \
                InvalidSelectorException {
-        super(broker, context, info);
+        super(broker,usageManager, context, info);
     }
 
     protected boolean canDispatch(MessageReference node) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/QueueRegion.java?rev=599129&r1=599128&r2=599129&view=diff \
                ==============================================================================
                
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java \
Wed Nov 28 12:19:27 2007 @@ -41,15 +41,15 @@
 
     public String toString() {
         return "QueueRegion: destinations=" + destinations.size() + ", \
                subscriptions=" + subscriptions.size()
-               + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + \
"%"; +               + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() \
+ "%";  }
 
     protected Subscription createSubscription(ConnectionContext context, \
ConsumerInfo info)  throws InvalidSelectorException {
         if (info.isBrowser()) {
-            return new QueueBrowserSubscription(broker, context, info);
+            return new QueueBrowserSubscription(broker,usageManager, context, info);
         } else {
-            return new QueueSubscription(broker, context, info);
+            return new QueueSubscription(broker, usageManager,context, info);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/QueueSubscription.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java \
Wed Nov 28 12:19:27 2007 @@ -28,6 +28,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -35,8 +36,8 @@
 
     private static final Log LOG = LogFactory.getLog(QueueSubscription.class);
 
-    public QueueSubscription(Broker broker, ConnectionContext context, ConsumerInfo \
                info) throws InvalidSelectorException {
-        super(broker, context, info);
+    public QueueSubscription(Broker broker, SystemUsage usageManager, \
ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { +     \
super(broker,usageManager, context, info);  }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/TempQueueRegion.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java \
Wed Nov 28 12:19:27 2007 @@ -41,7 +41,7 @@
 
     protected Destination createDestination(ConnectionContext context, \
                ActiveMQDestination destination) throws Exception {
         final ActiveMQTempDestination tempDest = \
                (ActiveMQTempDestination)destination;
-        return new Queue(broker.getRoot(), destination, memoryManager, null, \
destinationStatistics, taskRunnerFactory, null) { +        return new \
Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, \
taskRunnerFactory, null) {  
             public void addSubscription(ConnectionContext context, Subscription sub) \
throws Exception {  
@@ -58,14 +58,14 @@
 
     protected Subscription createSubscription(ConnectionContext context, \
ConsumerInfo info) throws InvalidSelectorException {  if (info.isBrowser()) {
-            return new QueueBrowserSubscription(broker, context, info);
+            return new QueueBrowserSubscription(broker,usageManager,context, info);
         } else {
-            return new QueueSubscription(broker, context, info);
+            return new QueueSubscription(broker, usageManager,context, info);
         }
     }
 
     public String toString() {
-        return "TempQueueRegion: destinations=" + destinations.size() + ", \
subscriptions=" + subscriptions.size() + ", memory=" + \
memoryManager.getMemoryUsage().getPercentUsage() + "%"; +        return \
"TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + \
subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() \
+ "%";  }
 
     public void removeDestination(ConnectionContext context, ActiveMQDestination \
destination, long timeout) throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/TempTopicRegion.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java \
Wed Nov 28 12:19:27 2007 @@ -47,13 +47,13 @@
             throw new JMSException("A durable subscription cannot be created for a \
temporary topic.");  }
         try {
-            TopicSubscription answer = new TopicSubscription(broker, context, info, \
memoryManager); +            TopicSubscription answer = new TopicSubscription(broker, \
                context, info, usageManager);
             // lets configure the subscription depending on the destination
             ActiveMQDestination destination = info.getDestination();
             if (destination != null && broker.getDestinationPolicy() != null) {
                 PolicyEntry entry = \
broker.getDestinationPolicy().getEntryFor(destination);  if (entry != null) {
-                    entry.configure(broker, memoryManager, answer);
+                    entry.configure(broker, usageManager, answer);
                 }
             }
             answer.init();
@@ -67,7 +67,7 @@
     }
 
     public String toString() {
-        return "TempTopicRegion: destinations=" + destinations.size() + ", \
subscriptions=" + subscriptions.size() + ", memory=" + \
memoryManager.getMemoryUsage().getPercentUsage() + "%"; +        return \
"TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + \
subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() \
+ "%";  }
 
     public void removeDestination(ConnectionContext context, ActiveMQDestination \
destination, long timeout) throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java \
Wed Nov 28 12:19:27 2007 @@ -50,6 +50,8 @@
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
@@ -65,7 +67,7 @@
  * 
  * @version $Revision: 1.21 $
  */
-public class Topic  extends BaseDestination {
+public class Topic  extends BaseDestination  implements Task{
     private static final Log LOG = LogFactory.getLog(Topic.class);
     protected final ActiveMQDestination destination;
     protected final CopyOnWriteArrayList<Subscription> consumers = new \
CopyOnWriteArrayList<Subscription>(); @@ -81,28 +83,20 @@
     private boolean sendAdvisoryIfNoConsumers;
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
     private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> \
durableSubcribers = new ConcurrentHashMap<SubscriptionKey, \
                DurableTopicSubscription>();
-    
+    private final TaskRunner taskRunner;
     private final LinkedList<Runnable> messagesWaitingForSpace = new \
                LinkedList<Runnable>();
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
-
-            // We may need to do this in async thread since this is run for
-            // within a synchronization
-            // that the UsageManager is holding.
-
-            synchronized (messagesWaitingForSpace) {
-                while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) \
                {
-                    Runnable op = messagesWaitingForSpace.removeFirst();
-                    op.run();
+                try {
+                    Topic.this.taskRunner.wakeup();
+                } catch (InterruptedException e) {
                 }
-            }
-
         };
     };
     private final Broker broker;
 
     public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore \
                store, SystemUsage systemUsage, DestinationStatistics parentStats,
-                 TaskRunnerFactory taskFactory) {
+                 TaskRunnerFactory taskFactory) throws Exception {
         this.broker = broker;
         this.destination = destination;
         this.store = store; // this could be NULL! (If an advisory)
@@ -115,7 +109,8 @@
         }else{
         	//set the default
         	subscriptionRecoveryPolicy= new FixedSizedSubscriptionRecoveryPolicy();
-        }
+        } 
+        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + \
                destination.getPhysicalName());
         // Let the store know what usage manager we are using so that he can
         // flush messages to disk
         // when usage gets high.
@@ -463,6 +458,9 @@
     }
 
     public void stop() throws Exception {
+        if (taskRunner != null) {
+            taskRunner.shutdown();
+        }
         this.subscriptionRecoveryPolicy.stop();
         if (memoryUsage != null) {
             memoryUsage.stop();
@@ -499,6 +497,15 @@
         }
         return result.toArray(new Message[result.size()]);
     }
+    
+    public boolean iterate() {
+        while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
+            Runnable op = messagesWaitingForSpace.removeFirst();
+            op.run();
+        }
+        return false;
+    }
+
 
     // Properties
     // -------------------------------------------------------------------------

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/TopicRegion.java?rev=599129&r1=599128&r2=599129&view=diff \
                ==============================================================================
                
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java \
Wed Nov 28 12:19:27 2007 @@ -100,7 +100,7 @@
                                            + " subscriberName: " + \
key.getSubscriptionName());  }
             }
-            sub.activate(memoryManager, context, info);
+            sub.activate(usageManager, context, info);
             return sub;
         } else {
             return super.addConsumer(context, info);
@@ -140,7 +140,7 @@
     }
 
     public String toString() {
-        return "TopicRegion: destinations=" + destinations.size() + ", \
subscriptions=" + subscriptions.size() + ", memory=" + \
memoryManager.getMemoryUsage().getPercentUsage() + "%"; +        return "TopicRegion: \
destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", \
memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";  }
 
     @Override
@@ -230,12 +230,12 @@
             SubscriptionKey key = new SubscriptionKey(context.getClientId(), \
info.getSubscriptionName());  DurableTopicSubscription sub = \
durableSubscriptions.get(key);  if (sub == null) {
-                sub = new DurableTopicSubscription(broker, memoryManager, context, \
info, keepDurableSubsActive); +                sub = new \
DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive); \
                ActiveMQDestination destination = info.getDestination();
                 if (destination != null && broker.getDestinationPolicy() != null) {
                     PolicyEntry entry = \
broker.getDestinationPolicy().getEntryFor(destination);  if (entry != null) {
-                        entry.configure(broker, memoryManager, sub);
+                        entry.configure(broker, usageManager, sub);
                     }
                 }
                 durableSubscriptions.put(key, sub);
@@ -245,13 +245,13 @@
             return sub;
         }
         try {
-            TopicSubscription answer = new TopicSubscription(broker, context, info, \
memoryManager); +            TopicSubscription answer = new TopicSubscription(broker, \
                context, info, usageManager);
             // lets configure the subscription depending on the destination
             ActiveMQDestination destination = info.getDestination();
             if (destination != null && broker.getDestinationPolicy() != null) {
                 PolicyEntry entry = \
broker.getDestinationPolicy().getEntryFor(destination);  if (entry != null) {
-                    entry.configure(broker, memoryManager, answer);
+                    entry.configure(broker, usageManager, answer);
                 }
             }
             answer.init();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java \
Wed Nov 28 12:19:27 2007 @@ -50,6 +50,7 @@
 
     public synchronized void stop() throws Exception  {
         started=false;
+        audit=null;
         gc();
     }
 
@@ -238,6 +239,13 @@
     public boolean isTransient() {
         return false;
     }
+    
+    /**
+     * Mark a message as already dispatched
+     * @param message
+     */
+    public void dispatched(MessageReference message) {   
+    }
 
 
     protected synchronized boolean  isDuplicate(MessageId messageId) {
@@ -246,7 +254,12 @@
         }
         return this.audit.isDuplicate(messageId);
     }
-
-   
+    
+    protected synchronized void rollback(MessageId id) {
+        if (this.audit != null) {
+            audit.rollback(id);
+        }
+    }
+  
    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java \
Wed Nov 28 12:19:27 2007 @@ -142,6 +142,7 @@
             for (Iterator<MessageReference> i = getDiskList().iterator(); \
i.hasNext() && count < maxItems;) {  Message message = (Message)i.next();
                 message.setRegionDestination(regionDestination);
+                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
                 message.incrementReferenceCount();
                 result.add(message);
                 count++;
@@ -210,6 +211,7 @@
         if (!isDiskListEmpty()) {
             // got from disk
             message.setRegionDestination(regionDestination);
+            message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
             message.incrementReferenceCount();
         }
         return message;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/cursors/PendingMessageCursor.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java \
Wed Nov 28 12:19:27 2007 @@ -247,6 +247,12 @@
      * disappears when the broker shuts down
      */
     public boolean isTransient();
+    
+    /**
+     * Mark a message as already dispatched
+     * @param message
+     */
+    public void dispatched(MessageReference message);
 
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java \
Wed Nov 28 12:19:27 2007 @@ -119,6 +119,7 @@
         Message result = batchList.removeFirst();
         result.decrementReferenceCount();
         result.setRegionDestination(regionDestination);
+        result.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
         return result;
     }
 
@@ -133,6 +134,7 @@
             throws Exception {
         if (!isDuplicate(message.getMessageId())) {
             message.setRegionDestination(regionDestination);
+            message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
             message.incrementReferenceCount();
             batchList.addLast(message);
         } else {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java \
Wed Nov 28 12:19:27 2007 @@ -288,6 +288,20 @@
             nonPersistent.setEnableAudit(enableAudit);
         }
     }
+    
+    /**
+     * Mark a message as already dispatched
+     * @param message
+     */
+    public void dispatched(MessageReference message) {
+        super.dispatched(message);
+        for (PendingMessageCursor cursor : storePrefetches) {
+            cursor.dispatched(message);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.dispatched(message);
+        }
+    }
 
     protected synchronized PendingMessageCursor getNextCursor() throws Exception {
         if (currentCursor == null || currentCursor.isEmpty()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java \
Wed Nov 28 12:19:27 2007 @@ -18,7 +18,9 @@
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -28,6 +30,9 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -37,21 +42,19 @@
  * 
  * @version $Revision$
  */
-class TopicStorePrefetch extends AbstractPendingMessageCursor implements \
MessageRecoveryListener { +class TopicStorePrefetch extends \
AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {  
     private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
     private TopicMessageStore store;
-    private final LinkedList<Message> batchList = new LinkedList<Message>();
+    private final LinkedHashMap<MessageId,Message> batchList = new \
LinkedHashMap<MessageId,Message> ();  private String clientId;
     private String subscriberName;
     private Destination regionDestination;
-    private MessageId firstMessageId;
-    private MessageId lastMessageId;
     private boolean batchResetNeeded = true;
     private boolean storeMayHaveMoreMessages = true;
     private boolean started;
     private final Subscription subscription;
-
+   
     /**
      * @param topic
      * @param clientId
@@ -63,12 +66,15 @@
         this.store = (TopicMessageStore)topic.getMessageStore();
         this.clientId = clientId;
         this.subscriberName = subscriberName;
+        this.maxProducersToAudit=32;
+        this.maxAuditDepth=10000;
     }
 
     public synchronized void start() throws Exception {
         if (!started) {
             started = true;
             super.start();
+            getSystemUsage().getMemoryUsage().addUsageListener(this);
             safeFillBatch();
         }
     }
@@ -76,6 +82,7 @@
     public synchronized void stop() throws Exception {
         if (started) {
             started = false;
+            getSystemUsage().getMemoryUsage().removeUsageListener(this);
             super.stop();
             store.resetBatching(clientId, subscriberName);
             gc();
@@ -97,22 +104,16 @@
 
     public synchronized void addMessageLast(MessageReference node) throws Exception \
{  if (node != null) {
-            if (isEmpty() && started) {
-                firstMessageId = node.getMessageId();
-            }
-            lastMessageId = node.getMessageId();
-            node.decrementReferenceCount();
             storeMayHaveMoreMessages=true;
+            node.decrementReferenceCount();
         }
     }
 
     public synchronized void addMessageFirst(MessageReference node) throws Exception \
{  if (node != null) {
-            if (started) {
-                firstMessageId = node.getMessageId();
-            }
-            node.decrementReferenceCount();
             storeMayHaveMoreMessages=true;
+            node.decrementReferenceCount();
+            rollback(node.getMessageId());
         }
     }
 
@@ -127,7 +128,8 @@
     }
 
     public synchronized boolean hasNext() {
-        return !isEmpty();
+        boolean result =  !isEmpty();
+        return result;
     }
 
     public synchronized MessageReference next() {
@@ -136,13 +138,11 @@
         if (batchList.isEmpty()) {
             return null;
         } else {
-            result = batchList.removeFirst();
-            if (lastMessageId != null) {
-                if (result.getMessageId().equals(lastMessageId)) {
-                    // pendingCount=0;
-                }
-            }
+            Iterator i = batchList.entrySet().iterator();
+            result = (Message) ((Map.Entry)i.next()).getValue();
+            i.remove();
             result.setRegionDestination(regionDestination);
+            result.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
         }
         return result;
     }
@@ -154,16 +154,23 @@
     public void finished() {
     }
 
-    public synchronized boolean recoverMessage(Message message) throws Exception {
+    public synchronized boolean recoverMessage(Message message)
+            throws Exception {
         MessageEvaluationContext messageEvaluationContext = new \
MessageEvaluationContext();  messageEvaluationContext.setMessageReference(message);
-        if( subscription.matches(message, messageEvaluationContext) ) {
+        if (subscription.matches(message, messageEvaluationContext)) {
             message.setRegionDestination(regionDestination);
-            // only increment if count is zero (could have been cached)
-            if (message.getReferenceCount() == 0) {
-                message.incrementReferenceCount();
+            if (!isDuplicate(message.getMessageId())) {
+                // only increment if count is zero (could have been cached)
+                if (message.getReferenceCount() == 0) {
+                    message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+                    message.incrementReferenceCount();
+                   
+                }
+                batchList.put(message.getMessageId(), message);
+            }else {
+                this.storeMayHaveMoreMessages=true;
             }
-            batchList.addLast(message);
         }
         return true;
     }
@@ -172,9 +179,23 @@
         // shouldn't get called
         throw new RuntimeException("Not supported");
     }
+    
+    /**
+     * Mark a message as already dispatched
+     * @param message
+     */  
+    public synchronized void dispatched(MessageReference message) {
+        if (this.audit != null) {
+            isDuplicate(message.getMessageId());
+            Message removed = this.batchList.remove(message.getMessageId());
+            if (removed != null) {
+                removed.decrementReferenceCount();
+            }
+        }
+    }
 
     // implementation
-    protected void safeFillBatch() {
+    protected synchronized void safeFillBatch() {
         try {
             fillBatch();
         } catch (Exception e) {
@@ -184,29 +205,17 @@
     }
 
     protected synchronized void fillBatch() throws Exception {
-        if( batchResetNeeded ) {
-            store.resetBatching(clientId, subscriberName);
-            batchResetNeeded=false;
-            storeMayHaveMoreMessages=true;
-        }
-        
-        while( batchList.isEmpty() && storeMayHaveMoreMessages ) {
-            store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this);
-            if( batchList.isEmpty() ) {
-                storeMayHaveMoreMessages = false;
-            } else {
-                if (firstMessageId != null) {
-                    int pos = 0;
-                    for (Iterator<Message> iter = batchList.iterator(); \
                iter.hasNext();) {
-                        Message msg = iter.next();
-                        if (msg.getMessageId().equals(firstMessageId)) {
-                            firstMessageId = null;
-                            break;
-                        } else {
-                            iter.remove();
-                        }
-                    }
-                }
+        if (batchResetNeeded) {
+            this.store.resetBatching(clientId, subscriberName);
+            this.batchResetNeeded = false;
+            this.storeMayHaveMoreMessages = true;
+        }
+        while (this.batchList.isEmpty() && this.storeMayHaveMoreMessages) {
+            this.storeMayHaveMoreMessages = false;
+            this.store.recoverNextMessages(clientId, subscriberName,
+                    maxBatchSize, this);
+            if (!this.batchList.isEmpty()) {
+                this.storeMayHaveMoreMessages=true;
             }
         }
     }
@@ -221,11 +230,22 @@
     }
 
     public synchronized void gc() {
-        for (Message msg : batchList) {
+        for (Message msg : batchList.values()) {
             msg.decrementReferenceCount();
         }
         batchList.clear();
         batchResetNeeded = true;
+    }
+    
+    public void onUsageChanged(Usage usage, int oldPercentUsage,int newPercentUsage) \
{ +        if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
+            storeMayHaveMoreMessages = true;
+            try {
+                fillBatch();
+            } catch (Exception e) {
+                LOG.error("Failed to fill batch ", e);
+            }
+        }
     }
 
     public String toString() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/policy/PolicyEntry.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java \
Wed Nov 28 12:19:27 2007 @@ -51,11 +51,13 @@
     private PendingQueueMessageStoragePolicy pendingQueuePolicy;
     private PendingDurableSubscriberMessageStoragePolicy \
                pendingDurableSubscriberPolicy;
     private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
-    private int maxProducersToAudit=1024;
-    private int maxAuditDepth=1;
+    private int maxProducersToAudit=32;
+    private int maxAuditDepth=1024;
+    private int maxQueueAuditDepth=1;
     private boolean enableAudit=true;
     private boolean producerFlowControl = true;
-
+    private boolean optimizedDispatch=false;
+   
     public void configure(Queue queue, Store tmpStore) {
         if (dispatchPolicy != null) {
             queue.setDispatchPolicy(dispatchPolicy);
@@ -73,7 +75,7 @@
         }
         queue.setProducerFlowControl(isProducerFlowControl());
         queue.setEnableAudit(isEnableAudit());
-        queue.setMaxAuditDepth(getMaxAuditDepth());
+        queue.setMaxAuditDepth(getMaxQueueAuditDepth());
         queue.setMaxProducersToAudit(getMaxProducersToAudit());
     }
 
@@ -132,6 +134,8 @@
             cursor.setSystemUsage(memoryManager);
             sub.setPending(cursor);
         }
+        sub.setMaxAuditDepth(getMaxAuditDepth());
+        sub.setMaxProducersToAudit(getMaxProducersToAudit());
     }
 
     // Properties
@@ -329,6 +333,22 @@
      */
     public void setEnableAudit(boolean enableAudit) {
         this.enableAudit = enableAudit;
+    }
+
+    public int getMaxQueueAuditDepth() {
+        return maxQueueAuditDepth;
+    }
+
+    public void setMaxQueueAuditDepth(int maxQueueAuditDepth) {
+        this.maxQueueAuditDepth = maxQueueAuditDepth;
+    }
+
+    public boolean isOptimizedDispatch() {
+        return optimizedDispatch;
+    }
+
+    public void setOptimizedDispatch(boolean optimizedDispatch) {
+        this.optimizedDispatch = optimizedDispatch;
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java \
Wed Nov 28 12:19:27 2007 @@ -26,6 +26,7 @@
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -81,6 +82,7 @@
     private transient short referenceCount;
     private transient ActiveMQConnection connection;
     private transient org.apache.activemq.broker.region.Destination \
regionDestination; +    private transient MemoryUsage memoryUsage;
 
     private BrokerId[] brokerPath;
     private BrokerId[] cluster;
@@ -127,6 +129,7 @@
         copy.regionDestination = regionDestination;
         copy.brokerInTime = brokerInTime;
         copy.brokerOutTime = brokerOutTime;
+        copy.memoryUsage=this.memoryUsage;
         // copying the broker path breaks networks - if a consumer re-uses a
         // consumed
         // message and forwards it on
@@ -567,6 +570,17 @@
 
     public void setRegionDestination(org.apache.activemq.broker.region.Destination \
destination) {  this.regionDestination = destination;
+        if(this.memoryUsage==null) {
+            this.memoryUsage=regionDestination.getBrokerMemoryUsage();
+        }
+    }
+    
+    public MemoryUsage getMemoryUsage() {
+        return this.memoryUsage;
+    }
+    
+    public void setMemoryUsage(MemoryUsage usage) {
+        this.memoryUsage=usage;
     }
 
     public boolean isMarshallAware() {
@@ -581,16 +595,15 @@
             size = getSize();
         }
 
-        if (rc == 1 && regionDestination != null) {
-            regionDestination.getBrokerMemoryUsage().increaseUsage(size);
+        if (rc == 1 && getMemoryUsage() != null) {
+            getMemoryUsage().increaseUsage(size);
         }
 
-        // System.out.println(" + "+getDestination()+" :::: "+getMessageId()+"
-        // "+rc);
+        //System.out.println(" + "+getMemoryUsage().getName()+" :::: \
"+getMessageId()+"rc="+rc);  return rc;
     }
 
-    public synchronized int decrementReferenceCount() {
+    public int decrementReferenceCount() {
         int rc;
         int size;
         synchronized (this) {
@@ -598,11 +611,10 @@
             size = getSize();
         }
 
-        if (rc == 0 && regionDestination != null) {
-            regionDestination.getBrokerMemoryUsage().decreaseUsage(size);
+        if (rc == 0 && getMemoryUsage() != null) {
+            getMemoryUsage().decreaseUsage(size);
         }
-        // System.out.println(" - "+getDestination()+" :::: "+getMessageId()+"
-        // "+rc);
+        //System.out.println(" - "+getMemoryUsage().getName()+" :::: \
"+getMessageId()+"rc="+rc);  
         return rc;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/store/amq/AMQTopicMessageStore.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java \
Wed Nov 28 12:19:27 2007 @@ -59,13 +59,15 @@
         topicReferenceStore.recoverSubscription(clientId, subscriptionName, new \
RecoveryListenerAdapter(this, listener));  }
 
-    public void recoverNextMessages(String clientId, String subscriptionName, int \
maxReturned, final MessageRecoveryListener listener) throws Exception { +    public \
void recoverNextMessages(String clientId, String subscriptionName, +            int \
maxReturned, final MessageRecoveryListener listener) +            throws Exception {
         RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, \
                listener);
-        topicReferenceStore.recoverNextMessages(clientId, subscriptionName, \
                maxReturned, recoveryListener);
-        if (recoveryListener.size() == 0) {
-            flush();
-            topicReferenceStore.recoverNextMessages(clientId, subscriptionName, \
                maxReturned, recoveryListener);
-        }
+            topicReferenceStore.recoverNextMessages(clientId, \
subscriptionName,maxReturned, recoveryListener); +            if \
(recoveryListener.size() == 0) { +                flush();
+                topicReferenceStore.recoverNextMessages(clientId,subscriptionName, \
maxReturned, recoveryListener); +            }
     }
 
     public SubscriptionInfo lookupSubscription(String clientId, String \
subscriptionName) throws IOException { @@ -145,14 +147,18 @@
      * @param key
      * @throws IOException 
      */
-    protected void acknowledge(ConnectionContext context,MessageId messageId, \
Location location, String clientId,String subscriptionName) throws IOException { +    \
protected void acknowledge(ConnectionContext context, MessageId messageId, +          \
Location location, String clientId, String subscriptionName) +            throws \
IOException {  synchronized (this) {
             lastLocation = location;
-            if (topicReferenceStore.acknowledgeReference(context, clientId, \
                subscriptionName, messageId)){
-                MessageAck ack = new MessageAck();
-                ack.setLastMessageId(messageId);
-                removeMessage(context, ack);
-            }
+        }
+        if (topicReferenceStore.acknowledgeReference(context, clientId,
+                subscriptionName, messageId)) {
+            MessageAck ack = new MessageAck();
+            ack.setLastMessageId(messageId);
+            removeMessage(context, ack);
+
         }
         try {
             asyncWriteTask.wakeup();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/store/kahadaptor/KahaReferenceStore.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java \
Wed Nov 28 12:19:27 2007 @@ -63,10 +63,14 @@
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    protected final boolean recoverReference(MessageRecoveryListener listener, \
                ReferenceRecord record)
-        throws Exception {
-        listener.recoverMessageReference(new MessageId(record.getMessageId()));
-        return listener.hasSpace();
+    protected final boolean recoverReference(MessageRecoveryListener listener,
+            ReferenceRecord record) throws Exception {
+        MessageId id = new MessageId(record.getMessageId());
+        if (listener.hasSpace()) {
+            listener.recoverMessageReference(id);
+            return true;
+        }
+        return false;
     }
 
     public synchronized void recover(MessageRecoveryListener listener) throws \
Exception { @@ -90,14 +94,15 @@
                 entry = messageContainer.getNext(entry);
             }
         }
-        if (entry != null) {
+        if (entry != null) {      
             int count = 0;
             do {
                 ReferenceRecord msg = messageContainer.getValue(entry);
-                if (msg != null) {
-                    recoverReference(listener, msg);
-                    count++;
-                    lastBatchId = msg.getMessageId();
+                if (msg != null ) {
+                    if ( recoverReference(listener, msg)) {
+                        count++;
+                        lastBatchId = msg.getMessageId();
+                    }
                 } else {
                     lastBatchId = null;
                 }
@@ -134,7 +139,7 @@
         removeMessage(ack.getLastMessageId());
     }
 
-    public synchronized void removeMessage(MessageId msgId) throws IOException {
+    public synchronized void removeMessage(MessageId msgId) throws IOException {    
         StoreEntry entry = messageContainer.getEntry(msgId);
         if (entry != null) {
             ReferenceRecord rr = messageContainer.remove(msgId);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java \
Wed Nov 28 12:19:27 2007 @@ -245,15 +245,19 @@
                     entry = container.getNextEntry(entry);
                 }
             }
-
+           
             if (entry != null) {
                 do {
                     ConsumerMessageRef consumerRef = container.get(entry);
-                    ReferenceRecord msg = \
messageContainer.getValue(consumerRef.getMessageEntry()); +                    \
ReferenceRecord msg = messageContainer.getValue(consumerRef +                         \
.getMessageEntry());  if (msg != null) {
-                        recoverReference(listener, msg);
-                        count++;
-                        container.setBatchEntry(msg.getMessageId(), entry);
+                        if (recoverReference(listener, msg)) {
+                            count++;
+                            container.setBatchEntry(msg.getMessageId(), entry);
+                        } else {
+                            break;
+                        }
                     } else {
                         container.reset();
                     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/store/kahadaptor/TopicSubContainer.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java \
Wed Nov 28 12:19:27 2007 @@ -67,17 +67,20 @@
         if (!listContainer.isEmpty()) {
             StoreEntry entry = listContainer.getFirst();
             while (entry != null) {
-                ConsumerMessageRef ref = \
                (ConsumerMessageRef)listContainer.get(entry);
-                listContainer.remove(entry);
-                if (listContainer != null && batchEntry != null && \
                (listContainer.isEmpty() || batchEntry.equals(entry))) {
-                    reset();
-                }
+                ConsumerMessageRef ref = \
(ConsumerMessageRef)listContainer.get(entry);            if (ref != null && \
ref.getMessageId().equals(id)) {  result = ref;
+                    listContainer.remove(entry);
+                    if (batchEntry != null && batchEntry.equals(entry)) {
+                        reset();
+                    }
                     break;
                 }
-                entry = listContainer.getFirst();
+                entry = listContainer.getNext(entry);
             }
+        }
+        if (listContainer != null  && (listContainer.isEmpty() )) {
+            reset();
         }
         return result;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java \
Wed Nov 28 12:19:27 2007 @@ -118,15 +118,15 @@
         if (value == 0) {
             return;
         }
-        if (parent != null) {
-            ((MemoryUsage)parent).increaseUsage(value);
-        }
         int percentUsage;
         synchronized (usageMutex) {
             usage += value;
             percentUsage = caclPercentUsage();
         }
         setPercentUsage(percentUsage);
+        if (parent != null) {
+            ((MemoryUsage)parent).increaseUsage(value);
+        }
     }
 
     /**
@@ -138,15 +138,15 @@
         if (value == 0) {
             return;
         }
-        if (parent != null) {
-            parent.decreaseUsage(value);
-        }
         int percentUsage;
         synchronized (usageMutex) {
             usage -= value;
             percentUsage = caclPercentUsage();
         }
         setPercentUsage(percentUsage);
+        if (parent != null) {
+            parent.decreaseUsage(value);
+        }
     }
 
     protected long retrieveUsage() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=599129&r1=599128&r2=599129&view=diff
 ==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java \
                (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java \
Wed Nov 28 12:19:27 2007 @@ -30,6 +30,7 @@
     private int firstIndex = -1;
     private int firstBin = -1;
     private long lastBitSet=-1;
+    private long lastInOrderBit=-1;
 
     /**
      * Create a BitArrayBin to a certain window size (number of messages to
@@ -76,10 +77,15 @@
      * @return true if next message is in order
      */
     public boolean isInOrder(long index) {
-        if (lastBitSet== -1) {
-            return true;
+        boolean result = false;
+        if (lastInOrderBit == -1) {
+            result = true;
+        } else {
+            result = lastInOrderBit + 1 == index;
         }
-        return lastBitSet+1==index;
+        lastInOrderBit = index;
+        return result;
+
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apach \
e/activemq/ActiveMQMessageAuditTest.java?rev=599129&r1=599128&r2=599129&view=diff \
                ==============================================================================
                
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java \
                (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java \
Wed Nov 28 12:19:27 2007 @@ -105,6 +105,7 @@
             String id = idGen.generateId();
             if (i==0) {
                 assertFalse(audit.isDuplicate(id));
+                assertTrue(audit.isInOrder(id));
             }
             if (i > 1 && i%2 != 0) {
                 list.add(id);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
                
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apach \
e/activemq/bugs/DurableConsumerTest.java?rev=599129&r1=599128&r2=599129&view=diff \
                ==============================================================================
                
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java \
                (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java \
Wed Nov 28 12:19:27 2007 @@ -63,6 +63,8 @@
         MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, \
CONSUMER_NAME);  consumerConnection.start();
         consumerConnection.close();
+        broker.stop();
+        broker =createBroker();
         
         Connection producerConnection = factory.createConnection();
        
@@ -79,7 +81,8 @@
             }
         }
         producerConnection.close();
-        
+        broker.stop();
+        broker =createBroker();
         
         consumerConnection = factory.createConnection();
         consumerConnection.setClientID(CONSUMER_NAME);


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic