[prev in list] [next in list] [prev in thread] [next in thread]
List: drill-commits
Subject: [3/3] git commit: DRILL-1346: Use HBase table size information to improve scan parallelization
From: jacques () apache ! org
Date: 2014-08-29 23:34:02
Message-ID: 881954a24da248c5bf8df327042bde5d () git ! apache ! org
[Download RAW message or body]
DRILL-1346: Use HBase table size information to improve scan parallelization
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/64e43d2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/64e43d2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/64e43d2b
Branch: refs/heads/master
Commit: 64e43d2b770cb420b8f4fa9cf390b07efa5da44e
Parents: c241137
Author: Aditya Kishore <aditya@maprtech.com>
Authored: Fri Aug 29 21:31:07 2014 +0530
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Fri Aug 29 14:59:04 2014 -0700
----------------------------------------------------------------------
.../drill/exec/store/hbase/HBaseGroupScan.java | 20 ++-
.../drill/exec/store/hbase/HBaseScanSpec.java | 5 +-
.../exec/store/hbase/TableStatsCalculator.java | 179 +++++++++++++++++++
.../src/main/resources/drill-module.conf | 6 +-
.../apache/drill/hbase/TestTableGenerator.java | 45 +++++
5 files changed, 246 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java \
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 8301de1..d9a3cf9 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -97,6 +97,10 @@ public class HBaseGroupScan extends AbstractGroupScan implements \
DrillHBaseConst
private boolean filterPushedDown = false;
+ private TableStatsCalculator statsCalculator;
+
+ private long scanSizeInBytes = 0;
+
@JsonCreator
public HBaseGroupScan(@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
@JsonProperty("storage") HBaseStoragePluginConfig \
storagePluginConfig, @@ -126,6 +130,8 @@ public class HBaseGroupScan extends \
AbstractGroupScan implements DrillHBaseConst this.storagePluginConfig = \
that.storagePluginConfig; this.hTableDesc = that.hTableDesc;
this.filterPushedDown = that.filterPushedDown;
+ this.statsCalculator = that.statsCalculator;
+ this.scanSizeInBytes = that.scanSizeInBytes;
}
@Override
@@ -142,7 +148,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements \
DrillHBaseConst
HTable table = new HTable(storagePluginConfig.getHBaseConf(), \
hbaseScanSpec.getTableName()); this.hTableDesc = table.getTableDescriptor();
NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations();
- table.close();
+ statsCalculator = new TableStatsCalculator(table, hbaseScanSpec, \
storagePlugin.getContext().getConfig());
boolean foundStartRegion = false;
regionsToScan = new TreeMap<HRegionInfo, ServerName>();
@@ -153,10 +159,13 @@ public class HBaseGroupScan extends AbstractGroupScan \
implements DrillHBaseConst }
foundStartRegion = true;
regionsToScan.put(regionInfo, mapEntry.getValue());
+ scanSizeInBytes += \
statsCalculator.getRegionSizeInBytes(regionInfo.getRegionName());
if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length \
!= 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) { break;
}
}
+
+ table.close();
} catch (IOException e) {
throw new DrillRuntimeException("Error getting region info for table: " + \
hbaseScanSpec.getTableName(), e); }
@@ -342,11 +351,10 @@ public class HBaseGroupScan extends AbstractGroupScan \
implements DrillHBaseConst
@Override
public ScanStats getScanStats() {
- //TODO: look at stats for this.
- int rowCount = (hbaseScanSpec.getFilter() != null ? 5 : 10) * \
regionsToScan.size();
- int avgColumnSize = 10;
- int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size();
- return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, \
avgColumnSize * numColumns * rowCount); + int rowCount = (int) ((scanSizeInBytes \
/ statsCalculator.getAvgRowSizeInBytes()) * (hbaseScanSpec.getFilter() != null ? 0.5 \
: 1)); + // the following calculation is not precise since 'columns' could specify \
CFs while getColsPerRow() returns the number of qualifier. + float diskCost = \
scanSizeInBytes * ((columns == null || columns.isEmpty()) ? 1 : \
columns.size()/statsCalculator.getColsPerRow()); + return new \
ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost); }
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java \
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
index c2ee723..f9a585e 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.hbase;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;
@@ -68,11 +69,11 @@ public class HBaseScanSpec {
}
public byte[] getStartRow() {
- return startRow;
+ return startRow == null ? HConstants.EMPTY_START_ROW : startRow;
}
public byte[] getStopRow() {
- return stopRow;
+ return stopRow == null ? HConstants.EMPTY_START_ROW : stopRow;
}
@JsonIgnore
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java \
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
new file mode 100644
index 0000000..0ce9938
--- /dev/null
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerLoad;
+import org.apache.hadoop.hbase.HServerLoad.RegionLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Computes size of each region for given table.
+ */
+public class TableStatsCalculator {
+ static final org.slf4j.Logger logger = \
org.slf4j.LoggerFactory.getLogger(TableStatsCalculator.class); +
+ private static final String DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT = \
"drill.exec.hbase.scan.samplerows.count"; +
+ private static final String DRILL_EXEC_HBASE_SCAN_SIZECALCULATOR_ENABLED = \
"drill.exec.hbase.scan.sizecalculator.enabled"; +
+ private static final int DEFAULT_SAMPLE_SIZE = 100;
+
+ /**
+ * Maps each region to its size in bytes.
+ */
+ private Map<byte[], Long> sizeMap = null;
+
+ private int avgRowSizeInBytes;
+
+ private int colsPerRow;
+
+ /**
+ * Computes size of each region for table.
+ *
+ * @param table
+ * @param hbaseScanSpec
+ * @param drillConfig
+ * @throws IOException
+ */
+ public TableStatsCalculator(HTable table, HBaseScanSpec hbaseScanSpec, DrillConfig \
config) throws IOException { + HBaseAdmin admin = new \
HBaseAdmin(table.getConfiguration()); + try {
+ int rowsToSample = rowsToSample(config);
+ if (rowsToSample > 0) {
+ Scan scan = new Scan(hbaseScanSpec.getStartRow(), \
hbaseScanSpec.getStopRow()); + scan.setCaching(rowsToSample < \
DEFAULT_SAMPLE_SIZE ? rowsToSample : DEFAULT_SAMPLE_SIZE); + \
scan.setMaxVersions(1); + ResultScanner scanner = table.getScanner(scan);
+ int rowSizeSum = 0, numColumnsSum = 0, rowCount = 0;
+ for (; rowCount < rowsToSample; ++rowCount) {
+ Result row = scanner.next();
+ if (row == null) {
+ break;
+ }
+ numColumnsSum += row.size();
+ rowSizeSum += row.getBytes().getLength();
+ }
+ avgRowSizeInBytes = rowSizeSum/rowCount;
+ colsPerRow = numColumnsSum/rowCount;
+ scanner.close();
+ }
+
+ if (!enabled(config)) {
+ logger.info("Region size calculation disabled.");
+ return;
+ }
+
+ logger.info("Calculating region sizes for table \"" + new \
String(table.getTableName()) + "\"."); +
+ //get regions for table
+ Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet();
+ Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ for (HRegionInfo regionInfo : tableRegionInfos) {
+ tableRegions.add(regionInfo.getRegionName());
+ }
+
+ ClusterStatus clusterStatus = null;
+ try {
+ clusterStatus = admin.getClusterStatus();
+ } catch (Exception e) {
+ logger.debug(e.getMessage());
+ } finally {
+ if (clusterStatus == null) {
+ return;
+ }
+ }
+
+ sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
+ Collection<ServerName> servers = clusterStatus.getServers();
+ //iterate all cluster regions, filter regions from our table and compute their \
size + for (ServerName serverName : servers) {
+ HServerLoad serverLoad = clusterStatus.getLoad(serverName);
+
+ for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
+ byte[] regionId = regionLoad.getName();
+
+ if (tableRegions.contains(regionId)) {
+ long regionSizeMB = regionLoad.getMemStoreSizeMB() + \
regionLoad.getStorefileSizeMB(); + sizeMap.put(regionId, (regionSizeMB > 0 \
? regionSizeMB : 1) * (1024*1024)); + if (logger.isDebugEnabled()) {
+ logger.debug("Region " + regionLoad.getNameAsString() + " has size " + \
regionSizeMB + "MB"); + }
+ }
+ }
+ }
+ logger.debug("Region sizes calculated");
+ } finally {
+ admin.close();
+ }
+
+ }
+
+ private boolean enabled(DrillConfig config) {
+ return config.hasPath(DRILL_EXEC_HBASE_SCAN_SIZECALCULATOR_ENABLED)
+ ? config.getBoolean(DRILL_EXEC_HBASE_SCAN_SIZECALCULATOR_ENABLED) : true;
+ }
+
+ private int rowsToSample(DrillConfig config) {
+ return config.hasPath(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT)
+ ? config.getInt(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) : \
DEFAULT_SAMPLE_SIZE; + }
+
+ /**
+ * Returns size of given region in bytes. Returns 0 if region was not found.
+ */
+ public long getRegionSizeInBytes(byte[] regionId) {
+ if (sizeMap == null) {
+ return avgRowSizeInBytes*1024*1024; // 1 million rows
+ } else {
+ Long size = sizeMap.get(regionId);
+ if (size == null) {
+ logger.debug("Unknown region:" + Arrays.toString(regionId));
+ return 0;
+ } else {
+ return size;
+ }
+ }
+ }
+
+ public int getAvgRowSizeInBytes() {
+ return avgRowSizeInBytes;
+ }
+
+ public int getColsPerRow() {
+ return colsPerRow;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/resources/drill-module.conf \
b/contrib/storage-hbase/src/main/resources/drill-module.conf index 0edceaf..0f0a0c6 \
100644
--- a/contrib/storage-hbase/src/main/resources/drill-module.conf
+++ b/contrib/storage-hbase/src/main/resources/drill-module.conf
@@ -29,6 +29,10 @@ drill.exec: {
"hbase.zookeeper.property.clientPort" : 2181
}
}
- }
+ },
+ hbase.scan: {
+ samplerows.count: 100,
+ sizecalculator.enabled: true
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java \
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java \
index 3678c78..99862e0 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
@@ -18,6 +18,7 @@
package org.apache.drill.hbase;
import java.util.Arrays;
+import java.util.Random;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -107,4 +108,48 @@ public class TestTableGenerator {
table.close();
}
+ public static void generateHBaseDataset2(HBaseAdmin admin, String tableName, int \
numberRegions) throws Exception { + if (admin.tableExists(tableName)) {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ }
+
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor("f"));
+
+ if (numberRegions > 1) {
+ admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1));
+ } else {
+ admin.createTable(desc);
+ }
+
+ HTable table = new HTable(admin.getConfiguration(), tableName);
+
+ int rowCount = 0;
+ byte[] bytes = null;
+ final int numColumns = 5;
+ Random random = new Random();
+ int iteration = 0;
+ while (rowCount < 1000) {
+ char rowKeyChar = 'a';
+ for (int i = 0; i < numberRegions; i++) {
+ Put p = new Put((""+rowKeyChar+iteration).getBytes());
+ for (int j = 1; j <= numColumns; j++) {
+ bytes = new byte[5000]; random.nextBytes(bytes);
+ p.add("f".getBytes(), ("c"+j).getBytes(), bytes);
+ }
+ table.put(p);
+
+ ++rowKeyChar;
+ ++rowCount;
+ }
+ ++iteration;
+ }
+
+ table.flushCommits();
+ table.close();
+
+ admin.flush(tableName);
+ }
+
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic