[prev in list] [next in list] [prev in thread] [next in thread]
List: cassandra-commits
Subject: git commit: Repair should validate checksums before streaming Patch by Vijay, reviewed by Jason Brow
From: vijay () apache ! org
Date: 2013-01-31 21:55:13
Message-ID: 20130131215513.C2CF5828537 () tyr ! zones ! apache ! org
[Download RAW message or body]
Updated Branches:
refs/heads/trunk cb9cf504c -> 639b314d2
Repair should validate checksums before streaming
Patch by Vijay, reviewed by Jason Brown for CASSANDRA-3648
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/639b314d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/639b314d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/639b314d
Branch: refs/heads/trunk
Commit: 639b314d2ef2d95cfef2ffd4bc597540fa83f3cf
Parents: cb9cf50
Author: Vijay Parthasarathy <vijay2win@gmail.com>
Authored: Thu Jan 31 13:49:19 2013 -0800
Committer: Vijay Parthasarathy <vijay2win@gmail.com>
Committed: Thu Jan 31 13:49:19 2013 -0800
----------------------------------------------------------------------
.../org/apache/cassandra/io/sstable/Component.java | 6 +-
.../org/apache/cassandra/io/sstable/SSTable.java | 1 +
.../apache/cassandra/io/sstable/SSTableWriter.java | 34 +---
.../cassandra/io/util/DataIntegrityMetadata.java | 167 +++++++++++++++
.../apache/cassandra/io/util/SequentialWriter.java | 37 +---
.../apache/cassandra/streaming/FileStreamTask.java | 40 +++-
.../cassandra/streaming/StreamingTransferTest.java | 42 ++++
7 files changed, 260 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java \
b/src/java/org/apache/cassandra/io/sstable/Component.java index cbc12d9..599e0ba \
100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -53,6 +53,8 @@ public class Component
STATS("Statistics.db"),
// holds sha1 sum of the data file (to be checked by sha1sum)
DIGEST("Digest.sha1"),
+ // holds the CRC32 for chunks in an a uncompressed file.
+ CRC("CRC.db"),
// holds SSTable Index Summary and Boundaries
SUMMARY("Summary.db"),
// table of contents, stores the list of all components for the sstable
@@ -83,6 +85,7 @@ public class Component
public final static Component COMPRESSION_INFO = new \
Component(Type.COMPRESSION_INFO); public final static Component STATS = new \
Component(Type.STATS); public final static Component DIGEST = new \
Component(Type.DIGEST); + public final static Component CRC = new \
Component(Type.CRC); public final static Component SUMMARY = new \
Component(Type.SUMMARY); public final static Component TOC = new \
Component(Type.TOC);
@@ -134,7 +137,8 @@ public class Component
case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; \
break;
case STATS: component = Component.STATS; \
break;
case DIGEST: component = Component.DIGEST; \
break;
- case SUMMARY: component = Component.SUMMARY; break;
+ case CRC: component = Component.CRC; \
break; + case SUMMARY: component = Component.SUMMARY; \
break;
case TOC: component = Component.TOC; \
break;
case CUSTOM: component = new Component(Type.CUSTOM, \
path.right); break; default:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java \
b/src/java/org/apache/cassandra/io/sstable/SSTable.java index c7486ba..25738d0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -63,6 +63,7 @@ public abstract class SSTable
public static final String COMPONENT_FILTER = Component.Type.FILTER.repr;
public static final String COMPONENT_STATS = Component.Type.STATS.repr;
public static final String COMPONENT_DIGEST = Component.Type.DIGEST.repr;
+ public static final String COMPONENT_CRC = Component.Type.CRC.repr;
public static final String COMPONENT_SUMMARY = Component.Type.SUMMARY.repr;
public static final String TEMPFILE_MARKER = "tmp";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java \
b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 279599e..2166808 \
100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -18,9 +18,7 @@
package org.apache.cassandra.io.sstable;
import java.io.*;
-import java.nio.channels.ClosedChannelException;
import java.util.*;
-import java.util.regex.Pattern;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
@@ -48,6 +46,7 @@ public class SSTableWriter extends SSTable
private DecoratedKey lastWrittenKey;
private FileMark dataMark;
private final SSTableMetadata.Collector sstableMetadataCollector;
+ private DataIntegrityMetadata.ChecksumWriter integratyWriter;
public SSTableWriter(String filename, long keyCount)
{
@@ -70,11 +69,16 @@ public class SSTableWriter extends SSTable
components.add(Component.FILTER);
if (metadata.compressionParameters().sstableCompressor != null)
+ {
components.add(Component.COMPRESSION_INFO);
+ }
else
+ {
// it would feel safer to actually add this component later in \
maybeWriteDigest(), // but the components are unmodifiable after construction
components.add(Component.DIGEST);
+ components.add(Component.CRC);
+ }
return components;
}
@@ -104,7 +108,8 @@ public class SSTableWriter extends SSTable
dbuilder = \
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); dataFile = \
SequentialWriter.open(new File(getFilename()),
!DatabaseDescriptor.populateIOCacheOnFlush());
- dataFile.setComputeDigest();
+ integratyWriter = DataIntegrityMetadata.checksumWriter(descriptor);
+ dataFile.setDataIntegratyWriter(integratyWriter);
}
this.sstableMetadataCollector = sstableMetadataCollector;
@@ -313,7 +318,6 @@ public class SSTableWriter extends SSTable
// write sstable statistics
SSTableMetadata sstableMetadata = \
sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName()); \
writeMetadata(descriptor, sstableMetadata);
- maybeWriteDigest();
// save the table of components
SSTable.appendTOC(descriptor, components);
@@ -343,28 +347,6 @@ public class SSTableWriter extends SSTable
return sstable;
}
- private void maybeWriteDigest()
- {
- byte[] digest = dataFile.digest();
- if (digest == null)
- return;
-
- SequentialWriter out = SequentialWriter.open(new \
File(descriptor.filenameFor(SSTable.COMPONENT_DIGEST)), true);
- // Writting output compatible with sha1sum
- Descriptor newdesc = descriptor.asTemporary(false);
- String[] tmp = \
newdesc.filenameFor(SSTable.COMPONENT_DATA).split(Pattern.quote(File.separator));
- String dataFileName = tmp[tmp.length - 1];
- try
- {
- out.write(String.format("%s %s", Hex.bytesToHex(digest), \
dataFileName).getBytes());
- }
- catch (ClosedChannelException e)
- {
- throw new AssertionError(); // can't happen.
- }
- out.close();
- }
-
private static void writeMetadata(Descriptor desc, SSTableMetadata \
sstableMetadata) {
SequentialWriter out = SequentialWriter.open(new \
File(desc.filenameFor(SSTable.COMPONENT_STATS)), true);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java \
b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java new file mode \
100644 index 0000000..f334d08
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.regex.Pattern;
+import java.util.zip.Checksum;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.PureJavaCrc32;
+
+public class DataIntegrityMetadata
+{
+ public static ChecksumValidator checksumValidator(Descriptor desc) throws \
IOException + {
+ return new ChecksumValidator(desc);
+ }
+
+ public static class ChecksumValidator implements Closeable
+ {
+ private final Checksum checksum = new PureJavaCrc32();
+ private final RandomAccessReader reader;
+ private final Descriptor descriptor;
+ public final int chunkSize;
+
+ public ChecksumValidator(Descriptor desc) throws IOException
+ {
+ this.descriptor = desc;
+ reader = RandomAccessReader.open(new \
File(desc.filenameFor(Component.CRC))); + chunkSize = reader.readInt();
+ }
+
+ public void seek(long offset)
+ {
+ long start = chunkStart(offset);
+ reader.seek(((start / chunkSize) * 4L) + 4); // 8 byte checksum per
+ // chunk + 4 byte
+ // header/chunkLength
+ }
+
+ public long chunkStart(long offset)
+ {
+ long startChunk = offset / chunkSize;
+ return startChunk * chunkSize;
+ }
+
+ public void validate(byte[] bytes, int start, int end) throws IOException
+ {
+ checksum.update(bytes, start, end);
+ int current = (int) checksum.getValue();
+ checksum.reset();
+ int actual = reader.readInt();
+ if (current != actual)
+ throw new IOException("Corrupted SSTable : " + \
descriptor.filenameFor(Component.DATA)); + }
+
+ public void close()
+ {
+ reader.close();
+ }
+ }
+
+ public static ChecksumWriter checksumWriter(Descriptor desc)
+ {
+ return new ChecksumWriter(desc);
+ }
+
+ public static class ChecksumWriter implements Closeable
+ {
+ private final Checksum checksum = new PureJavaCrc32();
+ private final MessageDigest digest;
+ private final SequentialWriter writer;
+ private final Descriptor descriptor;
+
+ public ChecksumWriter(Descriptor desc)
+ {
+ this.descriptor = desc;
+ writer = SequentialWriter.open(new \
File(desc.filenameFor(Component.CRC)), true); + try
+ {
+ digest = MessageDigest.getInstance("SHA-1");
+ }
+ catch (NoSuchAlgorithmException e)
+ {
+ // SHA-1 is standard in java 6
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void writeChunkSize(int length)
+ {
+ try
+ {
+ writer.stream.writeInt(length);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public void append(byte[] buffer, int start, int end)
+ {
+ try
+ {
+ checksum.update(buffer, start, end);
+ writer.stream.writeInt((int) checksum.getValue());
+ checksum.reset();
+
+ digest.update(buffer, start, end);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public void close()
+ {
+ FileUtils.closeQuietly(writer);
+ byte[] bytes = digest.digest();
+ if (bytes == null)
+ return;
+ SequentialWriter out = SequentialWriter.open(new \
File(descriptor.filenameFor(SSTable.COMPONENT_DIGEST)), true); + // \
Writting output compatible with sha1sum + Descriptor newdesc = \
descriptor.asTemporary(false); + String[] tmp = \
newdesc.filenameFor(SSTable.COMPONENT_DATA).split(Pattern.quote(File.separator)); + \
String dataFileName = tmp[tmp.length - 1]; + try
+ {
+ out.write(String.format("%s %s", Hex.bytesToHex(bytes), \
dataFileName).getBytes()); + }
+ catch (ClosedChannelException e)
+ {
+ throw new AssertionError(); // can't happen.
+ }
+ finally
+ {
+ FileUtils.closeQuietly(out);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java \
b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 77d4fcf..b970c95 \
100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -19,8 +19,6 @@ package org.apache.cassandra.io.util;
import java.io.*;
import java.nio.channels.ClosedChannelException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
@@ -60,7 +58,7 @@ public class SequentialWriter extends OutputStream
private int bytesSinceTrickleFsync = 0;
public final DataOutputStream stream;
- private MessageDigest digest;
+ private DataIntegrityMetadata.ChecksumWriter metadata;
public SequentialWriter(File file, int bufferSize, boolean skipIOCache)
{
@@ -265,8 +263,8 @@ public class SequentialWriter extends OutputStream
throw new FSWriteError(e, getPath());
}
- if (digest != null)
- digest.update(buffer, 0, validBufferBytes);
+ if (metadata != null)
+ metadata.append(buffer, 0, validBufferBytes);
}
public long getFilePointer()
@@ -392,6 +390,7 @@ public class SequentialWriter extends OutputStream
throw new FSWriteError(e, getPath());
}
+ FileUtils.closeQuietly(metadata);
CLibrary.tryCloseFD(directoryFD);
}
@@ -400,34 +399,12 @@ public class SequentialWriter extends OutputStream
* This can only be called before any data is written to this write,
* otherwise an IllegalStateException is thrown.
*/
- public void setComputeDigest()
+ public void setDataIntegratyWriter(DataIntegrityMetadata.ChecksumWriter writer)
{
if (current != 0)
throw new IllegalStateException();
-
- try
- {
- digest = MessageDigest.getInstance("SHA-1");
- }
- catch (NoSuchAlgorithmException e)
- {
- // SHA-1 is standard in java 6
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Return the digest associated to this file or null if no digest was
- * created.
- * This can only be called once the file is fully created, i.e. after
- * close() has been called. Otherwise an IllegalStateException is thrown.
- */
- public byte[] digest()
- {
- if (buffer != null)
- throw new IllegalStateException();
-
- return digest == null ? null : digest.digest();
+ metadata = writer;
+ metadata.writeChunkSize(buffer.length);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java \
b/src/java/org/apache/cassandra/streaming/FileStreamTask.java index 67d5c35..8472d54 \
100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -27,6 +27,10 @@ import org.slf4j.LoggerFactory;
import com.ning.compress.lzf.LZFOutputStream;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.DataIntegrityMetadata;
+import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.metrics.StreamingMetrics;
@@ -41,7 +45,7 @@ public class FileStreamTask extends WrappedRunnable
{
private static final Logger logger = \
LoggerFactory.getLogger(FileStreamTask.class);
- public static final int CHUNK_SIZE = 64 * 1024;
+ private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
public static final int MAX_CONNECT_ATTEMPTS = 4;
protected final StreamHeader header;
@@ -54,7 +58,7 @@ public class FileStreamTask extends WrappedRunnable
private OutputStream compressedoutput;
private DataInputStream input;
// allocate buffer to use for transfers only once
- private final byte[] transferBuffer = new byte[CHUNK_SIZE];
+ private byte[] transferBuffer;
// outbound global throughput limiter
protected final Throttle throttle;
private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler();
@@ -140,6 +144,11 @@ public class FileStreamTask extends WrappedRunnable
// try to skip kernel page cache if possible
RandomAccessReader file = RandomAccessReader.open(new \
File(header.file.getFilename()), true); + Descriptor desc = \
Descriptor.fromFilename(header.file.getFilename()); + ChecksumValidator \
metadata = null; + if (new File(desc.filenameFor(Component.CRC)).exists())
+ metadata = DataIntegrityMetadata.checksumValidator(desc);
+ transferBuffer = metadata == null ? new byte[DEFAULT_CHUNK_SIZE] : new \
byte[metadata.chunkSize];
// setting up data compression stream
compressedoutput = new LZFOutputStream(output);
@@ -151,21 +160,26 @@ public class FileStreamTask extends WrappedRunnable
// stream each of the required sections of the file
for (Pair<Long, Long> section : header.file.sections)
{
+ long start = metadata == null ? section.left : \
metadata.chunkStart(section.left); + int skipBytes = (int) \
(section.left - start); // seek to the beginning of the section
- file.seek(section.left);
+ file.seek(start);
+ if (metadata != null)
+ metadata.seek(start);
- // length of the section to stream
- long length = section.right - section.left;
+ // length of the section to read
+ long length = section.right - start;
// tracks write progress
long bytesTransferred = 0;
while (bytesTransferred < length)
{
- long lastWrite = write(file, length, bytesTransferred);
+ long lastWrite = write(file, metadata, skipBytes, length, \
bytesTransferred); bytesTransferred += lastWrite;
totalBytesTransferred += lastWrite;
// store streaming progress
header.file.progress += lastWrite;
+ skipBytes = 0;
}
// make sure that current section is send
@@ -203,6 +217,8 @@ public class FileStreamTask extends WrappedRunnable
* Sequentially read bytes from the file and write them to the output stream
*
* @param reader The file reader to read from
+ * @param validator validator to verify data integrity
+ * @param start number of bytes to skip transfer, but include for validation.
* @param length The full length that should be transferred
* @param bytesTransferred Number of bytes remaining to transfer
*
@@ -210,12 +226,16 @@ public class FileStreamTask extends WrappedRunnable
*
* @throws IOException on any I/O error
*/
- protected long write(RandomAccessReader reader, long length, long \
bytesTransferred) throws IOException + protected long write(RandomAccessReader \
reader, ChecksumValidator validator, int start, long length, long bytesTransferred) \
throws IOException {
- int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
+ int toTransfer = (int) Math.min(transferBuffer.length, length - \
bytesTransferred); + int minReadable = (int) Math.min(transferBuffer.length, \
reader.length() - reader.getFilePointer());
- reader.readFully(transferBuffer, 0, toTransfer);
- compressedoutput.write(transferBuffer, 0, toTransfer);
+ reader.readFully(transferBuffer, 0, minReadable);
+ if (validator != null)
+ validator.validate(transferBuffer, 0, minReadable);
+
+ compressedoutput.write(transferBuffer, start, (toTransfer - start));
throttle.throttleDelta(toTransfer);
return toTransfer;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java \
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index \
07b19ae..76d66b7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -25,6 +25,7 @@ import static org.apache.cassandra.Util.column;
import static org.apache.cassandra.Util.addMutation;
import java.net.InetAddress;
+import java.sql.Date;
import java.util.*;
import org.apache.cassandra.SchemaLoader;
@@ -117,6 +118,11 @@ public class StreamingTransferTest extends SchemaLoader
List<Range<Token>> ranges = new ArrayList<Range<Token>>();
ranges.add(new Range<Token>(p.getMinimumToken(), \
p.getToken(ByteBufferUtil.bytes("key1"))));
ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key2")), \
p.getMinimumToken())); + transfer(table, sstable, ranges);
+ }
+
+ private void transfer(Table table, SSTableReader sstable, List<Range<Token>> \
ranges) throws Exception + {
StreamOutSession session = StreamOutSession.create(table.getName(), LOCAL, \
(IStreamCallback)null);
StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, \
OperationType.BOOTSTRAP); session.await();
@@ -313,6 +319,42 @@ public class StreamingTransferTest extends SchemaLoader
}
}
+ @Test
+ public void testRandomSSTableTransfer() throws Exception
+ {
+ final Table table = Table.open("Keyspace1");
+ final ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+ Mutator mutator = new Mutator()
+ {
+ public void mutate(String key, String colName, long timestamp) throws \
Exception + {
+ RowMutation rm = new RowMutation("Keyspace1", \
ByteBufferUtil.bytes(key)); + ColumnFamily cf = \
ColumnFamily.create(table.getName(), cfs.name); + \
cf.addColumn(column(colName, "value", timestamp)); + cf.addColumn(new \
Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(new \
Date(timestamp).toString()), timestamp)); + rm.add(cf);
+ logger.debug("Applying row to transfer " + rm);
+ rm.apply();
+ }
+ };
+ // write a lot more data so the data is spread in more than 1 chunk.
+ for (int i = 1; i <= 6000; i++)
+ mutator.mutate("key" + i, "col" + i, System.currentTimeMillis());
+ cfs.forceBlockingFlush();
+ Util.compactAll(cfs).get();
+ SSTableReader sstable = cfs.getSSTables().iterator().next();
+ cfs.clearUnsafe();
+
+ IPartitioner p = StorageService.getPartitioner();
+ List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+ ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), \
p.getToken(ByteBufferUtil.bytes("key1000")))); + ranges.add(new \
Range<Token>(p.getToken(ByteBufferUtil.bytes("key5")), \
p.getToken(ByteBufferUtil.bytes("key500")))); + ranges.add(new \
Range<Token>(p.getToken(ByteBufferUtil.bytes("key9")), \
p.getToken(ByteBufferUtil.bytes("key900")))); + transfer(table, sstable, \
ranges); + assertEquals(1, cfs.getSSTables().size());
+ assertEquals(7, Util.getRangeSlice(cfs).size());
+ }
+
public interface Mutator
{
public void mutate(String key, String col, long timestamp) throws Exception;
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic