[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: git commit: FLUME-2155. Index the Flume Event Queue during replay to improve replay time.
From: hshreedharan () apache ! org
Date: 2013-12-13 20:37:03
Message-ID: 74e097ded15f4b31b21bc48e10cfbd16 () git ! apache ! org
[Download RAW message or body]
Updated Branches:
refs/heads/trunk d76118d72 -> 6373032a6
FLUME-2155. Index the Flume Event Queue during replay to improve replay time.
(Brock Noland via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6373032a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6373032a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6373032a
Branch: refs/heads/trunk
Commit: 6373032a620bdc687b6d03b12726713d08c71a10
Parents: d76118d
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Fri Dec 13 12:35:43 2013 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Dec 13 12:35:43 2013 -0800
----------------------------------------------------------------------
flume-ng-channels/flume-file-channel/pom.xml | 5 +
.../flume/channel/file/CheckpointRebuilder.java | 4 +-
.../file/EventQueueBackingStoreFile.java | 6 +-
.../apache/flume/channel/file/FileChannel.java | 1 -
.../flume/channel/file/FlumeEventQueue.java | 106 +++++++++++++++++--
.../java/org/apache/flume/channel/file/Log.java | 9 +-
.../org/apache/flume/channel/file/LogFile.java | 1 -
.../flume/channel/file/ReplayHandler.java | 18 ++--
.../flume/channel/file/Serialization.java | 2 +-
.../flume/channel/file/TestCheckpoint.java | 8 +-
.../channel/file/TestCheckpointRebuilder.java | 3 +-
.../file/TestEventQueueBackingStoreFactory.java | 4 +-
.../flume/channel/file/TestFlumeEventQueue.java | 103 ++++++++++++++----
pom.xml | 6 ++
14 files changed, 221 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/pom.xml \
b/flume-ng-channels/flume-file-channel/pom.xml index e055d8a..eacd329 100644
--- a/flume-ng-channels/flume-file-channel/pom.xml
+++ b/flume-ng-channels/flume-file-channel/pom.xml
@@ -102,6 +102,11 @@
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.mapdb</groupId>
+ <artifactId>mapdb</artifactId>
+ </dependency>
+
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
index 7883d0e..4388181 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
@@ -27,7 +27,6 @@ import com.google.common.collect.Sets;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
@@ -251,7 +250,8 @@ public class CheckpointRebuilder {
capacity, "channel");
FlumeEventQueue queue = new FlumeEventQueue(backingStore,
new File(checkpointDir, "inflighttakes"),
- new File(checkpointDir, "inflightputs"));
+ new File(checkpointDir, "inflightputs"),
+ new File(checkpointDir, Log.QUEUE_SET));
CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles, queue);
if(rebuilder.rebuild()) {
rebuilder.writeCheckpoint();
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-fil \
e-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
index 2366cbc..8a9fdae 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -166,7 +166,7 @@ abstract class EventQueueBackingStoreFile extends \
EventQueueBackingStore { "from the checkpoint directory. Cannot complete backup of \
the " + "checkpoint.");
for (File origFile : checkpointFiles) {
- if(origFile.getName().equals(Log.FILE_LOCK)) {
+ if(Log.EXCLUDES.contains(origFile.getName())) {
continue;
}
Serialization.copyFile(origFile, new File(backupDirectory,
@@ -399,6 +399,7 @@ abstract class EventQueueBackingStoreFile extends \
EventQueueBackingStore { File file = new File(args[0]);
File inflightTakesFile = new File(args[1]);
File inflightPutsFile = new File(args[2]);
+ File queueSetDir = new File(args[3]);
if (!file.exists()) {
throw new IOException("File " + file + " does not exist");
}
@@ -421,7 +422,8 @@ abstract class EventQueueBackingStoreFile extends \
EventQueueBackingStore { + fileID + ", offset = " + offset);
}
FlumeEventQueue queue =
- new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile);
+ new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile,
+ queueSetDir);
SetMultimap<Long, Long> putMap = queue.deserializeInflightPuts();
System.out.println("Inflight Puts:");
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index 36f150b..2cd7f03 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -100,7 +100,6 @@ public class FileChannel extends BasicChannelSemantics {
private String encryptionActiveKey;
private String encryptionCipherProvider;
private boolean useDualCheckpoints;
- private boolean isTest = false;
@Override
public synchronized void setName(String name) {
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index ac03fb4..7888b41 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -30,11 +30,15 @@ import java.util.Collection;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.concurrent.Future;
+
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.ArrayUtils;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
@@ -56,16 +60,26 @@ final class FlumeEventQueue {
private final String channelNameDescriptor;
private final InflightEventWrapper inflightTakes;
private final InflightEventWrapper inflightPuts;
+ private long searchTime = 0;
+ private long searchCount = 0;
+ private long copyTime = 0;
+ private long copyCount = 0;
+ private DB db;
+ private Set<Long> queueSet;
/**
* @param capacity max event capacity of queue
* @throws IOException
*/
FlumeEventQueue(EventQueueBackingStore backingStore, File inflightTakesFile,
- File inflightPutsFile) throws Exception {
+ File inflightPutsFile, File queueSetDBDir) throws Exception {
Preconditions.checkArgument(backingStore.getCapacity() > 0,
"Capacity must be greater than zero");
+ Preconditions.checkNotNull(backingStore, "backingStore");
this.channelNameDescriptor = "[channel=" + backingStore.getName() + "]";
+ Preconditions.checkNotNull(inflightTakesFile, "inflightTakesFile");
+ Preconditions.checkNotNull(inflightPutsFile, "inflightPutsFile");
+ Preconditions.checkNotNull(queueSetDBDir, "queueSetDBDir");
this.backingStore = backingStore;
try {
inflightPuts = new InflightEventWrapper(inflightPutsFile);
@@ -74,6 +88,32 @@ final class FlumeEventQueue {
LOG.error("Could not read checkpoint.", e);
throw e;
}
+ if(queueSetDBDir.isDirectory()) {
+ FileUtils.deleteDirectory(queueSetDBDir);
+ } else if(queueSetDBDir.isFile() && !queueSetDBDir.delete()) {
+ throw new IOException("QueueSetDir " + queueSetDBDir + " is a file and"
+ + " could not be deleted");
+ }
+ if(!queueSetDBDir.mkdirs()) {
+ throw new IllegalStateException("Could not create QueueSet Dir "
+ + queueSetDBDir);
+ }
+ File dbFile = new File(queueSetDBDir, "db");
+ db = DBMaker.newFileDB(dbFile)
+ .closeOnJvmShutdown()
+ .transactionDisable()
+ .syncOnCommitDisable()
+ .deleteFilesAfterClose()
+ .cacheDisable()
+ .randomAccessFileEnableIfNeeded()
+ .make();
+ queueSet = db.createTreeSet("QueueSet").make();
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < backingStore.getSize(); i++) {
+ queueSet.add(get(i));
+ }
+ LOG.info("QueueSet population inserting " + backingStore.getSize()
+ + " took " + (System.currentTimeMillis() - start));
}
SetMultimap<Long, Long> deserializeInflightPuts()
@@ -182,8 +222,10 @@ final class FlumeEventQueue {
}
/**
- * Remove FlumeEventPointer from queue, will normally
- * only be used when recovering from a crash
+ * Remove FlumeEventPointer from queue, will
+ * only be used when recovering from a crash. It is not
+ * legal to call this method after replayComplete has been
+ * called.
* @param FlumeEventPointer to be removed
* @return true if the FlumeEventPointer was found
* and removed
@@ -191,14 +233,25 @@ final class FlumeEventQueue {
synchronized boolean remove(FlumeEventPointer e) {
long value = e.toLong();
Preconditions.checkArgument(value != EMPTY);
+ if (queueSet == null) {
+ throw new IllegalStateException("QueueSet is null, thus replayComplete"
+ + " has been called which is illegal");
+ }
+ if (!queueSet.contains(value)) {
+ return false;
+ }
+ searchCount++;
+ long start = System.currentTimeMillis();
for (int i = 0; i < backingStore.getSize(); i++) {
if(get(i) == value) {
remove(i, 0);
FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
backingStore.decrementFileID(ptr.getFileID());
+ searchTime += System.currentTimeMillis() - start;
return true;
}
}
+ searchTime += System.currentTimeMillis() - start;
return false;
}
/**
@@ -261,6 +314,9 @@ final class FlumeEventQueue {
}
}
set(index, value);
+ if (queueSet != null) {
+ queueSet.add(value);
+ }
return true;
}
@@ -279,7 +335,12 @@ final class FlumeEventQueue {
throw new IndexOutOfBoundsException("index = " + index
+ ", queueSize " + backingStore.getSize() +" " + channelNameDescriptor);
}
+ copyCount++;
+ long start = System.currentTimeMillis();
long value = get(index);
+ if (queueSet != null) {
+ queueSet.remove(value);
+ }
//if txn id = 0, we are recovering from a crash.
if(transactionID != 0) {
inflightTakes.addEvent(transactionID, value);
@@ -304,10 +365,10 @@ final class FlumeEventQueue {
}
}
backingStore.setSize(backingStore.getSize() - 1);
+ copyTime += System.currentTimeMillis() - start;
return value;
}
-
protected synchronized int getSize() {
return backingStore.getSize() + inflightTakes.getSize();
}
@@ -321,6 +382,13 @@ final class FlumeEventQueue {
synchronized void close() throws IOException {
try {
+ if (db != null) {
+ db.close();
+ }
+ } catch(Exception ex) {
+ LOG.warn("Error closing db", ex);
+ }
+ try {
backingStore.close();
inflightPuts.close();
inflightTakes.close();
@@ -328,6 +396,33 @@ final class FlumeEventQueue {
LOG.warn("Error closing backing store", e);
}
}
+
+ /**
+ * Called when ReplayHandler has completed and thus remove(FlumeEventPointer)
+ * will no longer be called.
+ */
+ synchronized void replayComplete() {
+ String msg = "Search Count = " + searchCount + ", Search Time = " +
+ searchTime + ", Copy Count = " + copyCount + ", Copy Time = " +
+ copyTime;
+ LOG.info(msg);
+ if(db != null) {
+ db.close();
+ }
+ queueSet = null;
+ db = null;
+ }
+
+ @VisibleForTesting
+ long getSearchCount() {
+ return searchCount;
+ }
+
+ @VisibleForTesting
+ long getCopyCount() {
+ return copyCount;
+ }
+
/**
* A representation of in flight events which have not yet been committed.
* None of the methods are thread safe, and should be called from thread
@@ -340,7 +435,6 @@ final class FlumeEventQueue {
private volatile RandomAccessFile file;
private volatile java.nio.channels.FileChannel fileChannel;
private final MessageDigest digest;
- private volatile Future<?> future;
private final File inflightEventsFile;
private volatile boolean syncRequired = false;
private SetMultimap<Long, Integer> inflightFileIDs = HashMultimap.create();
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 8a8cb7f..70106cb 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -78,6 +78,7 @@ public class Log {
private static final Logger LOGGER = LoggerFactory.getLogger(Log.class);
private static final int MIN_NUM_LOGS = 2;
public static final String FILE_LOCK = "in_use.lock";
+ public static final String QUEUE_SET = "queueset";
// for reader
private final Map<Integer, LogFile.RandomReader> idLogFileMap = Collections
.synchronizedMap(new HashMap<Integer, LogFile.RandomReader>());
@@ -103,7 +104,8 @@ public class Log {
/**
* Set of files that should be excluded from backup and restores.
*/
- public static final Set<String> EXCLUDES = Sets.newHashSet(FILE_LOCK);
+ public static final Set<String> EXCLUDES = Sets.newHashSet(FILE_LOCK,
+ QUEUE_SET);
/**
* Shared lock
*/
@@ -405,6 +407,7 @@ public class Log {
}
File inflightTakesFile = new File(checkpointDir, "inflighttakes");
File inflightPutsFile = new File(checkpointDir, "inflightputs");
+ File queueSetDir = new File(checkpointDir, QUEUE_SET);
EventQueueBackingStore backingStore = null;
@@ -414,7 +417,7 @@ public class Log {
backupCheckpointDir, queueCapacity, channelNameDescriptor,
true, this.useDualCheckpoints);
queue = new FlumeEventQueue(backingStore, inflightTakesFile,
- inflightPutsFile);
+ inflightPutsFile, queueSetDir);
LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified())
+ ", queue depth = " + queue.getSize());
@@ -450,7 +453,7 @@ public class Log {
backupCheckpointDir,
queueCapacity, channelNameDescriptor, true, useDualCheckpoints);
queue = new FlumeEventQueue(backingStore, inflightTakesFile,
- inflightPutsFile);
+ inflightPutsFile, queueSetDir);
// If the checkpoint was deleted due to BadCheckpointException, then
// trigger fast replay if the channel is configured to.
shouldFastReplay = this.useFastReplay;
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index 62f68c6..26a24b1 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.apache.flume.ChannelException;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.channel.file.encryption.CipherProvider;
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
index c8f5fdd..e668c2e 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -26,7 +26,6 @@ import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import org.apache.commons.collections.MultiMap;
import org.apache.commons.collections.map.MultiValueMap;
-import org.apache.flume.ChannelException;
import org.apache.flume.channel.file.encryption.KeyProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -290,7 +289,9 @@ class ReplayHandler {
record.getLogWriteOrderID());
readCount++;
if(readCount % 10000 == 0 && readCount > 0) {
- LOG.info("Read " + readCount + " records");
+ LOG.info("read: " + readCount + ", put: " + putCount + ", take: "
+ + takeCount + ", rollback: " + rollbackCount + ", commit: "
+ + commitCount + ", skip: " + skipCount + ", eventCount:" + count);
}
if (record.getLogWriteOrderID() > lastCheckpoint) {
if (type == TransactionEventRecord.Type.PUT.get()) {
@@ -339,6 +340,7 @@ class ReplayHandler {
LOG.info("read: " + readCount + ", put: " + putCount + ", take: "
+ takeCount + ", rollback: " + rollbackCount + ", commit: "
+ commitCount + ", skip: " + skipCount + ", eventCount:" + count);
+ queue.replayComplete();
} finally {
TransactionIDOracle.setSeed(transactionIDSeed);
WriteOrderOracle.setSeed(writeOrderIDSeed);
@@ -363,15 +365,9 @@ class ReplayHandler {
count += uncommittedTakes;
int pendingTakesSize = pendingTakes.size();
if (pendingTakesSize > 0) {
- String msg = "Pending takes " + pendingTakesSize
- + " exist after the end of replay";
- if (LOG.isDebugEnabled()) {
- for (Long pointer : pendingTakes) {
- LOG.debug("Pending take " + FlumeEventPointer.fromLong(pointer));
- }
- } else {
- LOG.error(msg + ". Duplicate messages will exist in destination.");
- }
+ LOG.info("Pending takes " + pendingTakesSize + " exist after the" +
+ " end of replay. Duplicate messages will exist in" +
+ " destination.");
}
}
private LogRecord next() throws IOException, CorruptEventException {
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
index f8160d9..d55660d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -98,7 +98,7 @@ public class Serialization {
builder = new StringBuilder("Deleted the following files: ");
}
if(excludes == null) {
- excludes = Collections.EMPTY_SET;
+ excludes = Collections.emptySet();
}
for (File file : files) {
if(excludes.contains(file.getName())) {
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
index 1e0230d..c1de12e 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
@@ -32,11 +32,13 @@ public class TestCheckpoint {
File file;
File inflightPuts;
File inflightTakes;
+ File queueSet;
@Before
public void setup() throws IOException {
file = File.createTempFile("Checkpoint", "");
inflightPuts = File.createTempFile("inflightPuts", "");
inflightTakes = File.createTempFile("inflightTakes", "");
+ queueSet = File.createTempFile("queueset", "");
Assert.assertTrue(file.isFile());
Assert.assertTrue(file.canWrite());
}
@@ -50,14 +52,14 @@ public class TestCheckpoint {
new EventQueueBackingStoreFileV2(file, 1, "test");
FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20);
FlumeEventQueue queueIn = new FlumeEventQueue(backingStore,
- inflightTakes, inflightPuts);
+ inflightTakes, inflightPuts, queueSet);
queueIn.addHead(ptrIn);
FlumeEventQueue queueOut = new FlumeEventQueue(backingStore,
- inflightTakes, inflightPuts);
+ inflightTakes, inflightPuts, queueSet);
Assert.assertEquals(0, queueOut.getLogWriteOrderID());
queueIn.checkpoint(false);
FlumeEventQueue queueOut2 = new FlumeEventQueue(backingStore,
- inflightTakes, inflightPuts);
+ inflightTakes, inflightPuts, queueSet);
FlumeEventPointer ptrOut = queueOut2.removeHead(0L);
Assert.assertEquals(ptrIn, ptrOut);
Assert.assertTrue(queueOut2.getLogWriteOrderID() > 0);
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-fil \
e-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
index 536af54..621d445 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
@@ -63,6 +63,7 @@ public class TestCheckpointRebuilder extends TestFileChannelBase {
File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
File inflightTakesFile = new File(checkpointDir, "inflighttakes");
File inflightPutsFile = new File(checkpointDir, "inflightputs");
+ File queueSetDir = new File(checkpointDir, "queueset");
Assert.assertTrue(checkpointFile.delete());
Assert.assertTrue(metaDataFile.delete());
Assert.assertTrue(inflightTakesFile.delete());
@@ -71,7 +72,7 @@ public class TestCheckpointRebuilder extends TestFileChannelBase {
EventQueueBackingStoreFactory.get(checkpointFile, 50,
"test");
FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakesFile,
- inflightPutsFile);
+ inflightPutsFile, queueSetDir);
CheckpointRebuilder checkpointRebuilder =
new CheckpointRebuilder(getAllLogs(dataDirs), queue);
Assert.assertTrue(checkpointRebuilder.rebuild());
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-fil \
e-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
index dfb3bf9..52c706d 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
@@ -58,12 +58,14 @@ public class TestEventQueueBackingStoreFactory {
File checkpoint;
File inflightTakes;
File inflightPuts;
+ File queueSetDir;
@Before
public void setup() throws IOException {
baseDir = Files.createTempDir();
checkpoint = new File(baseDir, "checkpoint");
inflightTakes = new File(baseDir, "takes");
inflightPuts = new File(baseDir, "puts");
+ queueSetDir = new File(baseDir, "queueset");
TestUtils.copyDecompressed("fileformat-v2-checkpoint.gz", checkpoint);
}
@@ -275,7 +277,7 @@ public class TestEventQueueBackingStoreFactory {
List<Long> expectedPointers)
throws Exception {
FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakes,
- inflightPuts);
+ inflightPuts, queueSetDir);
List<Long> actualPointers = Lists.newArrayList();
FlumeEventPointer ptr;
while((ptr = queue.removeHead(0L)) != null) {
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
index 203cbf2..1adb21a 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
@@ -44,6 +44,7 @@ import java.io.RandomAccessFile;
public class TestFlumeEventQueue {
FlumeEventPointer pointer1 = new FlumeEventPointer(1, 1);
FlumeEventPointer pointer2 = new FlumeEventPointer(2, 2);
+ FlumeEventPointer pointer3 = new FlumeEventPointer(3, 3);
FlumeEventQueue queue;
EventQueueBackingStoreSupplier backingStoreSupplier;
EventQueueBackingStore backingStore;
@@ -53,11 +54,13 @@ public class TestFlumeEventQueue {
File checkpoint;
File inflightTakes;
File inflightPuts;
+ File queueSetDir;
EventQueueBackingStoreSupplier() {
baseDir = Files.createTempDir();
checkpoint = new File(baseDir, "checkpoint");
inflightTakes = new File(baseDir, "inflightputs");
inflightPuts = new File(baseDir, "inflighttakes");
+ queueSetDir = new File(baseDir, "queueset");
}
File getCheckpoint() {
return checkpoint;
@@ -68,6 +71,9 @@ public class TestFlumeEventQueue {
File getInflightTakes() {
return inflightTakes;
}
+ File getQueueSetDir() {
+ return queueSetDir;
+ }
void delete() {
FileUtils.deleteQuietly(baseDir);
}
@@ -120,7 +126,8 @@ public class TestFlumeEventQueue {
backingStore = new EventQueueBackingStoreFileV2(checkpoint, 1, "test");
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addTail(pointer1));
Assert.assertFalse(queue.addTail(pointer2));
}
@@ -132,7 +139,8 @@ public class TestFlumeEventQueue {
backingStore = new EventQueueBackingStoreFileV2(checkpoint, 0, "test");
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
}
@Test(expected=IllegalArgumentException.class)
public void testInvalidCapacityNegative() throws Exception {
@@ -142,20 +150,23 @@ public class TestFlumeEventQueue {
backingStore = new EventQueueBackingStoreFileV2(checkpoint, -1, "test");
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
}
@Test
public void testQueueIsEmptyAfterCreation() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertNull(queue.removeHead(0L));
}
@Test
public void addTail1() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addTail(pointer1));
Assert.assertEquals(pointer1, queue.removeHead(0));
Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
@@ -164,7 +175,8 @@ public class TestFlumeEventQueue {
public void addTail2() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addTail(pointer1));
Assert.assertTrue(queue.addTail(pointer2));
Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
@@ -175,7 +187,8 @@ public class TestFlumeEventQueue {
public void addTailLarge() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
int size = 500;
Set<Integer> fileIDs = Sets.newHashSet();
for (int i = 1; i <= size; i++) {
@@ -194,7 +207,8 @@ public class TestFlumeEventQueue {
public void addHead1() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addHead(pointer1));
Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
Assert.assertEquals(pointer1, queue.removeHead(0));
@@ -204,7 +218,9 @@ public class TestFlumeEventQueue {
public void addHead2() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
+ queue.replayComplete();
Assert.assertTrue(queue.addHead(pointer1));
Assert.assertTrue(queue.addHead(pointer2));
Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
@@ -215,7 +231,9 @@ public class TestFlumeEventQueue {
public void addHeadLarge() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
+ queue.replayComplete();
int size = 500;
Set<Integer> fileIDs = Sets.newHashSet();
for (int i = 1; i <= size; i++) {
@@ -234,10 +252,12 @@ public class TestFlumeEventQueue {
public void addTailRemove1() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addTail(pointer1));
Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
Assert.assertTrue(queue.remove(pointer1));
+ queue.replayComplete();
Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
Assert.assertNull(queue.removeHead(0));
Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
@@ -247,10 +267,12 @@ public class TestFlumeEventQueue {
public void addTailRemove2() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addTail(pointer1));
Assert.assertTrue(queue.addTail(pointer2));
Assert.assertTrue(queue.remove(pointer1));
+ queue.replayComplete();
Assert.assertEquals(pointer2, queue.removeHead(0));
}
@@ -258,7 +280,8 @@ public class TestFlumeEventQueue {
public void addHeadRemove1() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
queue.addHead(pointer1);
Assert.assertTrue(queue.remove(pointer1));
Assert.assertNull(queue.removeHead(0));
@@ -267,17 +290,43 @@ public class TestFlumeEventQueue {
public void addHeadRemove2() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
Assert.assertTrue(queue.addHead(pointer1));
Assert.assertTrue(queue.addHead(pointer2));
Assert.assertTrue(queue.remove(pointer1));
+ queue.replayComplete();
Assert.assertEquals(pointer2, queue.removeHead(0));
}
@Test
+ public void testUnknownPointerDoesNotCauseSearch() throws Exception {
+ queue = new FlumeEventQueue(backingStore,
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
+ Assert.assertTrue(queue.addHead(pointer1));
+ Assert.assertTrue(queue.addHead(pointer2));
+ Assert.assertFalse(queue.remove(pointer3)); // does search
+ Assert.assertTrue(queue.remove(pointer1));
+ Assert.assertTrue(queue.remove(pointer2));
+ queue.replayComplete();
+ Assert.assertEquals(2, queue.getSearchCount());
+ }
+ @Test(expected=IllegalStateException.class)
+ public void testRemoveAfterReplayComplete() throws Exception {
+ queue = new FlumeEventQueue(backingStore,
+ backingStoreSupplier.getInflightTakes(),
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
+ queue.replayComplete();
+ queue.remove(pointer1);
+ }
+ @Test
public void testWrappingCorrectly() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
int size = Integer.MAX_VALUE;
for (int i = 1; i <= size; i++) {
if(!queue.addHead(new FlumeEventPointer(i, i))) {
@@ -299,7 +348,8 @@ public class TestFlumeEventQueue {
public void testInflightPuts() throws Exception{
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
long txnID2 = txnID1 + 1;
queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1);
@@ -309,7 +359,8 @@ public class TestFlumeEventQueue {
TimeUnit.SECONDS.sleep(3L);
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts();
Assert.assertTrue(deserializedMap.get(
txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
@@ -323,7 +374,8 @@ public class TestFlumeEventQueue {
public void testInflightTakes() throws Exception {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
long txnID2 = txnID1 + 1;
queue.addTail(new FlumeEventPointer(1, 1));
@@ -336,7 +388,8 @@ public class TestFlumeEventQueue {
TimeUnit.SECONDS.sleep(3L);
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightTakes();
Assert.assertTrue(deserializedMap.get(
txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
@@ -353,7 +406,8 @@ public class TestFlumeEventQueue {
try {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
long txnID2 = txnID1 + 1;
queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1);
@@ -367,7 +421,8 @@ public class TestFlumeEventQueue {
inflight.writeInt(new Random().nextInt());
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts();
Assert.assertTrue(deserializedMap.get(
txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
@@ -386,7 +441,8 @@ public class TestFlumeEventQueue {
try {
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
long txnID2 = txnID1 + 1;
queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1);
@@ -400,7 +456,8 @@ public class TestFlumeEventQueue {
inflight.writeInt(new Random().nextInt());
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
- backingStoreSupplier.getInflightPuts());
+ backingStoreSupplier.getInflightPuts(),
+ backingStoreSupplier.getQueueSetDir());
SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightTakes();
Assert.assertTrue(deserializedMap.get(
txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 267925f..77b7f22 100644
--- a/pom.xml
+++ b/pom.xml
@@ -774,6 +774,12 @@ limitations under the License.
<version>4.2.1</version>
</dependency>
+ <dependency>
+ <groupId>org.mapdb</groupId>
+ <artifactId>mapdb</artifactId>
+ <version>0.9.7</version>
+ </dependency>
+
<!-- Gson: Java to Json conversion -->
<dependency>
<groupId>com.google.code.gson</groupId>
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic