[prev in list] [next in list] [prev in thread] [next in thread]
List: flume-commits
Subject: flume git commit: FLUME-3152 Add Flume Metric for Backup Checkpoint Errors
From: denes () apache ! org
Date: 2017-08-23 16:27:58
Message-ID: 895f4f0ce31642638bcbb0731666f41c () git ! apache ! org
[Download RAW message or body]
Repository: flume
Updated Branches:
refs/heads/trunk 66327aa20 -> 4d79aa003
FLUME-3152 Add Flume Metric for Backup Checkpoint Errors
This change adds a new metric (channel.file.checkpoint.backup.write.error)
to the File Channel. It gets incremented if an exception happens
during backup checkpoints writes.
This closes #156
Reviewers: Denes Arvay
(Ferenc Szabo via Denes Arvay)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4d79aa00
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4d79aa00
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4d79aa00
Branch: refs/heads/trunk
Commit: 4d79aa003aa02e8d513a1ae1406795d758143397
Parents: 66327aa
Author: Ferenc Szabo <fszabo@cloudera.com>
Authored: Mon Aug 21 14:29:38 2017 +0200
Committer: Denes Arvay <denes@apache.org>
Committed: Wed Aug 23 18:18:39 2017 +0200
----------------------------------------------------------------------
.../flume/channel/file/CheckpointRebuilder.java | 3 +-
.../file/EventQueueBackingStoreFactory.java | 41 +++++----
.../file/EventQueueBackingStoreFile.java | 24 ++++--
.../file/EventQueueBackingStoreFileV2.java | 8 +-
.../file/EventQueueBackingStoreFileV3.java | 12 +--
.../java/org/apache/flume/channel/file/Log.java | 4 +-
.../instrumentation/FileChannelCounter.java | 16 +++-
.../FileChannelCounterMBean.java | 7 ++
.../flume/channel/file/TestCheckpoint.java | 3 +-
.../channel/file/TestCheckpointRebuilder.java | 3 +-
.../file/TestEventQueueBackingStoreFactory.java | 87 +++++++++++++-------
.../file/TestFileChannelErrorMetrics.java | 67 +++++++++++++++
.../flume/channel/file/TestFlumeEventQueue.java | 19 +++--
13 files changed, 220 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
index a0ecdeb..8fbf3c8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
@@ -28,6 +28,7 @@ import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,7 +241,7 @@ public class CheckpointRebuilder {
} else {
EventQueueBackingStore backingStore =
EventQueueBackingStoreFactory.get(checkpointFile,
- capacity, "channel");
+ capacity, "channel", new FileChannelCounter("Main"));
FlumeEventQueue queue = new FlumeEventQueue(backingStore,
new File(checkpointDir, "inflighttakes"),
new File(checkpointDir, "inflightputs"),
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-fil \
e-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
index dcd6f98..7f8b3f6 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
@@ -19,6 +19,7 @@
package org.apache.flume.channel.file;
import com.google.common.io.Files;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,19 +32,22 @@ class EventQueueBackingStoreFactory {
private EventQueueBackingStoreFactory() {}
- static EventQueueBackingStore get(File checkpointFile, int capacity,
- String name) throws Exception {
- return get(checkpointFile, capacity, name, true);
+ static EventQueueBackingStore get(
+ File checkpointFile, int capacity, String name, FileChannelCounter counter
+ ) throws Exception {
+ return get(checkpointFile, capacity, name, counter, true);
}
- static EventQueueBackingStore get(File checkpointFile, int capacity,
- String name, boolean upgrade) throws Exception {
- return get(checkpointFile, null, capacity, name, upgrade, false, false);
+ static EventQueueBackingStore get(
+ File checkpointFile, int capacity, String name, FileChannelCounter counter, \
boolean upgrade + ) throws Exception {
+ return get(checkpointFile, null, capacity, name, counter, upgrade, false, \
false); }
- static EventQueueBackingStore get(File checkpointFile, File backupCheckpointDir,
- int capacity, String name, boolean upgrade,
- boolean shouldBackup, boolean compressBackup) \
throws Exception { + static EventQueueBackingStore get(
+ File checkpointFile, File backupCheckpointDir, int capacity, String name,
+ FileChannelCounter counter, boolean upgrade, boolean shouldBackup, boolean \
compressBackup + ) throws Exception {
File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
RandomAccessFile checkpointFileHandle = null;
try {
@@ -69,21 +73,21 @@ class EventQueueBackingStoreFactory {
throw new IOException("Cannot create " + checkpointFile);
}
return new EventQueueBackingStoreFileV3(checkpointFile,
- capacity, name, backupCheckpointDir, shouldBackup, compressBackup);
+ capacity, name, counter, backupCheckpointDir, shouldBackup, \
compressBackup); }
// v3 due to meta file, version will be checked by backing store
if (metaDataExists) {
return new EventQueueBackingStoreFileV3(checkpointFile, capacity,
- name, backupCheckpointDir, shouldBackup, compressBackup);
+ name, counter, backupCheckpointDir, shouldBackup, compressBackup);
}
checkpointFileHandle = new RandomAccessFile(checkpointFile, "r");
int version = (int) checkpointFileHandle.readLong();
if (Serialization.VERSION_2 == version) {
if (upgrade) {
return upgrade(checkpointFile, capacity, name, backupCheckpointDir,
- shouldBackup, compressBackup);
+ shouldBackup, compressBackup, counter);
}
- return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
+ return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name, \
counter); }
LOG.error("Found version " + Integer.toHexString(version) + " in " +
checkpointFile);
@@ -100,12 +104,13 @@ class EventQueueBackingStoreFactory {
}
}
- private static EventQueueBackingStore upgrade(File checkpointFile, int capacity, \
String name,
- File backupCheckpointDir, boolean \
shouldBackup,
- boolean compressBackup) throws \
Exception { + private static EventQueueBackingStore upgrade(
+ File checkpointFile, int capacity, String name, File backupCheckpointDir,
+ boolean shouldBackup, boolean compressBackup, FileChannelCounter counter
+ ) throws Exception {
LOG.info("Attempting upgrade of " + checkpointFile + " for " + name);
EventQueueBackingStoreFileV2 backingStoreV2 =
- new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
+ new EventQueueBackingStoreFileV2(checkpointFile, capacity, name, counter);
String backupName = checkpointFile.getName() + "-backup-"
+ System.currentTimeMillis();
Files.copy(checkpointFile,
@@ -113,7 +118,7 @@ class EventQueueBackingStoreFactory {
File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile,
metaDataFile);
- return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name,
+ return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name, counter,
backupCheckpointDir, shouldBackup, compressBackup);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-fil \
e-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
index 73f1d4c..445d912 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +61,7 @@ abstract class EventQueueBackingStoreFile extends \
EventQueueBackingStore { protected final Map<Integer, AtomicInteger> \
logFileIDReferenceCounts = Maps.newHashMap(); protected final MappedByteBuffer \
mappedBuffer; protected final RandomAccessFile checkpointFileHandle;
+ private final FileChannelCounter fileChannelCounter;
protected final File checkpointFile;
private final Semaphore backupCompletedSema = new Semaphore(1);
protected final boolean shouldBackup;
@@ -67,17 +69,18 @@ abstract class EventQueueBackingStoreFile extends \
EventQueueBackingStore { private final File backupDir;
private final ExecutorService checkpointBackUpExecutor;
- protected EventQueueBackingStoreFile(int capacity, String name,
- File checkpointFile) throws IOException,
- BadCheckpointException {
- this(capacity, name, checkpointFile, null, false, false);
+ protected EventQueueBackingStoreFile(
+ int capacity, String name, FileChannelCounter fileChannelCounter, File \
checkpointFile + ) throws IOException, BadCheckpointException {
+ this(capacity, name, fileChannelCounter, checkpointFile, null, false, false);
}
- protected EventQueueBackingStoreFile(int capacity, String name,
- File checkpointFile, File \
checkpointBackupDir,
- boolean backupCheckpoint, boolean \
compressBackup)
- throws IOException, BadCheckpointException {
+ protected EventQueueBackingStoreFile(
+ int capacity, String name, FileChannelCounter fileChannelCounter, File \
checkpointFile, + File checkpointBackupDir, boolean backupCheckpoint, boolean \
compressBackup + ) throws IOException, BadCheckpointException {
super(capacity, name);
+ this.fileChannelCounter = fileChannelCounter;
this.checkpointFile = checkpointFile;
this.shouldBackup = backupCheckpoint;
this.compressBackup = compressBackup;
@@ -294,6 +297,7 @@ abstract class EventQueueBackingStoreFile extends \
EventQueueBackingStore { try {
backupCheckpoint(backupDir);
} catch (Throwable throwable) {
+ fileChannelCounter.incrementCheckpointBackupWriteErrorCount();
error = true;
LOG.error("Backing up of checkpoint directory failed.", throwable);
} finally {
@@ -432,7 +436,9 @@ abstract class EventQueueBackingStoreFile extends \
EventQueueBackingStore { }
int capacity = (int) ((file.length() - (HEADER_SIZE * 8L)) / 8L);
EventQueueBackingStoreFile backingStore = (EventQueueBackingStoreFile)
- EventQueueBackingStoreFactory.get(file, capacity, "debug", false);
+ EventQueueBackingStoreFactory.get(
+ file, capacity, "debug", new FileChannelCounter("Main"), false
+ );
System.out.println("File Reference Counts"
+ backingStore.logFileIDReferenceCounts);
System.out.println("Queue Capacity " + backingStore.getCapacity());
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-fil \
e-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
index 71183aa..3711a78 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Preconditions;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile {
@@ -33,9 +34,10 @@ final class EventQueueBackingStoreFileV2 extends \
EventQueueBackingStoreFile { private static final int INDEX_ACTIVE_LOG = 5;
private static final int MAX_ACTIVE_LOGS = 1024;
- EventQueueBackingStoreFileV2(File checkpointFile, int capacity, String name)
- throws IOException, BadCheckpointException {
- super(capacity, name, checkpointFile);
+ EventQueueBackingStoreFileV2(
+ File checkpointFile, int capacity, String name, FileChannelCounter counter
+ ) throws IOException, BadCheckpointException {
+ super(capacity, name, counter, checkpointFile);
Preconditions.checkArgument(capacity > 0,
"capacity must be greater than 0 " + capacity);
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-fil \
e-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
index f1a892a..da5a082 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
@@ -20,6 +20,7 @@ package org.apache.flume.channel.file;
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.apache.flume.channel.file.proto.ProtosFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,16 +37,17 @@ final class EventQueueBackingStoreFileV3 extends \
EventQueueBackingStoreFile { private static final Logger LOG = \
LoggerFactory.getLogger(EventQueueBackingStoreFileV3.class); private final File \
metaDataFile;
- EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
- String name) throws IOException, \
BadCheckpointException {
- this(checkpointFile, capacity, name, null, false, false);
+ EventQueueBackingStoreFileV3(
+ File checkpointFile, int capacity, String name, FileChannelCounter counter
+ ) throws IOException, BadCheckpointException {
+ this(checkpointFile, capacity, name, counter, null, false, false);
}
EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
- String name, File checkpointBackupDir,
+ String name, FileChannelCounter counter, File \
checkpointBackupDir,
boolean backupCheckpoint, boolean compressBackup)
throws IOException, BadCheckpointException {
- super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint,
+ super(capacity, name, counter, checkpointFile, checkpointBackupDir, \
backupCheckpoint, compressBackup);
Preconditions.checkArgument(capacity > 0,
"capacity must be greater than 0 " + capacity);
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 1662a5b..efc8d14 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -450,7 +450,7 @@ public class Log {
backingStore =
EventQueueBackingStoreFactory.get(checkpointFile,
backupCheckpointDir, queueCapacity, channelNameDescriptor,
- true, this.useDualCheckpoints,
+ channelCounter, true, this.useDualCheckpoints,
this.compressBackupCheckpoint);
queue = new FlumeEventQueue(backingStore, inflightTakesFile,
inflightPutsFile, queueSetDir);
@@ -487,7 +487,7 @@ public class Log {
}
backingStore = EventQueueBackingStoreFactory.get(
checkpointFile, backupCheckpointDir, queueCapacity,
- channelNameDescriptor, true, useDualCheckpoints,
+ channelNameDescriptor, channelCounter, true, useDualCheckpoints,
compressBackupCheckpoint);
queue = new FlumeEventQueue(backingStore, inflightTakesFile,
inflightPutsFile, queueSetDir);
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-fil \
e-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
index 40470a8..6cec3da 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
@@ -28,10 +28,15 @@ public class FileChannelCounter extends ChannelCounter \
implements FileChannelCou private static final String EVENT_PUT_ERROR_COUNT = \
"channel.file.event.put.error"; private static final String EVENT_TAKE_ERROR_COUNT = \
"channel.file.event.take.error"; private static final String \
CHECKPOINT_WRITE_ERROR_COUNT = "channel.file.checkpoint.write.error"; + private \
static final String CHECKPOINT_BACKUP_WRITE_ERROR_COUNT + = \
"channel.file.checkpoint.backup.write.error";
public FileChannelCounter(String name) {
super(name, new String[] {
- EVENT_PUT_ERROR_COUNT, EVENT_TAKE_ERROR_COUNT, CHECKPOINT_WRITE_ERROR_COUNT \
}); + EVENT_PUT_ERROR_COUNT, EVENT_TAKE_ERROR_COUNT,
+ CHECKPOINT_WRITE_ERROR_COUNT, CHECKPOINT_BACKUP_WRITE_ERROR_COUNT
+ }
+ );
}
@Override
@@ -83,4 +88,13 @@ public class FileChannelCounter extends ChannelCounter implements \
FileChannelCou public void incrementCheckpointWriteErrorCount() {
increment(CHECKPOINT_WRITE_ERROR_COUNT);
}
+
+ @Override
+ public long getCheckpointBackupWriteErrorCount() {
+ return get(CHECKPOINT_BACKUP_WRITE_ERROR_COUNT);
+ }
+
+ public void incrementCheckpointBackupWriteErrorCount() {
+ increment(CHECKPOINT_BACKUP_WRITE_ERROR_COUNT);
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-fil \
e-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java \
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
index 175b1f4..9386094 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
@@ -62,4 +62,11 @@ public interface FileChannelCounterMBean extends \
ChannelCounterMBean {
* @see org.apache.flume.channel.file.Log.BackgroundWorker#run()
*/
long getCheckpointWriteErrorCount();
+
+ /**
+ * A count of the number of errors encountered while trying to write the backup \
checkpoints. This + * includes any Throwables.
+ * @see org.apache.flume.channel.file.EventQueueBackingStoreFile#startBackupThread()
+ */
+ long getCheckpointBackupWriteErrorCount();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
index cd1dcd9..1e00ee2 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import junit.framework.Assert;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +52,7 @@ public class TestCheckpoint {
@Test
public void testSerialization() throws Exception {
EventQueueBackingStore backingStore =
- new EventQueueBackingStoreFileV2(file, 1, "test");
+ new EventQueueBackingStoreFileV2(file, 1, "test", new \
FileChannelCounter("test")); FlumeEventPointer ptrIn = new FlumeEventPointer(10, \
20); FlumeEventQueue queueIn = new FlumeEventQueue(backingStore,
inflightTakes, inflightPuts, queueSet);
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-fil \
e-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
index c6c6ad3..6c91661 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.util.Map;
import java.util.Set;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -70,7 +71,7 @@ public class TestCheckpointRebuilder extends TestFileChannelBase {
Assert.assertTrue(inflightPutsFile.delete());
EventQueueBackingStore backingStore =
EventQueueBackingStoreFactory.get(checkpointFile, 50,
- "test");
+ "test", new FileChannelCounter("test"));
FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakesFile,
inflightPutsFile, queueSetDir);
CheckpointRebuilder checkpointRebuilder =
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-fil \
e-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
index 0939454..7aebb03 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
@@ -23,6 +23,7 @@ import com.google.common.io.Files;
import com.google.protobuf.InvalidProtocolBufferException;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.apache.flume.channel.file.proto.ProtosFactory;
import org.junit.After;
import org.junit.Before;
@@ -75,29 +76,39 @@ public class TestEventQueueBackingStoreFactory {
@Test
public void testWithNoFlag() throws Exception {
- verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test"),
- Serialization.VERSION_3, pointersInTestCheckpoint);
+ verify(
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new \
FileChannelCounter("test")), + Serialization.VERSION_3, \
pointersInTestCheckpoint + );
}
@Test
public void testWithFlag() throws Exception {
- verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", true),
- Serialization.VERSION_3, pointersInTestCheckpoint);
+ verify(
+ EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test"), true
+ ),
+ Serialization.VERSION_3, pointersInTestCheckpoint
+ );
}
@Test
public void testNoUprade() throws Exception {
- verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false),
- Serialization.VERSION_2, pointersInTestCheckpoint);
+ verify(
+ EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test"), false
+ ),
+ Serialization.VERSION_2, pointersInTestCheckpoint
+ );
}
@Test(expected = BadCheckpointException.class)
public void testDecreaseCapacity() throws Exception {
Assert.assertTrue(checkpoint.delete());
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new \
FileChannelCounter("test")); backingStore.close();
- EventQueueBackingStoreFactory.get(checkpoint, 9, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 9, "test", new \
FileChannelCounter("test")); Assert.fail();
}
@@ -105,17 +116,21 @@ public class TestEventQueueBackingStoreFactory {
public void testIncreaseCapacity() throws Exception {
Assert.assertTrue(checkpoint.delete());
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new \
FileChannelCounter("test")); backingStore.close();
- EventQueueBackingStoreFactory.get(checkpoint, 11, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 11, "test", new \
FileChannelCounter("test")); Assert.fail();
}
@Test
public void testNewCheckpoint() throws Exception {
Assert.assertTrue(checkpoint.delete());
- verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false),
- Serialization.VERSION_3, Collections.<Long>emptyList());
+ verify(
+ EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test"), false
+ ),
+ Serialization.VERSION_3, Collections.<Long>emptyList()
+ );
}
@Test(expected = BadCheckpointException.class)
@@ -123,13 +138,15 @@ public class TestEventQueueBackingStoreFactory {
RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
try {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new \
FileChannelCounter("test")); backingStore.close();
writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * \
Serialization.SIZE_OF_LONG); writer.writeLong(94L);
writer.getFD().sync();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} finally {
writer.close();
}
@@ -141,12 +158,14 @@ public class TestEventQueueBackingStoreFactory {
try {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new \
FileChannelCounter("test")); backingStore.close();
writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER * \
Serialization.SIZE_OF_LONG); \
writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE); \
writer.getFD().sync();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} finally {
writer.close();
}
@@ -157,12 +176,14 @@ public class TestEventQueueBackingStoreFactory {
RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
try {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new \
FileChannelCounter("test")); backingStore.close();
writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * \
Serialization.SIZE_OF_LONG); writer.writeLong(2L);
writer.getFD().sync();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} finally {
writer.close();
}
@@ -173,7 +194,7 @@ public class TestEventQueueBackingStoreFactory {
FileOutputStream os = null;
try {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new \
FileChannelCounter("test")); backingStore.close();
Assert.assertTrue(checkpoint.exists());
Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0);
@@ -184,7 +205,9 @@ public class TestEventQueueBackingStoreFactory {
os = new FileOutputStream(Serialization.getMetaDataFile(checkpoint));
meta.toBuilder().setVersion(2).build().writeDelimitedTo(os);
os.flush();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} finally {
os.close();
}
@@ -195,12 +218,14 @@ public class TestEventQueueBackingStoreFactory {
RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
try {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new \
FileChannelCounter("test")); backingStore.close();
writer.seek(EventQueueBackingStoreFile.INDEX_WRITE_ORDER_ID * \
Serialization.SIZE_OF_LONG); writer.writeLong(2L);
writer.getFD().sync();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} finally {
writer.close();
}
@@ -211,7 +236,7 @@ public class TestEventQueueBackingStoreFactory {
FileOutputStream os = null;
try {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new \
FileChannelCounter("test")); backingStore.close();
Assert.assertTrue(checkpoint.exists());
Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0);
@@ -223,7 +248,9 @@ public class TestEventQueueBackingStoreFactory {
Serialization.getMetaDataFile(checkpoint));
meta.toBuilder().setWriteOrderID(1).build().writeDelimitedTo(os);
os.flush();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} finally {
os.close();
}
@@ -232,7 +259,7 @@ public class TestEventQueueBackingStoreFactory {
@Test(expected = BadCheckpointException.class)
public void testTruncateMeta() throws Exception {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new \
FileChannelCounter("test")); backingStore.close();
Assert.assertTrue(checkpoint.exists());
File metaFile = Serialization.getMetaDataFile(checkpoint);
@@ -241,13 +268,15 @@ public class TestEventQueueBackingStoreFactory {
writer.setLength(0);
writer.getFD().sync();
writer.close();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
}
@Test(expected = InvalidProtocolBufferException.class)
public void testCorruptMeta() throws Throwable {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new \
FileChannelCounter("test")); backingStore.close();
Assert.assertTrue(checkpoint.exists());
File metaFile = Serialization.getMetaDataFile(checkpoint);
@@ -258,7 +287,9 @@ public class TestEventQueueBackingStoreFactory {
writer.getFD().sync();
writer.close();
try {
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} catch (BadCheckpointException ex) {
throw ex.getCause();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-fil \
e-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
index d0237db..e2d1ee6 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
@@ -31,7 +31,10 @@ import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse;
@@ -230,6 +233,70 @@ public class TestFileChannelErrorMetrics extends \
TestFileChannelBase { assertFalse(channel.getChannelCounter().isOpen());
}
+ @Test
+ public void testCheckpointBackupWriteErrorShouldIncreaseCounter()
+ throws IOException, InterruptedException {
+ FileChannelCounter fileChannelCounter = new FileChannelCounter("test");
+ File checkpointFile = File.createTempFile("checkpoint", ".tmp");
+ File backupDir = Files.createTempDirectory("checkpoint").toFile();
+ backupDir.deleteOnExit();
+ checkpointFile.deleteOnExit();
+ EventQueueBackingStoreFileV3 backingStoreFileV3 = new \
EventQueueBackingStoreFileV3( + checkpointFile, 1, "test", fileChannelCounter, \
backupDir,true, false + );
+
+ // Exception will be thrown by state check if beforeCheckpoint is not called
+ backingStoreFileV3.checkpoint();
+ // wait for other thread to reach the error state
+ assertEventuallyTrue("checkpoint backup write failure should increase counter to \
1", + new BooleanPredicate() {
+ @Override
+ public boolean get() {
+ return fileChannelCounter.getCheckpointBackupWriteErrorCount() == 1;
+ }
+ },
+ 100
+ );
+ }
+
+ @Test
+ public void testCheckpointBackupWriteErrorShouldIncreaseCounter2()
+ throws Exception {
+ int checkpointInterval = 1500;
+ Map config = new HashMap();
+ config.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, \
String.valueOf(checkpointInterval)); + \
config.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true"); + final \
FileChannel channel = createFileChannel(Collections.unmodifiableMap(config)); + \
channel.start(); + Transaction tx = channel.getTransaction();
+ tx.begin();
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ tx.commit();
+ tx.close();
+ final long beforeCheckpointWrite = System.currentTimeMillis();
+ // first checkpoint should be written successfully -> the counter should remain \
0 + assertEventuallyTrue("checkpoint backup should have been written", new \
BooleanPredicate() { + @Override
+ public boolean get() {
+ return new File(backupDir, "checkpoint").lastModified() > \
beforeCheckpointWrite; + }
+ }, checkpointInterval * 3);
+ assertEquals(0, \
channel.getChannelCounter().getCheckpointBackupWriteErrorCount()); + \
FileUtils.deleteDirectory(backupDir); + tx = channel.getTransaction();
+ tx.begin();
+ channel.put(EventBuilder.withBody("test2".getBytes()));
+ tx.commit();
+ tx.close();
+ // the backup directory has been deleted so the backup checkpoint write should \
have been failed + assertEventuallyTrue("checkpointBackupWriteErrorCount should be \
1", new BooleanPredicate() { + @Override
+ public boolean get() {
+ return channel.getChannelCounter().getCheckpointBackupWriteErrorCount() >= \
1; + }
+ }, checkpointInterval * 3);
+ }
+
private interface BooleanPredicate {
boolean get();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java \
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
index f1700f9..9c7352e 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -96,7 +97,7 @@ public class TestFlumeEventQueue {
public EventQueueBackingStore get() throws Exception {
Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000,
- "test");
+ "test", new \
FileChannelCounter("test")); }
}
},
@@ -105,7 +106,9 @@ public class TestFlumeEventQueue {
@Override
public EventQueueBackingStore get() throws Exception {
Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
- return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000, "test");
+ return new EventQueueBackingStoreFileV3(
+ getCheckpoint(), 1000, "test", new FileChannelCounter("test")
+ );
}
}
}
@@ -135,7 +138,9 @@ public class TestFlumeEventQueue {
backingStore.close();
File checkpoint = backingStoreSupplier.getCheckpoint();
Assert.assertTrue(checkpoint.delete());
- backingStore = new EventQueueBackingStoreFileV2(checkpoint, 1, "test");
+ backingStore = new EventQueueBackingStoreFileV2(
+ checkpoint, 1, "test", new FileChannelCounter("test")
+ );
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
backingStoreSupplier.getInflightPuts(),
@@ -149,7 +154,9 @@ public class TestFlumeEventQueue {
backingStore.close();
File checkpoint = backingStoreSupplier.getCheckpoint();
Assert.assertTrue(checkpoint.delete());
- backingStore = new EventQueueBackingStoreFileV2(checkpoint, 0, "test");
+ backingStore = new EventQueueBackingStoreFileV2(
+ checkpoint, 0, "test", new FileChannelCounter("test")
+ );
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
backingStoreSupplier.getInflightPuts(),
@@ -161,7 +168,9 @@ public class TestFlumeEventQueue {
backingStore.close();
File checkpoint = backingStoreSupplier.getCheckpoint();
Assert.assertTrue(checkpoint.delete());
- backingStore = new EventQueueBackingStoreFileV2(checkpoint, -1, "test");
+ backingStore = new EventQueueBackingStoreFileV2(
+ checkpoint, -1, "test", new FileChannelCounter("test")
+ );
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
backingStoreSupplier.getInflightPuts(),
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic