Skip to content

Commit

Permalink
Fixed issue apache#314: This patch using division to check the condition
Browse files Browse the repository at this point in the history
of computing dimension boundary on ZORDER/HIBERT partitioning.
  • Loading branch information
fengguangyuan committed Nov 8, 2021
1 parent 714a997 commit d095b7d
Showing 1 changed file with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.reflect.ClassTag
import scala.util.hashing.byteswap32

import org.apache.spark.Partitioner
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.CurveIndex
Expand All @@ -43,7 +44,7 @@ class CurveRangePartitioner(
curveIndex: CurveIndex,
private var ascending: Boolean = true,
val samplePointsPerPartitionHint: Int = 20)
extends Partitioner {
extends Partitioner with Logging {

// A constructor declared in order to maintain backward compatibility for Java, when we add the
// 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160.
Expand Down Expand Up @@ -105,14 +106,19 @@ class CurveRangePartitioner(
candidates ++= reSampled.map(x => (x, weight))
}

logDebug(s"Total sampled ${candidates.size} rows to do partitioning.")

val candidatesAndWeightsWithIndex = orders.map(orderKey => {
candidates.map {
val cardinalityWithWeights = candidates.map {
case (row, weight) =>
(orderKey.child.eval(row), weight)
}.filter(_._1 != null)
.groupBy(_._1)
.mapValues(_.map(_._2).sum)
.toSeq
logDebug(s"Sampled cardinality on order key ${orderKey.child.toString} =" +
s" ${cardinalityWithWeights.size}")
cardinalityWithWeights
}).zipWithIndex

def getOrdering(idx: Int): Ordering[Any] = {
Expand All @@ -130,15 +136,18 @@ class CurveRangePartitioner(
}

val sortedKeyAndWeightsWithIndex = candidatesAndWeightsWithIndex.sortBy(_._1.size)
val totalSamplePartitions = candidatesAndWeightsWithIndex.map(_._1.size).product
var totalOverheadPartitions = partitions * SQLConf.get.zorderEstimatedPartitionsFactor
val shouldNotCompute = candidatesAndWeightsWithIndex
.map(_._1.size)
.filter(_ > 0)
.foldLeft(totalOverheadPartitions)((rest, card) => rest / card) >= 1

val computedDimensionBounds = for (i <- sortedKeyAndWeightsWithIndex.indices) yield {
val aggCandidates = sortedKeyAndWeightsWithIndex(i)._1
val orderIdx = sortedKeyAndWeightsWithIndex(i)._2
implicit val ordering = getOrdering(orderIdx)

val estimatedPartitions = if (totalSamplePartitions > totalOverheadPartitions) {
val estimatedPartitions = if (!shouldNotCompute) {
val avgDimensionSize = math.ceil(
math.pow(totalOverheadPartitions, 1d / (orders.size - i))).toInt
val estimatedPartitions = math.min(avgDimensionSize, aggCandidates.size)
Expand All @@ -149,8 +158,12 @@ class CurveRangePartitioner(
aggCandidates.size
}

logDebug(s"Estimated partitions on order key ${orders(orderIdx).child.toString} =" +
s" $estimatedPartitions")

val dimensionBoundary = CurveRangePartitioner.determineBounds(
aggCandidates, estimatedPartitions.toInt)
aggCandidates, estimatedPartitions)

(orders(orderIdx), ordering, dimensionBoundary)
}

Expand Down

0 comments on commit d095b7d

Please sign in to comment.