[prev in list] [next in list] [prev in thread] [next in thread] 

List:       james-dev
Subject:    [18/31] james-project git commit: MAILBOX-307 Prepare Cassandra ACL statements
From:       matthieu () apache ! org
Date:       2017-09-29 7:21:56
Message-ID: 1adeb7aaa796461ba5591a211aa354d5 () git ! apache ! org
[Download RAW message or body]

MAILBOX-307 Prepare Cassandra ACL statements


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/91d303b1
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/91d303b1
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/91d303b1

Branch: refs/heads/master
Commit: 91d303b1dc72bd2a69a3b1ed8e7b3b86e7d85ca9
Parents: ee68d17
Author: benwa <btellier@linagora.com>
Authored: Mon Sep 25 17:17:47 2017 +0700
Committer: Matthieu Baechler <matthieu@apache.org>
Committed: Fri Sep 29 09:20:40 2017 +0200

----------------------------------------------------------------------
 .../cassandra/mail/CassandraACLMapper.java      | 118 ++++++++++++-------
 1 file changed, 76 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/91d303b1/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
                
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java \
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
 index 5695ffc..b2603a2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
                
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
 @@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
@@ -45,15 +46,18 @@ import \
org.apache.james.mailbox.store.json.SimpleMailboxACLJsonConverter;  import \
org.slf4j.Logger;  import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.Insert;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Throwables;
 
 public class CassandraACLMapper {
-    private static final Logger LOG = \
LoggerFactory.getLogger(CassandraACLMapper.class);  public static final int \
INITIAL_VALUE = 0; +    private static final Logger LOG = \
LoggerFactory.getLogger(CassandraACLMapper.class); +    private static final String \
OLD_VERSION = "oldVersion";  
     @FunctionalInterface
     public interface CodeInjector {
@@ -61,43 +65,75 @@ public class CassandraACLMapper {
     }
 
     private final CassandraAsyncExecutor executor;
-    private final Session session;
     private final int maxRetry;
     private final CodeInjector codeInjector;
+    private final PreparedStatement insertStatement;
+    private final PreparedStatement conditionalInsertStatement;
+    private final PreparedStatement conditionalUpdateStatement;
+    private final PreparedStatement readStatement;
 
     public CassandraACLMapper(Session session, CassandraConfiguration \
cassandraConfiguration) {  this(session, cassandraConfiguration, () -> {});
     }
 
     public CassandraACLMapper(Session session, CassandraConfiguration \
                cassandraConfiguration, CodeInjector codeInjector) {
-        this.session = session;
         this.executor = new CassandraAsyncExecutor(session);
         this.maxRetry = cassandraConfiguration.getAclMaxRetry();
         this.codeInjector = codeInjector;
+        this.insertStatement = session.prepare(insertCqlBase());
+        this.conditionalInsertStatement = \
session.prepare(insertCqlBase().ifNotExists()); +        \
this.conditionalUpdateStatement = prepareConditionalUpdate(session); +        \
this.readStatement = prepareReadStatement(session); +    }
+
+    private PreparedStatement prepareConditionalUpdate(Session session) {
+        return session.prepare(
+            update(CassandraACLTable.TABLE_NAME)
+                .where(eq(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID)))
+                .with(set(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL)))
+                .and(set(CassandraACLTable.VERSION, \
bindMarker(CassandraACLTable.VERSION))) +                \
.onlyIf(eq(CassandraACLTable.VERSION, bindMarker(OLD_VERSION)))); +    }
+
+    private PreparedStatement prepareReadStatement(Session session) {
+        return session.prepare(
+            select(CassandraACLTable.ACL, CassandraACLTable.VERSION)
+                .from(CassandraACLTable.TABLE_NAME)
+                .where(eq(CassandraMailboxTable.ID, \
bindMarker(CassandraACLTable.ID)))); +    }
+
+    private Insert insertCqlBase() {
+        return insertInto(CassandraACLTable.TABLE_NAME)
+            .value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID))
+            .value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL))
+            .value(CassandraACLTable.VERSION, INITIAL_VALUE);
     }
 
     public CompletableFuture<MailboxACL> getACL(CassandraId cassandraId) {
-        return  getStoredACLRow(cassandraId).thenApply(resultSet -> {
-            if (resultSet.isExhausted()) {
-                return SimpleMailboxACL.EMPTY;
-            }
-            String serializedACL = resultSet.one().getString(CassandraACLTable.ACL);
-            return deserializeACL(cassandraId, serializedACL);
-        });
+        return getStoredACLRow(cassandraId)
+            .thenApply(resultSet -> getAcl(cassandraId, resultSet));
+    }
+
+    private MailboxACL getAcl(CassandraId cassandraId, ResultSet resultSet) {
+        if (resultSet.isExhausted()) {
+            return SimpleMailboxACL.EMPTY;
+        }
+        String serializedACL = resultSet.one().getString(CassandraACLTable.ACL);
+        return deserializeACL(cassandraId, serializedACL);
     }
 
     public void updateACL(CassandraId cassandraId, MailboxACL.MailboxACLCommand \
command) throws MailboxException {  try {
-            new FunctionRunnerWithRetry(maxRetry).execute(
-                () -> {
-                    codeInjector.inject();
-                    ResultSet resultSet = getAclWithVersion(cassandraId)
-                        .map(aclWithVersion -> aclWithVersion.apply(command))
-                        .map(aclWithVersion -> updateStoredACL(cassandraId, \
                aclWithVersion))
-                        .orElseGet(() -> insertACL(cassandraId, \
                applyCommandOnEmptyACL(command)));
-                    return \
                resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
                
-                }
-            );
+            new FunctionRunnerWithRetry(maxRetry)
+                .execute(
+                    () -> {
+                        codeInjector.inject();
+                        ResultSet resultSet = getAclWithVersion(cassandraId)
+                            .map(aclWithVersion -> aclWithVersion.apply(command))
+                            .map(aclWithVersion -> updateStoredACL(cassandraId, \
aclWithVersion)) +                            .orElseGet(() -> insertACL(cassandraId, \
applyCommandOnEmptyACL(command))); +                        return \
resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED); +        \
});  } catch (LightweightTransactionException e) {
             throw new MailboxException("Exception during lightweight transaction", \
e);  }
@@ -105,11 +141,11 @@ public class CassandraACLMapper {
 
     public void resetACL(CassandraId cassandraId, MailboxACL mailboxACL) {
         try {
-            session.execute(
-                insertInto(CassandraACLTable.TABLE_NAME)
-                    .value(CassandraACLTable.ID, cassandraId.asUuid())
-                    .value(CassandraACLTable.ACL, \
                SimpleMailboxACLJsonConverter.toJson(mailboxACL))
-                    .value(CassandraACLTable.VERSION, INITIAL_VALUE));
+            executor.executeVoid(
+                insertStatement.bind()
+                    .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+                    .setString(CassandraACLTable.ACL, \
SimpleMailboxACLJsonConverter.toJson(mailboxACL))) +                .join();
         } catch (JsonProcessingException e) {
             throw Throwables.propagate(e);
         }
@@ -124,20 +160,20 @@ public class CassandraACLMapper {
     }
 
     private CompletableFuture<ResultSet> getStoredACLRow(CassandraId cassandraId) {
-        return executor.execute(select(CassandraACLTable.ACL, \
                CassandraACLTable.VERSION)
-            .from(CassandraACLTable.TABLE_NAME)
-            .where(eq(CassandraMailboxTable.ID, cassandraId.asUuid())));
+        return executor.execute(
+            readStatement.bind()
+                .setUUID(CassandraACLTable.ID, cassandraId.asUuid()));
     }
 
     private ResultSet updateStoredACL(CassandraId cassandraId, ACLWithVersion \
aclWithVersion) {  try {
-            return session.execute(
-                update(CassandraACLTable.TABLE_NAME)
-                    .with(set(CassandraACLTable.ACL, \
                SimpleMailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL)))
-                    .and(set(CassandraACLTable.VERSION, aclWithVersion.version + 1))
-                    .where(eq(CassandraACLTable.ID, cassandraId.asUuid()))
-                    .onlyIf(eq(CassandraACLTable.VERSION, aclWithVersion.version))
-            );
+            return executor.execute(
+                conditionalUpdateStatement.bind()
+                    .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+                    .setString(CassandraACLTable.ACL,  \
SimpleMailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL)) +                    \
.setLong(CassandraACLTable.VERSION, aclWithVersion.version + 1) +                    \
.setLong(OLD_VERSION, aclWithVersion.version)) +                .join();
         } catch (JsonProcessingException exception) {
             throw Throwables.propagate(exception);
         }
@@ -145,13 +181,11 @@ public class CassandraACLMapper {
 
     private ResultSet insertACL(CassandraId cassandraId, MailboxACL acl) {
         try {
-            return session.execute(
-                insertInto(CassandraACLTable.TABLE_NAME)
-                    .value(CassandraACLTable.ID, cassandraId.asUuid())
-                    .value(CassandraACLTable.ACL, \
                SimpleMailboxACLJsonConverter.toJson(acl))
-                    .value(CassandraACLTable.VERSION, 0)
-                    .ifNotExists()
-            );
+            return executor.execute(
+                conditionalInsertStatement.bind()
+                    .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+                    .setString(CassandraACLTable.ACL, \
SimpleMailboxACLJsonConverter.toJson(acl))) +                .join();
         } catch (JsonProcessingException exception) {
             throw Throwables.propagate(exception);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic