[prev in list] [next in list] [prev in thread] [next in thread] 

List:       activemq-commits
Subject:    [activemq-artemis] 02/03: ARTEMIS-3400 add audit logging for message ack
From:       clebertsuconic () apache ! org
Date:       2021-07-27 3:27:53
Message-ID: 20210727032752.BEED281F25 () gitbox ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit f554806ec315cb8b4d0844b12674a825e08d44a3
Author: Justin Bertram <jbertram@apache.org>
AuthorDate: Fri Jul 23 12:54:01 2021 -0500

    ARTEMIS-3400 add audit logging for message ack
    
    Aside from adding audit logging for message acknowledgement this commit
    also consolidates the two nearly identical acknowledge method
    implementations in o.a.a.a.c.s.i.QueueImpl. This avoids duplicating
    code for audit logging, plugin invocation, etc. There is no semantic
    change.
---
 .../apache/activemq/artemis/logs/AuditLogger.java  |  9 +++
 .../artemis/core/server/impl/QueueImpl.java        | 73 +++++++++++-----------
 .../logging/AuditLoggerAMQPMutualSSLTest.java      |  1 +
 .../tests/smoke/logging/AuditLoggerTest.java       |  3 +-
 4 files changed, 49 insertions(+), 37 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java \
b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java \
                index 9a41027..94a879c 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2437,6 +2437,15 @@ public interface AuditLogger extends BasicLogger {
    @Message(id = 601501, value = "User {0} is consuming a message from {1}: {2}", \
format = Message.Format.MESSAGE_FORMAT)  void consumeMessage(String user, String \
address, String message);  
+   //hot path log using a different logger
+   static void coreAcknowledgeMessage(Subject user, String remoteAddress, String \
queue, String message) { +      MESSAGE_LOGGER.acknowledgeMessage(getCaller(user, \
remoteAddress), queue, message); +   }
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 601502, value = "User {0} is acknowledging a message from {1}: \
{2}", format = Message.Format.MESSAGE_FORMAT) +   void acknowledgeMessage(String \
user, String queue, String message); +
    /*
     * This logger is focused on user interaction from the console or thru resource \
                specific functions in the management layer/JMX
     * */
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java \
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 index 37af7e4..300bed6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
                
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 @@ -101,6 +101,7 @@ import \
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract  import \
org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;  import \
org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl;  import \
org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import \
org.apache.activemq.artemis.logs.AuditLogger;  import \
org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;  import \
org.apache.activemq.artemis.utils.ArtemisCloseable;  import \
org.apache.activemq.artemis.utils.BooleanUtil; @@ -1825,32 +1826,7 @@ public class \
QueueImpl extends CriticalComponentImpl implements Queue {  
    @Override
    public void acknowledge(final MessageReference ref, final AckReason reason, final \
                ServerConsumer consumer) throws Exception {
-      if (nonDestructive && reason == AckReason.NORMAL) {
-         decDelivering(ref);
-         if (logger.isDebugEnabled()) {
-            logger.debug("acknowledge ignored nonDestructive=true and \
                reason=NORMAL");
-         }
-      } else {
-         if (ref.isPaged()) {
-            pageSubscription.ack((PagedReference) ref);
-            postAcknowledge(ref, reason);
-         } else {
-            Message message = ref.getMessage();
-
-            boolean durableRef = message.isDurable() && isDurable();
-
-            if (durableRef) {
-               storageManager.storeAcknowledge(id, message.getMessageID());
-            }
-            postAcknowledge(ref, reason);
-         }
-
-         ackAttempts.incrementAndGet();
-
-         if (server != null && server.hasBrokerMessagePlugins()) {
-            server.callBrokerMessagePlugins(plugin -> \
                plugin.messageAcknowledged(ref, reason, consumer));
-         }
-      }
+      acknowledge(null, ref, reason, consumer);
    }
 
    @Override
@@ -1860,34 +1836,59 @@ public class QueueImpl extends CriticalComponentImpl \
implements Queue {  
    @Override
    public void acknowledge(final Transaction tx, final MessageReference ref, final \
                AckReason reason, final ServerConsumer consumer) throws Exception {
-      RefsOperation refsOperation = getRefsOperation(tx, reason);
+      boolean transactional = tx != null;
+      RefsOperation refsOperation = null;
+      if (transactional) {
+         refsOperation = getRefsOperation(tx, reason);
+      }
 
       if (nonDestructive && reason == AckReason.NORMAL) {
-         refsOperation.addOnlyRefAck(ref);
+         if (transactional) {
+            refsOperation.addOnlyRefAck(ref);
+         } else {
+            decDelivering(ref);
+         }
          if (logger.isDebugEnabled()) {
             logger.debug("acknowledge tx ignored nonDestructive=true and \
reason=NORMAL");  }
       } else {
          if (ref.isPaged()) {
-            pageSubscription.ackTx(tx, (PagedReference) ref);
-
-            refsOperation.addAck(ref);
+            if (transactional) {
+               pageSubscription.ackTx(tx, (PagedReference) ref);
+               refsOperation.addAck(ref);
+            } else {
+               pageSubscription.ack((PagedReference) ref);
+               postAcknowledge(ref, reason);
+            }
          } else {
             Message message = ref.getMessage();
 
             boolean durableRef = message.isDurable() && isDurable();
 
             if (durableRef) {
-               storageManager.storeAcknowledgeTransactional(tx.getID(), id, \
                message.getMessageID());
-
-               tx.setContainsPersistent();
+               if (transactional) {
+                  storageManager.storeAcknowledgeTransactional(tx.getID(), id, \
message.getMessageID()); +                  tx.setContainsPersistent();
+               } else {
+                  storageManager.storeAcknowledge(id, message.getMessageID());
+               }
+            }
+            if (transactional) {
+               ackAttempts.incrementAndGet();
+               refsOperation.addAck(ref);
+            } else {
+               postAcknowledge(ref, reason);
             }
+         }
 
+         if (!transactional) {
             ackAttempts.incrementAndGet();
-
-            refsOperation.addAck(ref);
          }
 
+         if (AuditLogger.isMessageLoggingEnabled()) {
+            ServerSession session = server.getSessionByID(consumer.getSessionID());
+            AuditLogger.coreAcknowledgeMessage(session.getRemotingConnection().getAuditSubject(), \
session.getRemotingConnection().getRemoteAddress(), getName().toString(), \
ref.getMessage().toString()); +         }
          if (server != null && server.hasBrokerMessagePlugins()) {
             server.callBrokerMessagePlugins(plugin -> \
plugin.messageAcknowledged(ref, reason, consumer));  }
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerAMQPMutualSSLTest.java \
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerAMQPMutualSSLTest.java
 index 64f9d19..f21680a 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerAMQPMutualSSLTest.java
                
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerAMQPMutualSSLTest.java
 @@ -85,5 +85,6 @@ public class AuditLoggerAMQPMutualSSLTest extends \
                AuditLoggerTestBase {
       checkAuditLogRecord(true, "AMQ601500: User myUser(producers)@", "is sending a \
                message AMQPStandardMessage");
       checkAuditLogRecord(true, "AMQ601265: User myUser(producers)@", "is creating a \
                core consumer");
       checkAuditLogRecord(true, "AMQ601501: User myUser(producers)@", "is consuming \
a message from exampleQueue"); +      checkAuditLogRecord(true, "AMQ601502: User \
myUser(producers)@", "is acknowledging a message from exampleQueue: \
AMQPStandardMessage");  }
 }
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java \
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java
 index 4cd33b9..8e94d86 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java
                
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java
 @@ -171,9 +171,10 @@ public class AuditLoggerTest extends AuditLoggerTestBase {
          Assert.assertNotNull(clientMessage);
          clientMessage = consumer.receive(5000);
          Assert.assertNotNull(clientMessage);
-         checkAuditLogRecord(true, "is consuming a message from");
       } finally {
          connection.close();
       }
+      checkAuditLogRecord(true, "is consuming a message from");
+      checkAuditLogRecord(true, "is acknowledging a message from");
    }
 }


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic