[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-dev
Subject:    [jira] Commented: (AMQ-1918) AbstractStoreCursor.size gets out of
From:       "Nicusor Tanase (JIRA)" <jira () apache ! org>
Date:       2008-09-26 11:06:52
Message-ID: 2015970146.1222427212754.JavaMail.jira () brutus
[Download RAW message or body]


    [ https://issues.apache.org/activemq/browse/AMQ-1918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=46016#action_46016 \
] 

Nicusor Tanase commented on AMQ-1918:
-------------------------------------

I found a way to work around this issue, by changing the way messages are loaded from \
the database. I ran tests with several queues, producers and consumers and did not \
get any undelivered messages anymore.

DefaultJDBCAdapter.doRecoverNextMessages() recovers the messages with ID higher then \
the last recovered messages. The SQL statement is:
{code:title=org.apache.activemq.store.jdbc.Statements.java|borderStyle=solid}
findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
                                        + " WHERE CONTAINER=? AND ID > ? ORDER BY \
ID"; {code}
However, it can happen that messages with lower id are inserted into the DB after \
messages with higher IDs.  Such messages do not get recovered from DB.

I have changed on my local copy the DefaultJDBCAdapter to act retroactive, looking \
back {{maxReturned}} rows for any missed messages. Anyway, I am not familiar with \
ActiveMQ code, so you might want to have a look at the modified \
DefaultJDBCAdapter.doRecoverNextMessages() bellow:

{code:title=org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.java|borderStyle=solid}
  public class DefaultJDBCAdapter implements JDBCAdapter {

   private Set<Long> lastRecoveredMessagesIds = new TreeSet<Long>();
   -------------------------------------------------------

    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination \
                destination, long nextSeq,
                                      int maxReturned, JDBCMessageRecoveryListener \
listener) throws Exception {  PreparedStatement s = null;
        ResultSet rs = null;
        long id = 0;
        List<Long> cleanupIds = new ArrayList<Long>();
        int index = 0;
        try {
            s = c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
  s.setMaxRows(maxReturned*2);
            s.setString(1, destination.getQualifiedName());
            s.setLong(2, nextSeq - maxReturned);
            rs = s.executeQuery();
            int count = 0;
            if (statements.isUseExternalMessageReferences()) {
                while (rs.next() && count < maxReturned) {
                	id = rs.getLong(1);
                	if ( lastRecoveredMessagesIds.contains(id) ) {
                		// this message was already recovered
                		cleanupIds.add(id);
                		continue;
                	}                	
                    if (listener.recoverMessageReference(rs.getString(1))) {
                        count++;
                        lastRecoveredMessagesIds.add(id);
                    } else {
                        LOG.debug("Stopped recover next messages");
                    }
                }
            } else {
                while (rs.next() && count < maxReturned) {
                	id = rs.getLong(1);
                	if ( lastRecoveredMessagesIds.contains(id) ) {
                		// this message was already recovered
                		cleanupIds.add(id);
                		continue;
                	}
                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) \
{  count++;
                        lastRecoveredMessagesIds.add(id);
                    } else {
                        LOG.debug("Stopped recover next messages");
                    }
                }
            }
            
            //not cleanup the list of recovered messages
            index = 0;
            Iterator<Long> it = cleanupIds.iterator();
            while (it.hasNext() && index < count) {
            	lastRecoveredMessagesIds.remove(it.next());
            }
            
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            close(rs);
            close(s);
        }
    }

}
{code}




> AbstractStoreCursor.size gets out of synch with Store size and blocks consumers
> -------------------------------------------------------------------------------
> 
> Key: AMQ-1918
> URL: https://issues.apache.org/activemq/browse/AMQ-1918
> Project: ActiveMQ
> Issue Type: Bug
> Components: Message Store
> Affects Versions: 5.1.0
> Reporter: Richard Yarger
> Assignee: Rob Davies
> Priority: Critical
> Fix For: 5.3.0
> 
> Attachments: activemq.xml, testAMQMessageStore.zip, testdata.zip
> 
> 
> In version 5.1.0, we are seeing our queue consumers stop consuming for no reason.
> We have a staged queue environment and we occasionally see one queue display \
> negative pending message counts that hang around -x, rise to -x+n gradually and \
> then fall back to -x abruptly. The messages are building up and being processed in \
> bunches but its not easy to see because the counts are negative. We see this \
> behavior in the messages coming out of the system. Outbound messages come out in \
> bunches and are synchronized with the queue pending count dropping to -x. This \
> issue does not happen ALL of the time. It happens about once a week and the only \
> way to fix it is to bounce the broker. It doesn't happen to the same queue \
> everytime, so it is not our consuming code. Although we don't have a reproducible \
> scenario, we have been able to debug the issue in our test environment. We traced \
> the problem to the cached store size in the AbstractStoreCursor. This value becomes \
> 0 or negative and prevents the AbstractStoreCursor from retrieving more messages \
> from the store. (see AbstractStoreCursor.fillBatch() ) We have seen size value go \
> lower than -1000. We have also forced it to fix itself by sending in n+1 messages. \
> Once the size goes above zero, the cached value is refreshed and things work ok \
> again. Unfortunately, during low volume times, it could be hours before n+1 \
> messages are received, so our message latency can rise during low volume times.... \
> :( I have attached our broker config.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic