[prev in list] [next in list] [prev in thread] [next in thread]
List: drill-commits
Subject: [14/16] git commit: DRILL-1081: Always return OK_NEW_SCHEMA on first batch in Streaming Agg
From: jacques () apache ! org
Date: 2014-06-26 20:45:14
Message-ID: 431bafda6b374f7cbf84fcab91f1a657 () git ! apache ! org
[Download RAW message or body]
DRILL-1081: Always return OK_NEW_SCHEMA on first batch in Streaming Agg
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d5967c56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d5967c56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d5967c56
Branch: refs/heads/master
Commit: d5967c564b4cce933210d2e673f128431c632441
Parents: b8d4576
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Wed Jun 25 20:00:25 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu Jun 26 09:02:24 2014 -0700
----------------------------------------------------------------------
.../drill/exec/physical/impl/aggregate/StreamingAggBatch.java | 2 ++
.../exec/physical/impl/aggregate/StreamingAggTemplate.java | 5 ++++-
.../physical/impl/validate/IteratorValidatorBatchIterator.java | 4 ++++
3 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d5967c56/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java \
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index b587ad1..2f71bf9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -122,6 +122,8 @@ public class StreamingAggBatch extends \
AbstractRecordBatch<StreamingAggregate> { first = false;
done = true;
return IterOutcome.OK_NEW_SCHEMA;
+ } else if (outcome == IterOutcome.OK && first) {
+ outcome = IterOutcome.OK_NEW_SCHEMA;
}
first = false;
return outcome;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d5967c56/exec/java-exec/sr \
c/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java \
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 4d6e7c4..8a9ba3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -170,6 +170,9 @@ public abstract class StreamingAggTemplate implements \
StreamingAggregator {
if(EXTRA_DEBUG) logger.debug("Received no more batches, \
returning."); return setOkAndReturn();
}else{
+ if (first && out == IterOutcome.OK) {
+ out = IterOutcome.OK_NEW_SCHEMA;
+ }
outcome = out;
return AggOutcome.CLEANUP_AND_RETURN;
}
@@ -195,7 +198,7 @@ public abstract class StreamingAggTemplate implements \
StreamingAggregator { if(incoming.getRecordCount() == 0){
continue;
}else{
- if(isSamePrev(previousIndex , previous, currentIndex)){
+ if(previousIndex != -1 && isSamePrev(previousIndex , previous, \
currentIndex)){
if(EXTRA_DEBUG) logger.debug("New value was same as last value of \
previous batch, adding."); addRecordInc(currentIndex);
previousIndex = currentIndex;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d5967c56/exec/java-exec/sr \
c/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java \
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index ee8f37a..c8e9c60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
@@ -113,6 +114,9 @@ public class IteratorValidatorBatchIterator implements \
RecordBatch { if (first && state == IterOutcome.NONE) {
throw new IllegalStateException("The incoming iterator returned a state of \
NONE on the first batch. There should always be at least one batch output before \
returning NONE"); }
+ if (first && state == IterOutcome.OK) {
+ throw new IllegalStateException("The incoming iterator returned a state of OK \
on the first batch. There should always be a new schema on the first batch. Incoming: \
" + incoming.getClass().getName()); + }
if (first) first = !first;
if(state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic