[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-dev
Subject:    Re: git commit: Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions so that
From:       Gary Tully <gary.tully () gmail ! com>
Date:       2013-11-29 12:44:02
Message-ID: CAH+vQmNEY8+6L3BduPQthb6tCgEc2cuGdt_+JUtNyA6kADeVug () mail ! gmail ! com
[Download RAW message or body]

Hiram, there is a regression in
org.apache.activemq.store.LevelDBStorePerDestinationTest
seems nothing is going to terminate the connection in this case.
Skipped the test as it was hanging the ci builds.
see: https://git-wip-us.apache.org/repos/asf?p=activemq.git;a=commit;h=b9f0783a

On 25 November 2013 18:22,  <chirino@apache.org> wrote:
> Updated Branches:
> refs/heads/trunk 00cb9a566 -> b0e91d47f
> 
> 
> Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions so \
> that the clients retry try the operations instead of giving up.  Also retry the \
> problemantic getMessage() call which seems to fail at times. 
> Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
> Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b0e91d47
> Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b0e91d47
> Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b0e91d47
> 
> Branch: refs/heads/trunk
> Commit: b0e91d47f5fced59c89a34d993f4d87c7986b04b
> Parents: 00cb9a5
> Author: Hiram Chirino <hiram@hiramchirino.com>
> Authored: Mon Nov 25 13:17:58 2013 -0500
> Committer: Hiram Chirino <hiram@hiramchirino.com>
> Committed: Mon Nov 25 13:17:58 2013 -0500
> 
> ----------------------------------------------------------------------
> .../activemq/broker/SuppressReplyException.java |  8 +++++++
> .../org/apache/activemq/leveldb/DBManager.scala |  2 +-
> .../apache/activemq/leveldb/LevelDBClient.scala | 23 +++++++++++++++++---
> .../apache/activemq/leveldb/LevelDBStore.scala  |  6 ++---
> 4 files changed, 32 insertions(+), 7 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>                 
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java \
> b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>  index eb54a12..f2c6502 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>                 
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>  @@ -26,6 +26,14 @@ import java.io.IOException;
> *
> */
> public class SuppressReplyException extends RuntimeException {
> +    public SuppressReplyException(Throwable cause) {
> +        super(cause);
> +    }
> +
> +    public SuppressReplyException(String reason) {
> +        super(reason);
> +    }
> +
> public SuppressReplyException(String reason, IOException cause) {
> super(reason, cause);
> }
> 
> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>                 
> ----------------------------------------------------------------------
> diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala \
> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala \
>                 index e467379..00260d9 100644
> --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>                 
> +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>  @@ -860,7 +860,7 @@ class DBManager(val parent:LevelDBStore) {
> def getMessage(x: MessageId):Message = {
> val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)
> val locator = id.getDataLocator()
> -    val msg = client.getMessage(locator)
> +    val msg = client.getMessageWithRetry(locator)
> msg.setMessageId(id)
> msg
> }
> 
> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>                 
> ----------------------------------------------------------------------
> diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala \
> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>  index 15f7bb0..c0cedce 100755
> --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>                 
> +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>  @@ -52,6 +52,7 @@ import org.apache.activemq.leveldb.MessageRecord
> import org.apache.activemq.leveldb.EntryLocator
> import org.apache.activemq.leveldb.DataLocator
> import org.fusesource.hawtbuf.ByteArrayOutputStream
> +import org.apache.activemq.broker.SuppressReplyException
> 
> /**
> * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
> @@ -545,7 +546,7 @@ class LevelDBClient(store: LevelDBStore) {
> Thread.sleep(100);
> }
> }
> -      throw failure;
> +      throw new SuppressReplyException(failure);
> }
> try {
> func
> @@ -1244,7 +1245,7 @@ class LevelDBClient(store: LevelDBStore) {
> collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
> val seq = decodeLong(key)
> var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
> -      val msg = getMessage(locator)
> +      val msg = getMessageWithRetry(locator)
> msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
> msg.getMessageId().setDataLocator(locator)
> msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
> @@ -1270,7 +1271,7 @@ class LevelDBClient(store: LevelDBStore) {
> func(XaAckRecord(collectionKey, seq, ack, sub))
> } else {
> var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
> -        val msg = getMessage(locator)
> +        val msg = getMessageWithRetry(locator)
> msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
> msg.getMessageId().setDataLocator(locator)
> func(msg)
> @@ -1287,6 +1288,22 @@ class LevelDBClient(store: LevelDBStore) {
> }
> }
> 
> +  def getMessageWithRetry(locator:AnyRef):Message = {
> +    var retry = 0
> +    var rc = getMessage(locator);
> +    while( rc == null ) {
> +      if( retry > 10 )
> +        return null;
> +      Thread.sleep(retry*10)
> +      rc = getMessage(locator);
> +      retry+=1
> +    }
> +    if( retry > 0 ) {
> +      info("Recovered from 'failed getMessage' on retry: "+retry)
> +    }
> +    rc
> +  }
> +
> def getMessage(locator:AnyRef):Message = {
> assert(locator!=null)
> val buffer = locator match {
> 
> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>                 
> ----------------------------------------------------------------------
> diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala \
> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>  index 322656f..e4c7a02 100644
> --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>                 
> +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>  @@ -17,7 +17,7 @@
> 
> package org.apache.activemq.leveldb
> 
> -import org.apache.activemq.broker.{LockableServiceSupport, BrokerServiceAware, \
> ConnectionContext} +import org.apache.activemq.broker.{SuppressReplyException, \
> LockableServiceSupport, BrokerServiceAware, ConnectionContext} import \
> org.apache.activemq.command._ import org.apache.activemq.openwire.OpenWireFormat
> import org.apache.activemq.usage.SystemUsage
> @@ -186,7 +186,7 @@ class LevelDBStore extends LockableServiceSupport with \
> BrokerServiceAware with P 
> def check_running = {
> if( this.isStopped ) {
> -      throw new IOException("Store has been stopped")
> +      throw new SuppressReplyException("Store has been stopped")
> }
> }
> 
> @@ -437,7 +437,7 @@ class LevelDBStore extends LockableServiceSupport with \
> BrokerServiceAware with P def verify_running = {
> if( isStopping || isStopped ) {
> try {
> -        throw new IOException("Not running")
> +        throw new SuppressReplyException("Not running")
> } catch {
> case e:IOException =>
> if( broker_service!=null ) {
> 



-- 
http://redhat.com
http://blog.garytully.com


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic