[prev in list] [next in list] [prev in thread] [next in thread] 

List:       james-dev
Subject:    [1/6] james-project git commit: MAILBOX-297 Staged flags updates
From:       btellier () apache ! org
Date:       2017-05-29 10:14:18
Message-ID: 8e49b519ca1b4f7ba6c73fce741c9e2f () git ! apache ! org
[Download RAW message or body]

Repository: james-project
Updated Branches:
  refs/heads/master d496bb23a -> 09c9d34c6


MAILBOX-297 Staged flags updates


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6f2d4c79
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6f2d4c79
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6f2d4c79

Branch: refs/heads/master
Commit: 6f2d4c79f2aaa4e0acb2b8b59e644eef7e11d37a
Parents: 0535d4b
Author: benwa <btellier@linagora.com>
Authored: Mon May 22 11:53:22 2017 +0700
Committer: benwa <btellier@linagora.com>
Committed: Mon May 29 16:58:49 2017 +0700

----------------------------------------------------------------------
 .../cassandra/mail/CassandraMessageMapper.java  | 176 +++++++++++--------
 1 file changed, 107 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/6f2d4c79/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
                
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java \
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 index 349cd08..7b711bd 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
                
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 @@ -25,25 +25,21 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
 import org.apache.james.mailbox.FlagsBuilder;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.CassandraId;
 import org.apache.james.mailbox.cassandra.CassandraMessageId;
-import org.apache.james.mailbox.cassandra.mail.utils.MessageDeletedDuringFlagsUpdateException;
  import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MailboxCounters;
-import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.MessageMetaData;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.model.UpdatedFlags;
@@ -60,16 +56,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.steveash.guavate.Guavate;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 
 public class CassandraMessageMapper implements MessageMapper {
-    private static final Logger LOGGER = \
                LoggerFactory.getLogger(CassandraMessageMapper.class);
     public static final MailboxCounters INITIAL_COUNTERS =  \
                MailboxCounters.builder()
         .count(0L)
         .unseen(0L)
         .build();
     public static final int EXPUNGE_BATCH_SIZE = 100;
+    public static final Logger LOGGER = \
LoggerFactory.getLogger(CassandraMessageMapper.class);  
     private final CassandraModSeqProvider modSeqProvider;
     private final MailboxSession mailboxSession;
@@ -280,14 +275,51 @@ public class CassandraMessageMapper implements MessageMapper {
     @Override
     public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator \
flagUpdateCalculator, MessageRange set) throws MailboxException {  CassandraId \
                mailboxId = (CassandraId) mailbox.getMailboxId();
-        return messageIdDAO.retrieveMessages(mailboxId, set)
-                .join()
-                .flatMap(message -> updateFlagsOnMessage(mailbox, \
                flagUpdateCalculator, message))
-                .map((UpdatedFlags updatedFlags) -> \
                indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags)
-                    .thenApply(voidValue -> updatedFlags))
-                .map(CompletableFuture::join)
-                .collect(Collectors.toList()) // This collect is here as we need to \
                consume all the stream before returning result
-                .iterator();
+
+        return runUpdate(mailboxId, set, flagUpdateCalculator).iterator();
+    }
+
+    private List<UpdatedFlags> runUpdate(CassandraId mailboxId, MessageRange set, \
FlagsUpdateCalculator flagsUpdateCalculator) throws MailboxException { +        \
Stream<ComposedMessageIdWithMetaData> toBeUpdated = \
messageIdDAO.retrieveMessages(mailboxId, set).join(); +
+        FlagsUpdateStageResult globalResult = runUpdateStage(mailboxId, toBeUpdated, \
flagsUpdateCalculator); +
+        int retryCount = 0;
+
+        while (retryCount < maxRetries && !globalResult.getFailed().isEmpty()) {
+            retryCount++;
+            FlagsUpdateStageResult stageResult = runUpdateStage(mailboxId,
+                FluentFutureStream.of(
+                    globalResult.getFailed().stream()
+                        .map(uid -> messageIdDAO.retrieve(mailboxId, uid)))
+                    .flatMap(OptionalConverter::toStream)
+                    .completableFuture().join(),
+                flagsUpdateCalculator);
+
+            globalResult = globalResult.keepSuccess().merge(stageResult);
+        }
+
+        LOGGER.error("Can not update following UIDs {} for mailbox {}", \
globalResult.getFailed(), mailboxId.asUuid()); +
+        return globalResult.getSucceeded();
+    }
+
+    private FlagsUpdateStageResult runUpdateStage(CassandraId mailboxId, \
Stream<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator \
flagsUpdateCalculator) { +        Long newModSeq = \
modSeqProvider.nextModSeq(mailboxId).join().orElseThrow(() -> new \
RuntimeException("ModSeq generation failed")); +
+        FlagsUpdateStageResult result = toBeUpdated
+            .map(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator,
+                newModSeq,
+                oldMetadata))
+            .reduce(FlagsUpdateStageResult::merge)
+            .orElse(none());
+
+        result.getSucceeded().stream()
+            .map((UpdatedFlags updatedFlags) -> \
indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags) +                \
.thenApply(voidValue -> updatedFlags)) +            \
.forEach(CompletableFuture::join); +
+        return result;
     }
 
     @Override
@@ -329,41 +361,30 @@ public class CassandraMessageMapper implements MessageMapper {
                 imapUidDAO.insert(composedMessageIdWithMetaData));
     }
 
-    private Stream<UpdatedFlags> updateFlagsOnMessage(Mailbox mailbox, \
FlagsUpdateCalculator flagUpdateCalculator, ComposedMessageIdWithMetaData \
                composedMessageIdWithMetaData) {
-        return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, \
                composedMessageIdWithMetaData)
-            .map(Stream::of)
-            .orElse(handleRetries(mailbox, flagUpdateCalculator, \
                composedMessageIdWithMetaData));
-    }
 
-    private Optional<UpdatedFlags> tryMessageFlagsUpdate(FlagsUpdateCalculator \
                flagUpdateCalculator, Mailbox mailbox, ComposedMessageIdWithMetaData \
                oldMetaData) {
-        try {
-            long oldModSeq = oldMetaData.getModSeq();
-            Flags oldFlags = oldMetaData.getFlags();
-            Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags);
+    private FlagsUpdateStageResult tryFlagsUpdate(FlagsUpdateCalculator \
flagUpdateCalculator, long newModSeq, ComposedMessageIdWithMetaData oldMetaData) { +  \
Flags oldFlags = oldMetaData.getFlags(); +        Flags newFlags = \
flagUpdateCalculator.buildNewFlags(oldFlags);  
-            boolean involveFlagsChanges = !identicalFlags(oldFlags, newFlags);
-            long newModSeq = generateNewModSeqIfNeeded(mailbox, oldModSeq, \
                involveFlagsChanges);
-
-            if (updateFlags(oldMetaData, newFlags, newModSeq)) {
-                return Optional.of(UpdatedFlags.builder()
-                    .uid(oldMetaData.getComposedMessageId().getUid())
-                    .modSeq(newModSeq)
-                    .oldFlags(oldFlags)
-                    .newFlags(newFlags)
-                    .build());
-            } else {
-                return Optional.empty();
-            }
-        } catch (MailboxException e) {
-            throw Throwables.propagate(e);
+        if (identicalFlags(oldFlags, newFlags)) {
+            return success(UpdatedFlags.builder()
+                .uid(oldMetaData.getComposedMessageId().getUid())
+                .modSeq(oldMetaData.getModSeq())
+                .oldFlags(oldFlags)
+                .newFlags(newFlags)
+                .build());
         }
-    }
 
-    private long generateNewModSeqIfNeeded(Mailbox mailbox, long oldModSeq, boolean \
                involveFlagsChanges) throws MailboxException {
-        if (involveFlagsChanges) {
-            return modSeqProvider.nextModSeq(mailboxSession, mailbox);
+        if (updateFlags(oldMetaData, newFlags, newModSeq)) {
+            return success(UpdatedFlags.builder()
+                .uid(oldMetaData.getComposedMessageId().getUid())
+                .modSeq(newModSeq)
+                .oldFlags(oldFlags)
+                .newFlags(newFlags)
+                .build());
+        } else {
+            return fail(oldMetaData.getComposedMessageId().getUid());
         }
-        return oldModSeq;
     }
 
     private boolean identicalFlags(Flags oldFlags, Flags newFlags) {
@@ -385,32 +406,49 @@ public class CassandraMessageMapper implements MessageMapper {
             .join();
     }
 
-    private Stream<UpdatedFlags> handleRetries(Mailbox mailbox, \
FlagsUpdateCalculator flagUpdateCalculator, ComposedMessageIdWithMetaData \
                oldMetaData) {
-        try {
-            return Stream.of(
-                new FunctionRunnerWithRetry(maxRetries)
-                    .executeAndRetrieveObject(() -> retryMessageFlagsUpdate(mailbox,
-                            oldMetaData.getComposedMessageId().getMessageId(),
-                            flagUpdateCalculator)));
-        } catch (MessageDeletedDuringFlagsUpdateException e) {
-            mailboxSession.getLog().warn(e.getMessage());
-            return Stream.of();
-        } catch (MailboxDeleteDuringUpdateException e) {
-            LOGGER.info("Mailbox {} was deleted during flag update", \
                mailbox.getMailboxId());
-            return Stream.of();
-        } catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
+    private static FlagsUpdateStageResult success(UpdatedFlags updatedFlags) {
+        return new FlagsUpdateStageResult(ImmutableList.of(), \
ImmutableList.of(updatedFlags));  }
 
-    private Optional<UpdatedFlags> retryMessageFlagsUpdate(Mailbox mailbox, \
                MessageId messageId, FlagsUpdateCalculator flagUpdateCalculator) {
-        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
-        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = \
                imapUidDAO.retrieve((CassandraMessageId) messageId, \
                Optional.of(cassandraId))
-            .join()
-            .findFirst()
-            .orElseThrow(MailboxDeleteDuringUpdateException::new);
-        return tryMessageFlagsUpdate(flagUpdateCalculator,
-                mailbox,
-                composedMessageIdWithMetaData);
+    private static FlagsUpdateStageResult fail(MessageUid uid) {
+        return new FlagsUpdateStageResult(ImmutableList.of(uid), \
ImmutableList.of()); +    }
+
+    private static FlagsUpdateStageResult none() {
+        return new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of());
+    }
+
+    private static class FlagsUpdateStageResult {
+        private final List<MessageUid> failed;
+        private final List<UpdatedFlags> succeeded;
+
+        public FlagsUpdateStageResult(List<MessageUid> failed, List<UpdatedFlags> \
succeeded) { +            this.failed = failed;
+            this.succeeded = succeeded;
+        }
+
+        public List<MessageUid> getFailed() {
+            return failed;
+        }
+
+        public List<UpdatedFlags> getSucceeded() {
+            return succeeded;
+        }
+
+        public FlagsUpdateStageResult merge(FlagsUpdateStageResult other) {
+            return new FlagsUpdateStageResult(
+                ImmutableList.<MessageUid>builder()
+                    .addAll(this.failed)
+                    .addAll(other.failed)
+                    .build(),
+                ImmutableList.<UpdatedFlags>builder()
+                    .addAll(this.succeeded)
+                    .addAll(other.succeeded)
+                    .build());
+        }
+
+        public FlagsUpdateStageResult keepSuccess() {
+            return new FlagsUpdateStageResult(ImmutableList.of(), succeeded);
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic