[prev in list] [next in list] [prev in thread] [next in thread]
List: james-dev
Subject: [18/31] james-project git commit: MAILBOX-307 Prepare Cassandra ACL statements
From: matthieu () apache ! org
Date: 2017-09-29 7:21:56
Message-ID: 1adeb7aaa796461ba5591a211aa354d5 () git ! apache ! org
[Download RAW message or body]
MAILBOX-307 Prepare Cassandra ACL statements
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/91d303b1
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/91d303b1
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/91d303b1
Branch: refs/heads/master
Commit: 91d303b1dc72bd2a69a3b1ed8e7b3b86e7d85ca9
Parents: ee68d17
Author: benwa <btellier@linagora.com>
Authored: Mon Sep 25 17:17:47 2017 +0700
Committer: Matthieu Baechler <matthieu@apache.org>
Committed: Fri Sep 29 09:20:40 2017 +0200
----------------------------------------------------------------------
.../cassandra/mail/CassandraACLMapper.java | 118 ++++++++++++-------
1 file changed, 76 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/91d303b1/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java \
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
index 5695ffc..b2603a2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
@@ -19,6 +19,7 @@
package org.apache.james.mailbox.cassandra.mail;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
@@ -45,15 +46,18 @@ import \
org.apache.james.mailbox.store.json.SimpleMailboxACLJsonConverter; import \
org.slf4j.Logger; import org.slf4j.LoggerFactory;
+import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.Insert;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Throwables;
public class CassandraACLMapper {
- private static final Logger LOG = \
LoggerFactory.getLogger(CassandraACLMapper.class); public static final int \
INITIAL_VALUE = 0; + private static final Logger LOG = \
LoggerFactory.getLogger(CassandraACLMapper.class); + private static final String \
OLD_VERSION = "oldVersion";
@FunctionalInterface
public interface CodeInjector {
@@ -61,43 +65,75 @@ public class CassandraACLMapper {
}
private final CassandraAsyncExecutor executor;
- private final Session session;
private final int maxRetry;
private final CodeInjector codeInjector;
+ private final PreparedStatement insertStatement;
+ private final PreparedStatement conditionalInsertStatement;
+ private final PreparedStatement conditionalUpdateStatement;
+ private final PreparedStatement readStatement;
public CassandraACLMapper(Session session, CassandraConfiguration \
cassandraConfiguration) { this(session, cassandraConfiguration, () -> {});
}
public CassandraACLMapper(Session session, CassandraConfiguration \
cassandraConfiguration, CodeInjector codeInjector) {
- this.session = session;
this.executor = new CassandraAsyncExecutor(session);
this.maxRetry = cassandraConfiguration.getAclMaxRetry();
this.codeInjector = codeInjector;
+ this.insertStatement = session.prepare(insertCqlBase());
+ this.conditionalInsertStatement = \
session.prepare(insertCqlBase().ifNotExists()); + \
this.conditionalUpdateStatement = prepareConditionalUpdate(session); + \
this.readStatement = prepareReadStatement(session); + }
+
+ private PreparedStatement prepareConditionalUpdate(Session session) {
+ return session.prepare(
+ update(CassandraACLTable.TABLE_NAME)
+ .where(eq(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID)))
+ .with(set(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL)))
+ .and(set(CassandraACLTable.VERSION, \
bindMarker(CassandraACLTable.VERSION))) + \
.onlyIf(eq(CassandraACLTable.VERSION, bindMarker(OLD_VERSION)))); + }
+
+ private PreparedStatement prepareReadStatement(Session session) {
+ return session.prepare(
+ select(CassandraACLTable.ACL, CassandraACLTable.VERSION)
+ .from(CassandraACLTable.TABLE_NAME)
+ .where(eq(CassandraMailboxTable.ID, \
bindMarker(CassandraACLTable.ID)))); + }
+
+ private Insert insertCqlBase() {
+ return insertInto(CassandraACLTable.TABLE_NAME)
+ .value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID))
+ .value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL))
+ .value(CassandraACLTable.VERSION, INITIAL_VALUE);
}
public CompletableFuture<MailboxACL> getACL(CassandraId cassandraId) {
- return getStoredACLRow(cassandraId).thenApply(resultSet -> {
- if (resultSet.isExhausted()) {
- return SimpleMailboxACL.EMPTY;
- }
- String serializedACL = resultSet.one().getString(CassandraACLTable.ACL);
- return deserializeACL(cassandraId, serializedACL);
- });
+ return getStoredACLRow(cassandraId)
+ .thenApply(resultSet -> getAcl(cassandraId, resultSet));
+ }
+
+ private MailboxACL getAcl(CassandraId cassandraId, ResultSet resultSet) {
+ if (resultSet.isExhausted()) {
+ return SimpleMailboxACL.EMPTY;
+ }
+ String serializedACL = resultSet.one().getString(CassandraACLTable.ACL);
+ return deserializeACL(cassandraId, serializedACL);
}
public void updateACL(CassandraId cassandraId, MailboxACL.MailboxACLCommand \
command) throws MailboxException { try {
- new FunctionRunnerWithRetry(maxRetry).execute(
- () -> {
- codeInjector.inject();
- ResultSet resultSet = getAclWithVersion(cassandraId)
- .map(aclWithVersion -> aclWithVersion.apply(command))
- .map(aclWithVersion -> updateStoredACL(cassandraId, \
aclWithVersion))
- .orElseGet(() -> insertACL(cassandraId, \
applyCommandOnEmptyACL(command)));
- return \
resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
- }
- );
+ new FunctionRunnerWithRetry(maxRetry)
+ .execute(
+ () -> {
+ codeInjector.inject();
+ ResultSet resultSet = getAclWithVersion(cassandraId)
+ .map(aclWithVersion -> aclWithVersion.apply(command))
+ .map(aclWithVersion -> updateStoredACL(cassandraId, \
aclWithVersion)) + .orElseGet(() -> insertACL(cassandraId, \
applyCommandOnEmptyACL(command))); + return \
resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED); + \
}); } catch (LightweightTransactionException e) {
throw new MailboxException("Exception during lightweight transaction", \
e); }
@@ -105,11 +141,11 @@ public class CassandraACLMapper {
public void resetACL(CassandraId cassandraId, MailboxACL mailboxACL) {
try {
- session.execute(
- insertInto(CassandraACLTable.TABLE_NAME)
- .value(CassandraACLTable.ID, cassandraId.asUuid())
- .value(CassandraACLTable.ACL, \
SimpleMailboxACLJsonConverter.toJson(mailboxACL))
- .value(CassandraACLTable.VERSION, INITIAL_VALUE));
+ executor.executeVoid(
+ insertStatement.bind()
+ .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+ .setString(CassandraACLTable.ACL, \
SimpleMailboxACLJsonConverter.toJson(mailboxACL))) + .join();
} catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
@@ -124,20 +160,20 @@ public class CassandraACLMapper {
}
private CompletableFuture<ResultSet> getStoredACLRow(CassandraId cassandraId) {
- return executor.execute(select(CassandraACLTable.ACL, \
CassandraACLTable.VERSION)
- .from(CassandraACLTable.TABLE_NAME)
- .where(eq(CassandraMailboxTable.ID, cassandraId.asUuid())));
+ return executor.execute(
+ readStatement.bind()
+ .setUUID(CassandraACLTable.ID, cassandraId.asUuid()));
}
private ResultSet updateStoredACL(CassandraId cassandraId, ACLWithVersion \
aclWithVersion) { try {
- return session.execute(
- update(CassandraACLTable.TABLE_NAME)
- .with(set(CassandraACLTable.ACL, \
SimpleMailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL)))
- .and(set(CassandraACLTable.VERSION, aclWithVersion.version + 1))
- .where(eq(CassandraACLTable.ID, cassandraId.asUuid()))
- .onlyIf(eq(CassandraACLTable.VERSION, aclWithVersion.version))
- );
+ return executor.execute(
+ conditionalUpdateStatement.bind()
+ .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+ .setString(CassandraACLTable.ACL, \
SimpleMailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL)) + \
.setLong(CassandraACLTable.VERSION, aclWithVersion.version + 1) + \
.setLong(OLD_VERSION, aclWithVersion.version)) + .join();
} catch (JsonProcessingException exception) {
throw Throwables.propagate(exception);
}
@@ -145,13 +181,11 @@ public class CassandraACLMapper {
private ResultSet insertACL(CassandraId cassandraId, MailboxACL acl) {
try {
- return session.execute(
- insertInto(CassandraACLTable.TABLE_NAME)
- .value(CassandraACLTable.ID, cassandraId.asUuid())
- .value(CassandraACLTable.ACL, \
SimpleMailboxACLJsonConverter.toJson(acl))
- .value(CassandraACLTable.VERSION, 0)
- .ifNotExists()
- );
+ return executor.execute(
+ conditionalInsertStatement.bind()
+ .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+ .setString(CassandraACLTable.ACL, \
SimpleMailboxACLJsonConverter.toJson(acl))) + .join();
} catch (JsonProcessingException exception) {
throw Throwables.propagate(exception);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic