[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-commits
Subject: svn commit: r500862 - in /incubator/activemq/trunk/activemq-core/src:
From: rajdavies () apache ! org
Date: 2007-01-28 19:39:04
Message-ID: 20070128193905.286A71A981A () eris ! apache ! org
[Download RAW message or body]
Author: rajdavies
Date: Sun Jan 28 11:39:02 2007
New Revision: 500862
URL: http://svn.apache.org/viewvc?view=rev&rev=500862
Log:
Updated support for configurable Cursor types from the Destination Policy Map
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java \
(with props) incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java \
(with props) incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java \
(with props) incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java
incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java \
Sun Jan 28 11:39:02 2007 @@ -254,20 +254,6 @@
* @param adminConnectionContext
*/
public abstract void setAdminConnectionContext(ConnectionContext \
adminConnectionContext);
-
- /**
- * @return the pendingDurableSubscriberPolicy
- */
- public abstract PendingDurableSubscriberMessageStoragePolicy \
getPendingDurableSubscriberPolicy();
-
- /**
- * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to \
set
- */
- public abstract void \
setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy \
pendingDurableSubscriberPolicy);
- /**
- * @return the broker's temp data store
- * @throws Exception
- */
-
+
public Store getTempDataStore();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java \
Sun Jan 28 11:39:02 2007 @@ -234,15 +234,6 @@
next.setAdminConnectionContext(adminConnectionContext);
}
- public PendingDurableSubscriberMessageStoragePolicy \
getPendingDurableSubscriberPolicy() {
- return next.getPendingDurableSubscriberPolicy();
- }
-
- public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy \
pendingDurableSubscriberPolicy) {
- next.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
- }
-
-
public Store getTempDataStore() {
return next.getTempDataStore();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/BrokerService.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java \
Sun Jan 28 11:39:02 2007 @@ -55,7 +55,6 @@
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
@@ -149,8 +148,6 @@
private Store tempDataStore;
private int persistenceThreadPriority = Thread.MAX_PRIORITY;
private boolean useLocalHostBrokerName = false;
- //private PendingDurableSubscriberMessageStoragePolicy \
pendingDurableSubscriberPolicy = new \
VMPendingDurableSubscriberMessageStoragePolicy();
- private PendingDurableSubscriberMessageStoragePolicy \
pendingDurableSubscriberPolicy = new \
StorePendingDurableSubscriberMessageStoragePolicy();
@@ -1008,24 +1005,7 @@
public void setPersistenceThreadPriority(int persistenceThreadPriority){
this.persistenceThreadPriority=persistenceThreadPriority;
}
-
- /**
- * @return the pendingDurableSubscriberPolicy
- */
- public PendingDurableSubscriberMessageStoragePolicy \
getPendingDurableSubscriberPolicy(){
- return this.pendingDurableSubscriberPolicy;
- }
-
- /**
- * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to \
set
- */
- public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy \
pendingDurableSubscriberPolicy){
- this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy;
- if (broker != null) {
- broker.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
- }
- }
-
+
/**
* @return the useLocalHostBrokerName
*/
@@ -1296,7 +1276,6 @@
regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
regionBroker.setBrokerName(getBrokerName());
- regionBroker.setPendingDurableSubscriberPolicy(getPendingDurableSubscriberPolicy());
return regionBroker;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=500862&r1=500861&r2=500862 \
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java \
Sun Jan 28 11:39:02 2007 @@ -232,12 +232,6 @@
return null;
}
- public PendingDurableSubscriberMessageStoragePolicy \
getPendingDurableSubscriberPolicy() {
- return null;
- }
-
- public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy \
pendingDurableSubscriberPolicy) {
- }
public Store getTempDataStore() {
return null;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=500862&r1=500861&r2=500862 \
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java \
Sun Jan 28 11:39:02 2007 @@ -231,15 +231,7 @@
public Response messagePull(ConnectionContext context, MessagePull pull) {
throw new BrokerStoppedException(this.message);
}
-
- public PendingDurableSubscriberMessageStoragePolicy \
getPendingDurableSubscriberPolicy() {
- throw new BrokerStoppedException(this.message);
- }
-
- public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy \
pendingDurableSubscriberPolicy) {
- throw new BrokerStoppedException(this.message);
- }
-
+
public Store getTempDataStore() {
throw new BrokerStoppedException(this.message);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java \
Sun Jan 28 11:39:02 2007 @@ -246,14 +246,6 @@
return getNext().messagePull(context, pull);
}
- public PendingDurableSubscriberMessageStoragePolicy \
getPendingDurableSubscriberPolicy() {
- return getNext().getPendingDurableSubscriberPolicy();
- }
-
- public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy \
pendingDurableSubscriberPolicy) {
- getNext().setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
- }
-
public Store getTempDataStore() {
return getNext().getTempDataStore();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java \
Sun Jan 28 11:39:02 2007 @@ -24,10 +24,12 @@
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,8 +42,8 @@
private final boolean keepDurableSubsActive;
private boolean active=false;
- public DurableTopicSubscription(Broker broker,ConnectionContext context, \
ConsumerInfo info, boolean keepDurableSubsActive,PendingMessageCursor cursor) throws \
InvalidSelectorException {
- super(broker,context,info,cursor);
+ public DurableTopicSubscription(Broker broker,ConnectionContext context, \
ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException { + \
super(broker,context,info,new \
StoreDurableSubscriberCursor(context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),info.getPrefetchSize()));
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), \
info.getSubscriptionName()); }
@@ -70,7 +72,7 @@
dispatchMatched();
}
- public void activate(ConnectionContext context, ConsumerInfo info) throws \
Exception { + public void activate(UsageManager memoryManager,ConnectionContext \
context, ConsumerInfo info) throws Exception { log.debug("Deactivating " + this);
if( !active ) {
this.active = true;
@@ -83,6 +85,7 @@
}
}
synchronized(pending) {
+ pending.setUsageManager(memoryManager);
pending.start();
}
//If nothing was in the persistent store, then try to use the recovery \
policy.
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java \
Sun Jan 28 11:39:02 2007 @@ -49,7 +49,7 @@
abstract public class PrefetchSubscription extends AbstractSubscription{
static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
- final protected PendingMessageCursor pending;
+ protected PendingMessageCursor pending;
final protected LinkedList dispatched=new LinkedList();
protected int prefetchExtension=0;
protected long enqueueCounter;
@@ -342,6 +342,17 @@
public boolean isRecoveryRequired(){
return pending.isRecoveryRequired();
}
+
+
+ public PendingMessageCursor getPending(){
+ return this.pending;
+ }
+
+ public void setPending(PendingMessageCursor pending){
+ this.pending=pending;
+ }
+
+
/**
* optimize message consumer prefetch if the consumer supports it
@@ -506,4 +517,7 @@
protected void acknowledge(ConnectionContext context,final MessageAck ack,final \
MessageReference node) throws IOException{
}
+
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/Queue.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java \
Sun Jan 28 11:39:02 2007 @@ -22,10 +22,9 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-
+import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
@@ -58,8 +57,6 @@
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import java.util.concurrent.CopyOnWriteArrayList;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java \
Sun Jan 28 11:39:02 2007 @@ -95,7 +95,7 @@
private ConnectionContext adminConnectionContext;
protected DestinationFactory destinationFactory;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
- private PendingDurableSubscriberMessageStoragePolicy \
pendingDurableSubscriberPolicy = new \
VMPendingDurableSubscriberMessageStoragePolicy(); +
public RegionBroker(BrokerService brokerService,TaskRunnerFactory \
taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, \
DestinationInterceptor destinationInterceptor) throws IOException { \
this.brokerService = brokerService; @@ -587,16 +587,5 @@
public Store getTempDataStore() {
return brokerService.getTempDataStore();
- }
-
- /**
- * @return the pendingDurableSubscriberPolicy
- */
- public PendingDurableSubscriberMessageStoragePolicy \
getPendingDurableSubscriberPolicy(){
- return this.pendingDurableSubscriberPolicy;
- }
-
- public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy \
durableSubscriberCursor){
- this.pendingDurableSubscriberPolicy=durableSubscriberCursor;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/TempTopicRegion.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java \
Sun Jan 28 11:39:02 2007 @@ -1,51 +1,70 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor \
license agreements. See the NOTICE + * file distributed with this work for additional \
information regarding copyright ownership. The ASF licenses this file + * to You \
under the Apache License, Version 2.0 (the "License"); you may not use this file \
except in compliance with the + * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed \
under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR \
CONDITIONS OF ANY KIND, either express or implied. See the License for the + * \
specific language governing permissions and limitations under the \
License.
*/
+
package org.apache.activemq.broker.region;
import javax.jms.JMSException;
-
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
*
* @version $Revision: 1.7 $
*/
-public class TempTopicRegion extends AbstractRegion {
+public class TempTopicRegion extends AbstractRegion{
- public TempTopicRegion(RegionBroker broker,DestinationStatistics \
destinationStatistics, UsageManager memoryManager, TaskRunnerFactory \
taskRunnerFactory, DestinationFactory destinationFactory) {
- super(broker,destinationStatistics, memoryManager, taskRunnerFactory, \
destinationFactory); + private static final Log \
log=LogFactory.getLog(TempTopicRegion.class); +
+ public TempTopicRegion(RegionBroker broker,DestinationStatistics \
destinationStatistics,UsageManager memoryManager, + TaskRunnerFactory \
taskRunnerFactory,DestinationFactory destinationFactory){ + \
super(broker,destinationStatistics,memoryManager,taskRunnerFactory,destinationFactory);
setAutoCreateDestinations(false);
}
- protected Subscription createSubscription(ConnectionContext context, \
ConsumerInfo info) throws JMSException {
- if( info.isDurable() ) {
+ protected Subscription createSubscription(ConnectionContext context,ConsumerInfo \
info) throws JMSException{ + if(info.isDurable()){
throw new JMSException("A durable subscription cannot be created for a \
temporary topic.");
- } else {
- return new TopicSubscription(broker,context, info, this.memoryManager);
}
- }
-
- public String toString() {
- return "TempTopicRegion: destinations="+destinations.size()+", \
subscriptions="+subscriptions.size()+", memory="+memoryManager.getPercentUsage()+"%"; \
+ try{ + TopicSubscription answer=new \
TopicSubscription(broker,context,info,memoryManager); + // lets configure \
the subscription depending on the destination + ActiveMQDestination \
destination=info.getDestination(); + \
if(destination!=null&&broker.getDestinationPolicy()!=null){ + \
PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination); + \
if(entry!=null){ + entry.configure(broker,memoryManager,answer);
+ }
+ }
+ answer.init();
+ return answer;
+ }catch(Exception e){
+ log.error("Failed to create TopicSubscription ",e);
+ JMSException jmsEx=new JMSException("Couldn't create \
TopicSubscription"); + jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
}
-
+ public String toString(){
+ return "TempTopicRegion: destinations="+destinations.size()+", \
subscriptions="+subscriptions.size()+", memory=" + \
+memoryManager.getPercentUsage()+"%"; + }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java \
Sun Jan 28 11:39:02 2007 @@ -104,7 +104,7 @@
+" subscriberName: "+key.getSubscriptionName());
}
}
- sub.activate(context,info);
+ sub.activate(memoryManager,context,info);
return sub;
}else{
return super.addConsumer(context,info);
@@ -208,38 +208,45 @@
}
}
- protected Subscription createSubscription(ConnectionContext context, \
ConsumerInfo info) throws JMSException {
- if (info.isDurable()) {
- if (AdvisorySupport.isAdvisoryTopic(info.getDestination())){
+ protected Subscription createSubscription(ConnectionContext context,ConsumerInfo \
info) throws JMSException{ + if(info.isDurable()){
+ if(AdvisorySupport.isAdvisoryTopic(info.getDestination())){
throw new JMSException("Cannot create a durable subscription for an \
advisory Topic"); }
- SubscriptionKey key = new SubscriptionKey(context.getClientId(), \
info.getSubscriptionName());
- DurableTopicSubscription sub = (DurableTopicSubscription) \
durableSubscriptions.get(key); + SubscriptionKey key=new \
SubscriptionKey(context.getClientId(),info.getSubscriptionName()); + \
DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key); \
if(sub==null){
- PendingMessageCursor \
cursor=broker.getPendingDurableSubscriberPolicy().getSubscriberPendingMessageCursor(
- \
context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),
- info.getPrefetchSize());
- cursor.setUsageManager(memoryManager);
- sub=new \
DurableTopicSubscription(broker,context,info,keepDurableSubsActive,cursor); + \
sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive); + \
ActiveMQDestination destination=info.getDestination(); + \
if(destination!=null&&broker.getDestinationPolicy()!=null){ + \
PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination); + \
if(entry!=null){ + entry.configure(broker,memoryManager,sub);
+ }
+ }
durableSubscriptions.put(key,sub);
- }
- else {
+ }else{
throw new JMSException("That durable subscription is already \
active."); }
return sub;
}
- else {
- TopicSubscription answer = new TopicSubscription(broker,context, info, \
memoryManager);
-
+ try{
+ TopicSubscription answer=new \
TopicSubscription(broker,context,info,memoryManager);
// lets configure the subscription depending on the destination
- ActiveMQDestination destination = info.getDestination();
- if (destination != null && broker.getDestinationPolicy() != null) {
- PolicyEntry entry = \
broker.getDestinationPolicy().getEntryFor(destination);
- if (entry != null) {
- entry.configure(answer);
+ ActiveMQDestination destination=info.getDestination();
+ if(destination!=null&&broker.getDestinationPolicy()!=null){
+ PolicyEntry \
entry=broker.getDestinationPolicy().getEntryFor(destination); + \
if(entry!=null){ + entry.configure(broker,memoryManager,answer);
}
}
+ answer.init();
return answer;
+ }catch(Exception e){
+ log.error("Failed to create TopicSubscription ",e);
+ JMSException jmsEx=new JMSException("Couldn't create \
TopicSubscription"); + jmsEx.setLinkedException(e);
+ throw jmsEx;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java \
Sun Jan 28 11:39:02 2007 @@ -22,6 +22,7 @@
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ConsumerControl;
@@ -41,7 +42,7 @@
private static final Log log=LogFactory.getLog(TopicSubscription.class);
private static final AtomicLong cursorNameCounter=new AtomicLong(0);
- final protected FilePendingMessageCursor matched;
+ protected PendingMessageCursor matched;
final protected UsageManager usageManager;
protected AtomicLong dispatched=new AtomicLong();
protected AtomicLong delivered=new AtomicLong();
@@ -56,17 +57,21 @@
private int memoryUsageHighWaterMark=95;
public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo \
info,UsageManager usageManager)
- throws InvalidSelectorException{
+ throws Exception{
super(broker,context,info);
this.usageManager=usageManager;
String matchedName="TopicSubscription:"+cursorNameCounter.getAndIncrement()+"["+info.getConsumerId().toString()
+"]";
this.matched=new \
FilePendingMessageCursor(matchedName,broker.getTempDataStore()); +
+ }
+
+ public void init() throws Exception {
this.matched.setUsageManager(usageManager);
this.matched.start();
}
-
- public void add(MessageReference node) throws InterruptedException,IOException{
+
+ public void add(MessageReference node) throws Exception{
enqueueCounter.incrementAndGet();
node.incrementReferenceCount();
if(!isFull()&&!isSlaveBroker()){
@@ -309,6 +314,20 @@
public UsageManager getUsageManager(){
return this.usageManager;
}
+
+ /**
+ * @return the matched
+ */
+ public PendingMessageCursor getMatched(){
+ return this.matched;
+ }
+
+ /**
+ * @param matched the matched to set
+ */
+ public void setMatched(PendingMessageCursor matched){
+ this.matched=matched;
+ }
/**
* inform the MessageConsumer on the client to change it's prefetch
@@ -402,7 +421,14 @@
public void destroy(){
synchronized(matchedListMutex){
- matched.destroy();
+ try{
+ matched.destroy();
+ }catch(Exception e){
+ log.warn("Failed to destroy cursor",e);
+ }
}
}
+
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java \
Sun Jan 28 11:39:02 2007 @@ -14,6 +14,7 @@
package org.apache.activemq.broker.region.cursors;
+import java.util.LinkedList;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
@@ -149,5 +150,22 @@
*/
public UsageManager getUsageManager(){
return this.usageManager;
+ }
+
+ /**
+ * destroy the cursor
+ * @throws Exception
+ */
+ public void destroy() throws Exception {
+ stop();
+ }
+
+ /**
+ * Page in a restricted number of messages
+ * @param maxItems
+ * @return a list of paged in messages
+ */
+ public LinkedList pageInList(int maxItems) {
+ throw new RuntimeException("Not supported");
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java \
Sun Jan 28 11:39:02 2007 @@ -18,6 +18,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
@@ -39,6 +40,7 @@
public class FilePendingMessageCursor extends AbstractPendingMessageCursor \
implements UsageListener{
static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class);
+ static private final AtomicLong nameCount = new AtomicLong();
private Store store;
private String name;
private LinkedList memoryList=new LinkedList();
@@ -54,7 +56,7 @@
* @param store
*/
public FilePendingMessageCursor(String name,Store store){
- this.name=name;
+ this.name=nameCount.incrementAndGet() + "_"+name;
this.store=store;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java \
Sun Jan 28 11:39:02 2007 @@ -14,6 +14,7 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
+import java.util.LinkedList;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
@@ -188,6 +189,19 @@
* @return true if the cursor has buffered messages ready to deliver
*/
public boolean hasMessagesBufferedToDeliver();
+
+ /**
+ * destroy the cursor
+ * @throws Exception
+ */
+ public void destroy() throws Exception;
+
+ /**
+ * Page in a restricted number of messages
+ * @param maxItems
+ * @return a list of paged in messages
+ */
+ public LinkedList pageInList(int maxItems);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java \
Sun Jan 28 11:39:02 2007 @@ -103,4 +103,13 @@
}
}
}
+
+ /**
+ * Page in a restricted number of messages
+ * @param maxItems
+ * @return a list of paged in messages
+ */
+ public LinkedList pageInList(int maxItems) {
+ return list;
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java \
Sun Jan 28 11:39:02 2007 @@ -23,7 +23,7 @@
/**
* Creates a FilePendingMessageCursor
* *
- * @org.apache.xbean.XBean element="fileCursor" description="Pending messages paged \
in from file" + * @org.apache.xbean.XBean element="fileQueueCursor" \
description="Pending messages paged in from file"
*
* @version $Revision$
*/
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java?view=auto&rev=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java \
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java \
Sun Jan 28 11:39:02 2007 @@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor \
license agreements. See the NOTICE + * file distributed with this work for additional \
information regarding copyright ownership. The ASF licenses this file + * to You \
under the Apache License, Version 2.0 (the "License"); you may not use this file \
except in compliance with the + * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed \
under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR \
CONDITIONS OF ANY KIND, either express or implied. See the License for the + * \
specific language governing permissions and limitations under the License. + */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+ * Creates a PendIngMessageCursor for Durable subscribers
+ * *
+ * @org.apache.xbean.XBean element="fileCursor" description="Pending messages for \
durable subscribers + * held in temporary files"
+ *
+ * @version $Revision$
+ */
+public class FilePendingSubscriberMessageStoragePolicy implements \
PendingSubscriberMessageStoragePolicy{ +
+ /**
+ * @param name
+ * @param tmpStorage
+ * @param maxBatchSize
+ * @return a Cursor
+ * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, \
org.apache.activemq.kaha.Store, int) + */
+ public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store \
tmpStorage,int maxBatchSize){ + return new \
FilePendingMessageCursor("PendingCursor:" + name,tmpStorage); + }
+}
\ No newline at end of file
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java?view=auto&rev=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java \
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java \
Sun Jan 28 11:39:02 2007 @@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor \
license agreements. See the NOTICE + * file distributed with this work for additional \
information regarding copyright ownership. The ASF licenses this file + * to You \
under the Apache License, Version 2.0 (the "License"); you may not use this file \
except in compliance with the + * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed \
under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR \
CONDITIONS OF ANY KIND, either express or implied. See the License for the + * \
specific language governing permissions and limitations under the License. + */
+
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+* Abstraction to allow different policies for holding messages awaiting dispatch to \
active clients +*
+* @version $Revision$
+*/
+public interface PendingSubscriberMessageStoragePolicy{
+
+ /**
+ * Retrieve the configured pending message storage cursor;
+ *
+ * @param name
+ * @param tmpStorage
+ * @param maxBatchSize
+ * @return the Pending Message cursor
+ */
+ public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store \
tmpStorage, + int maxBatchSize);
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/policy/PolicyEntry.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java \
Sun Jan 28 11:39:02 2007 @@ -1,22 +1,21 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor \
license agreements. See the NOTICE + * file distributed with this work for additional \
information regarding copyright ownership. The ASF licenses this file + * to You \
under the Apache License, Version 2.0 (the "License"); you may not use this file \
except in compliance with the + * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed \
under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR \
CONDITIONS OF ANY KIND, either express or implied. See the License for the + * \
specific language governing permissions and limitations under the \
License.
*/
+
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription;
@@ -25,21 +24,21 @@
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.kaha.Store;
+import org.apache.activemq.memory.UsageManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
- * Represents an entry in a {@link PolicyMap} for assigning policies to a
- * specific destination or a hierarchical wildcard area of destinations.
+ * Represents an entry in a {@link PolicyMap} for assigning policies to a specific \
destination or a hierarchical + * wildcard area of destinations.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.1 $
*/
-public class PolicyEntry extends DestinationMapEntry {
+public class PolicyEntry extends DestinationMapEntry{
- private static final Log log = LogFactory.getLog(PolicyEntry.class);
-
+ private static final Log log=LogFactory.getLog(PolicyEntry.class);
private DispatchPolicy dispatchPolicy;
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers;
@@ -48,135 +47,149 @@
private MessageEvictionStrategy messageEvictionStrategy;
private long memoryLimit;
private MessageGroupMapFactory messageGroupMapFactory;
- private PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy;
-
- public void configure(Queue queue, Store tmpStore) {
- if (dispatchPolicy != null) {
+ private PendingQueueMessageStoragePolicy pendingQueuePolicy;
+ private PendingDurableSubscriberMessageStoragePolicy \
pendingDurableSubscriberPolicy; + private PendingSubscriberMessageStoragePolicy \
pendingSubscriberPolicy; + public void configure(Queue queue,Store tmpStore){
+ if(dispatchPolicy!=null){
queue.setDispatchPolicy(dispatchPolicy);
}
- if (deadLetterStrategy != null) {
+ if(deadLetterStrategy!=null){
queue.setDeadLetterStrategy(deadLetterStrategy);
}
queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
- if( memoryLimit>0 ) {
+ if(memoryLimit>0){
queue.getUsageManager().setLimit(memoryLimit);
}
- if (pendingQueueMessageStoragePolicy != null) {
- PendingMessageCursor messages = \
pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor(queue,tmpStore); + \
if(pendingQueuePolicy!=null){ + PendingMessageCursor \
messages=pendingQueuePolicy.getQueuePendingMessageCursor(queue,tmpStore); \
queue.setMessages(messages); }
}
- public void configure(Topic topic) {
- if (dispatchPolicy != null) {
+ public void configure(Topic topic){
+ if(dispatchPolicy!=null){
topic.setDispatchPolicy(dispatchPolicy);
}
- if (deadLetterStrategy != null) {
+ if(deadLetterStrategy!=null){
topic.setDeadLetterStrategy(deadLetterStrategy);
}
- if (subscriptionRecoveryPolicy != null) {
+ if(subscriptionRecoveryPolicy!=null){
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
}
topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
- if( memoryLimit>0 ) {
+ if(memoryLimit>0){
topic.getUsageManager().setLimit(memoryLimit);
}
-
}
- public void configure(TopicSubscription subscription) {
- if (pendingMessageLimitStrategy != null) {
- int value = \
pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
- int consumerLimit = \
subscription.getInfo().getMaximumPendingMessageLimit();
- if (consumerLimit > 0) {
- if (value < 0 || consumerLimit < value) {
- value = consumerLimit;
+ public void configure(Broker broker,UsageManager memoryManager,TopicSubscription \
subscription){ + if(pendingMessageLimitStrategy!=null){
+ int value=pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
+ int consumerLimit=subscription.getInfo().getMaximumPendingMessageLimit();
+ if(consumerLimit>0){
+ if(value<0||consumerLimit<value){
+ value=consumerLimit;
}
}
- if (value >= 0) {
- if (log.isDebugEnabled()) {
- log.debug("Setting the maximumPendingMessages size to: " + value \
+ " for consumer: " + subscription.getInfo().getConsumerId()); + \
if(value>=0){ + if(log.isDebugEnabled()){
+ log.debug("Setting the maximumPendingMessages size to: "+value+" \
for consumer: " + \
+subscription.getInfo().getConsumerId()); }
subscription.setMaximumPendingMessages(value);
}
}
- if (messageEvictionStrategy != null) {
+ if(messageEvictionStrategy!=null){
subscription.setMessageEvictionStrategy(messageEvictionStrategy);
}
+ if (pendingSubscriberPolicy!=null) {
+ String name = subscription.getContext().getClientId() + "_" + \
subscription.getConsumerInfo().getConsumerId(); + int maxBatchSize = \
subscription.getConsumerInfo().getPrefetchSize(); + \
subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(name,broker.getTempDataStore(),maxBatchSize));
+ }
+ }
+
+ public void configure(Broker broker,UsageManager \
memoryManager,DurableTopicSubscription sub){ + String \
clientId=sub.getClientId(); + String subName=sub.getSubscriptionName();
+ int prefetch=sub.getPrefetchSize();
+ if(pendingDurableSubscriberPolicy!=null){
+ PendingMessageCursor \
cursor=pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, + \
subName,broker.getTempDataStore(),prefetch); + \
cursor.setUsageManager(memoryManager); + sub.setPending(cursor);
+ }
}
// Properties
// -------------------------------------------------------------------------
- public DispatchPolicy getDispatchPolicy() {
+ public DispatchPolicy getDispatchPolicy(){
return dispatchPolicy;
}
- public void setDispatchPolicy(DispatchPolicy policy) {
- this.dispatchPolicy = policy;
+ public void setDispatchPolicy(DispatchPolicy policy){
+ this.dispatchPolicy=policy;
}
- public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
+ public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy(){
return subscriptionRecoveryPolicy;
}
- public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy \
subscriptionRecoveryPolicy) {
- this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
+ public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy \
subscriptionRecoveryPolicy){ + \
this.subscriptionRecoveryPolicy=subscriptionRecoveryPolicy; }
- public boolean isSendAdvisoryIfNoConsumers() {
+ public boolean isSendAdvisoryIfNoConsumers(){
return sendAdvisoryIfNoConsumers;
}
/**
- * Sends an advisory message if a non-persistent message is sent and there
- * are no active consumers
+ * Sends an advisory message if a non-persistent message is sent and there are \
no active consumers
*/
- public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
- this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
+ public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers){
+ this.sendAdvisoryIfNoConsumers=sendAdvisoryIfNoConsumers;
}
- public DeadLetterStrategy getDeadLetterStrategy() {
+ public DeadLetterStrategy getDeadLetterStrategy(){
return deadLetterStrategy;
}
/**
- * Sets the policy used to determine which dead letter queue destination
- * should be used
+ * Sets the policy used to determine which dead letter queue destination should \
be used
*/
- public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
- this.deadLetterStrategy = deadLetterStrategy;
+ public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy){
+ this.deadLetterStrategy=deadLetterStrategy;
}
- public PendingMessageLimitStrategy getPendingMessageLimitStrategy() {
+ public PendingMessageLimitStrategy getPendingMessageLimitStrategy(){
return pendingMessageLimitStrategy;
}
/**
- * Sets the strategy to calculate the maximum number of messages that are
- * allowed to be pending on consumers (in addition to their prefetch sizes).
+ * Sets the strategy to calculate the maximum number of messages that are \
allowed to be pending on consumers (in + * addition to their prefetch sizes).
*
- * Once the limit is reached, non-durable topics can then start discarding
- * old messages. This allows us to keep dispatching messages to slow
- * consumers while not blocking fast consumers and discarding the messages
- * oldest first.
+ * Once the limit is reached, non-durable topics can then start discarding old \
messages. This allows us to keep + * dispatching messages to slow consumers while \
not blocking fast consumers and discarding the messages oldest + * first.
*/
- public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy \
pendingMessageLimitStrategy) {
- this.pendingMessageLimitStrategy = pendingMessageLimitStrategy;
+ public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy \
pendingMessageLimitStrategy){ + \
this.pendingMessageLimitStrategy=pendingMessageLimitStrategy; }
- public MessageEvictionStrategy getMessageEvictionStrategy() {
+ public MessageEvictionStrategy getMessageEvictionStrategy(){
return messageEvictionStrategy;
}
/**
- * Sets the eviction strategy used to decide which message to evict when the
- * slow consumer needs to discard messages
+ * Sets the eviction strategy used to decide which message to evict when the \
slow consumer needs to discard messages
*/
- public void setMessageEvictionStrategy(MessageEvictionStrategy \
messageEvictionStrategy) {
- this.messageEvictionStrategy = messageEvictionStrategy;
+ public void setMessageEvictionStrategy(MessageEvictionStrategy \
messageEvictionStrategy){ + \
this.messageEvictionStrategy=messageEvictionStrategy; }
- public long getMemoryLimit() {
+ public long getMemoryLimit(){
return memoryLimit;
}
@@ -184,40 +197,72 @@
*
* @org.apache.xbean.Property \
propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
- public void setMemoryLimit(long memoryLimit) {
- this.memoryLimit = memoryLimit;
+ public void setMemoryLimit(long memoryLimit){
+ this.memoryLimit=memoryLimit;
}
- public MessageGroupMapFactory getMessageGroupMapFactory() {
- if (messageGroupMapFactory == null) {
- messageGroupMapFactory = new MessageGroupHashBucketFactory();
+ public MessageGroupMapFactory getMessageGroupMapFactory(){
+ if(messageGroupMapFactory==null){
+ messageGroupMapFactory=new MessageGroupHashBucketFactory();
}
return messageGroupMapFactory;
}
/**
- * Sets the factory used to create new instances of {MessageGroupMap} used to \
implement the
- * <a href="http://incubator.apache.org/activemq/message-groups.html">Message \
Groups</a> functionality. + * Sets the factory used to create new instances of \
{MessageGroupMap} used to implement the <a + * \
href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a> \
functionality. + */
+ public void setMessageGroupMapFactory(MessageGroupMapFactory \
messageGroupMapFactory){ + this.messageGroupMapFactory=messageGroupMapFactory;
+ }
+
+
+ /**
+ * @return the pendingDurableSubscriberPolicy
*/
- public void setMessageGroupMapFactory(MessageGroupMapFactory \
messageGroupMapFactory) {
- this.messageGroupMapFactory = messageGroupMapFactory;
+ public PendingDurableSubscriberMessageStoragePolicy \
getPendingDurableSubscriberPolicy(){ + return \
this.pendingDurableSubscriberPolicy; }
/**
- * @return the pendingQueueMessageStoragePolicy
+ * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to \
set
*/
- public PendingQueueMessageStoragePolicy getPendingQueueMessageStoragePolicy(){
- return this.pendingQueueMessageStoragePolicy;
+ public void setPendingDurableSubscriberPolicy(
+ PendingDurableSubscriberMessageStoragePolicy \
pendingDurableSubscriberPolicy){ + \
this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy; }
/**
- * @param pendingQueueMessageStoragePolicy the pendingQueueMessageStoragePolicy \
to set + * @return the pendingQueuePolicy
*/
- public void setPendingQueueMessageStoragePolicy(PendingQueueMessageStoragePolicy \
pendingQueueMessageStoragePolicy){
- this.pendingQueueMessageStoragePolicy=pendingQueueMessageStoragePolicy;
+ public PendingQueueMessageStoragePolicy getPendingQueuePolicy(){
+ return this.pendingQueuePolicy;
}
+ /**
+ * @param pendingQueuePolicy the pendingQueuePolicy to set
+ */
+ public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy \
pendingQueuePolicy){ + this.pendingQueuePolicy=pendingQueuePolicy;
+ }
+
+
+ /**
+ * @return the pendingSubscriberPolicy
+ */
+ public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy(){
+ return this.pendingSubscriberPolicy;
+ }
+
+
+ /**
+ * @param pendingSubscriberPolicy the pendingSubscriberPolicy to set
+ */
+ public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy \
pendingSubscriberPolicy){ + \
this.pendingSubscriberPolicy=pendingSubscriberPolicy; + }
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java \
Sun Jan 28 11:39:02 2007 @@ -21,7 +21,7 @@
/**
* Creates a VMPendingMessageCursor
* *
- * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in \
the JVM" + * @org.apache.xbean.XBean element="vmDurableCursor" description="Pending \
messages held in the JVM"
*
* @version $Revision$
*/
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java \
Sun Jan 28 11:39:02 2007 @@ -22,7 +22,7 @@
/**
* Creates a VMPendingMessageCursor
* *
- * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in \
the JVM" + * @org.apache.xbean.XBean element="vmQueueCursor" description="Pending \
messages held in the JVM"
*
* @version $Revision$
*/
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java \
/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java?view=auto&rev=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java \
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java \
Sun Jan 28 11:39:02 2007 @@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor \
license agreements. See the NOTICE + * file distributed with this work for additional \
information regarding copyright ownership. The ASF licenses this file + * to You \
under the Apache License, Version 2.0 (the "License"); you may not use this file \
except in compliance with the + * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed \
under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR \
CONDITIONS OF ANY KIND, either express or implied. See the License for the + * \
specific language governing permissions and limitations under the License. + */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+ * Creates a VMPendingMessageCursor
+ * *
+ * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in \
the JVM" + *
+ * @version $Revision$
+ */
+public class VMPendingSubscriberMessageStoragePolicy implements \
PendingSubscriberMessageStoragePolicy{ +
+ /**
+ * @param name
+ * @param tmpStorage
+ * @param maxBatchSize
+ * @return a Cursor
+ * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, \
org.apache.activemq.kaha.Store, int) + */
+ public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store \
tmpStorage,int maxBatchSize){ + return new VMPendingMessageCursor();
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java \
/org/apache/activemq/broker/region/cursors/CursorDurableTest.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java \
Sun Jan 28 11:39:02 2007 @@ -22,7 +22,6 @@
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
/**
* @version $Revision: 1.3 $
@@ -50,7 +49,6 @@
protected void configureBroker(BrokerService answer) throws Exception{
answer.setDeleteAllMessagesOnStartup(true);
- answer.setPendingDurableSubscriberPolicy(new \
StorePendingDurableSubscriberMessageStoragePolicy()); \
answer.addConnector(bindAddress); answer.setDeleteAllMessagesOnStartup(true);
}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java \
/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java \
Sun Jan 28 11:39:02 2007 @@ -53,7 +53,7 @@
protected void configureBroker(BrokerService answer) throws Exception{
PolicyEntry policy = new PolicyEntry();
- policy.setPendingQueueMessageStoragePolicy(new \
StorePendingQueueMessageStoragePolicy()); + policy.setPendingQueuePolicy(new \
StorePendingQueueMessageStoragePolicy()); PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
answer.setDestinationPolicy(pMap);
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java \
/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java \
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java \
Sun Jan 28 11:39:02 2007 @@ -38,7 +38,7 @@
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new \
File("activemq-data/durableTest")); answer.setPersistenceAdapter(adaptor);
PolicyEntry policy = new PolicyEntry();
- policy.setPendingQueueMessageStoragePolicy(new \
StorePendingQueueMessageStoragePolicy()); + policy.setPendingQueuePolicy(new \
StorePendingQueueMessageStoragePolicy()); PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
answer.setDestinationPolicy(pMap);
Added: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml?view=auto&rev=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml \
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml \
Sun Jan 28 11:39:02 2007 @@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright 2005-2006 The Apache Software Foundation
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans>
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <broker persistent="false" xmlns="http://activemq.org/config/1.0">
+
+ <!-- lets define the dispatch policy -->
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry topic="org.apache.>">
+ <dispatchPolicy>
+ <strictOrderDispatchPolicy />
+ </dispatchPolicy>
+ <deadLetterStrategy>
+ <individualDeadLetterStrategy topicPrefix="Test.DLQ." />
+ </deadLetterStrategy>
+ <pendingSubscriberPolicy>
+ <vmCursor />
+ </pendingSubscriberPolicy>
+ </policyEntry>
+
+ <policyEntry queue="org.apache.>">
+ <dispatchPolicy>
+ <strictOrderDispatchPolicy />
+ </dispatchPolicy>
+ <deadLetterStrategy>
+ <individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
+ </deadLetterStrategy>
+ <pendingQueuePolicy>
+ <vmQueueCursor />
+ </pendingQueuePolicy>
+ </policyEntry>
+
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+ </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->
Modified: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/reso \
urces/org/apache/activemq/perf/slowConsumerBroker.xml?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml \
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml \
Sun Jan 28 11:39:02 2007 @@ -18,14 +18,14 @@
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
- <broker brokerName="slowConsumerBroker" persistent="true" useShutdownHook="false" \
xmlns="http://activemq.org/config/1.0"> + <broker brokerName="slowConsumerBroker" \
useJmx="false" persistent="false" useShutdownHook="false" \
xmlns="http://activemq.org/config/1.0"> <transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
<destinationPolicy>
<policyMap>
<policyEntries>
- <policyEntry topic=">">
+ <policyEntry topic="blob">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10"/>
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic