[prev in list] [next in list] [prev in thread] [next in thread] 

List:       cassandra-commits
Subject:    git commit: Repair should validate checksums before streaming Patch by Vijay, reviewed by Jason Brow
From:       vijay () apache ! org
Date:       2013-01-31 21:55:13
Message-ID: 20130131215513.C2CF5828537 () tyr ! zones ! apache ! org
[Download RAW message or body]

Updated Branches:
  refs/heads/trunk cb9cf504c -> 639b314d2


Repair should validate checksums before streaming
Patch by Vijay, reviewed by Jason Brown for CASSANDRA-3648

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/639b314d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/639b314d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/639b314d

Branch: refs/heads/trunk
Commit: 639b314d2ef2d95cfef2ffd4bc597540fa83f3cf
Parents: cb9cf50
Author: Vijay Parthasarathy <vijay2win@gmail.com>
Authored: Thu Jan 31 13:49:19 2013 -0800
Committer: Vijay Parthasarathy <vijay2win@gmail.com>
Committed: Thu Jan 31 13:49:19 2013 -0800

----------------------------------------------------------------------
 .../org/apache/cassandra/io/sstable/Component.java |    6 +-
 .../org/apache/cassandra/io/sstable/SSTable.java   |    1 +
 .../apache/cassandra/io/sstable/SSTableWriter.java |   34 +---
 .../cassandra/io/util/DataIntegrityMetadata.java   |  167 +++++++++++++++
 .../apache/cassandra/io/util/SequentialWriter.java |   37 +---
 .../apache/cassandra/streaming/FileStreamTask.java |   40 +++-
 .../cassandra/streaming/StreamingTransferTest.java |   42 ++++
 7 files changed, 260 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/sstable/Component.java
                
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java \
b/src/java/org/apache/cassandra/io/sstable/Component.java index cbc12d9..599e0ba \
                100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -53,6 +53,8 @@ public class Component
         STATS("Statistics.db"),
         // holds sha1 sum of the data file (to be checked by sha1sum)
         DIGEST("Digest.sha1"),
+        // holds the CRC32 for chunks in an a uncompressed file.
+        CRC("CRC.db"),
         // holds SSTable Index Summary and Boundaries
         SUMMARY("Summary.db"),
         // table of contents, stores the list of all components for the sstable
@@ -83,6 +85,7 @@ public class Component
     public final static Component COMPRESSION_INFO = new \
Component(Type.COMPRESSION_INFO);  public final static Component STATS = new \
Component(Type.STATS);  public final static Component DIGEST = new \
Component(Type.DIGEST); +    public final static Component CRC = new \
Component(Type.CRC);  public final static Component SUMMARY = new \
Component(Type.SUMMARY);  public final static Component TOC = new \
Component(Type.TOC);  
@@ -134,7 +137,8 @@ public class Component
             case COMPRESSION_INFO:  component = Component.COMPRESSION_INFO;          \
                break;
             case STATS:             component = Component.STATS;                     \
                break;
             case DIGEST:            component = Component.DIGEST;                    \
                break;
-            case SUMMARY:           component = Component.SUMMARY;          break;
+            case CRC:               component = Component.CRC;                       \
break; +            case SUMMARY:           component = Component.SUMMARY;            \
                break;
             case TOC:               component = Component.TOC;                       \
                break;
             case CUSTOM:            component = new Component(Type.CUSTOM, \
path.right); break;  default:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/sstable/SSTable.java
                
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java \
b/src/java/org/apache/cassandra/io/sstable/SSTable.java index c7486ba..25738d0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -63,6 +63,7 @@ public abstract class SSTable
     public static final String COMPONENT_FILTER = Component.Type.FILTER.repr;
     public static final String COMPONENT_STATS = Component.Type.STATS.repr;
     public static final String COMPONENT_DIGEST = Component.Type.DIGEST.repr;
+    public static final String COMPONENT_CRC = Component.Type.CRC.repr;
     public static final String COMPONENT_SUMMARY = Component.Type.SUMMARY.repr;
 
     public static final String TEMPFILE_MARKER = "tmp";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
                
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java \
b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 279599e..2166808 \
                100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -18,9 +18,7 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.*;
-import java.nio.channels.ClosedChannelException;
 import java.util.*;
-import java.util.regex.Pattern;
 
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
@@ -48,6 +46,7 @@ public class SSTableWriter extends SSTable
     private DecoratedKey lastWrittenKey;
     private FileMark dataMark;
     private final SSTableMetadata.Collector sstableMetadataCollector;
+    private DataIntegrityMetadata.ChecksumWriter integratyWriter;
 
     public SSTableWriter(String filename, long keyCount)
     {
@@ -70,11 +69,16 @@ public class SSTableWriter extends SSTable
             components.add(Component.FILTER);
 
         if (metadata.compressionParameters().sstableCompressor != null)
+        {
             components.add(Component.COMPRESSION_INFO);
+        }
         else
+        {
             // it would feel safer to actually add this component later in \
maybeWriteDigest(),  // but the components are unmodifiable after construction
             components.add(Component.DIGEST);
+            components.add(Component.CRC);
+        }
         return components;
     }
 
@@ -104,7 +108,8 @@ public class SSTableWriter extends SSTable
             dbuilder = \
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());  dataFile = \
                SequentialWriter.open(new File(getFilename()),
 			                      !DatabaseDescriptor.populateIOCacheOnFlush());
-            dataFile.setComputeDigest();
+            integratyWriter = DataIntegrityMetadata.checksumWriter(descriptor);
+            dataFile.setDataIntegratyWriter(integratyWriter);
         }
 
         this.sstableMetadataCollector = sstableMetadataCollector;
@@ -313,7 +318,6 @@ public class SSTableWriter extends SSTable
         // write sstable statistics
         SSTableMetadata sstableMetadata = \
sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName()); \
                writeMetadata(descriptor, sstableMetadata);
-        maybeWriteDigest();
 
         // save the table of components
         SSTable.appendTOC(descriptor, components);
@@ -343,28 +347,6 @@ public class SSTableWriter extends SSTable
         return sstable;
     }
 
-    private void maybeWriteDigest()
-    {
-        byte[] digest = dataFile.digest();
-        if (digest == null)
-            return;
-
-        SequentialWriter out = SequentialWriter.open(new \
                File(descriptor.filenameFor(SSTable.COMPONENT_DIGEST)), true);
-        // Writting output compatible with sha1sum
-        Descriptor newdesc = descriptor.asTemporary(false);
-        String[] tmp = \
                newdesc.filenameFor(SSTable.COMPONENT_DATA).split(Pattern.quote(File.separator));
                
-        String dataFileName = tmp[tmp.length - 1];
-        try
-        {
-            out.write(String.format("%s  %s", Hex.bytesToHex(digest), \
                dataFileName).getBytes());
-        }
-        catch (ClosedChannelException e)
-        {
-            throw new AssertionError(); // can't happen.
-        }
-        out.close();
-    }
-
     private static void writeMetadata(Descriptor desc, SSTableMetadata \
sstableMetadata)  {
         SequentialWriter out = SequentialWriter.open(new \
File(desc.filenameFor(SSTable.COMPONENT_STATS)), true);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
                
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java \
b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java new file mode \
100644 index 0000000..f334d08
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.regex.Pattern;
+import java.util.zip.Checksum;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.PureJavaCrc32;
+
+public class DataIntegrityMetadata
+{
+    public static ChecksumValidator checksumValidator(Descriptor desc) throws \
IOException +    {
+        return new ChecksumValidator(desc);
+    }
+
+    public static class ChecksumValidator implements Closeable
+    {
+        private final Checksum checksum = new PureJavaCrc32();
+        private final RandomAccessReader reader;
+        private final Descriptor descriptor;
+        public final int chunkSize;
+
+        public ChecksumValidator(Descriptor desc) throws IOException
+        {
+            this.descriptor = desc;
+            reader = RandomAccessReader.open(new \
File(desc.filenameFor(Component.CRC))); +            chunkSize = reader.readInt();
+        }
+
+        public void seek(long offset)
+        {
+            long start = chunkStart(offset);
+            reader.seek(((start / chunkSize) * 4L) + 4); // 8 byte checksum per
+                                                         // chunk + 4 byte
+                                                         // header/chunkLength
+        }
+
+        public long chunkStart(long offset)
+        {
+            long startChunk = offset / chunkSize;
+            return startChunk * chunkSize;
+        }
+
+        public void validate(byte[] bytes, int start, int end) throws IOException
+        {
+            checksum.update(bytes, start, end);
+            int current = (int) checksum.getValue();
+            checksum.reset();
+            int actual = reader.readInt();
+            if (current != actual)
+                throw new IOException("Corrupted SSTable : " + \
descriptor.filenameFor(Component.DATA)); +        }
+
+        public void close()
+        {
+            reader.close();
+        }
+    }
+
+    public static ChecksumWriter checksumWriter(Descriptor desc)
+    {
+        return new ChecksumWriter(desc);
+    }
+
+    public static class ChecksumWriter implements Closeable
+    {
+        private final Checksum checksum = new PureJavaCrc32();
+        private final MessageDigest digest;
+        private final SequentialWriter writer;
+        private final Descriptor descriptor;
+
+        public ChecksumWriter(Descriptor desc)
+        {
+            this.descriptor = desc;
+            writer = SequentialWriter.open(new \
File(desc.filenameFor(Component.CRC)), true); +            try
+            {
+                digest = MessageDigest.getInstance("SHA-1");
+            }
+            catch (NoSuchAlgorithmException e)
+            {
+                // SHA-1 is standard in java 6
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void writeChunkSize(int length)
+        {
+            try
+            {
+                writer.stream.writeInt(length);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+
+        public void append(byte[] buffer, int start, int end)
+        {
+            try
+            {
+                checksum.update(buffer, start, end);
+                writer.stream.writeInt((int) checksum.getValue());
+                checksum.reset();
+
+                digest.update(buffer, start, end);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+
+        public void close()
+        {
+            FileUtils.closeQuietly(writer);
+            byte[] bytes = digest.digest();
+            if (bytes == null)
+                return;
+            SequentialWriter out = SequentialWriter.open(new \
File(descriptor.filenameFor(SSTable.COMPONENT_DIGEST)), true); +            // \
Writting output compatible with sha1sum +            Descriptor newdesc = \
descriptor.asTemporary(false); +            String[] tmp = \
newdesc.filenameFor(SSTable.COMPONENT_DATA).split(Pattern.quote(File.separator)); +   \
String dataFileName = tmp[tmp.length - 1]; +            try
+            {
+                out.write(String.format("%s  %s", Hex.bytesToHex(bytes), \
dataFileName).getBytes()); +            }
+            catch (ClosedChannelException e)
+            {
+                throw new AssertionError(); // can't happen.
+            }
+            finally
+            {
+                FileUtils.closeQuietly(out);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/util/SequentialWriter.java
                
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java \
b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 77d4fcf..b970c95 \
                100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -19,8 +19,6 @@ package org.apache.cassandra.io.util;
 
 import java.io.*;
 import java.nio.channels.ClosedChannelException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
@@ -60,7 +58,7 @@ public class SequentialWriter extends OutputStream
     private int bytesSinceTrickleFsync = 0;
 
     public final DataOutputStream stream;
-    private MessageDigest digest;
+    private DataIntegrityMetadata.ChecksumWriter metadata;
 
     public SequentialWriter(File file, int bufferSize, boolean skipIOCache)
     {
@@ -265,8 +263,8 @@ public class SequentialWriter extends OutputStream
             throw new FSWriteError(e, getPath());
         }
 
-        if (digest != null)
-            digest.update(buffer, 0, validBufferBytes);
+        if (metadata != null)
+            metadata.append(buffer, 0, validBufferBytes);
     }
 
     public long getFilePointer()
@@ -392,6 +390,7 @@ public class SequentialWriter extends OutputStream
             throw new FSWriteError(e, getPath());
         }
 
+        FileUtils.closeQuietly(metadata);
         CLibrary.tryCloseFD(directoryFD);
     }
 
@@ -400,34 +399,12 @@ public class SequentialWriter extends OutputStream
      * This can only be called before any data is written to this write,
      * otherwise an IllegalStateException is thrown.
      */
-    public void setComputeDigest()
+    public void setDataIntegratyWriter(DataIntegrityMetadata.ChecksumWriter writer)
     {
         if (current != 0)
             throw new IllegalStateException();
-
-        try
-        {
-            digest = MessageDigest.getInstance("SHA-1");
-        }
-        catch (NoSuchAlgorithmException e)
-        {
-            // SHA-1 is standard in java 6
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Return the digest associated to this file or null if no digest was
-     * created.
-     * This can only be called once the file is fully created, i.e. after
-     * close() has been called. Otherwise an IllegalStateException is thrown.
-     */
-    public byte[] digest()
-    {
-        if (buffer != null)
-            throw new IllegalStateException();
-
-        return digest == null ? null : digest.digest();
+        metadata = writer;
+        metadata.writeChunkSize(buffer.length);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/streaming/FileStreamTask.java
                
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java \
b/src/java/org/apache/cassandra/streaming/FileStreamTask.java index 67d5c35..8472d54 \
                100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -27,6 +27,10 @@ import org.slf4j.LoggerFactory;
 
 import com.ning.compress.lzf.LZFOutputStream;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.DataIntegrityMetadata;
+import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.metrics.StreamingMetrics;
@@ -41,7 +45,7 @@ public class FileStreamTask extends WrappedRunnable
 {
     private static final Logger logger = \
LoggerFactory.getLogger(FileStreamTask.class);  
-    public static final int CHUNK_SIZE = 64 * 1024;
+    private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
     public static final int MAX_CONNECT_ATTEMPTS = 4;
 
     protected final StreamHeader header;
@@ -54,7 +58,7 @@ public class FileStreamTask extends WrappedRunnable
     private OutputStream compressedoutput;
     private DataInputStream input;
     // allocate buffer to use for transfers only once
-    private final byte[] transferBuffer = new byte[CHUNK_SIZE];
+    private byte[] transferBuffer;
     // outbound global throughput limiter
     protected final Throttle throttle;
     private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler();
@@ -140,6 +144,11 @@ public class FileStreamTask extends WrappedRunnable
 
         // try to skip kernel page cache if possible
         RandomAccessReader file = RandomAccessReader.open(new \
File(header.file.getFilename()), true); +        Descriptor desc = \
Descriptor.fromFilename(header.file.getFilename()); +        ChecksumValidator \
metadata = null; +        if (new File(desc.filenameFor(Component.CRC)).exists())
+            metadata = DataIntegrityMetadata.checksumValidator(desc);
+        transferBuffer = metadata == null ? new byte[DEFAULT_CHUNK_SIZE] : new \
byte[metadata.chunkSize];  
         // setting up data compression stream
         compressedoutput = new LZFOutputStream(output);
@@ -151,21 +160,26 @@ public class FileStreamTask extends WrappedRunnable
             // stream each of the required sections of the file
             for (Pair<Long, Long> section : header.file.sections)
             {
+                long start = metadata == null ? section.left : \
metadata.chunkStart(section.left); +                int skipBytes = (int) \
(section.left - start);  // seek to the beginning of the section
-                file.seek(section.left);
+                file.seek(start);
+                if (metadata != null)
+                    metadata.seek(start);
 
-                // length of the section to stream
-                long length = section.right - section.left;
+                // length of the section to read
+                long length = section.right - start;
                 // tracks write progress
                 long bytesTransferred = 0;
 
                 while (bytesTransferred < length)
                 {
-                    long lastWrite = write(file, length, bytesTransferred);
+                    long lastWrite = write(file, metadata, skipBytes, length, \
bytesTransferred);  bytesTransferred += lastWrite;
                     totalBytesTransferred += lastWrite;
                     // store streaming progress
                     header.file.progress += lastWrite;
+                    skipBytes = 0;
                 }
 
                 // make sure that current section is send
@@ -203,6 +217,8 @@ public class FileStreamTask extends WrappedRunnable
      * Sequentially read bytes from the file and write them to the output stream
      *
      * @param reader The file reader to read from
+     * @param validator validator to verify data integrity
+     * @param start number of bytes to skip transfer, but include for validation.
      * @param length The full length that should be transferred
      * @param bytesTransferred Number of bytes remaining to transfer
      *
@@ -210,12 +226,16 @@ public class FileStreamTask extends WrappedRunnable
      *
      * @throws IOException on any I/O error
      */
-    protected long write(RandomAccessReader reader, long length, long \
bytesTransferred) throws IOException +    protected long write(RandomAccessReader \
reader, ChecksumValidator validator, int start, long length, long bytesTransferred) \
throws IOException  {
-        int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
+        int toTransfer = (int) Math.min(transferBuffer.length, length - \
bytesTransferred); +        int minReadable = (int) Math.min(transferBuffer.length, \
reader.length() - reader.getFilePointer());  
-        reader.readFully(transferBuffer, 0, toTransfer);
-        compressedoutput.write(transferBuffer, 0, toTransfer);
+        reader.readFully(transferBuffer, 0, minReadable);
+        if (validator != null)
+            validator.validate(transferBuffer, 0, minReadable);
+
+        compressedoutput.write(transferBuffer, start, (toTransfer - start));
         throttle.throttleDelta(toTransfer);
 
         return toTransfer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
                
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java \
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index \
                07b19ae..76d66b7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -25,6 +25,7 @@ import static org.apache.cassandra.Util.column;
 import static org.apache.cassandra.Util.addMutation;
 
 import java.net.InetAddress;
+import java.sql.Date;
 import java.util.*;
 
 import org.apache.cassandra.SchemaLoader;
@@ -117,6 +118,11 @@ public class StreamingTransferTest extends SchemaLoader
         List<Range<Token>> ranges = new ArrayList<Range<Token>>();
         ranges.add(new Range<Token>(p.getMinimumToken(), \
                p.getToken(ByteBufferUtil.bytes("key1"))));
         ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key2")), \
p.getMinimumToken())); +        transfer(table, sstable, ranges);
+    }
+
+    private void transfer(Table table, SSTableReader sstable, List<Range<Token>> \
ranges) throws Exception +    {
         StreamOutSession session = StreamOutSession.create(table.getName(), LOCAL, \
                (IStreamCallback)null);
         StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, \
OperationType.BOOTSTRAP);  session.await();
@@ -313,6 +319,42 @@ public class StreamingTransferTest extends SchemaLoader
         }
     }
 
+    @Test
+    public void testRandomSSTableTransfer() throws Exception
+    {
+        final Table table = Table.open("Keyspace1");
+        final ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+        Mutator mutator = new Mutator()
+        {
+            public void mutate(String key, String colName, long timestamp) throws \
Exception +            {
+                RowMutation rm = new RowMutation("Keyspace1", \
ByteBufferUtil.bytes(key)); +                ColumnFamily cf = \
ColumnFamily.create(table.getName(), cfs.name); +                \
cf.addColumn(column(colName, "value", timestamp)); +                cf.addColumn(new \
Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(new \
Date(timestamp).toString()), timestamp)); +                rm.add(cf);
+                logger.debug("Applying row to transfer " + rm);
+                rm.apply();
+            }
+        };
+        // write a lot more data so the data is spread in more than 1 chunk.
+        for (int i = 1; i <= 6000; i++)
+            mutator.mutate("key" + i, "col" + i, System.currentTimeMillis());
+        cfs.forceBlockingFlush();
+        Util.compactAll(cfs).get();
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+        cfs.clearUnsafe();
+
+        IPartitioner p = StorageService.getPartitioner();
+        List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+        ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), \
p.getToken(ByteBufferUtil.bytes("key1000")))); +        ranges.add(new \
Range<Token>(p.getToken(ByteBufferUtil.bytes("key5")), \
p.getToken(ByteBufferUtil.bytes("key500")))); +        ranges.add(new \
Range<Token>(p.getToken(ByteBufferUtil.bytes("key9")), \
p.getToken(ByteBufferUtil.bytes("key900")))); +        transfer(table, sstable, \
ranges); +        assertEquals(1, cfs.getSSTables().size());
+        assertEquals(7, Util.getRangeSlice(cfs).size());
+    }
+
     public interface Mutator
     {
         public void mutate(String key, String col, long timestamp) throws Exception;


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic