[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r599129 - in /activemq/trunk/activemq-core/src:
From: rajdavies () apache ! org
Date: 2007-11-28 20:19:28
Message-ID: 20071128201932.24C931A9832 () eris ! apache ! org
[Download RAW message or body]
Author: rajdavies
Date: Wed Nov 28 12:19:27 2007
New Revision: 599129
URL: http://svn.apache.org/viewvc?rev=599129&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1490
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java \
Wed Nov 28 12:19:27 2007 @@ -33,8 +33,8 @@
*/
public class ActiveMQMessageAudit {
- private static final int DEFAULT_WINDOW_SIZE = 1024;
- private static final int MAXIMUM_PRODUCER_COUNT = 128;
+ private static final int DEFAULT_WINDOW_SIZE = 2048;
+ private static final int MAXIMUM_PRODUCER_COUNT = 64;
private int auditDepth;
private int maximumNumberOfProducersToTrack;
private LRUCache<Object, BitArrayBin> map;
@@ -220,23 +220,33 @@
/**
* Check the MessageId is in order
+ * @param message
+ * @return
+ */
+ public synchronized boolean isInOrder(final MessageReference message) {
+ return isInOrder(message.getMessageId());
+ }
+
+ /**
+ * Check the MessageId is in order
* @param id
* @return
*/
public synchronized boolean isInOrder(final MessageId id) {
- boolean answer = true;
-
+ boolean answer = false;
+
if (id != null) {
ProducerId pid = id.getProducerId();
if (pid != null) {
BitArrayBin bab = map.get(pid);
- if (bab != null) {
- answer = bab.isInOrder(id.getProducerSequenceId());
+ if (bab == null) {
+ bab = new BitArrayBin(auditDepth);
+ map.put(pid, bab);
}
-
+ answer = bab.isInOrder(id.getProducerSequenceId());
+
}
}
return answer;
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java \
Wed Nov 28 12:19:27 2007 @@ -113,7 +113,7 @@
private TaskRunnerFactory persistenceTaskRunnerFactory;
private SystemUsage systemUsage;
private SystemUsage producerSystemUsage;
- private SystemUsage consumerSystemUsage;
+ private SystemUsage storeSystemUsage;
private PersistenceAdapter persistenceAdapter;
private PersistenceAdapterFactory persistenceFactory;
private DestinationFactory destinationFactory;
@@ -668,23 +668,23 @@
* @throws IOException
*/
public SystemUsage getConsumerSystemUsage() throws IOException {
- if (consumerSystemUsage == null) {
- consumerSystemUsage = new SystemUsage(getSystemUsage(), "Consumer");
- consumerSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
- addService(consumerSystemUsage);
+ if (this.storeSystemUsage == null) {
+ this.storeSystemUsage = new SystemUsage(getSystemUsage(), "Store");
+ this.storeSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
+ addService(this.storeSystemUsage);
}
- return consumerSystemUsage;
+ return this.storeSystemUsage;
}
/**
- * @param consumerUsageManager the consumerUsageManager to set
+ * @param storeSystemUsage the storeSystemUsage to set
*/
- public void setConsumerSystemUsage(SystemUsage consumerUsageManager) {
- if (this.consumerSystemUsage != null) {
- removeService(this.consumerSystemUsage);
+ public void setConsumerSystemUsage(SystemUsage storeSystemUsage) {
+ if (this.storeSystemUsage != null) {
+ removeService(this.storeSystemUsage);
}
- this.consumerSystemUsage = consumerUsageManager;
- addService(this.producerSystemUsage);
+ this.storeSystemUsage = storeSystemUsage;
+ addService(this.storeSystemUsage);
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/AbstractRegion.java?rev=599129&r1=599128&r2=599129&view=diff \
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java \
Wed Nov 28 12:19:27 2007 @@ -56,7 +56,7 @@
protected final Map<ActiveMQDestination, Destination> destinations = new \
ConcurrentHashMap<ActiveMQDestination, Destination>(); protected final \
DestinationMap destinationMap = new DestinationMap();
protected final Map<ConsumerId, Subscription> subscriptions = new \
ConcurrentHashMap<ConsumerId, Subscription>();
- protected final SystemUsage memoryManager;
+ protected final SystemUsage usageManager;
protected final DestinationFactory destinationFactory;
protected final DestinationStatistics destinationStatistics;
protected final RegionBroker broker;
@@ -73,7 +73,7 @@
}
this.broker = broker;
this.destinationStatistics = destinationStatistics;
- this.memoryManager = memoryManager;
+ this.usageManager = memoryManager;
this.taskRunnerFactory = taskRunnerFactory;
if (broker == null) {
throw new IllegalArgumentException("null destinationFactory");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/DurableTopicSubscription.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java \
Wed Nov 28 12:19:27 2007 @@ -43,14 +43,13 @@
private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = \
new ConcurrentHashMap<ActiveMQDestination, Destination>(); private final \
SubscriptionKey subscriptionKey; private final boolean keepDurableSubsActive;
- private final SystemUsage usageManager;
private boolean active;
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, \
ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws \
InvalidSelectorException {
- super(broker, context, info);
+ super(broker,usageManager, context, info);
this.pending = new StoreDurableSubscriberCursor(context.getClientId(), \
info.getSubscriptionName(), broker.getTempDataStore(), \
info.getPrefetchSize(), this);
- this.usageManager = usageManager;
+ this.pending.setSystemUsage(usageManager);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), \
info.getSubscriptionName()); }
@@ -191,7 +190,7 @@
return active;
}
- protected synchronized void acknowledge(ConnectionContext context, MessageAck \
ack, MessageReference node) throws IOException { + protected void \
acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws \
IOException {
node.getRegionDestination().acknowledge(context, this, ack, node);
redeliveredMessages.remove(node.getMessageId());
node.decrementReferenceCount();
@@ -238,7 +237,7 @@
}
/**
- * @param memoryManager
+ * @param usageManager
* @param oldPercentUsage
* @param newPercentUsage
* @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage,
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/PrefetchSubscription.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java \
Wed Nov 28 12:19:27 2007 @@ -23,6 +23,7 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -38,6 +39,7 @@
import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -55,14 +57,20 @@
protected long enqueueCounter;
protected long dispatchCounter;
protected long dequeueCounter;
+ protected boolean optimizedDispatch=false;
+ private int maxProducersToAudit=32;
+ private int maxAuditDepth=2048;
+ protected final SystemUsage usageManager;
+ protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
- public PrefetchSubscription(Broker broker, ConnectionContext context, \
ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException { + \
public PrefetchSubscription(Broker broker, SystemUsage usageManager, \
ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws \
InvalidSelectorException { super(broker, context, info);
+ this.usageManager=usageManager;
pending = cursor;
}
- public PrefetchSubscription(Broker broker, ConnectionContext context, \
ConsumerInfo info) throws InvalidSelectorException {
- this(broker, context, info, new VMPendingMessageCursor());
+ public PrefetchSubscription(Broker broker, SystemUsage usageManager, \
ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { + \
this(broker,usageManager,context, info, new VMPendingMessageCursor()); }
/**
@@ -118,8 +126,7 @@
boolean pendingEmpty = false;
pendingEmpty = pending.isEmpty();
enqueueCounter++;
-
- if (!isFull() && pendingEmpty && !isSlave()) {
+ if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) {
dispatch(node);
} else {
optimizePrefetch();
@@ -128,6 +135,7 @@
LOG.debug("Prefetch limit.");
}
pending.addMessageLast(node);
+ dispatchMatched();
}
}
}
@@ -364,6 +372,9 @@
public synchronized void setPending(PendingMessageCursor pending) {
this.pending = pending;
+ if (this.pending!=null) {
+ this.pending.setSystemUsage(usageManager);
+ }
}
/**
@@ -440,6 +451,9 @@
if (node != QueueMessageReference.NULL_MESSAGE) {
dispatchCounter++;
dispatched.addLast(node);
+ if(pending != null) {
+ pending.dispatched(message);
+ }
} else {
prefetchExtension = Math.max(0, prefetchExtension - 1);
}
@@ -459,8 +473,6 @@
context.getConnection().dispatchSync(md);
onDispatch(node, message);
}
- // System.err.println(broker.getBrokerName() + " " + this + " (" +
- // enqueueCounter + ", " + dispatchCounter +") " + node);
return true;
} else {
return false;
@@ -534,6 +546,30 @@
* @throws IOException
*/
protected void acknowledge(ConnectionContext context, final MessageAck ack, \
final MessageReference node) throws IOException { + }
+
+ public boolean isOptimizedDispatch() {
+ return optimizedDispatch;
+ }
+
+ public void setOptimizedDispatch(boolean optimizedDispatch) {
+ this.optimizedDispatch = optimizedDispatch;
+ }
+
+ public int getMaxProducersToAudit() {
+ return maxProducersToAudit;
+ }
+
+ public void setMaxProducersToAudit(int maxProducersToAudit) {
+ this.maxProducersToAudit = maxProducersToAudit;
+ }
+
+ public int getMaxAuditDepth() {
+ return maxAuditDepth;
+ }
+
+ public void setMaxAuditDepth(int maxAuditDepth) {
+ this.maxAuditDepth = maxAuditDepth;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java \
Wed Nov 28 12:19:27 2007 @@ -96,7 +96,7 @@
private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
private final MessageEvaluationContext queueMsgConext = new \
MessageEvaluationContext(); private final Object exclusiveLockMutex = new Object();
- private TaskRunner taskRunner;
+ private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new \
LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/QueueBrowserSubscription.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java \
Wed Nov 28 12:19:27 2007 @@ -25,14 +25,15 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.usage.SystemUsage;
public class QueueBrowserSubscription extends QueueSubscription {
boolean browseDone;
- public QueueBrowserSubscription(Broker broker, ConnectionContext context, \
ConsumerInfo info) + public QueueBrowserSubscription(Broker broker, SystemUsage \
usageManager, ConnectionContext context, ConsumerInfo info) throws \
InvalidSelectorException {
- super(broker, context, info);
+ super(broker,usageManager, context, info);
}
protected boolean canDispatch(MessageReference node) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/QueueRegion.java?rev=599129&r1=599128&r2=599129&view=diff \
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java \
Wed Nov 28 12:19:27 2007 @@ -41,15 +41,15 @@
public String toString() {
return "QueueRegion: destinations=" + destinations.size() + ", \
subscriptions=" + subscriptions.size()
- + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + \
"%"; + + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() \
+ "%"; }
protected Subscription createSubscription(ConnectionContext context, \
ConsumerInfo info) throws InvalidSelectorException {
if (info.isBrowser()) {
- return new QueueBrowserSubscription(broker, context, info);
+ return new QueueBrowserSubscription(broker,usageManager, context, info);
} else {
- return new QueueSubscription(broker, context, info);
+ return new QueueSubscription(broker, usageManager,context, info);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/QueueSubscription.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java \
Wed Nov 28 12:19:27 2007 @@ -28,6 +28,7 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,8 +36,8 @@
private static final Log LOG = LogFactory.getLog(QueueSubscription.class);
- public QueueSubscription(Broker broker, ConnectionContext context, ConsumerInfo \
info) throws InvalidSelectorException {
- super(broker, context, info);
+ public QueueSubscription(Broker broker, SystemUsage usageManager, \
ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { + \
super(broker,usageManager, context, info); }
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/TempQueueRegion.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java \
Wed Nov 28 12:19:27 2007 @@ -41,7 +41,7 @@
protected Destination createDestination(ConnectionContext context, \
ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = \
(ActiveMQTempDestination)destination;
- return new Queue(broker.getRoot(), destination, memoryManager, null, \
destinationStatistics, taskRunnerFactory, null) { + return new \
Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, \
taskRunnerFactory, null) {
public void addSubscription(ConnectionContext context, Subscription sub) \
throws Exception {
@@ -58,14 +58,14 @@
protected Subscription createSubscription(ConnectionContext context, \
ConsumerInfo info) throws InvalidSelectorException { if (info.isBrowser()) {
- return new QueueBrowserSubscription(broker, context, info);
+ return new QueueBrowserSubscription(broker,usageManager,context, info);
} else {
- return new QueueSubscription(broker, context, info);
+ return new QueueSubscription(broker, usageManager,context, info);
}
}
public String toString() {
- return "TempQueueRegion: destinations=" + destinations.size() + ", \
subscriptions=" + subscriptions.size() + ", memory=" + \
memoryManager.getMemoryUsage().getPercentUsage() + "%"; + return \
"TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + \
subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() \
+ "%"; }
public void removeDestination(ConnectionContext context, ActiveMQDestination \
destination, long timeout) throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/TempTopicRegion.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java \
Wed Nov 28 12:19:27 2007 @@ -47,13 +47,13 @@
throw new JMSException("A durable subscription cannot be created for a \
temporary topic."); }
try {
- TopicSubscription answer = new TopicSubscription(broker, context, info, \
memoryManager); + TopicSubscription answer = new TopicSubscription(broker, \
context, info, usageManager);
// lets configure the subscription depending on the destination
ActiveMQDestination destination = info.getDestination();
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = \
broker.getDestinationPolicy().getEntryFor(destination); if (entry != null) {
- entry.configure(broker, memoryManager, answer);
+ entry.configure(broker, usageManager, answer);
}
}
answer.init();
@@ -67,7 +67,7 @@
}
public String toString() {
- return "TempTopicRegion: destinations=" + destinations.size() + ", \
subscriptions=" + subscriptions.size() + ", memory=" + \
memoryManager.getMemoryUsage().getPercentUsage() + "%"; + return \
"TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + \
subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() \
+ "%"; }
public void removeDestination(ConnectionContext context, ActiveMQDestination \
destination, long timeout) throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java \
Wed Nov 28 12:19:27 2007 @@ -50,6 +50,8 @@
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
@@ -65,7 +67,7 @@
*
* @version $Revision: 1.21 $
*/
-public class Topic extends BaseDestination {
+public class Topic extends BaseDestination implements Task{
private static final Log LOG = LogFactory.getLog(Topic.class);
protected final ActiveMQDestination destination;
protected final CopyOnWriteArrayList<Subscription> consumers = new \
CopyOnWriteArrayList<Subscription>(); @@ -81,28 +83,20 @@
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> \
durableSubcribers = new ConcurrentHashMap<SubscriptionKey, \
DurableTopicSubscription>();
-
+ private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new \
LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
-
- // We may need to do this in async thread since this is run for
- // within a synchronization
- // that the UsageManager is holding.
-
- synchronized (messagesWaitingForSpace) {
- while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) \
{
- Runnable op = messagesWaitingForSpace.removeFirst();
- op.run();
+ try {
+ Topic.this.taskRunner.wakeup();
+ } catch (InterruptedException e) {
}
- }
-
};
};
private final Broker broker;
public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore \
store, SystemUsage systemUsage, DestinationStatistics parentStats,
- TaskRunnerFactory taskFactory) {
+ TaskRunnerFactory taskFactory) throws Exception {
this.broker = broker;
this.destination = destination;
this.store = store; // this could be NULL! (If an advisory)
@@ -115,7 +109,8 @@
}else{
//set the default
subscriptionRecoveryPolicy= new FixedSizedSubscriptionRecoveryPolicy();
- }
+ }
+ this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + \
destination.getPhysicalName());
// Let the store know what usage manager we are using so that he can
// flush messages to disk
// when usage gets high.
@@ -463,6 +458,9 @@
}
public void stop() throws Exception {
+ if (taskRunner != null) {
+ taskRunner.shutdown();
+ }
this.subscriptionRecoveryPolicy.stop();
if (memoryUsage != null) {
memoryUsage.stop();
@@ -499,6 +497,15 @@
}
return result.toArray(new Message[result.size()]);
}
+
+ public boolean iterate() {
+ while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
+ Runnable op = messagesWaitingForSpace.removeFirst();
+ op.run();
+ }
+ return false;
+ }
+
// Properties
// -------------------------------------------------------------------------
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/TopicRegion.java?rev=599129&r1=599128&r2=599129&view=diff \
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java \
Wed Nov 28 12:19:27 2007 @@ -100,7 +100,7 @@
+ " subscriberName: " + \
key.getSubscriptionName()); }
}
- sub.activate(memoryManager, context, info);
+ sub.activate(usageManager, context, info);
return sub;
} else {
return super.addConsumer(context, info);
@@ -140,7 +140,7 @@
}
public String toString() {
- return "TopicRegion: destinations=" + destinations.size() + ", \
subscriptions=" + subscriptions.size() + ", memory=" + \
memoryManager.getMemoryUsage().getPercentUsage() + "%"; + return "TopicRegion: \
destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", \
memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%"; }
@Override
@@ -230,12 +230,12 @@
SubscriptionKey key = new SubscriptionKey(context.getClientId(), \
info.getSubscriptionName()); DurableTopicSubscription sub = \
durableSubscriptions.get(key); if (sub == null) {
- sub = new DurableTopicSubscription(broker, memoryManager, context, \
info, keepDurableSubsActive); + sub = new \
DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive); \
ActiveMQDestination destination = info.getDestination();
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = \
broker.getDestinationPolicy().getEntryFor(destination); if (entry != null) {
- entry.configure(broker, memoryManager, sub);
+ entry.configure(broker, usageManager, sub);
}
}
durableSubscriptions.put(key, sub);
@@ -245,13 +245,13 @@
return sub;
}
try {
- TopicSubscription answer = new TopicSubscription(broker, context, info, \
memoryManager); + TopicSubscription answer = new TopicSubscription(broker, \
context, info, usageManager);
// lets configure the subscription depending on the destination
ActiveMQDestination destination = info.getDestination();
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = \
broker.getDestinationPolicy().getEntryFor(destination); if (entry != null) {
- entry.configure(broker, memoryManager, answer);
+ entry.configure(broker, usageManager, answer);
}
}
answer.init();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java \
Wed Nov 28 12:19:27 2007 @@ -50,6 +50,7 @@
public synchronized void stop() throws Exception {
started=false;
+ audit=null;
gc();
}
@@ -238,6 +239,13 @@
public boolean isTransient() {
return false;
}
+
+ /**
+ * Mark a message as already dispatched
+ * @param message
+ */
+ public void dispatched(MessageReference message) {
+ }
protected synchronized boolean isDuplicate(MessageId messageId) {
@@ -246,7 +254,12 @@
}
return this.audit.isDuplicate(messageId);
}
-
-
+
+ protected synchronized void rollback(MessageId id) {
+ if (this.audit != null) {
+ audit.rollback(id);
+ }
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java \
Wed Nov 28 12:19:27 2007 @@ -142,6 +142,7 @@
for (Iterator<MessageReference> i = getDiskList().iterator(); \
i.hasNext() && count < maxItems;) { Message message = (Message)i.next();
message.setRegionDestination(regionDestination);
+ message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
result.add(message);
count++;
@@ -210,6 +211,7 @@
if (!isDiskListEmpty()) {
// got from disk
message.setRegionDestination(regionDestination);
+ message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
}
return message;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/cursors/PendingMessageCursor.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java \
Wed Nov 28 12:19:27 2007 @@ -247,6 +247,12 @@
* disappears when the broker shuts down
*/
public boolean isTransient();
+
+ /**
+ * Mark a message as already dispatched
+ * @param message
+ */
+ public void dispatched(MessageReference message);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java \
Wed Nov 28 12:19:27 2007 @@ -119,6 +119,7 @@
Message result = batchList.removeFirst();
result.decrementReferenceCount();
result.setRegionDestination(regionDestination);
+ result.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
return result;
}
@@ -133,6 +134,7 @@
throws Exception {
if (!isDuplicate(message.getMessageId())) {
message.setRegionDestination(regionDestination);
+ message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
batchList.addLast(message);
} else {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java \
Wed Nov 28 12:19:27 2007 @@ -288,6 +288,20 @@
nonPersistent.setEnableAudit(enableAudit);
}
}
+
+ /**
+ * Mark a message as already dispatched
+ * @param message
+ */
+ public void dispatched(MessageReference message) {
+ super.dispatched(message);
+ for (PendingMessageCursor cursor : storePrefetches) {
+ cursor.dispatched(message);
+ }
+ if (nonPersistent != null) {
+ nonPersistent.dispatched(message);
+ }
+ }
protected synchronized PendingMessageCursor getNextCursor() throws Exception {
if (currentCursor == null || currentCursor.isEmpty()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java \
Wed Nov 28 12:19:27 2007 @@ -18,7 +18,9 @@
import java.io.IOException;
import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
@@ -28,6 +30,9 @@
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,21 +42,19 @@
*
* @version $Revision$
*/
-class TopicStorePrefetch extends AbstractPendingMessageCursor implements \
MessageRecoveryListener { +class TopicStorePrefetch extends \
AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
private TopicMessageStore store;
- private final LinkedList<Message> batchList = new LinkedList<Message>();
+ private final LinkedHashMap<MessageId,Message> batchList = new \
LinkedHashMap<MessageId,Message> (); private String clientId;
private String subscriberName;
private Destination regionDestination;
- private MessageId firstMessageId;
- private MessageId lastMessageId;
private boolean batchResetNeeded = true;
private boolean storeMayHaveMoreMessages = true;
private boolean started;
private final Subscription subscription;
-
+
/**
* @param topic
* @param clientId
@@ -63,12 +66,15 @@
this.store = (TopicMessageStore)topic.getMessageStore();
this.clientId = clientId;
this.subscriberName = subscriberName;
+ this.maxProducersToAudit=32;
+ this.maxAuditDepth=10000;
}
public synchronized void start() throws Exception {
if (!started) {
started = true;
super.start();
+ getSystemUsage().getMemoryUsage().addUsageListener(this);
safeFillBatch();
}
}
@@ -76,6 +82,7 @@
public synchronized void stop() throws Exception {
if (started) {
started = false;
+ getSystemUsage().getMemoryUsage().removeUsageListener(this);
super.stop();
store.resetBatching(clientId, subscriberName);
gc();
@@ -97,22 +104,16 @@
public synchronized void addMessageLast(MessageReference node) throws Exception \
{ if (node != null) {
- if (isEmpty() && started) {
- firstMessageId = node.getMessageId();
- }
- lastMessageId = node.getMessageId();
- node.decrementReferenceCount();
storeMayHaveMoreMessages=true;
+ node.decrementReferenceCount();
}
}
public synchronized void addMessageFirst(MessageReference node) throws Exception \
{ if (node != null) {
- if (started) {
- firstMessageId = node.getMessageId();
- }
- node.decrementReferenceCount();
storeMayHaveMoreMessages=true;
+ node.decrementReferenceCount();
+ rollback(node.getMessageId());
}
}
@@ -127,7 +128,8 @@
}
public synchronized boolean hasNext() {
- return !isEmpty();
+ boolean result = !isEmpty();
+ return result;
}
public synchronized MessageReference next() {
@@ -136,13 +138,11 @@
if (batchList.isEmpty()) {
return null;
} else {
- result = batchList.removeFirst();
- if (lastMessageId != null) {
- if (result.getMessageId().equals(lastMessageId)) {
- // pendingCount=0;
- }
- }
+ Iterator i = batchList.entrySet().iterator();
+ result = (Message) ((Map.Entry)i.next()).getValue();
+ i.remove();
result.setRegionDestination(regionDestination);
+ result.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
}
return result;
}
@@ -154,16 +154,23 @@
public void finished() {
}
- public synchronized boolean recoverMessage(Message message) throws Exception {
+ public synchronized boolean recoverMessage(Message message)
+ throws Exception {
MessageEvaluationContext messageEvaluationContext = new \
MessageEvaluationContext(); messageEvaluationContext.setMessageReference(message);
- if( subscription.matches(message, messageEvaluationContext) ) {
+ if (subscription.matches(message, messageEvaluationContext)) {
message.setRegionDestination(regionDestination);
- // only increment if count is zero (could have been cached)
- if (message.getReferenceCount() == 0) {
- message.incrementReferenceCount();
+ if (!isDuplicate(message.getMessageId())) {
+ // only increment if count is zero (could have been cached)
+ if (message.getReferenceCount() == 0) {
+ message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+ message.incrementReferenceCount();
+
+ }
+ batchList.put(message.getMessageId(), message);
+ }else {
+ this.storeMayHaveMoreMessages=true;
}
- batchList.addLast(message);
}
return true;
}
@@ -172,9 +179,23 @@
// shouldn't get called
throw new RuntimeException("Not supported");
}
+
+ /**
+ * Mark a message as already dispatched
+ * @param message
+ */
+ public synchronized void dispatched(MessageReference message) {
+ if (this.audit != null) {
+ isDuplicate(message.getMessageId());
+ Message removed = this.batchList.remove(message.getMessageId());
+ if (removed != null) {
+ removed.decrementReferenceCount();
+ }
+ }
+ }
// implementation
- protected void safeFillBatch() {
+ protected synchronized void safeFillBatch() {
try {
fillBatch();
} catch (Exception e) {
@@ -184,29 +205,17 @@
}
protected synchronized void fillBatch() throws Exception {
- if( batchResetNeeded ) {
- store.resetBatching(clientId, subscriberName);
- batchResetNeeded=false;
- storeMayHaveMoreMessages=true;
- }
-
- while( batchList.isEmpty() && storeMayHaveMoreMessages ) {
- store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this);
- if( batchList.isEmpty() ) {
- storeMayHaveMoreMessages = false;
- } else {
- if (firstMessageId != null) {
- int pos = 0;
- for (Iterator<Message> iter = batchList.iterator(); \
iter.hasNext();) {
- Message msg = iter.next();
- if (msg.getMessageId().equals(firstMessageId)) {
- firstMessageId = null;
- break;
- } else {
- iter.remove();
- }
- }
- }
+ if (batchResetNeeded) {
+ this.store.resetBatching(clientId, subscriberName);
+ this.batchResetNeeded = false;
+ this.storeMayHaveMoreMessages = true;
+ }
+ while (this.batchList.isEmpty() && this.storeMayHaveMoreMessages) {
+ this.storeMayHaveMoreMessages = false;
+ this.store.recoverNextMessages(clientId, subscriberName,
+ maxBatchSize, this);
+ if (!this.batchList.isEmpty()) {
+ this.storeMayHaveMoreMessages=true;
}
}
}
@@ -221,11 +230,22 @@
}
public synchronized void gc() {
- for (Message msg : batchList) {
+ for (Message msg : batchList.values()) {
msg.decrementReferenceCount();
}
batchList.clear();
batchResetNeeded = true;
+ }
+
+ public void onUsageChanged(Usage usage, int oldPercentUsage,int newPercentUsage) \
{ + if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
+ storeMayHaveMoreMessages = true;
+ try {
+ fillBatch();
+ } catch (Exception e) {
+ LOG.error("Failed to fill batch ", e);
+ }
+ }
}
public String toString() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/broker/region/policy/PolicyEntry.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java \
Wed Nov 28 12:19:27 2007 @@ -51,11 +51,13 @@
private PendingQueueMessageStoragePolicy pendingQueuePolicy;
private PendingDurableSubscriberMessageStoragePolicy \
pendingDurableSubscriberPolicy;
private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
- private int maxProducersToAudit=1024;
- private int maxAuditDepth=1;
+ private int maxProducersToAudit=32;
+ private int maxAuditDepth=1024;
+ private int maxQueueAuditDepth=1;
private boolean enableAudit=true;
private boolean producerFlowControl = true;
-
+ private boolean optimizedDispatch=false;
+
public void configure(Queue queue, Store tmpStore) {
if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy);
@@ -73,7 +75,7 @@
}
queue.setProducerFlowControl(isProducerFlowControl());
queue.setEnableAudit(isEnableAudit());
- queue.setMaxAuditDepth(getMaxAuditDepth());
+ queue.setMaxAuditDepth(getMaxQueueAuditDepth());
queue.setMaxProducersToAudit(getMaxProducersToAudit());
}
@@ -132,6 +134,8 @@
cursor.setSystemUsage(memoryManager);
sub.setPending(cursor);
}
+ sub.setMaxAuditDepth(getMaxAuditDepth());
+ sub.setMaxProducersToAudit(getMaxProducersToAudit());
}
// Properties
@@ -329,6 +333,22 @@
*/
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
+ }
+
+ public int getMaxQueueAuditDepth() {
+ return maxQueueAuditDepth;
+ }
+
+ public void setMaxQueueAuditDepth(int maxQueueAuditDepth) {
+ this.maxQueueAuditDepth = maxQueueAuditDepth;
+ }
+
+ public boolean isOptimizedDispatch() {
+ return optimizedDispatch;
+ }
+
+ public void setOptimizedDispatch(boolean optimizedDispatch) {
+ this.optimizedDispatch = optimizedDispatch;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java \
Wed Nov 28 12:19:27 2007 @@ -26,6 +26,7 @@
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@@ -81,6 +82,7 @@
private transient short referenceCount;
private transient ActiveMQConnection connection;
private transient org.apache.activemq.broker.region.Destination \
regionDestination; + private transient MemoryUsage memoryUsage;
private BrokerId[] brokerPath;
private BrokerId[] cluster;
@@ -127,6 +129,7 @@
copy.regionDestination = regionDestination;
copy.brokerInTime = brokerInTime;
copy.brokerOutTime = brokerOutTime;
+ copy.memoryUsage=this.memoryUsage;
// copying the broker path breaks networks - if a consumer re-uses a
// consumed
// message and forwards it on
@@ -567,6 +570,17 @@
public void setRegionDestination(org.apache.activemq.broker.region.Destination \
destination) { this.regionDestination = destination;
+ if(this.memoryUsage==null) {
+ this.memoryUsage=regionDestination.getBrokerMemoryUsage();
+ }
+ }
+
+ public MemoryUsage getMemoryUsage() {
+ return this.memoryUsage;
+ }
+
+ public void setMemoryUsage(MemoryUsage usage) {
+ this.memoryUsage=usage;
}
public boolean isMarshallAware() {
@@ -581,16 +595,15 @@
size = getSize();
}
- if (rc == 1 && regionDestination != null) {
- regionDestination.getBrokerMemoryUsage().increaseUsage(size);
+ if (rc == 1 && getMemoryUsage() != null) {
+ getMemoryUsage().increaseUsage(size);
}
- // System.out.println(" + "+getDestination()+" :::: "+getMessageId()+"
- // "+rc);
+ //System.out.println(" + "+getMemoryUsage().getName()+" :::: \
"+getMessageId()+"rc="+rc); return rc;
}
- public synchronized int decrementReferenceCount() {
+ public int decrementReferenceCount() {
int rc;
int size;
synchronized (this) {
@@ -598,11 +611,10 @@
size = getSize();
}
- if (rc == 0 && regionDestination != null) {
- regionDestination.getBrokerMemoryUsage().decreaseUsage(size);
+ if (rc == 0 && getMemoryUsage() != null) {
+ getMemoryUsage().decreaseUsage(size);
}
- // System.out.println(" - "+getDestination()+" :::: "+getMessageId()+"
- // "+rc);
+ //System.out.println(" - "+getMemoryUsage().getName()+" :::: \
"+getMessageId()+"rc="+rc);
return rc;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/store/amq/AMQTopicMessageStore.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java \
Wed Nov 28 12:19:27 2007 @@ -59,13 +59,15 @@
topicReferenceStore.recoverSubscription(clientId, subscriptionName, new \
RecoveryListenerAdapter(this, listener)); }
- public void recoverNextMessages(String clientId, String subscriptionName, int \
maxReturned, final MessageRecoveryListener listener) throws Exception { + public \
void recoverNextMessages(String clientId, String subscriptionName, + int \
maxReturned, final MessageRecoveryListener listener) + throws Exception {
RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, \
listener);
- topicReferenceStore.recoverNextMessages(clientId, subscriptionName, \
maxReturned, recoveryListener);
- if (recoveryListener.size() == 0) {
- flush();
- topicReferenceStore.recoverNextMessages(clientId, subscriptionName, \
maxReturned, recoveryListener);
- }
+ topicReferenceStore.recoverNextMessages(clientId, \
subscriptionName,maxReturned, recoveryListener); + if \
(recoveryListener.size() == 0) { + flush();
+ topicReferenceStore.recoverNextMessages(clientId,subscriptionName, \
maxReturned, recoveryListener); + }
}
public SubscriptionInfo lookupSubscription(String clientId, String \
subscriptionName) throws IOException { @@ -145,14 +147,18 @@
* @param key
* @throws IOException
*/
- protected void acknowledge(ConnectionContext context,MessageId messageId, \
Location location, String clientId,String subscriptionName) throws IOException { + \
protected void acknowledge(ConnectionContext context, MessageId messageId, + \
Location location, String clientId, String subscriptionName) + throws \
IOException { synchronized (this) {
lastLocation = location;
- if (topicReferenceStore.acknowledgeReference(context, clientId, \
subscriptionName, messageId)){
- MessageAck ack = new MessageAck();
- ack.setLastMessageId(messageId);
- removeMessage(context, ack);
- }
+ }
+ if (topicReferenceStore.acknowledgeReference(context, clientId,
+ subscriptionName, messageId)) {
+ MessageAck ack = new MessageAck();
+ ack.setLastMessageId(messageId);
+ removeMessage(context, ack);
+
}
try {
asyncWriteTask.wakeup();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/store/kahadaptor/KahaReferenceStore.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java \
Wed Nov 28 12:19:27 2007 @@ -63,10 +63,14 @@
throw new RuntimeException("Use addMessageReference instead");
}
- protected final boolean recoverReference(MessageRecoveryListener listener, \
ReferenceRecord record)
- throws Exception {
- listener.recoverMessageReference(new MessageId(record.getMessageId()));
- return listener.hasSpace();
+ protected final boolean recoverReference(MessageRecoveryListener listener,
+ ReferenceRecord record) throws Exception {
+ MessageId id = new MessageId(record.getMessageId());
+ if (listener.hasSpace()) {
+ listener.recoverMessageReference(id);
+ return true;
+ }
+ return false;
}
public synchronized void recover(MessageRecoveryListener listener) throws \
Exception { @@ -90,14 +94,15 @@
entry = messageContainer.getNext(entry);
}
}
- if (entry != null) {
+ if (entry != null) {
int count = 0;
do {
ReferenceRecord msg = messageContainer.getValue(entry);
- if (msg != null) {
- recoverReference(listener, msg);
- count++;
- lastBatchId = msg.getMessageId();
+ if (msg != null ) {
+ if ( recoverReference(listener, msg)) {
+ count++;
+ lastBatchId = msg.getMessageId();
+ }
} else {
lastBatchId = null;
}
@@ -134,7 +139,7 @@
removeMessage(ack.getLastMessageId());
}
- public synchronized void removeMessage(MessageId msgId) throws IOException {
+ public synchronized void removeMessage(MessageId msgId) throws IOException {
StoreEntry entry = messageContainer.getEntry(msgId);
if (entry != null) {
ReferenceRecord rr = messageContainer.remove(msgId);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java \
Wed Nov 28 12:19:27 2007 @@ -245,15 +245,19 @@
entry = container.getNextEntry(entry);
}
}
-
+
if (entry != null) {
do {
ConsumerMessageRef consumerRef = container.get(entry);
- ReferenceRecord msg = \
messageContainer.getValue(consumerRef.getMessageEntry()); + \
ReferenceRecord msg = messageContainer.getValue(consumerRef + \
.getMessageEntry()); if (msg != null) {
- recoverReference(listener, msg);
- count++;
- container.setBatchEntry(msg.getMessageId(), entry);
+ if (recoverReference(listener, msg)) {
+ count++;
+ container.setBatchEntry(msg.getMessageId(), entry);
+ } else {
+ break;
+ }
} else {
container.reset();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apach \
e/activemq/store/kahadaptor/TopicSubContainer.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java \
Wed Nov 28 12:19:27 2007 @@ -67,17 +67,20 @@
if (!listContainer.isEmpty()) {
StoreEntry entry = listContainer.getFirst();
while (entry != null) {
- ConsumerMessageRef ref = \
(ConsumerMessageRef)listContainer.get(entry);
- listContainer.remove(entry);
- if (listContainer != null && batchEntry != null && \
(listContainer.isEmpty() || batchEntry.equals(entry))) {
- reset();
- }
+ ConsumerMessageRef ref = \
(ConsumerMessageRef)listContainer.get(entry); if (ref != null && \
ref.getMessageId().equals(id)) { result = ref;
+ listContainer.remove(entry);
+ if (batchEntry != null && batchEntry.equals(entry)) {
+ reset();
+ }
break;
}
- entry = listContainer.getFirst();
+ entry = listContainer.getNext(entry);
}
+ }
+ if (listContainer != null && (listContainer.isEmpty() )) {
+ reset();
}
return result;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java \
Wed Nov 28 12:19:27 2007 @@ -118,15 +118,15 @@
if (value == 0) {
return;
}
- if (parent != null) {
- ((MemoryUsage)parent).increaseUsage(value);
- }
int percentUsage;
synchronized (usageMutex) {
usage += value;
percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
+ if (parent != null) {
+ ((MemoryUsage)parent).increaseUsage(value);
+ }
}
/**
@@ -138,15 +138,15 @@
if (value == 0) {
return;
}
- if (parent != null) {
- parent.decreaseUsage(value);
- }
int percentUsage;
synchronized (usageMutex) {
usage -= value;
percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
+ if (parent != null) {
+ parent.decreaseUsage(value);
+ }
}
protected long retrieveUsage() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=599129&r1=599128&r2=599129&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java \
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java \
Wed Nov 28 12:19:27 2007 @@ -30,6 +30,7 @@
private int firstIndex = -1;
private int firstBin = -1;
private long lastBitSet=-1;
+ private long lastInOrderBit=-1;
/**
* Create a BitArrayBin to a certain window size (number of messages to
@@ -76,10 +77,15 @@
* @return true if next message is in order
*/
public boolean isInOrder(long index) {
- if (lastBitSet== -1) {
- return true;
+ boolean result = false;
+ if (lastInOrderBit == -1) {
+ result = true;
+ } else {
+ result = lastInOrderBit + 1 == index;
}
- return lastBitSet+1==index;
+ lastInOrderBit = index;
+ return result;
+
}
/**
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apach \
e/activemq/ActiveMQMessageAuditTest.java?rev=599129&r1=599128&r2=599129&view=diff \
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java \
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java \
Wed Nov 28 12:19:27 2007 @@ -105,6 +105,7 @@
String id = idGen.generateId();
if (i==0) {
assertFalse(audit.isDuplicate(id));
+ assertTrue(audit.isInOrder(id));
}
if (i > 1 && i%2 != 0) {
list.add(id);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apach \
e/activemq/bugs/DurableConsumerTest.java?rev=599129&r1=599128&r2=599129&view=diff \
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java \
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java \
Wed Nov 28 12:19:27 2007 @@ -63,6 +63,8 @@
MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, \
CONSUMER_NAME); consumerConnection.start();
consumerConnection.close();
+ broker.stop();
+ broker =createBroker();
Connection producerConnection = factory.createConnection();
@@ -79,7 +81,8 @@
}
}
producerConnection.close();
-
+ broker.stop();
+ broker =createBroker();
consumerConnection = factory.createConnection();
consumerConnection.setClientID(CONSUMER_NAME);
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic