[prev in list] [next in list] [prev in thread] [next in thread] 

List:       cassandra-commits
Subject:    [5/5] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
From:       aleksey () apache ! org
Date:       2017-09-30 10:03:38
Message-ID: d4c7fd006bd5445cb6dfd45fa2229608 () git ! apache ! org
[Download RAW message or body]

Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0db88242
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0db88242
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0db88242

Branch: refs/heads/trunk
Commit: 0db88242c66d3a7193a9ad836f9a515b3ac7f9fa
Parents: 77abf86 e819fec
Author: Aleksey Yeschenko <aleksey@yeschenko.com>
Authored: Sat Sep 30 11:02:41 2017 +0100
Committer: Aleksey Yeschenko <aleksey@yeschenko.com>
Committed: Sat Sep 30 11:02:41 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   5 +
 .../cassandra/db/PartitionRangeReadCommand.java |  14 ++
 .../org/apache/cassandra/db/ReadCommand.java    |   2 +
 .../db/SinglePartitionReadCommand.java          |   5 +
 .../apache/cassandra/db/filter/DataLimits.java  |   5 +-
 .../UnfilteredPartitionIterators.java           |  19 +-
 .../apache/cassandra/service/DataResolver.java  | 239 ++++++++++++++-----
 7 files changed, 217 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1495c5d,61f3405..d8ba7f7
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -155,7 -11,11 +155,12 @@@
   * Duplicate the buffer before passing it to analyser in SASI operation \
                (CASSANDRA-13512)
   * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
  Merged from 3.0:
+  * Implement short read protection on partition boundaries (CASSANDRA-13595)
+  * Fix ISE thrown by UPI.Serializer.hasNext() for some SELECT queries \
(CASSANDRA-13911) +  * Filter header only commit logs before recovery \
                (CASSANDRA-13918)
   * AssertionError prepending to a list (CASSANDRA-13149)
++ * AssertionError prepending to a list (CASSANDRA-13149)
+  * Fix support for SuperColumn tables (CASSANDRA-12373)
   * Handle limit correctly on tables with strict liveness (CASSANDRA-13883)
   * Remove non-rpc-ready nodes from counter leader candidates (CASSANDRA-13043)
   * Improve short read protection performance (CASSANDRA-13794)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
                
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/src/java/org/apache/cassandra/db/ReadCommand.java
                
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
                
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/src/java/org/apache/cassandra/db/filter/DataLimits.java
                
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
                
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/src/java/org/apache/cassandra/service/DataResolver.java
                
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 6f5fada,111d561..d4c77d1
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -522,18 -562,94 +561,94 @@@ public class DataResolver extends Respo
              return Transformation.apply(MoreRows.extend(partition, protection), \
protection);  }
  
-         private class ShortReadRowProtection extends Transformation implements \
MoreRows<UnfilteredRowIterator> +         /*
+          * We only get here once all the rows and partitions in this iterator have \
been iterated over, and so +          * if the node had returned the requested number \
of rows but we still get here, then some results were +          * skipped during \
reconciliation. +          */
+         public UnfilteredPartitionIterator moreContents()
+         {
+             // never try to request additional partitions from replicas if our \
reconciled partitions are already filled to the limit +             assert \
!mergedResultCounter.isDone(); + 
+             // we do not apply short read protection when we have no limits at all
+             assert !command.limits().isUnlimited();
+ 
+             /*
+              * If this is a single partition read command or an (indexed) partition \
range read command with +              * a partition key specified, then we can't and \
shouldn't try fetch more partitions. +              */
+             assert !command.isLimitedToOnePartition();
+ 
+             /*
+              * If the returned result doesn't have enough rows/partitions to \
satisfy even the original limit, don't ask for more. +              *
+              * Can only take the short cut if there is no per partition limit set. \
Otherwise it's possible to hit false +              * positives due to some rows \
being uncounted for in certain scenarios (see CASSANDRA-13911). +              */
+             if (!singleResultCounter.isDone() && \
command.limits().perPartitionCount() == DataLimits.NO_LIMIT) +                 return \
null; + 
+             /*
+              * Either we had an empty iterator as the initial response, or our \
moreContents() call got us an empty iterator. +              * There is no point to \
ask the replica for more rows - it has no more in the requested range. +              \
*/ +             if (!partitionsFetched)
+                 return null;
+             partitionsFetched = false;
+ 
+             /*
+              * We are going to fetch one partition at a time for thrift and \
potentially more for CQL. +              * The row limit will either be set to the \
per partition limit - if the command has no total row limit set, or +              * \
the total # of rows remaining - if it has some. If we don't grab enough rows in some \
of the partitions, +              * then future \
ShortReadRowsProtection.moreContents() calls will fetch the missing ones. +           \
*/ +             int toQuery = command.limits().count() != DataLimits.NO_LIMIT
+                         ? command.limits().count() - counted(mergedResultCounter)
+                         : command.limits().perPartitionCount();
+ 
 -            ColumnFamilyStore.metricsFor(command.metadata().cfId).shortReadProtectionRequests.mark();
 ++            ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
 +             Tracing.trace("Requesting {} extra rows from {} for short read \
protection", toQuery, source); + 
+             PartitionRangeReadCommand cmd = \
makeFetchAdditionalPartitionReadCommand(toQuery); +             return \
executeReadCommand(cmd); +         }
+ 
+         // Counts the number of rows for regular queries and the number of groups \
for GROUP BY queries +         private int counted(Counter counter)
+         {
+             return command.limits().isGroupByLimit()
+                  ? counter.rowCounted()
+                  : counter.counted();
+         }
+ 
+         private PartitionRangeReadCommand \
makeFetchAdditionalPartitionReadCommand(int toQuery) +         {
+             PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
+ 
+             DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
+ 
+             AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
+             AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight()
+                                                         ? new \
Range<>(lastPartitionKey, bounds.right) +                                             \
: new ExcludingBounds<>(lastPartitionKey, bounds.right); +             DataRange \
newDataRange = cmd.dataRange().forSubRange(newBounds); + 
+             return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
+         }
+ 
+         private class ShortReadRowsProtection extends Transformation implements \
MoreRows<UnfilteredRowIterator>  {
 -            private final CFMetaData metadata;
 +            private final TableMetadata metadata;
              private final DecoratedKey partitionKey;
  
-             private Clustering lastClustering;
+             private Clustering lastClustering; // clustering of the last observed \
row  
              private int lastCounted = 0; // last seen recorded # before attempting \
                to fetch more rows
              private int lastFetched = 0; // # rows returned by last attempt to get \
                more (or by the original read command)
              private int lastQueried = 0; // # extra rows requested from the replica \
last time  
-             private ShortReadRowProtection(TableMetadata metadata, DecoratedKey \
                partitionKey)
 -            private ShortReadRowsProtection(CFMetaData metadata, DecoratedKey \
partitionKey) ++            private ShortReadRowsProtection(TableMetadata metadata, \
DecoratedKey partitionKey)  {
                  this.metadata = metadata;
                  this.partitionKey = partitionKey;
@@@ -641,12 -751,13 +750,13 @@@
                   *
                   * See CASSANDRA-13794 for more details.
                   */
-                 lastQueried = Math.max(Math.min(command.limits().count(), \
command.limits().perPartitionCount()), 8); +                 lastQueried = \
Math.min(command.limits().count(), command.limits().perPartitionCount());  
 -                ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark();
  +                ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark();
                
                  Tracing.trace("Requesting {} extra rows from {} for short read \
protection", lastQueried, source);  
-                 return \
executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried)); +                \
SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried); +   \
return UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd), cmd);  }
  
              // Counts the number of rows for regular queries and the number of \
groups for GROUP BY queries @@@ -669,24 -781,25 +779,25 @@@
                                                           command.rowFilter(),
                                                           \
command.limits().forShortReadRetry(toQuery),  partitionKey,
-                                                          filter);
+                                                          filter,
+                                                          command.indexMetadata());
              }
+         }
  
-             private UnfilteredRowIterator \
                executeReadCommand(SinglePartitionReadCommand cmd)
-             {
-                 DataResolver resolver = new DataResolver(keyspace, cmd, \
                ConsistencyLevel.ONE, 1, queryStartNanoTime);
-                 ReadCallback handler = new ReadCallback(resolver, \
                ConsistencyLevel.ONE, cmd, Collections.singletonList(source), \
                queryStartNanoTime);
- 
-                 if (StorageProxy.canDoLocalRequest(source))
-                     StageManager.getStage(Stage.READ).maybeExecuteImmediately(new \
                StorageProxy.LocalReadRunnable(cmd, handler));
-                 else
-                     \
                MessagingService.instance().sendRRWithFailure(cmd.createMessage(), \
                source, handler);
- 
-                 // We don't call handler.get() because we want to preserve \
                tombstones since we're still in the middle of merging node results.
-                 handler.awaitResults();
-                 assert resolver.responses.size() == 1;
-                 return \
UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), \
                cmd);
-             }
+         private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd)
+         {
+             DataResolver resolver = new DataResolver(keyspace, cmd, \
ConsistencyLevel.ONE, 1, queryStartNanoTime); +             ReadCallback handler = \
new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, \
Collections.singletonList(source), queryStartNanoTime); + 
+             if (StorageProxy.canDoLocalRequest(source))
+                 StageManager.getStage(Stage.READ).maybeExecuteImmediately(new \
StorageProxy.LocalReadRunnable(cmd, handler)); +             else
 -                MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), \
source, handler); ++                \
MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source, handler); \
+  +             // We don't call handler.get() because we want to preserve \
tombstones since we're still in the middle of merging node results. +             \
handler.awaitResults(); +             assert resolver.responses.size() == 1;
+             return resolver.responses.get(0).payload.makeIterator(command);
          }
      }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic