[prev in list] [next in list] [prev in thread] [next in thread] 

List:       drill-commits
Subject:    [3/3] git commit: DRILL-1346: Use HBase table size information to improve scan parallelization
From:       jacques () apache ! org
Date:       2014-08-29 23:34:02
Message-ID: 881954a24da248c5bf8df327042bde5d () git ! apache ! org
[Download RAW message or body]

DRILL-1346: Use HBase table size information to improve scan parallelization


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/64e43d2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/64e43d2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/64e43d2b

Branch: refs/heads/master
Commit: 64e43d2b770cb420b8f4fa9cf390b07efa5da44e
Parents: c241137
Author: Aditya Kishore <aditya@maprtech.com>
Authored: Fri Aug 29 21:31:07 2014 +0530
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Fri Aug 29 14:59:04 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/store/hbase/HBaseGroupScan.java  |  20 ++-
 .../drill/exec/store/hbase/HBaseScanSpec.java   |   5 +-
 .../exec/store/hbase/TableStatsCalculator.java  | 179 +++++++++++++++++++
 .../src/main/resources/drill-module.conf        |   6 +-
 .../apache/drill/hbase/TestTableGenerator.java  |  45 +++++
 5 files changed, 246 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
                
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java \
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
 index 8301de1..d9a3cf9 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
                
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
 @@ -97,6 +97,10 @@ public class HBaseGroupScan extends AbstractGroupScan implements \
DrillHBaseConst  
   private boolean filterPushedDown = false;
 
+  private TableStatsCalculator statsCalculator;
+
+  private long scanSizeInBytes = 0;
+
   @JsonCreator
   public HBaseGroupScan(@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
                         @JsonProperty("storage") HBaseStoragePluginConfig \
storagePluginConfig, @@ -126,6 +130,8 @@ public class HBaseGroupScan extends \
AbstractGroupScan implements DrillHBaseConst  this.storagePluginConfig = \
that.storagePluginConfig;  this.hTableDesc = that.hTableDesc;
     this.filterPushedDown = that.filterPushedDown;
+    this.statsCalculator = that.statsCalculator;
+    this.scanSizeInBytes = that.scanSizeInBytes;
   }
 
   @Override
@@ -142,7 +148,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements \
                DrillHBaseConst
       HTable table = new HTable(storagePluginConfig.getHBaseConf(), \
hbaseScanSpec.getTableName());  this.hTableDesc = table.getTableDescriptor();
       NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations();
-      table.close();
+      statsCalculator = new TableStatsCalculator(table, hbaseScanSpec, \
storagePlugin.getContext().getConfig());  
       boolean foundStartRegion = false;
       regionsToScan = new TreeMap<HRegionInfo, ServerName>();
@@ -153,10 +159,13 @@ public class HBaseGroupScan extends AbstractGroupScan \
implements DrillHBaseConst  }
         foundStartRegion = true;
         regionsToScan.put(regionInfo, mapEntry.getValue());
+        scanSizeInBytes += \
                statsCalculator.getRegionSizeInBytes(regionInfo.getRegionName());
         if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length \
!= 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) {  break;
         }
       }
+
+      table.close();
     } catch (IOException e) {
       throw new DrillRuntimeException("Error getting region info for table: " + \
hbaseScanSpec.getTableName(), e);  }
@@ -342,11 +351,10 @@ public class HBaseGroupScan extends AbstractGroupScan \
implements DrillHBaseConst  
   @Override
   public ScanStats getScanStats() {
-    //TODO: look at stats for this.
-    int rowCount = (hbaseScanSpec.getFilter() != null ? 5 : 10) * \
                regionsToScan.size();
-    int avgColumnSize = 10;
-    int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size();
-    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, \
avgColumnSize * numColumns * rowCount); +    int rowCount =  (int) ((scanSizeInBytes \
/ statsCalculator.getAvgRowSizeInBytes()) * (hbaseScanSpec.getFilter() != null ? 0.5 \
: 1)); +    // the following calculation is not precise since 'columns' could specify \
CFs while getColsPerRow() returns the number of qualifier. +    float diskCost = \
scanSizeInBytes * ((columns == null || columns.isEmpty()) ? 1 : \
columns.size()/statsCalculator.getColsPerRow()); +    return new \
ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost);  }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
                
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java \
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
 index c2ee723..f9a585e 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
                
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
 @@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.hbase;
 
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -68,11 +69,11 @@ public class HBaseScanSpec {
   }
 
   public byte[] getStartRow() {
-    return startRow;
+    return startRow == null ? HConstants.EMPTY_START_ROW : startRow;
   }
 
   public byte[] getStopRow() {
-    return stopRow;
+    return stopRow == null ? HConstants.EMPTY_START_ROW : stopRow;
   }
 
   @JsonIgnore

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
                
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java \
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
 new file mode 100644
index 0000000..0ce9938
--- /dev/null
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
 @@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerLoad;
+import org.apache.hadoop.hbase.HServerLoad.RegionLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Computes size of each region for given table.
+ */
+public class TableStatsCalculator {
+  static final org.slf4j.Logger logger = \
org.slf4j.LoggerFactory.getLogger(TableStatsCalculator.class); +
+  private static final String DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT = \
"drill.exec.hbase.scan.samplerows.count"; +
+  private static final String DRILL_EXEC_HBASE_SCAN_SIZECALCULATOR_ENABLED = \
"drill.exec.hbase.scan.sizecalculator.enabled"; +
+  private static final int DEFAULT_SAMPLE_SIZE = 100;
+
+  /**
+   * Maps each region to its size in bytes.
+   */
+  private Map<byte[], Long> sizeMap = null;
+
+  private int avgRowSizeInBytes;
+
+  private int colsPerRow;
+
+  /**
+   * Computes size of each region for table.
+   *
+   * @param table
+   * @param hbaseScanSpec 
+   * @param drillConfig 
+   * @throws IOException
+   */
+  public TableStatsCalculator(HTable table, HBaseScanSpec hbaseScanSpec, DrillConfig \
config) throws IOException { +    HBaseAdmin admin = new \
HBaseAdmin(table.getConfiguration()); +    try {
+      int rowsToSample = rowsToSample(config);
+      if (rowsToSample > 0) {
+        Scan scan = new Scan(hbaseScanSpec.getStartRow(), \
hbaseScanSpec.getStopRow()); +        scan.setCaching(rowsToSample < \
DEFAULT_SAMPLE_SIZE ? rowsToSample : DEFAULT_SAMPLE_SIZE); +        \
scan.setMaxVersions(1); +        ResultScanner scanner = table.getScanner(scan);
+        int rowSizeSum = 0, numColumnsSum = 0, rowCount = 0;
+        for (; rowCount < rowsToSample; ++rowCount) {
+          Result row = scanner.next();
+          if (row == null) {
+            break;
+          }
+          numColumnsSum += row.size();
+          rowSizeSum += row.getBytes().getLength();
+        }
+        avgRowSizeInBytes = rowSizeSum/rowCount;
+        colsPerRow = numColumnsSum/rowCount;
+        scanner.close();
+      }
+
+      if (!enabled(config)) {
+        logger.info("Region size calculation disabled.");
+        return;
+      }
+
+      logger.info("Calculating region sizes for table \"" + new \
String(table.getTableName()) + "\"."); +
+      //get regions for table
+      Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet();
+      Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+      for (HRegionInfo regionInfo : tableRegionInfos) {
+        tableRegions.add(regionInfo.getRegionName());
+      }
+
+      ClusterStatus clusterStatus = null;
+      try {
+        clusterStatus = admin.getClusterStatus();
+      } catch (Exception e) {
+        logger.debug(e.getMessage());
+      } finally {
+        if (clusterStatus == null) {
+          return;
+        }
+      }
+
+      sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
+      Collection<ServerName> servers = clusterStatus.getServers();
+      //iterate all cluster regions, filter regions from our table and compute their \
size +      for (ServerName serverName : servers) {
+        HServerLoad serverLoad = clusterStatus.getLoad(serverName);
+
+        for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
+          byte[] regionId = regionLoad.getName();
+
+          if (tableRegions.contains(regionId)) {
+            long regionSizeMB = regionLoad.getMemStoreSizeMB() + \
regionLoad.getStorefileSizeMB(); +            sizeMap.put(regionId, (regionSizeMB > 0 \
? regionSizeMB : 1) * (1024*1024)); +            if (logger.isDebugEnabled()) {
+              logger.debug("Region " + regionLoad.getNameAsString() + " has size " + \
regionSizeMB + "MB"); +            }
+          }
+        }
+      }
+      logger.debug("Region sizes calculated");
+    } finally {
+      admin.close();
+    }
+
+  }
+
+  private boolean enabled(DrillConfig config) {
+    return config.hasPath(DRILL_EXEC_HBASE_SCAN_SIZECALCULATOR_ENABLED)
+        ? config.getBoolean(DRILL_EXEC_HBASE_SCAN_SIZECALCULATOR_ENABLED) : true;
+  }
+
+  private int rowsToSample(DrillConfig config) {
+    return config.hasPath(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT)
+        ? config.getInt(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) : \
DEFAULT_SAMPLE_SIZE; +  }
+
+  /**
+   * Returns size of given region in bytes. Returns 0 if region was not found.
+   */
+  public long getRegionSizeInBytes(byte[] regionId) {
+    if (sizeMap == null) {
+      return avgRowSizeInBytes*1024*1024; // 1 million rows
+    } else {
+      Long size = sizeMap.get(regionId);
+      if (size == null) {
+        logger.debug("Unknown region:" + Arrays.toString(regionId));
+        return 0;
+      } else {
+        return size;
+      }
+    }
+  }
+
+  public int getAvgRowSizeInBytes() {
+    return avgRowSizeInBytes;
+  }
+
+  public int getColsPerRow() {
+    return colsPerRow;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/main/resources/drill-module.conf
                
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/resources/drill-module.conf \
b/contrib/storage-hbase/src/main/resources/drill-module.conf index 0edceaf..0f0a0c6 \
                100644
--- a/contrib/storage-hbase/src/main/resources/drill-module.conf
+++ b/contrib/storage-hbase/src/main/resources/drill-module.conf
@@ -29,6 +29,10 @@ drill.exec: {
         "hbase.zookeeper.property.clientPort" : 2181
       }
     }
-  }
+  },
 
+  hbase.scan: {
+    samplerows.count: 100,
+    sizecalculator.enabled: true
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
                
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java \
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java \
                index 3678c78..99862e0 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
                
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
 @@ -18,6 +18,7 @@
 package org.apache.drill.hbase;
 
 import java.util.Arrays;
+import java.util.Random;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -107,4 +108,48 @@ public class TestTableGenerator {
     table.close();
   }
 
+  public static void generateHBaseDataset2(HBaseAdmin admin, String tableName, int \
numberRegions) throws Exception { +    if (admin.tableExists(tableName)) {
+      admin.disableTable(tableName);
+      admin.deleteTable(tableName);
+    }
+
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(new HColumnDescriptor("f"));
+
+    if (numberRegions > 1) {
+      admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1));
+    } else {
+      admin.createTable(desc);
+    }
+
+    HTable table = new HTable(admin.getConfiguration(), tableName);
+
+    int rowCount = 0;
+    byte[] bytes = null;
+    final int numColumns = 5;
+    Random random = new Random();
+    int iteration = 0;
+    while (rowCount < 1000) {
+      char rowKeyChar = 'a';
+      for (int i = 0; i < numberRegions; i++) {
+        Put p = new Put((""+rowKeyChar+iteration).getBytes());
+        for (int j = 1; j <= numColumns; j++) {
+          bytes = new byte[5000]; random.nextBytes(bytes);
+          p.add("f".getBytes(), ("c"+j).getBytes(), bytes);
+        }
+        table.put(p);
+
+        ++rowKeyChar;
+        ++rowCount;
+      }
+      ++iteration;
+    }
+
+    table.flushCommits();
+    table.close();
+    
+    admin.flush(tableName);
+  }
+
 }


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic