[prev in list] [next in list] [prev in thread] [next in thread] 

List:       drill-commits
Subject:    [14/16] git commit: DRILL-1081: Always return OK_NEW_SCHEMA on first batch in Streaming Agg
From:       jacques () apache ! org
Date:       2014-06-26 20:45:14
Message-ID: 431bafda6b374f7cbf84fcab91f1a657 () git ! apache ! org
[Download RAW message or body]

DRILL-1081: Always return OK_NEW_SCHEMA on first batch in Streaming Agg


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d5967c56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d5967c56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d5967c56

Branch: refs/heads/master
Commit: d5967c564b4cce933210d2e673f128431c632441
Parents: b8d4576
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Wed Jun 25 20:00:25 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu Jun 26 09:02:24 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/aggregate/StreamingAggBatch.java   | 2 ++
 .../exec/physical/impl/aggregate/StreamingAggTemplate.java      | 5 ++++-
 .../physical/impl/validate/IteratorValidatorBatchIterator.java  | 4 ++++
 3 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d5967c56/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
                
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java \
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 index b587ad1..2f71bf9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
                
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 @@ -122,6 +122,8 @@ public class StreamingAggBatch extends \
AbstractRecordBatch<StreamingAggregate> {  first = false;
           done = true;
           return IterOutcome.OK_NEW_SCHEMA;
+        } else if (outcome == IterOutcome.OK && first) {
+          outcome = IterOutcome.OK_NEW_SCHEMA;
         }
         first = false;
         return outcome;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d5967c56/exec/java-exec/sr \
                c/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
                
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java \
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
 index 4d6e7c4..8a9ba3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
                
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
 @@ -170,6 +170,9 @@ public abstract class StreamingAggTemplate implements \
                StreamingAggregator {
                 if(EXTRA_DEBUG) logger.debug("Received no more batches, \
returning.");  return setOkAndReturn();
               }else{
+                if (first && out == IterOutcome.OK) {
+                  out = IterOutcome.OK_NEW_SCHEMA;
+                }
                 outcome = out;
                 return AggOutcome.CLEANUP_AND_RETURN;
               }
@@ -195,7 +198,7 @@ public abstract class StreamingAggTemplate implements \
StreamingAggregator {  if(incoming.getRecordCount() == 0){
                 continue;
               }else{
-                if(isSamePrev(previousIndex , previous, currentIndex)){
+                if(previousIndex != -1 && isSamePrev(previousIndex , previous, \
                currentIndex)){
                   if(EXTRA_DEBUG) logger.debug("New value was same as last value of \
previous batch, adding.");  addRecordInc(currentIndex);
                   previousIndex = currentIndex;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d5967c56/exec/java-exec/sr \
c/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
                
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java \
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
 index ee8f37a..c8e9c60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
                
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
 @@ -21,6 +21,7 @@ import java.util.Iterator;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
@@ -113,6 +114,9 @@ public class IteratorValidatorBatchIterator implements \
RecordBatch {  if (first && state == IterOutcome.NONE) {
       throw new IllegalStateException("The incoming iterator returned a state of \
NONE on the first batch. There should always be at least one batch output before \
returning NONE");  }
+    if (first && state == IterOutcome.OK) {
+      throw new IllegalStateException("The incoming iterator returned a state of OK \
on the first batch. There should always be a new schema on the first batch. Incoming: \
" + incoming.getClass().getName()); +    }
     if (first) first = !first;
 
     if(state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic