[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-dev
Subject:    [jira] Commented: (AMQ-2233) After rollback received messages not
From:       "Gary Tully (JIRA)" <jira () apache ! org>
Date:       2009-04-28 16:31:38
Message-ID: 1459169791.1240936298344.JavaMail.jira () brutus
[Download RAW message or body]


    [ https://issues.apache.org/activemq/browse/AMQ-2233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51426#action_51426 \
] 

Gary Tully commented on AMQ-2233:
---------------------------------

can you attach your est case (see the attach file link) and make a license grant so \
that your test can be added to the activemq suite?

This is an interesting test case. The change of behavior is around consumer.close() \
and transactions. When in a transaction, the close is deferred till the transaction \
completes, so with a prefetch > 0, messages dispatched to that consumer will not be \
available till the transaction commits.

I think your test case will work with a prefetch of 0, \
connectionFactory.setBrokerURL("vm://localhost?async=false&waitForStart=5000&jms.prefetchPolicy.all=0");


> After rollback received messages not re-presented
> -------------------------------------------------
> 
> Key: AMQ-2233
> URL: https://issues.apache.org/activemq/browse/AMQ-2233
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.2.0
> Reporter: Dave Syer
> Assignee: Gary Tully
> 
> After rollback received messages not re-presented.  If I receive in a transaction \
> and then roll back the messages should be re-presented in the next transaction.  \
> This used to work in 5.1.0, but is broken in 5.2.0. You can browse the Queue in JMX \
> after the rollback and see that the messages are still there, but they are not \
> received by a consumer in the same process. Here's a test case (fails on the \
> checkPostConditions()): {code}
> public class RawRollbackTests {
> 	
> 	private static ConnectionFactory connectionFactory;
> 	private static Destination queue;
> 	private static BrokerService broker;
> 	@BeforeClass
> 	public static void clean() throws Exception {
> 		FileUtils.deleteDirectory(new File("activemq-data"));
> 		broker = new BrokerService();
> 		broker.setUseJmx(true);
> 		broker.start();
> 		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
> 		connectionFactory.setBrokerURL("vm://localhost?async=false");
> 		RawRollbackTests.connectionFactory = connectionFactory;
> 		queue = new ActiveMQQueue("queue");
> 	}
> 	@AfterClass
> 	public static void close() throws Exception {
> 		broker.stop();
> 	}
> 	@Before
> 	public void clearData() throws Exception {
> 		getMessages(false); // drain queue
> 		convertAndSend("foo");
> 		convertAndSend("bar");
> 	}
> 	@After
> 	public void checkPostConditions() throws Exception {
> 		Thread.sleep(1000L);
> 		List<String> list = getMessages(false);
> 		assertEquals(2, list.size());
> 	}
> 	@Test
> 	public void testReceiveMessages() throws Exception {
> 		List<String> list = getMessages(true);
> 		assertEquals(2, list.size());
> 		assertTrue(list.contains("foo"));
> 	}
> 	
> 	private void convertAndSend(String msg) throws Exception {
> 		Connection connection = connectionFactory.createConnection();
> 		connection.start();
> 		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
> 		MessageProducer producer = session.createProducer(queue);
> 		producer.send(session.createTextMessage(msg));
> 		producer.close();
> 		session.commit();
> 		session.close();
> 		connection.close();
> 	}
> 	private List<String> getMessages(boolean rollback) throws Exception {
> 		Connection connection = connectionFactory.createConnection();
> 		connection.start();
> 		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
> 		String next = "";
> 		List<String> msgs = new ArrayList<String>();
> 		while (next != null) {
> 			next = (String) receiveAndConvert(session);
> 			if (next != null)
> 				msgs.add(next);
> 		}
> 		if (rollback) {
> 			session.rollback();
> 		} else {
> 			session.commit();
> 		}
> 		session.close();
> 		connection.close();
> 		return msgs;
> 	}
> 	private String receiveAndConvert(Session session) throws Exception {
> 		MessageConsumer consumer = session.createConsumer(queue);
> 		Message message = consumer.receive(100L);
> 		consumer.close();
> 		if (message==null) {
> 			return null;
> 		}
> 		return ((TextMessage)message).getText();
> 	}
> }
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic