Skip to content

Commit

Permalink
Merge branch 'main' into 2024_09_28
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyuan authored Sep 28, 2024
2 parents 0fda928 + b9fbb47 commit 8daed74
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,20 @@ case class CHInputPartitionsUtil(
.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

val totalCores = SparkResourceUtil.getTotalCores(relation.sparkSession.sessionState.conf)
val fileCntPerPartition = math.ceil((splitFiles.size * 1.0) / totalCores).toInt
val isAllSmallFiles = splitFiles.forall(_.length < maxSplitBytes)
val fileCntThreshold = relation.sparkSession.sessionState.conf
.getConfString(
CHBackendSettings.GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD,
CHBackendSettings.GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD_DEFAULT
)
.toInt

if (fileCntThreshold > 0 && fileCntPerPartition > fileCntThreshold) {
getFilePartitionsByFileCnt(splitFiles, fileCntPerPartition)
// calculate the file count for each partition according to the parameter
val totalFilesThreshold = totalCores * fileCntThreshold
if (fileCntThreshold > 0 && isAllSmallFiles && splitFiles.size <= totalFilesThreshold) {
var fileCnt = math.round((splitFiles.size * 1.0) / totalCores).toInt
if (fileCnt < 1) fileCnt = 1
getFilePartitionsByFileCnt(splitFiles, fileCnt)
} else {
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.utils

import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.backendsapi.clickhouse.{CHBackendSettings, CHConf}
import org.apache.gluten.execution.{GlutenMergeTreePartition, MergeTreePartRange, MergeTreePartSplit}
import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
import org.apache.gluten.softaffinity.SoftAffinityManager
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuil
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.util.SparkResourceUtil
import org.apache.spark.util.collection.BitSet

import com.fasterxml.jackson.core.`type`.TypeReference
Expand Down Expand Up @@ -114,6 +115,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
)
} else {
genInputPartitionSeq(
relation,
engine,
database,
tableName,
Expand All @@ -135,6 +137,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
}

def genInputPartitionSeq(
relation: HadoopFsRelation,
engine: String,
database: String,
tableName: String,
Expand Down Expand Up @@ -213,53 +216,142 @@ object MergeTreePartsPartitionsUtil extends Logging {
}

val maxSplitBytes = getMaxSplitBytes(sparkSession, selectRanges)
val total_marks = selectRanges.map(p => p.marks).sum
val total_Bytes = selectRanges.map(p => p.size).sum
// maxSplitBytes / (total_Bytes / total_marks) + 1
val markCntPerPartition = maxSplitBytes * total_marks / total_Bytes + 1

logInfo(s"Planning scan with bin packing, max mark: $markCntPerPartition")
val splitFiles = selectRanges
.flatMap {
part =>
val end = part.marks + part.start
(part.start until end by markCntPerPartition).map {
offset =>
val remaining = end - offset
val size = if (remaining > markCntPerPartition) markCntPerPartition else remaining
MergeTreePartSplit(
part.name,
part.dirName,
part.targetNode,
offset,
size,
size * part.size / part.marks)
}
}
val totalCores = SparkResourceUtil.getTotalCores(relation.sparkSession.sessionState.conf)
val isAllSmallFiles = selectRanges.forall(_.size < maxSplitBytes)
val fileCntThreshold = relation.sparkSession.sessionState.conf
.getConfString(
CHBackendSettings.GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD,
CHBackendSettings.GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD_DEFAULT
)
.toInt
val totalMarksThreshold = totalCores * fileCntThreshold
if (fileCntThreshold > 0 && isAllSmallFiles && selectRanges.size <= totalMarksThreshold) {
var fileCnt = math.round((selectRanges.size * 1.0) / totalCores).toInt
if (fileCnt < 1) fileCnt = 1
val splitFiles = selectRanges
.map {
part =>
MergeTreePartSplit(part.name, part.dirName, part.targetNode, 0, part.marks, part.size)
}
genInputPartitionSeqByFileCnt(
engine,
database,
tableName,
snapshotId,
relativeTablePath,
absoluteTablePath,
tableSchemaJson,
partitions,
table,
clickhouseTableConfigs,
splitFiles,
fileCnt
)
} else {
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val totalMarks = selectRanges.map(p => p.marks).sum
val totalBytes = selectRanges.map(p => p.size).sum
// maxSplitBytes / (total_Bytes / total_marks) + 1
val markCntPerPartition = maxSplitBytes * totalMarks / totalBytes + 1

logInfo(s"Planning scan with bin packing, max mark: $markCntPerPartition")
val splitFiles = selectRanges
.flatMap {
part =>
val end = part.marks + part.start
(part.start until end by markCntPerPartition).map {
offset =>
val remaining = end - offset
val size = if (remaining > markCntPerPartition) markCntPerPartition else remaining
MergeTreePartSplit(
part.name,
part.dirName,
part.targetNode,
offset,
size,
size * part.size / part.marks)
}
}

val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val (partNameWithLocation, locationDistinct) =
calculatedLocationForSoftAffinity(splitFiles, relativeTablePath)

genInputPartitionSeqBySplitFiles(
engine,
database,
tableName,
snapshotId,
relativeTablePath,
absoluteTablePath,
tableSchemaJson,
partitions,
table,
clickhouseTableConfigs,
splitFiles,
openCostInBytes,
maxSplitBytes,
partNameWithLocation,
locationDistinct
)
val (partNameWithLocation, locationDistinct) =
calculatedLocationForSoftAffinity(splitFiles, relativeTablePath)

genInputPartitionSeqBySplitFiles(
engine,
database,
tableName,
snapshotId,
relativeTablePath,
absoluteTablePath,
tableSchemaJson,
partitions,
table,
clickhouseTableConfigs,
splitFiles,
openCostInBytes,
maxSplitBytes,
partNameWithLocation,
locationDistinct
)
}
}

def genInputPartitionSeqByFileCnt(
engine: String,
database: String,
tableName: String,
snapshotId: String,
relativeTablePath: String,
absoluteTablePath: String,
tableSchemaJson: String,
partitions: ArrayBuffer[InputPartition],
table: ClickHouseTableV2,
clickhouseTableConfigs: Map[String, String],
splitFiles: Seq[MergeTreePartSplit],
fileCnt: Int): Unit = {
val currentFiles = new ArrayBuffer[MergeTreePartSplit]
var currentFileCnt = 0L

/** Close the current partition and move to the next. */
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
// Copy to a new Array.
val newPartition = GlutenMergeTreePartition(
partitions.size,
engine,
database,
tableName,
snapshotId,
relativeTablePath,
absoluteTablePath,
table.orderByKey(),
table.lowCardKey(),
table.minmaxIndexKey(),
table.bfIndexKey(),
table.setIndexKey(),
table.primaryKey(),
currentFiles.toArray,
tableSchemaJson,
clickhouseTableConfigs
)
partitions += newPartition
}
currentFiles.clear()
currentFileCnt = 0L
}

splitFiles.foreach {
file =>
if (currentFileCnt >= fileCnt) {
closePartition()
}
// Add the given file to the current partition.
currentFileCnt += 1L
currentFiles += file
}
closePartition()
partitions.toSeq
}
def genInputPartitionSeqBySplitFiles(
engine: String,
database: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2051,4 +2051,57 @@ class GlutenClickHouseMergeTreeWriteSuite
|""".stripMargin
runSql(sqlStr) { _ => }
}

test("GLUTEN-7358: Optimize the strategy of the partition split according to the files count") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_split;
|""".stripMargin)
spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_split
|(
| l_orderkey bigint,
| l_partkey bigint,
| l_suppkey bigint,
| l_linenumber bigint,
| l_quantity double,
| l_extendedprice double,
| l_discount double,
| l_tax double,
| l_returnflag string,
| l_linestatus string,
| l_shipdate date,
| l_commitdate date,
| l_receiptdate date,
| l_shipinstruct string,
| l_shipmode string,
| l_comment string
|)
|USING clickhouse
|LOCATION '$basePath/lineitem_split'
|""".stripMargin)
spark.sql(s"""
| insert into table lineitem_split
| select * from lineitem
|""".stripMargin)
Seq(("-1", 3), ("3", 3), ("6", 1)).foreach(
conf => {
withSQLConf(
("spark.gluten.sql.columnar.backend.ch.files.per.partition.threshold" -> conf._1)) {
val sql =
s"""
|select count(1), min(l_returnflag) from lineitem_split
|""".stripMargin
runSql(sql) {
df =>
val result = df.collect()
assertResult(1)(result.length)
assertResult("600572")(result(0).getLong(0).toString)
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec(0).getPartitions.size == conf._2)
}
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,28 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
// There are some BroadcastHashJoin with NOT condition
compareResultsAgainstVanillaSpark(sql, true, { df => })
}

test("GLUTEN-7358: Optimize the strategy of the partition split according to the files count") {
Seq(("-1", 8), ("100", 8), ("2000", 1)).foreach(
conf => {
withSQLConf(
("spark.gluten.sql.columnar.backend.ch.files.per.partition.threshold" -> conf._1)) {
val sql =
s"""
|select count(1) from store_sales
|""".stripMargin
compareResultsAgainstVanillaSpark(
sql,
true,
{
df =>
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec(0).getPartitions.size == conf._2)
}
)
}
})
}
}
1 change: 1 addition & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
dwio::common::FileFormat::PARQUET, // Currently only support parquet format.
compressionCodec)),
(!partitionedKey.empty()),
false, /*hasBucketProperty_*/
exec::TableWriteTraits::outputType(nullptr),
connector::CommitStrategy::kNoCommit,
childNode);
Expand Down

0 comments on commit 8daed74

Please sign in to comment.