[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-dev
Subject:    [jira] Commented: (AMQ-1585) Problems with pure master/slave
From:       "Hans Bausewein (JIRA)" <jira () apache ! org>
Date:       2008-08-29 14:52:53
Message-ID: 477223353.1220021573073.JavaMail.jira () brutus
[Download RAW message or body]


    [ https://issues.apache.org/activemq/browse/AMQ-1585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=45247#action_45247 \
] 

Hans Bausewein commented on AMQ-1585:
-------------------------------------

Checked out and compiled revision 675990 again.

Still worked fine (as far as this issue is concerned) with the same configuration \
files as the failing 690257.


> Problems with pure master/slave configuration
> ---------------------------------------------
> 
> Key: AMQ-1585
> URL: https://issues.apache.org/activemq/browse/AMQ-1585
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 4.1.1, 5.0.0, 5.1.0
> Environment: Ubuntu 6.04, JDK 1.5.0_011, Spring 2.0.x
> Reporter: Thomas Buckel
> Assignee: Rob Davies
> Fix For: 5.2.0
> 
> Attachments: AMQ-1585.patch, AMQ-1585.transacted.patch
> 
> 
> As posted in the AMQ user forum:
> http://www.nabble.com/Problems-with-Pure-Master-Slave-in-AMQ-5.0.0-to15471491s2354.html#a15474769
>                 
> -------------------
> Hi all,
> I am having trouble setting up a *stable* ActiveMQ Pure Master/Slave topology.
> Initially I have tried v4.1.1 which failed with an exception. I found an AMQ JIRA \
> ticket which said that Pure/Master slave didn't work in v4.1.1. Ok, so I switched \
> to AMQ 5.0.0, created 2 configs (master/slave, see end of message) and ran two AMQ \
> instances (on the same box) and most of the times my test (see below) worked, but \
>                 more often I get various error messages like:
> - On the slave:
> ERROR Service                        - Async error occurred: \
> javax.jms.JMSException: Slave broker out of sync with master: Dispatched message \
>                 (ID:tbuckel-desktop-41814-1202886136210-0:0:565:1:1) was not in the \
>                 pending list
> javax.jms.JMSException: Slave broker out of sync with master: Dispatched message \
> (ID:tbuckel-desktop-41814-1202886136210-0:0:565:1:1) was not in the pending list at \
> org.apache.activemq.broker.region.PrefetchSubscription.processMessageDispatchNotification(PrefetchSubscription.java:160)
>  at org.apache.activemq.broker.region.AbstractRegion.processDispatchNotification(AbstractRegion.java:381)
>  at org.apache.activemq.broker.region.RegionBroker.processDispatchNotification(RegionBroker.java:550)
>  at org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201)
>  at org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201)
>  at org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201)
>  at org.apache.activemq.broker.MutableBrokerFilter.processDispatchNotification(MutableBrokerFilter.java:211)
>  at org.apache.activemq.broker.TransportConnection.processMessageDispatchNotification(TransportConnection.java:450)
>  at org.apache.activemq.command.MessageDispatchNotification.visit(MessageDispatchNotification.java:77)
>  at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281)
>  at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178)
>  at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
>  at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
>  at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:202)
> at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
>  at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
>                 
> - After having killed the master, stopped the slave, copied the slave's data into \
> the master's data directory various error message came up (as described in the \
> Master/Slave recovery section), e.g. (internal) ActiveMQ topics were not available, \
> the admin webApp showed exceptions and errors on the client. The test I've created \
> uses Spring 2.0.x and pumps 1000 MapMessages in a queue through Spring's \
> JmsTempate, each message is created within its own transaction, using \
> JmsTransactionManager and TransactionTemplate. The created messages are consumed by \
> an initially instantiated transactional DefaultMessageListenerContainer. The AMQ \
> JARs in the test's classpath are activemq-core-5.0.0.jar, \
> geronimo-jms_1.1_spec-1.0.jar, geronimo-jta_1.0.1B_spec-1.0.jar as I've noticed a \
> really bad performance when only using the activemq-all-5.0.0.jar (maybe this is \
> the problem?). The test code work's without problems with OpenMQ, but I'd prefer \
> using the nice Pure Master/Active ActiveMQ if I can get it running in a *stable* \
> config ;) I would highly appreciate any help or suggestions. Maybe my config is \
> wrong or I miss something essential. I've also tried a recent AMQ 5.1 SNAPSHOT \
> which wasn't better... See below for the small program i used to test (no unit \
> test, behaviour appeared to be non deterministic to me and it's not so nice as i've \
> changed it quite often) Thanks in advance,
> Thomas
> <!-- MASTER config -->
> <beans
> xmlns="http://www.springframework.org/schema/beans"
> xmlns:amq="http://activemq.org/config/1.0"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://www.springframework.org/schema/beans \
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd \
> http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd \
> http://activemq.apache.org/camel/schema/spring \
> http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <bean \
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> \
> <broker xmlns="http://activemq.org/config/1.0" brokerName="master" \
> dataDirectory="${activemq.base}/data"> <destinationPolicy>
> <policyMap>
> <policyEntries>
> <policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="1mb">
> <dispatchPolicy>
> <strictOrderDispatchPolicy/>
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy/>
> </subscriptionRecoveryPolicy>
> </policyEntry>
> </policyEntries>
> </policyMap> 
> </destinationPolicy>
> <transportConnectors>
> <transportConnector name="openwire" uri="tcp://tbuckel-desktop:7778" />
> </transportConnectors>
> <networkConnectors/>
> <managementContext>
> <managementContext connectorPort="1100" jmxDomainName="org.apache.activemq"/>
> </managementContext>
> </broker>
> <commandAgent xmlns="http://activemq.org/config/1.0"/>
> <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
> <connectors>
> <nioConnector port="8161" />
> </connectors>
> <handlers>
> <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" \
> logUrlOnStart="true" /> </handlers>
> </jetty>
> </beans>
> <!-- SLAVE config -->
> <beans
> xmlns="http://www.springframework.org/schema/beans"
> xmlns:amq="http://activemq.org/config/1.0"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://www.springframework.org/schema/beans \
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd \
> http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd \
> http://activemq.apache.org/camel/schema/spring \
> http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <bean \
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> 
> <broker xmlns="http://activemq.org/config/1.0" brokerName="slave" \
> dataDirectory="${activemq.base}/data-slave" \
> masterConnectorURI="tcp://tbuckel-desktop:7778"> 
> <destinationPolicy>
> <policyMap>
> <policyEntries>
> <policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="1mb">
> <dispatchPolicy>
> <strictOrderDispatchPolicy/>
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy/>
> </subscriptionRecoveryPolicy>
> </policyEntry>
> </policyEntries>
> </policyMap>
> </destinationPolicy>
> <transportConnectors>
> <transportConnector name="openwire" uri="tcp://localhost:7779"/>
> </transportConnectors>
> <networkConnectors/>
> <managementContext>
> <managementContext connectorPort="1101" jmxDomainName="org.apache.activemq"/>
> </managementContext>
> </broker>
> <commandAgent xmlns="http://activemq.org/config/1.0"/>
> <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
> <connectors>
> <nioConnector port="8162" />
> </connectors>
> <handlers>
> <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" \
> logUrlOnStart="true" /> </handlers>
> </jetty>
> </beans>
> ------------
> Test code:
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.springframework.jms.connection.JmsTransactionManager;
> import org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.core.MessageCreator;
> import org.springframework.jms.listener.DefaultMessageListenerContainer;
> import org.springframework.transaction.TransactionStatus;
> import org.springframework.transaction.support.TransactionCallbackWithoutResult;
> import org.springframework.transaction.support.TransactionTemplate;
> import javax.jms.*;
> import java.math.BigInteger;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.concurrent.TimeUnit;
> public class AnotherFailoverTest {
> public static final int MESSAGES = 1000;
> private final static List<BigInteger> notConsumedMessages = new \
> ArrayList<BigInteger>(MESSAGES); private static ConnectionFactory createCF() throws \
> Exception { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
> cf.setBrokerURL("failover://(tcp://localhost:7778,tcp://localhost:7779)?randomize=false");
>  return new TransactionAwareConnectionFactoryProxy(cf);
> }
> private static void send() throws Exception {
> JmsTransactionManager transactionManager = new JmsTransactionManager();
> transactionManager.setConnectionFactory(createCF());
> transactionManager.afterPropertiesSet();
> int i=0;
> do {
> i++;
> final int number = i;
> try {
> final BigInteger v = new BigInteger(Integer.toString(number));
> TransactionTemplate tt = new TransactionTemplate(transactionManager);
> tt.execute(new TransactionCallbackWithoutResult() {
> protected void doInTransactionWithoutResult(TransactionStatus status) {
> final JmsTemplate template = new JmsTemplate(pcf);
> template.setSessionTransacted(true);
> template.afterPropertiesSet();
> template.send("testqueue", new MessageCreator() {
> public Message createMessage(Session session) throws JMSException {
> ObjectMessage dummyMessage = session.createObjectMessage();
> dummyMessage.setObject(v);
> synchronized (notConsumedMessages) {
> notConsumedMessages.add(v);
> }
> //                                System.out.println("Created message " + number + \
> "(" + notConsumedMessages.size() + ")"); return dummyMessage;
> }
> });
> }
> });
> } catch (Exception e) {
> e.printStackTrace();
> System.out.println("Error creating message " + number);
> }
> } while (i < MESSAGES);
> }
> private static void setupReceiver() throws Exception {
> JmsTransactionManager transactionManager = new JmsTransactionManager();
> transactionManager.setConnectionFactory(createCF());
> transactionManager.afterPropertiesSet();
> final DefaultMessageListenerContainer container = new \
> DefaultMessageListenerContainer(); container.setConnectionFactory(pcf);
> container.setTransactionManager(transactionManager);
> container.setMessageListener(new MessageListener() {
> public void onMessage(Message message) {
> try {
> ObjectMessage msg = (ObjectMessage) message;
> BigInteger number = (BigInteger) msg.getObject();
> synchronized (notConsumedMessages) {
> if (!notConsumedMessages.remove(number)) {
> System.err.println("Message " + number + " not found in list!");
> } else {
> //                        System.out.println("Consumed message " + number);
> }
> }
> } catch (JMSException e) {
> //                    e.printStackTrace();
> System.out.println("Error consuming message!");
> }
> }
> });
> container.setSessionTransacted(true);
> container.setDestinationName("testqueue");
> container.setExceptionListener(new ExceptionListener() {
> public void onException(JMSException jmsException) {
> System.err.println(jmsException);
> }
> });
> container.afterPropertiesSet();
> container.initialize();
> TimeUnit.SECONDS.sleep(1);
> }
> public static void main(String[] args) throws Exception {
> long start = System.currentTimeMillis();
> setupReceiver();
> send();
> int remainingSize = 0;
> do {
> Thread.sleep(500);
> synchronized (notConsumedMessages) {
> remainingSize = notConsumedMessages.size();
> }
> System.out.println("Unconsumed " + remainingSize + ": " + sb);
> } while (remainingSize > 0);
> System.out.println("All messages consumed.");
> long end = System.currentTimeMillis();
> System.out.println((end-start));
> System.exit(0);
> }
> }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic