[prev in list] [next in list] [prev in thread] [next in thread]
List: james-dev
Subject: [1/6] james-project git commit: MAILBOX-297 Staged flags updates
From: btellier () apache ! org
Date: 2017-05-29 10:14:18
Message-ID: 8e49b519ca1b4f7ba6c73fce741c9e2f () git ! apache ! org
[Download RAW message or body]
Repository: james-project
Updated Branches:
refs/heads/master d496bb23a -> 09c9d34c6
MAILBOX-297 Staged flags updates
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6f2d4c79
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6f2d4c79
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6f2d4c79
Branch: refs/heads/master
Commit: 6f2d4c79f2aaa4e0acb2b8b59e644eef7e11d37a
Parents: 0535d4b
Author: benwa <btellier@linagora.com>
Authored: Mon May 22 11:53:22 2017 +0700
Committer: benwa <btellier@linagora.com>
Committed: Mon May 29 16:58:49 2017 +0700
----------------------------------------------------------------------
.../cassandra/mail/CassandraMessageMapper.java | 176 +++++++++++--------
1 file changed, 107 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/6f2d4c79/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java \
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 349cd08..7b711bd 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -25,25 +25,21 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.mail.Flags;
import javax.mail.Flags.Flag;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
import org.apache.james.mailbox.FlagsBuilder;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.CassandraId;
import org.apache.james.mailbox.cassandra.CassandraMessageId;
-import org.apache.james.mailbox.cassandra.mail.utils.MessageDeletedDuringFlagsUpdateException;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.MailboxCounters;
-import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MessageMetaData;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.model.UpdatedFlags;
@@ -60,16 +56,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.steveash.guavate.Guavate;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
public class CassandraMessageMapper implements MessageMapper {
- private static final Logger LOGGER = \
LoggerFactory.getLogger(CassandraMessageMapper.class);
public static final MailboxCounters INITIAL_COUNTERS = \
MailboxCounters.builder()
.count(0L)
.unseen(0L)
.build();
public static final int EXPUNGE_BATCH_SIZE = 100;
+ public static final Logger LOGGER = \
LoggerFactory.getLogger(CassandraMessageMapper.class);
private final CassandraModSeqProvider modSeqProvider;
private final MailboxSession mailboxSession;
@@ -280,14 +275,51 @@ public class CassandraMessageMapper implements MessageMapper {
@Override
public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator \
flagUpdateCalculator, MessageRange set) throws MailboxException { CassandraId \
mailboxId = (CassandraId) mailbox.getMailboxId();
- return messageIdDAO.retrieveMessages(mailboxId, set)
- .join()
- .flatMap(message -> updateFlagsOnMessage(mailbox, \
flagUpdateCalculator, message))
- .map((UpdatedFlags updatedFlags) -> \
indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags)
- .thenApply(voidValue -> updatedFlags))
- .map(CompletableFuture::join)
- .collect(Collectors.toList()) // This collect is here as we need to \
consume all the stream before returning result
- .iterator();
+
+ return runUpdate(mailboxId, set, flagUpdateCalculator).iterator();
+ }
+
+ private List<UpdatedFlags> runUpdate(CassandraId mailboxId, MessageRange set, \
FlagsUpdateCalculator flagsUpdateCalculator) throws MailboxException { + \
Stream<ComposedMessageIdWithMetaData> toBeUpdated = \
messageIdDAO.retrieveMessages(mailboxId, set).join(); +
+ FlagsUpdateStageResult globalResult = runUpdateStage(mailboxId, toBeUpdated, \
flagsUpdateCalculator); +
+ int retryCount = 0;
+
+ while (retryCount < maxRetries && !globalResult.getFailed().isEmpty()) {
+ retryCount++;
+ FlagsUpdateStageResult stageResult = runUpdateStage(mailboxId,
+ FluentFutureStream.of(
+ globalResult.getFailed().stream()
+ .map(uid -> messageIdDAO.retrieve(mailboxId, uid)))
+ .flatMap(OptionalConverter::toStream)
+ .completableFuture().join(),
+ flagsUpdateCalculator);
+
+ globalResult = globalResult.keepSuccess().merge(stageResult);
+ }
+
+ LOGGER.error("Can not update following UIDs {} for mailbox {}", \
globalResult.getFailed(), mailboxId.asUuid()); +
+ return globalResult.getSucceeded();
+ }
+
+ private FlagsUpdateStageResult runUpdateStage(CassandraId mailboxId, \
Stream<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator \
flagsUpdateCalculator) { + Long newModSeq = \
modSeqProvider.nextModSeq(mailboxId).join().orElseThrow(() -> new \
RuntimeException("ModSeq generation failed")); +
+ FlagsUpdateStageResult result = toBeUpdated
+ .map(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator,
+ newModSeq,
+ oldMetadata))
+ .reduce(FlagsUpdateStageResult::merge)
+ .orElse(none());
+
+ result.getSucceeded().stream()
+ .map((UpdatedFlags updatedFlags) -> \
indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags) + \
.thenApply(voidValue -> updatedFlags)) + \
.forEach(CompletableFuture::join); +
+ return result;
}
@Override
@@ -329,41 +361,30 @@ public class CassandraMessageMapper implements MessageMapper {
imapUidDAO.insert(composedMessageIdWithMetaData));
}
- private Stream<UpdatedFlags> updateFlagsOnMessage(Mailbox mailbox, \
FlagsUpdateCalculator flagUpdateCalculator, ComposedMessageIdWithMetaData \
composedMessageIdWithMetaData) {
- return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, \
composedMessageIdWithMetaData)
- .map(Stream::of)
- .orElse(handleRetries(mailbox, flagUpdateCalculator, \
composedMessageIdWithMetaData));
- }
- private Optional<UpdatedFlags> tryMessageFlagsUpdate(FlagsUpdateCalculator \
flagUpdateCalculator, Mailbox mailbox, ComposedMessageIdWithMetaData \
oldMetaData) {
- try {
- long oldModSeq = oldMetaData.getModSeq();
- Flags oldFlags = oldMetaData.getFlags();
- Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags);
+ private FlagsUpdateStageResult tryFlagsUpdate(FlagsUpdateCalculator \
flagUpdateCalculator, long newModSeq, ComposedMessageIdWithMetaData oldMetaData) { + \
Flags oldFlags = oldMetaData.getFlags(); + Flags newFlags = \
flagUpdateCalculator.buildNewFlags(oldFlags);
- boolean involveFlagsChanges = !identicalFlags(oldFlags, newFlags);
- long newModSeq = generateNewModSeqIfNeeded(mailbox, oldModSeq, \
involveFlagsChanges);
-
- if (updateFlags(oldMetaData, newFlags, newModSeq)) {
- return Optional.of(UpdatedFlags.builder()
- .uid(oldMetaData.getComposedMessageId().getUid())
- .modSeq(newModSeq)
- .oldFlags(oldFlags)
- .newFlags(newFlags)
- .build());
- } else {
- return Optional.empty();
- }
- } catch (MailboxException e) {
- throw Throwables.propagate(e);
+ if (identicalFlags(oldFlags, newFlags)) {
+ return success(UpdatedFlags.builder()
+ .uid(oldMetaData.getComposedMessageId().getUid())
+ .modSeq(oldMetaData.getModSeq())
+ .oldFlags(oldFlags)
+ .newFlags(newFlags)
+ .build());
}
- }
- private long generateNewModSeqIfNeeded(Mailbox mailbox, long oldModSeq, boolean \
involveFlagsChanges) throws MailboxException {
- if (involveFlagsChanges) {
- return modSeqProvider.nextModSeq(mailboxSession, mailbox);
+ if (updateFlags(oldMetaData, newFlags, newModSeq)) {
+ return success(UpdatedFlags.builder()
+ .uid(oldMetaData.getComposedMessageId().getUid())
+ .modSeq(newModSeq)
+ .oldFlags(oldFlags)
+ .newFlags(newFlags)
+ .build());
+ } else {
+ return fail(oldMetaData.getComposedMessageId().getUid());
}
- return oldModSeq;
}
private boolean identicalFlags(Flags oldFlags, Flags newFlags) {
@@ -385,32 +406,49 @@ public class CassandraMessageMapper implements MessageMapper {
.join();
}
- private Stream<UpdatedFlags> handleRetries(Mailbox mailbox, \
FlagsUpdateCalculator flagUpdateCalculator, ComposedMessageIdWithMetaData \
oldMetaData) {
- try {
- return Stream.of(
- new FunctionRunnerWithRetry(maxRetries)
- .executeAndRetrieveObject(() -> retryMessageFlagsUpdate(mailbox,
- oldMetaData.getComposedMessageId().getMessageId(),
- flagUpdateCalculator)));
- } catch (MessageDeletedDuringFlagsUpdateException e) {
- mailboxSession.getLog().warn(e.getMessage());
- return Stream.of();
- } catch (MailboxDeleteDuringUpdateException e) {
- LOGGER.info("Mailbox {} was deleted during flag update", \
mailbox.getMailboxId());
- return Stream.of();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
+ private static FlagsUpdateStageResult success(UpdatedFlags updatedFlags) {
+ return new FlagsUpdateStageResult(ImmutableList.of(), \
ImmutableList.of(updatedFlags)); }
- private Optional<UpdatedFlags> retryMessageFlagsUpdate(Mailbox mailbox, \
MessageId messageId, FlagsUpdateCalculator flagUpdateCalculator) {
- CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
- ComposedMessageIdWithMetaData composedMessageIdWithMetaData = \
imapUidDAO.retrieve((CassandraMessageId) messageId, \
Optional.of(cassandraId))
- .join()
- .findFirst()
- .orElseThrow(MailboxDeleteDuringUpdateException::new);
- return tryMessageFlagsUpdate(flagUpdateCalculator,
- mailbox,
- composedMessageIdWithMetaData);
+ private static FlagsUpdateStageResult fail(MessageUid uid) {
+ return new FlagsUpdateStageResult(ImmutableList.of(uid), \
ImmutableList.of()); + }
+
+ private static FlagsUpdateStageResult none() {
+ return new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of());
+ }
+
+ private static class FlagsUpdateStageResult {
+ private final List<MessageUid> failed;
+ private final List<UpdatedFlags> succeeded;
+
+ public FlagsUpdateStageResult(List<MessageUid> failed, List<UpdatedFlags> \
succeeded) { + this.failed = failed;
+ this.succeeded = succeeded;
+ }
+
+ public List<MessageUid> getFailed() {
+ return failed;
+ }
+
+ public List<UpdatedFlags> getSucceeded() {
+ return succeeded;
+ }
+
+ public FlagsUpdateStageResult merge(FlagsUpdateStageResult other) {
+ return new FlagsUpdateStageResult(
+ ImmutableList.<MessageUid>builder()
+ .addAll(this.failed)
+ .addAll(other.failed)
+ .build(),
+ ImmutableList.<UpdatedFlags>builder()
+ .addAll(this.succeeded)
+ .addAll(other.succeeded)
+ .build());
+ }
+
+ public FlagsUpdateStageResult keepSuccess() {
+ return new FlagsUpdateStageResult(ImmutableList.of(), succeeded);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic