[prev in list] [next in list] [prev in thread] [next in thread]
List: activemq-dev
Subject: [jira] Commented: (AMQ-2233) After rollback received messages not
From: "Gary Tully (JIRA)" <jira () apache ! org>
Date: 2009-04-28 16:31:38
Message-ID: 1459169791.1240936298344.JavaMail.jira () brutus
[Download RAW message or body]
[ https://issues.apache.org/activemq/browse/AMQ-2233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51426#action_51426 \
]
Gary Tully commented on AMQ-2233:
---------------------------------
can you attach your est case (see the attach file link) and make a license grant so \
that your test can be added to the activemq suite?
This is an interesting test case. The change of behavior is around consumer.close() \
and transactions. When in a transaction, the close is deferred till the transaction \
completes, so with a prefetch > 0, messages dispatched to that consumer will not be \
available till the transaction commits.
I think your test case will work with a prefetch of 0, \
connectionFactory.setBrokerURL("vm://localhost?async=false&waitForStart=5000&jms.prefetchPolicy.all=0");
> After rollback received messages not re-presented
> -------------------------------------------------
>
> Key: AMQ-2233
> URL: https://issues.apache.org/activemq/browse/AMQ-2233
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.2.0
> Reporter: Dave Syer
> Assignee: Gary Tully
>
> After rollback received messages not re-presented. If I receive in a transaction \
> and then roll back the messages should be re-presented in the next transaction. \
> This used to work in 5.1.0, but is broken in 5.2.0. You can browse the Queue in JMX \
> after the rollback and see that the messages are still there, but they are not \
> received by a consumer in the same process. Here's a test case (fails on the \
> checkPostConditions()): {code}
> public class RawRollbackTests {
>
> private static ConnectionFactory connectionFactory;
> private static Destination queue;
> private static BrokerService broker;
> @BeforeClass
> public static void clean() throws Exception {
> FileUtils.deleteDirectory(new File("activemq-data"));
> broker = new BrokerService();
> broker.setUseJmx(true);
> broker.start();
> ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
> connectionFactory.setBrokerURL("vm://localhost?async=false");
> RawRollbackTests.connectionFactory = connectionFactory;
> queue = new ActiveMQQueue("queue");
> }
> @AfterClass
> public static void close() throws Exception {
> broker.stop();
> }
> @Before
> public void clearData() throws Exception {
> getMessages(false); // drain queue
> convertAndSend("foo");
> convertAndSend("bar");
> }
> @After
> public void checkPostConditions() throws Exception {
> Thread.sleep(1000L);
> List<String> list = getMessages(false);
> assertEquals(2, list.size());
> }
> @Test
> public void testReceiveMessages() throws Exception {
> List<String> list = getMessages(true);
> assertEquals(2, list.size());
> assertTrue(list.contains("foo"));
> }
>
> private void convertAndSend(String msg) throws Exception {
> Connection connection = connectionFactory.createConnection();
> connection.start();
> Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
> MessageProducer producer = session.createProducer(queue);
> producer.send(session.createTextMessage(msg));
> producer.close();
> session.commit();
> session.close();
> connection.close();
> }
> private List<String> getMessages(boolean rollback) throws Exception {
> Connection connection = connectionFactory.createConnection();
> connection.start();
> Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
> String next = "";
> List<String> msgs = new ArrayList<String>();
> while (next != null) {
> next = (String) receiveAndConvert(session);
> if (next != null)
> msgs.add(next);
> }
> if (rollback) {
> session.rollback();
> } else {
> session.commit();
> }
> session.close();
> connection.close();
> return msgs;
> }
> private String receiveAndConvert(Session session) throws Exception {
> MessageConsumer consumer = session.createConsumer(queue);
> Message message = consumer.receive(100L);
> consumer.close();
> if (message==null) {
> return null;
> }
> return ((TextMessage)message).getText();
> }
> }
> {code}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic