[prev in list] [next in list] [prev in thread] [next in thread]
List: cassandra-commits
Subject: [5/5] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
From: aleksey () apache ! org
Date: 2017-09-30 10:03:38
Message-ID: d4c7fd006bd5445cb6dfd45fa2229608 () git ! apache ! org
[Download RAW message or body]
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0db88242
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0db88242
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0db88242
Branch: refs/heads/trunk
Commit: 0db88242c66d3a7193a9ad836f9a515b3ac7f9fa
Parents: 77abf86 e819fec
Author: Aleksey Yeschenko <aleksey@yeschenko.com>
Authored: Sat Sep 30 11:02:41 2017 +0100
Committer: Aleksey Yeschenko <aleksey@yeschenko.com>
Committed: Sat Sep 30 11:02:41 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 5 +
.../cassandra/db/PartitionRangeReadCommand.java | 14 ++
.../org/apache/cassandra/db/ReadCommand.java | 2 +
.../db/SinglePartitionReadCommand.java | 5 +
.../apache/cassandra/db/filter/DataLimits.java | 5 +-
.../UnfilteredPartitionIterators.java | 19 +-
.../apache/cassandra/service/DataResolver.java | 239 ++++++++++++++-----
7 files changed, 217 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1495c5d,61f3405..d8ba7f7
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -155,7 -11,11 +155,12 @@@
* Duplicate the buffer before passing it to analyser in SASI operation \
(CASSANDRA-13512)
* Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
Merged from 3.0:
+ * Implement short read protection on partition boundaries (CASSANDRA-13595)
+ * Fix ISE thrown by UPI.Serializer.hasNext() for some SELECT queries \
(CASSANDRA-13911) + * Filter header only commit logs before recovery \
(CASSANDRA-13918)
* AssertionError prepending to a list (CASSANDRA-13149)
++ * AssertionError prepending to a list (CASSANDRA-13149)
+ * Fix support for SuperColumn tables (CASSANDRA-12373)
* Handle limit correctly on tables with strict liveness (CASSANDRA-13883)
* Remove non-rpc-ready nodes from counter leader candidates (CASSANDRA-13043)
* Improve short read protection performance (CASSANDRA-13794)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0db88242/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 6f5fada,111d561..d4c77d1
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -522,18 -562,94 +561,94 @@@ public class DataResolver extends Respo
return Transformation.apply(MoreRows.extend(partition, protection), \
protection); }
- private class ShortReadRowProtection extends Transformation implements \
MoreRows<UnfilteredRowIterator> + /*
+ * We only get here once all the rows and partitions in this iterator have \
been iterated over, and so + * if the node had returned the requested number \
of rows but we still get here, then some results were + * skipped during \
reconciliation. + */
+ public UnfilteredPartitionIterator moreContents()
+ {
+ // never try to request additional partitions from replicas if our \
reconciled partitions are already filled to the limit + assert \
!mergedResultCounter.isDone(); +
+ // we do not apply short read protection when we have no limits at all
+ assert !command.limits().isUnlimited();
+
+ /*
+ * If this is a single partition read command or an (indexed) partition \
range read command with + * a partition key specified, then we can't and \
shouldn't try fetch more partitions. + */
+ assert !command.isLimitedToOnePartition();
+
+ /*
+ * If the returned result doesn't have enough rows/partitions to \
satisfy even the original limit, don't ask for more. + *
+ * Can only take the short cut if there is no per partition limit set. \
Otherwise it's possible to hit false + * positives due to some rows \
being uncounted for in certain scenarios (see CASSANDRA-13911). + */
+ if (!singleResultCounter.isDone() && \
command.limits().perPartitionCount() == DataLimits.NO_LIMIT) + return \
null; +
+ /*
+ * Either we had an empty iterator as the initial response, or our \
moreContents() call got us an empty iterator. + * There is no point to \
ask the replica for more rows - it has no more in the requested range. + \
*/ + if (!partitionsFetched)
+ return null;
+ partitionsFetched = false;
+
+ /*
+ * We are going to fetch one partition at a time for thrift and \
potentially more for CQL. + * The row limit will either be set to the \
per partition limit - if the command has no total row limit set, or + * \
the total # of rows remaining - if it has some. If we don't grab enough rows in some \
of the partitions, + * then future \
ShortReadRowsProtection.moreContents() calls will fetch the missing ones. + \
*/ + int toQuery = command.limits().count() != DataLimits.NO_LIMIT
+ ? command.limits().count() - counted(mergedResultCounter)
+ : command.limits().perPartitionCount();
+
- ColumnFamilyStore.metricsFor(command.metadata().cfId).shortReadProtectionRequests.mark();
++ ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
+ Tracing.trace("Requesting {} extra rows from {} for short read \
protection", toQuery, source); +
+ PartitionRangeReadCommand cmd = \
makeFetchAdditionalPartitionReadCommand(toQuery); + return \
executeReadCommand(cmd); + }
+
+ // Counts the number of rows for regular queries and the number of groups \
for GROUP BY queries + private int counted(Counter counter)
+ {
+ return command.limits().isGroupByLimit()
+ ? counter.rowCounted()
+ : counter.counted();
+ }
+
+ private PartitionRangeReadCommand \
makeFetchAdditionalPartitionReadCommand(int toQuery) + {
+ PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
+
+ DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
+
+ AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
+ AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight()
+ ? new \
Range<>(lastPartitionKey, bounds.right) + \
: new ExcludingBounds<>(lastPartitionKey, bounds.right); + DataRange \
newDataRange = cmd.dataRange().forSubRange(newBounds); +
+ return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
+ }
+
+ private class ShortReadRowsProtection extends Transformation implements \
MoreRows<UnfilteredRowIterator> {
- private final CFMetaData metadata;
+ private final TableMetadata metadata;
private final DecoratedKey partitionKey;
- private Clustering lastClustering;
+ private Clustering lastClustering; // clustering of the last observed \
row
private int lastCounted = 0; // last seen recorded # before attempting \
to fetch more rows
private int lastFetched = 0; // # rows returned by last attempt to get \
more (or by the original read command)
private int lastQueried = 0; // # extra rows requested from the replica \
last time
- private ShortReadRowProtection(TableMetadata metadata, DecoratedKey \
partitionKey)
- private ShortReadRowsProtection(CFMetaData metadata, DecoratedKey \
partitionKey) ++ private ShortReadRowsProtection(TableMetadata metadata, \
DecoratedKey partitionKey) {
this.metadata = metadata;
this.partitionKey = partitionKey;
@@@ -641,12 -751,13 +750,13 @@@
*
* See CASSANDRA-13794 for more details.
*/
- lastQueried = Math.max(Math.min(command.limits().count(), \
command.limits().perPartitionCount()), 8); + lastQueried = \
Math.min(command.limits().count(), command.limits().perPartitionCount());
- ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark();
+ ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark();
Tracing.trace("Requesting {} extra rows from {} for short read \
protection", lastQueried, source);
- return \
executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried)); + \
SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried); + \
return UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd), cmd); }
// Counts the number of rows for regular queries and the number of \
groups for GROUP BY queries @@@ -669,24 -781,25 +779,25 @@@
command.rowFilter(),
\
command.limits().forShortReadRetry(toQuery), partitionKey,
- filter);
+ filter,
+ command.indexMetadata());
}
+ }
- private UnfilteredRowIterator \
executeReadCommand(SinglePartitionReadCommand cmd)
- {
- DataResolver resolver = new DataResolver(keyspace, cmd, \
ConsistencyLevel.ONE, 1, queryStartNanoTime);
- ReadCallback handler = new ReadCallback(resolver, \
ConsistencyLevel.ONE, cmd, Collections.singletonList(source), \
queryStartNanoTime);
-
- if (StorageProxy.canDoLocalRequest(source))
- StageManager.getStage(Stage.READ).maybeExecuteImmediately(new \
StorageProxy.LocalReadRunnable(cmd, handler));
- else
- \
MessagingService.instance().sendRRWithFailure(cmd.createMessage(), \
source, handler);
-
- // We don't call handler.get() because we want to preserve \
tombstones since we're still in the middle of merging node results.
- handler.awaitResults();
- assert resolver.responses.size() == 1;
- return \
UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), \
cmd);
- }
+ private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd)
+ {
+ DataResolver resolver = new DataResolver(keyspace, cmd, \
ConsistencyLevel.ONE, 1, queryStartNanoTime); + ReadCallback handler = \
new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, \
Collections.singletonList(source), queryStartNanoTime); +
+ if (StorageProxy.canDoLocalRequest(source))
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new \
StorageProxy.LocalReadRunnable(cmd, handler)); + else
- MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), \
source, handler); ++ \
MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source, handler); \
+ + // We don't call handler.get() because we want to preserve \
tombstones since we're still in the middle of merging node results. + \
handler.awaitResults(); + assert resolver.responses.size() == 1;
+ return resolver.responses.get(0).payload.makeIterator(command);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic