From d095b7da63b77f3d9dbad8439cc00cccb595022f Mon Sep 17 00:00:00 2001 From: fengguangyuan Date: Fri, 5 Nov 2021 21:16:53 +0800 Subject: [PATCH] Fixed issue #314: This patch using division to check the condition of computing dimension boundary on ZORDER/HIBERT partitioning. --- .../sql/execution/CurveRangePartitioner.scala | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/CurveRangePartitioner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/CurveRangePartitioner.scala index 5651aa130d003..4e33816be7b50 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/CurveRangePartitioner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/CurveRangePartitioner.scala @@ -23,6 +23,7 @@ import scala.reflect.ClassTag import scala.util.hashing.byteswap32 import org.apache.spark.Partitioner +import org.apache.spark.internal.Logging import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.CurveIndex @@ -43,7 +44,7 @@ class CurveRangePartitioner( curveIndex: CurveIndex, private var ascending: Boolean = true, val samplePointsPerPartitionHint: Int = 20) - extends Partitioner { + extends Partitioner with Logging { // A constructor declared in order to maintain backward compatibility for Java, when we add the // 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160. @@ -105,14 +106,19 @@ class CurveRangePartitioner( candidates ++= reSampled.map(x => (x, weight)) } + logDebug(s"Total sampled ${candidates.size} rows to do partitioning.") + val candidatesAndWeightsWithIndex = orders.map(orderKey => { - candidates.map { + val cardinalityWithWeights = candidates.map { case (row, weight) => (orderKey.child.eval(row), weight) }.filter(_._1 != null) .groupBy(_._1) .mapValues(_.map(_._2).sum) .toSeq + logDebug(s"Sampled cardinality on order key ${orderKey.child.toString} =" + + s" ${cardinalityWithWeights.size}") + cardinalityWithWeights }).zipWithIndex def getOrdering(idx: Int): Ordering[Any] = { @@ -130,15 +136,18 @@ class CurveRangePartitioner( } val sortedKeyAndWeightsWithIndex = candidatesAndWeightsWithIndex.sortBy(_._1.size) - val totalSamplePartitions = candidatesAndWeightsWithIndex.map(_._1.size).product var totalOverheadPartitions = partitions * SQLConf.get.zorderEstimatedPartitionsFactor + val shouldNotCompute = candidatesAndWeightsWithIndex + .map(_._1.size) + .filter(_ > 0) + .foldLeft(totalOverheadPartitions)((rest, card) => rest / card) >= 1 val computedDimensionBounds = for (i <- sortedKeyAndWeightsWithIndex.indices) yield { val aggCandidates = sortedKeyAndWeightsWithIndex(i)._1 val orderIdx = sortedKeyAndWeightsWithIndex(i)._2 implicit val ordering = getOrdering(orderIdx) - val estimatedPartitions = if (totalSamplePartitions > totalOverheadPartitions) { + val estimatedPartitions = if (!shouldNotCompute) { val avgDimensionSize = math.ceil( math.pow(totalOverheadPartitions, 1d / (orders.size - i))).toInt val estimatedPartitions = math.min(avgDimensionSize, aggCandidates.size) @@ -149,8 +158,12 @@ class CurveRangePartitioner( aggCandidates.size } + logDebug(s"Estimated partitions on order key ${orders(orderIdx).child.toString} =" + + s" $estimatedPartitions") + val dimensionBoundary = CurveRangePartitioner.determineBounds( - aggCandidates, estimatedPartitions.toInt) + aggCandidates, estimatedPartitions) + (orders(orderIdx), ordering, dimensionBoundary) }