[prev in list] [next in list] [prev in thread] [next in thread] 

List:       flume-commits
Subject:    git commit: FLUME-2155. Index the Flume Event Queue during replay to improve replay time.
From:       hshreedharan () apache ! org
Date:       2013-12-13 20:37:03
Message-ID: 74e097ded15f4b31b21bc48e10cfbd16 () git ! apache ! org
[Download RAW message or body]

Updated Branches:
  refs/heads/trunk d76118d72 -> 6373032a6


FLUME-2155. Index the Flume Event Queue during replay to improve replay time.

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6373032a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6373032a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6373032a

Branch: refs/heads/trunk
Commit: 6373032a620bdc687b6d03b12726713d08c71a10
Parents: d76118d
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Fri Dec 13 12:35:43 2013 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Dec 13 12:35:43 2013 -0800

----------------------------------------------------------------------
 flume-ng-channels/flume-file-channel/pom.xml    |   5 +
 .../flume/channel/file/CheckpointRebuilder.java |   4 +-
 .../file/EventQueueBackingStoreFile.java        |   6 +-
 .../apache/flume/channel/file/FileChannel.java  |   1 -
 .../flume/channel/file/FlumeEventQueue.java     | 106 +++++++++++++++++--
 .../java/org/apache/flume/channel/file/Log.java |   9 +-
 .../org/apache/flume/channel/file/LogFile.java  |   1 -
 .../flume/channel/file/ReplayHandler.java       |  18 ++--
 .../flume/channel/file/Serialization.java       |   2 +-
 .../flume/channel/file/TestCheckpoint.java      |   8 +-
 .../channel/file/TestCheckpointRebuilder.java   |   3 +-
 .../file/TestEventQueueBackingStoreFactory.java |   4 +-
 .../flume/channel/file/TestFlumeEventQueue.java | 103 ++++++++++++++----
 pom.xml                                         |   6 ++
 14 files changed, 221 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/pom.xml
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/pom.xml \
b/flume-ng-channels/flume-file-channel/pom.xml index e055d8a..eacd329 100644
--- a/flume-ng-channels/flume-file-channel/pom.xml
+++ b/flume-ng-channels/flume-file-channel/pom.xml
@@ -102,6 +102,11 @@
       <scope>compile</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.mapdb</groupId>
+      <artifactId>mapdb</artifactId>
+    </dependency>
+
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
 index 7883d0e..4388181 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
                
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
 @@ -27,7 +27,6 @@ import com.google.common.collect.Sets;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import org.apache.commons.cli.CommandLine;
@@ -251,7 +250,8 @@ public class CheckpointRebuilder {
               capacity, "channel");
       FlumeEventQueue queue = new FlumeEventQueue(backingStore,
               new File(checkpointDir, "inflighttakes"),
-              new File(checkpointDir, "inflightputs"));
+              new File(checkpointDir, "inflightputs"),
+              new File(checkpointDir, Log.QUEUE_SET));
       CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles, queue);
       if(rebuilder.rebuild()) {
         rebuilder.writeCheckpoint();

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-fil \
                e-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
 index 2366cbc..8a9fdae 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
                
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
 @@ -166,7 +166,7 @@ abstract class EventQueueBackingStoreFile extends \
EventQueueBackingStore {  "from the checkpoint directory. Cannot complete backup of \
the " +  "checkpoint.");
     for (File origFile : checkpointFiles) {
-      if(origFile.getName().equals(Log.FILE_LOCK)) {
+      if(Log.EXCLUDES.contains(origFile.getName())) {
         continue;
       }
       Serialization.copyFile(origFile, new File(backupDirectory,
@@ -399,6 +399,7 @@ abstract class EventQueueBackingStoreFile extends \
EventQueueBackingStore {  File file = new File(args[0]);
     File inflightTakesFile = new File(args[1]);
     File inflightPutsFile = new File(args[2]);
+    File queueSetDir = new File(args[3]);
     if (!file.exists()) {
       throw new IOException("File " + file + " does not exist");
     }
@@ -421,7 +422,8 @@ abstract class EventQueueBackingStoreFile extends \
EventQueueBackingStore {  + fileID + ", offset = " + offset);
     }
     FlumeEventQueue queue =
-        new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile);
+        new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile,
+            queueSetDir);
     SetMultimap<Long, Long> putMap = queue.deserializeInflightPuts();
     System.out.println("Inflight Puts:");
 

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
 index 36f150b..2cd7f03 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
                
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
 @@ -100,7 +100,6 @@ public class FileChannel extends BasicChannelSemantics {
   private String encryptionActiveKey;
   private String encryptionCipherProvider;
   private boolean useDualCheckpoints;
-  private boolean isTest = false;
 
   @Override
   public synchronized void setName(String name) {

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
 index ac03fb4..7888b41 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
                
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
 @@ -30,11 +30,15 @@ import java.util.Collection;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.Future;
+
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.ArrayUtils;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.SetMultimap;
@@ -56,16 +60,26 @@ final class FlumeEventQueue {
   private final String channelNameDescriptor;
   private final InflightEventWrapper inflightTakes;
   private final InflightEventWrapper inflightPuts;
+  private long searchTime = 0;
+  private long searchCount = 0;
+  private long copyTime = 0;
+  private long copyCount = 0;
+  private DB db;
+  private Set<Long> queueSet;
 
   /**
    * @param capacity max event capacity of queue
    * @throws IOException
    */
   FlumeEventQueue(EventQueueBackingStore backingStore, File inflightTakesFile,
-          File inflightPutsFile) throws Exception {
+          File inflightPutsFile, File queueSetDBDir) throws Exception {
     Preconditions.checkArgument(backingStore.getCapacity() > 0,
         "Capacity must be greater than zero");
+    Preconditions.checkNotNull(backingStore, "backingStore");
     this.channelNameDescriptor = "[channel=" + backingStore.getName() + "]";
+    Preconditions.checkNotNull(inflightTakesFile, "inflightTakesFile");
+    Preconditions.checkNotNull(inflightPutsFile, "inflightPutsFile");
+    Preconditions.checkNotNull(queueSetDBDir, "queueSetDBDir");
     this.backingStore = backingStore;
     try {
       inflightPuts = new InflightEventWrapper(inflightPutsFile);
@@ -74,6 +88,32 @@ final class FlumeEventQueue {
       LOG.error("Could not read checkpoint.", e);
       throw e;
     }
+    if(queueSetDBDir.isDirectory()) {
+      FileUtils.deleteDirectory(queueSetDBDir);
+    } else if(queueSetDBDir.isFile() && !queueSetDBDir.delete()) {
+      throw new IOException("QueueSetDir " + queueSetDBDir + " is a file and"
+          + " could not be deleted");
+    }
+    if(!queueSetDBDir.mkdirs()) {
+      throw new IllegalStateException("Could not create QueueSet Dir "
+          + queueSetDBDir);
+    }
+    File dbFile = new File(queueSetDBDir, "db");
+    db = DBMaker.newFileDB(dbFile)
+        .closeOnJvmShutdown()
+        .transactionDisable()
+        .syncOnCommitDisable()
+        .deleteFilesAfterClose()
+        .cacheDisable()
+        .randomAccessFileEnableIfNeeded()
+        .make();
+    queueSet = db.createTreeSet("QueueSet").make();
+    long start = System.currentTimeMillis();
+    for (int i = 0; i < backingStore.getSize(); i++) {
+      queueSet.add(get(i));
+    }
+    LOG.info("QueueSet population inserting " + backingStore.getSize()
+        + " took " + (System.currentTimeMillis() - start));
   }
 
   SetMultimap<Long, Long> deserializeInflightPuts()
@@ -182,8 +222,10 @@ final class FlumeEventQueue {
   }
 
   /**
-   * Remove FlumeEventPointer from queue, will normally
-   * only be used when recovering from a crash
+   * Remove FlumeEventPointer from queue, will
+   * only be used when recovering from a crash. It is not
+   * legal to call this method after replayComplete has been
+   * called.
    * @param FlumeEventPointer to be removed
    * @return true if the FlumeEventPointer was found
    * and removed
@@ -191,14 +233,25 @@ final class FlumeEventQueue {
   synchronized boolean remove(FlumeEventPointer e) {
     long value = e.toLong();
     Preconditions.checkArgument(value != EMPTY);
+    if (queueSet == null) {
+     throw new IllegalStateException("QueueSet is null, thus replayComplete"
+         + " has been called which is illegal");
+    }
+    if (!queueSet.contains(value)) {
+      return false;
+    }
+    searchCount++;
+    long start = System.currentTimeMillis();
     for (int i = 0; i < backingStore.getSize(); i++) {
       if(get(i) == value) {
         remove(i, 0);
         FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
         backingStore.decrementFileID(ptr.getFileID());
+        searchTime += System.currentTimeMillis() - start;
         return true;
       }
     }
+    searchTime += System.currentTimeMillis() - start;
     return false;
   }
   /**
@@ -261,6 +314,9 @@ final class FlumeEventQueue {
       }
     }
     set(index, value);
+    if (queueSet != null) {
+      queueSet.add(value);
+    }
     return true;
   }
 
@@ -279,7 +335,12 @@ final class FlumeEventQueue {
       throw new IndexOutOfBoundsException("index = " + index
           + ", queueSize " + backingStore.getSize() +" " + channelNameDescriptor);
     }
+    copyCount++;
+    long start = System.currentTimeMillis();
     long value = get(index);
+    if (queueSet != null) {
+      queueSet.remove(value);
+    }
     //if txn id = 0, we are recovering from a crash.
     if(transactionID != 0) {
       inflightTakes.addEvent(transactionID, value);
@@ -304,10 +365,10 @@ final class FlumeEventQueue {
       }
     }
     backingStore.setSize(backingStore.getSize() - 1);
+    copyTime += System.currentTimeMillis() - start;
     return value;
   }
 
-
   protected synchronized int getSize() {
     return backingStore.getSize() + inflightTakes.getSize();
   }
@@ -321,6 +382,13 @@ final class FlumeEventQueue {
 
   synchronized void close() throws IOException {
     try {
+      if (db != null) {
+        db.close();
+      }
+    } catch(Exception ex) {
+      LOG.warn("Error closing db", ex);
+    }
+    try {
       backingStore.close();
       inflightPuts.close();
       inflightTakes.close();
@@ -328,6 +396,33 @@ final class FlumeEventQueue {
       LOG.warn("Error closing backing store", e);
     }
   }
+
+  /**
+   * Called when ReplayHandler has completed and thus remove(FlumeEventPointer)
+   * will no longer be called.
+   */
+  synchronized void replayComplete() {
+    String msg = "Search Count = " + searchCount + ", Search Time = " +
+        searchTime + ", Copy Count = " + copyCount + ", Copy Time = " +
+        copyTime;
+    LOG.info(msg);
+    if(db != null) {
+      db.close();
+    }
+    queueSet = null;
+    db = null;
+  }
+
+  @VisibleForTesting
+  long getSearchCount() {
+    return searchCount;
+  }
+
+  @VisibleForTesting
+  long getCopyCount() {
+    return copyCount;
+  }
+
   /**
    * A representation of in flight events which have not yet been committed.
    * None of the methods are thread safe, and should be called from thread
@@ -340,7 +435,6 @@ final class FlumeEventQueue {
     private volatile RandomAccessFile file;
     private volatile java.nio.channels.FileChannel fileChannel;
     private final MessageDigest digest;
-    private volatile Future<?> future;
     private final File inflightEventsFile;
     private volatile boolean syncRequired = false;
     private SetMultimap<Long, Integer> inflightFileIDs = HashMultimap.create();

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
 index 8a8cb7f..70106cb 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
                
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
 @@ -78,6 +78,7 @@ public class Log {
   private static final Logger LOGGER = LoggerFactory.getLogger(Log.class);
   private static final int MIN_NUM_LOGS = 2;
   public static final String FILE_LOCK = "in_use.lock";
+  public static final String QUEUE_SET = "queueset";
   // for reader
   private final Map<Integer, LogFile.RandomReader> idLogFileMap = Collections
       .synchronizedMap(new HashMap<Integer, LogFile.RandomReader>());
@@ -103,7 +104,8 @@ public class Log {
   /**
    * Set of files that should be excluded from backup and restores.
    */
-  public static final Set<String> EXCLUDES = Sets.newHashSet(FILE_LOCK);
+  public static final Set<String> EXCLUDES = Sets.newHashSet(FILE_LOCK,
+      QUEUE_SET);
   /**
    * Shared lock
    */
@@ -405,6 +407,7 @@ public class Log {
       }
       File inflightTakesFile = new File(checkpointDir, "inflighttakes");
       File inflightPutsFile = new File(checkpointDir, "inflightputs");
+      File queueSetDir = new File(checkpointDir, QUEUE_SET);
       EventQueueBackingStore backingStore = null;
 
 
@@ -414,7 +417,7 @@ public class Log {
                 backupCheckpointDir, queueCapacity, channelNameDescriptor,
                 true, this.useDualCheckpoints);
         queue = new FlumeEventQueue(backingStore, inflightTakesFile,
-                inflightPutsFile);
+                inflightPutsFile, queueSetDir);
         LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified())
                 + ", queue depth = " + queue.getSize());
 
@@ -450,7 +453,7 @@ public class Log {
             backupCheckpointDir,
             queueCapacity, channelNameDescriptor, true, useDualCheckpoints);
         queue = new FlumeEventQueue(backingStore, inflightTakesFile,
-                inflightPutsFile);
+                inflightPutsFile, queueSetDir);
         // If the checkpoint was deleted due to BadCheckpointException, then
         // trigger fast replay if the channel is configured to.
         shouldFastReplay = this.useFastReplay;

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
 index 62f68c6..26a24b1 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
                
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
 @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import org.apache.flume.ChannelException;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.channel.file.encryption.CipherProvider;

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
 index c8f5fdd..e668c2e 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
                
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
 @@ -26,7 +26,6 @@ import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
 import org.apache.commons.collections.MultiMap;
 import org.apache.commons.collections.map.MultiValueMap;
-import org.apache.flume.ChannelException;
 import org.apache.flume.channel.file.encryption.KeyProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -290,7 +289,9 @@ class ReplayHandler {
             record.getLogWriteOrderID());
         readCount++;
         if(readCount % 10000 == 0 && readCount > 0) {
-          LOG.info("Read " + readCount + " records");
+          LOG.info("read: " + readCount + ", put: " + putCount + ", take: "
+              + takeCount + ", rollback: " + rollbackCount + ", commit: "
+              + commitCount + ", skip: " + skipCount + ", eventCount:" + count);
         }
         if (record.getLogWriteOrderID() > lastCheckpoint) {
           if (type == TransactionEventRecord.Type.PUT.get()) {
@@ -339,6 +340,7 @@ class ReplayHandler {
       LOG.info("read: " + readCount + ", put: " + putCount + ", take: "
           + takeCount + ", rollback: " + rollbackCount + ", commit: "
           + commitCount + ", skip: " + skipCount + ", eventCount:" + count);
+      queue.replayComplete();
     } finally {
       TransactionIDOracle.setSeed(transactionIDSeed);
       WriteOrderOracle.setSeed(writeOrderIDSeed);
@@ -363,15 +365,9 @@ class ReplayHandler {
     count += uncommittedTakes;
     int pendingTakesSize = pendingTakes.size();
     if (pendingTakesSize > 0) {
-      String msg = "Pending takes " + pendingTakesSize
-          + " exist after the end of replay";
-      if (LOG.isDebugEnabled()) {
-        for (Long pointer : pendingTakes) {
-          LOG.debug("Pending take " + FlumeEventPointer.fromLong(pointer));
-        }
-      } else {
-        LOG.error(msg + ". Duplicate messages will exist in destination.");
-      }
+      LOG.info("Pending takes " + pendingTakesSize + " exist after the" +
+          " end of replay. Duplicate messages will exist in" +
+          " destination.");
     }
   }
   private LogRecord next() throws IOException, CorruptEventException {

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
 index f8160d9..d55660d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
                
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
 @@ -98,7 +98,7 @@ public class Serialization {
       builder = new StringBuilder("Deleted the following files: ");
     }
     if(excludes == null) {
-      excludes = Collections.EMPTY_SET;
+      excludes = Collections.emptySet();
     }
     for (File file : files) {
       if(excludes.contains(file.getName())) {

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
 index 1e0230d..c1de12e 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
                
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
 @@ -32,11 +32,13 @@ public class TestCheckpoint {
   File file;
   File inflightPuts;
   File inflightTakes;
+  File queueSet;
   @Before
   public void setup() throws IOException {
     file = File.createTempFile("Checkpoint", "");
     inflightPuts = File.createTempFile("inflightPuts", "");
     inflightTakes = File.createTempFile("inflightTakes", "");
+    queueSet = File.createTempFile("queueset", "");
     Assert.assertTrue(file.isFile());
     Assert.assertTrue(file.canWrite());
   }
@@ -50,14 +52,14 @@ public class TestCheckpoint {
         new EventQueueBackingStoreFileV2(file, 1, "test");
     FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20);
     FlumeEventQueue queueIn = new FlumeEventQueue(backingStore,
-        inflightTakes, inflightPuts);
+        inflightTakes, inflightPuts, queueSet);
     queueIn.addHead(ptrIn);
     FlumeEventQueue queueOut = new FlumeEventQueue(backingStore,
-        inflightTakes, inflightPuts);
+        inflightTakes, inflightPuts, queueSet);
     Assert.assertEquals(0, queueOut.getLogWriteOrderID());
     queueIn.checkpoint(false);
     FlumeEventQueue queueOut2 = new FlumeEventQueue(backingStore,
-        inflightTakes, inflightPuts);
+        inflightTakes, inflightPuts, queueSet);
     FlumeEventPointer ptrOut = queueOut2.removeHead(0L);
     Assert.assertEquals(ptrIn, ptrOut);
     Assert.assertTrue(queueOut2.getLogWriteOrderID() > 0);

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-fil \
                e-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
 index 536af54..621d445 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
                
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
 @@ -63,6 +63,7 @@ public class TestCheckpointRebuilder extends TestFileChannelBase {
     File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
     File inflightTakesFile = new File(checkpointDir, "inflighttakes");
     File inflightPutsFile = new File(checkpointDir, "inflightputs");
+    File queueSetDir = new File(checkpointDir, "queueset");
     Assert.assertTrue(checkpointFile.delete());
     Assert.assertTrue(metaDataFile.delete());
     Assert.assertTrue(inflightTakesFile.delete());
@@ -71,7 +72,7 @@ public class TestCheckpointRebuilder extends TestFileChannelBase {
         EventQueueBackingStoreFactory.get(checkpointFile, 50,
             "test");
     FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakesFile,
-          inflightPutsFile);
+          inflightPutsFile, queueSetDir);
     CheckpointRebuilder checkpointRebuilder =
         new CheckpointRebuilder(getAllLogs(dataDirs), queue);
     Assert.assertTrue(checkpointRebuilder.rebuild());

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-fil \
e-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
 index dfb3bf9..52c706d 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
                
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
 @@ -58,12 +58,14 @@ public class TestEventQueueBackingStoreFactory {
   File checkpoint;
   File inflightTakes;
   File inflightPuts;
+  File queueSetDir;
   @Before
   public void setup() throws IOException {
     baseDir = Files.createTempDir();
     checkpoint = new File(baseDir, "checkpoint");
     inflightTakes = new File(baseDir, "takes");
     inflightPuts = new File(baseDir, "puts");
+    queueSetDir = new File(baseDir, "queueset");
     TestUtils.copyDecompressed("fileformat-v2-checkpoint.gz", checkpoint);
 
   }
@@ -275,7 +277,7 @@ public class TestEventQueueBackingStoreFactory {
       List<Long> expectedPointers)
       throws Exception {
     FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakes,
-        inflightPuts);
+        inflightPuts, queueSetDir);
     List<Long> actualPointers = Lists.newArrayList();
     FlumeEventPointer ptr;
     while((ptr = queue.removeHead(0L)) != null) {

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
                
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
 index 203cbf2..1adb21a 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
                
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
 @@ -44,6 +44,7 @@ import java.io.RandomAccessFile;
 public class TestFlumeEventQueue {
   FlumeEventPointer pointer1 = new FlumeEventPointer(1, 1);
   FlumeEventPointer pointer2 = new FlumeEventPointer(2, 2);
+  FlumeEventPointer pointer3 = new FlumeEventPointer(3, 3);
   FlumeEventQueue queue;
   EventQueueBackingStoreSupplier backingStoreSupplier;
   EventQueueBackingStore backingStore;
@@ -53,11 +54,13 @@ public class TestFlumeEventQueue {
     File checkpoint;
     File inflightTakes;
     File inflightPuts;
+    File queueSetDir;
     EventQueueBackingStoreSupplier() {
       baseDir = Files.createTempDir();
       checkpoint = new File(baseDir, "checkpoint");
       inflightTakes = new File(baseDir, "inflightputs");
       inflightPuts =  new File(baseDir, "inflighttakes");
+      queueSetDir =  new File(baseDir, "queueset");
     }
     File getCheckpoint() {
       return checkpoint;
@@ -68,6 +71,9 @@ public class TestFlumeEventQueue {
     File getInflightTakes() {
       return inflightTakes;
     }
+    File getQueueSetDir() {
+      return queueSetDir;
+    }
     void delete() {
       FileUtils.deleteQuietly(baseDir);
     }
@@ -120,7 +126,8 @@ public class TestFlumeEventQueue {
     backingStore = new EventQueueBackingStoreFileV2(checkpoint, 1, "test");
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertFalse(queue.addTail(pointer2));
   }
@@ -132,7 +139,8 @@ public class TestFlumeEventQueue {
     backingStore = new EventQueueBackingStoreFileV2(checkpoint, 0, "test");
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
   }
   @Test(expected=IllegalArgumentException.class)
   public void testInvalidCapacityNegative() throws Exception {
@@ -142,20 +150,23 @@ public class TestFlumeEventQueue {
     backingStore = new EventQueueBackingStoreFileV2(checkpoint, -1, "test");
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
   }
   @Test
   public void testQueueIsEmptyAfterCreation() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     Assert.assertNull(queue.removeHead(0L));
   }
   @Test
   public void addTail1() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertEquals(pointer1, queue.removeHead(0));
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
@@ -164,7 +175,8 @@ public class TestFlumeEventQueue {
   public void addTail2() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertTrue(queue.addTail(pointer2));
     Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
@@ -175,7 +187,8 @@ public class TestFlumeEventQueue {
   public void addTailLarge() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     int size = 500;
     Set<Integer> fileIDs = Sets.newHashSet();
     for (int i = 1; i <= size; i++) {
@@ -194,7 +207,8 @@ public class TestFlumeEventQueue {
   public void addHead1() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
     Assert.assertEquals(pointer1, queue.removeHead(0));
@@ -204,7 +218,9 @@ public class TestFlumeEventQueue {
   public void addHead2() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
+    queue.replayComplete();
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
     Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
@@ -215,7 +231,9 @@ public class TestFlumeEventQueue {
   public void addHeadLarge() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
+    queue.replayComplete();
     int size = 500;
     Set<Integer> fileIDs = Sets.newHashSet();
     for (int i = 1; i <= size; i++) {
@@ -234,10 +252,12 @@ public class TestFlumeEventQueue {
   public void addTailRemove1() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
     Assert.assertTrue(queue.remove(pointer1));
+    queue.replayComplete();
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
     Assert.assertNull(queue.removeHead(0));
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
@@ -247,10 +267,12 @@ public class TestFlumeEventQueue {
   public void addTailRemove2() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertTrue(queue.addTail(pointer2));
     Assert.assertTrue(queue.remove(pointer1));
+    queue.replayComplete();
     Assert.assertEquals(pointer2, queue.removeHead(0));
   }
 
@@ -258,7 +280,8 @@ public class TestFlumeEventQueue {
   public void addHeadRemove1() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     queue.addHead(pointer1);
     Assert.assertTrue(queue.remove(pointer1));
     Assert.assertNull(queue.removeHead(0));
@@ -267,17 +290,43 @@ public class TestFlumeEventQueue {
   public void addHeadRemove2() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
     Assert.assertTrue(queue.remove(pointer1));
+    queue.replayComplete();
     Assert.assertEquals(pointer2, queue.removeHead(0));
   }
   @Test
+  public void testUnknownPointerDoesNotCauseSearch() throws Exception {
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
+    Assert.assertTrue(queue.addHead(pointer1));
+    Assert.assertTrue(queue.addHead(pointer2));
+    Assert.assertFalse(queue.remove(pointer3)); // does search
+    Assert.assertTrue(queue.remove(pointer1));
+    Assert.assertTrue(queue.remove(pointer2));
+    queue.replayComplete();
+    Assert.assertEquals(2, queue.getSearchCount());
+  }
+  @Test(expected=IllegalStateException.class)
+  public void testRemoveAfterReplayComplete() throws Exception {
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
+    queue.replayComplete();
+    queue.remove(pointer1);
+  }
+  @Test
   public void testWrappingCorrectly() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     int size = Integer.MAX_VALUE;
     for (int i = 1; i <= size; i++) {
       if(!queue.addHead(new FlumeEventPointer(i, i))) {
@@ -299,7 +348,8 @@ public class TestFlumeEventQueue {
   public void testInflightPuts() throws Exception{
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
     long txnID2 = txnID1 + 1;
     queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1);
@@ -309,7 +359,8 @@ public class TestFlumeEventQueue {
     TimeUnit.SECONDS.sleep(3L);
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts();
     Assert.assertTrue(deserializedMap.get(
             txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
@@ -323,7 +374,8 @@ public class TestFlumeEventQueue {
   public void testInflightTakes() throws Exception {
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
     long txnID2 = txnID1 + 1;
     queue.addTail(new FlumeEventPointer(1, 1));
@@ -336,7 +388,8 @@ public class TestFlumeEventQueue {
     TimeUnit.SECONDS.sleep(3L);
     queue = new FlumeEventQueue(backingStore,
         backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts());
+        backingStoreSupplier.getInflightPuts(),
+        backingStoreSupplier.getQueueSetDir());
     SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightTakes();
     Assert.assertTrue(deserializedMap.get(
             txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
@@ -353,7 +406,8 @@ public class TestFlumeEventQueue {
     try {
       queue = new FlumeEventQueue(backingStore,
               backingStoreSupplier.getInflightTakes(),
-              backingStoreSupplier.getInflightPuts());
+              backingStoreSupplier.getInflightPuts(),
+              backingStoreSupplier.getQueueSetDir());
       long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
       long txnID2 = txnID1 + 1;
       queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1);
@@ -367,7 +421,8 @@ public class TestFlumeEventQueue {
       inflight.writeInt(new Random().nextInt());
       queue = new FlumeEventQueue(backingStore,
               backingStoreSupplier.getInflightTakes(),
-              backingStoreSupplier.getInflightPuts());
+              backingStoreSupplier.getInflightPuts(),
+              backingStoreSupplier.getQueueSetDir());
       SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts();
       Assert.assertTrue(deserializedMap.get(
               txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
@@ -386,7 +441,8 @@ public class TestFlumeEventQueue {
     try {
       queue = new FlumeEventQueue(backingStore,
               backingStoreSupplier.getInflightTakes(),
-              backingStoreSupplier.getInflightPuts());
+              backingStoreSupplier.getInflightPuts(),
+              backingStoreSupplier.getQueueSetDir());
       long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
       long txnID2 = txnID1 + 1;
       queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1);
@@ -400,7 +456,8 @@ public class TestFlumeEventQueue {
       inflight.writeInt(new Random().nextInt());
       queue = new FlumeEventQueue(backingStore,
               backingStoreSupplier.getInflightTakes(),
-              backingStoreSupplier.getInflightPuts());
+              backingStoreSupplier.getInflightPuts(),
+              backingStoreSupplier.getQueueSetDir());
       SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightTakes();
       Assert.assertTrue(deserializedMap.get(
               txnID1).contains(new FlumeEventPointer(1, 1).toLong()));

http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 267925f..77b7f22 100644
--- a/pom.xml
+++ b/pom.xml
@@ -774,6 +774,12 @@ limitations under the License.
         <version>4.2.1</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.mapdb</groupId>
+        <artifactId>mapdb</artifactId>
+        <version>0.9.7</version>
+      </dependency>
+
       <!--  Gson: Java to Json conversion -->
       <dependency>
         <groupId>com.google.code.gson</groupId>


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic