Skip to content

Commit

Permalink
[SPARK-44340][SQL] Define the computing logic through PartitionEvalua…
Browse files Browse the repository at this point in the history
…tor API and use it in WindowGroupLimitExec

### What changes were proposed in this pull request?
`WindowGroupLimitExec` is updated to use the PartitionEvaluator API to do execution.

### Why are the changes needed?
To define the computing logic and requires the caller side to explicitly list what needs to be serialized and sent to executors

### Does this PR introduce _any_ user-facing change?
'No'.
Just update the inner implementation.

### How was this patch tested?
Test cases updated & running benchmark manually.

```
[info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_311-b11 on Mac OS X 10.16
[info] Intel(R) Core(TM) i7-9750H CPU  2.60GHz
[info] Benchmark Top-K:                                                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] -----------------------------------------------------------------------------------------------------------------------------------------------
[info] ROW_NUMBER (PARTITION: , WindowGroupLimit: false)                        10622          11266         617          2.0         506.5       1.0X
[info] ROW_NUMBER (PARTITION: , WindowGroupLimit: true)                          1712           1744          19         12.2          81.6       6.2X
[info] ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: false)          23679          25107         NaN          0.9        1129.1       0.4X
[info] ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: true)            6381           6527          95          3.3         304.3       1.7X
[info] RANK (PARTITION: , WindowGroupLimit: false)                              11492          11631         106          1.8         548.0       0.9X
[info] RANK (PARTITION: , WindowGroupLimit: true)                                2675           2920         118          7.8         127.5       4.0X
[info] RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false)                24208          24299          95          0.9        1154.3       0.4X
[info] RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true)                  6347           6478          85          3.3         302.6       1.7X
[info] DENSE_RANK (PARTITION: , WindowGroupLimit: false)                        11288          11959         458          1.9         538.2       0.9X
[info] DENSE_RANK (PARTITION: , WindowGroupLimit: true)                          2684           2945         144          7.8         128.0       4.0X
[info] DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false)          24316          25130         711          0.9        1159.5       0.4X
[info] DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true)            6589           6925         383          3.2         314.2       1.6X
```

Closes #41899 from beliefer/SPARK-44340.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
beliefer authored and cloud-fan committed Jul 12, 2023
1 parent daa9844 commit 6c02bd0
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 189 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.window

import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, DenseRank, Expression, Rank, RowNumber, SortOrder}
import org.apache.spark.sql.execution.metric.SQLMetric

class WindowGroupLimitEvaluatorFactory(
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
rankLikeFunction: Expression,
limit: Int,
childOutput: Seq[Attribute],
numOutputRows: SQLMetric)
extends PartitionEvaluatorFactory[InternalRow, InternalRow] {

override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] = {
val limitFunc = rankLikeFunction match {
case _: RowNumber =>
(iter: Iterator[InternalRow]) => SimpleLimitIterator(iter, limit, numOutputRows)
case _: Rank =>
(iter: Iterator[InternalRow]) =>
RankLimitIterator(childOutput, iter, orderSpec, limit, numOutputRows)
case _: DenseRank =>
(iter: Iterator[InternalRow]) =>
DenseRankLimitIterator(childOutput, iter, orderSpec, limit, numOutputRows)
}

if (partitionSpec.isEmpty) {
new WindowGroupLimitPartitionEvaluator(limitFunc)
} else {
new WindowGroupLimitPartitionEvaluator(
input => new GroupedLimitIterator(input, childOutput, partitionSpec, limitFunc))
}
}

class WindowGroupLimitPartitionEvaluator(f: Iterator[InternalRow] => Iterator[InternalRow])
extends PartitionEvaluator[InternalRow, InternalRow] {

override def eval(
partitionIndex: Int,
inputs: Iterator[InternalRow]*): Iterator[InternalRow] = {
f(inputs.head)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.window

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, DenseRank, Expression, Rank, RowNumber, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
Expand Down Expand Up @@ -73,26 +73,23 @@ case class WindowGroupLimitExec(

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
rankLikeFunction match {
case _: RowNumber if partitionSpec.isEmpty =>
child.execute().mapPartitionsInternal(SimpleLimitIterator(_, limit, numOutputRows))
case _: RowNumber =>
child.execute().mapPartitionsInternal(new GroupedLimitIterator(_, output, partitionSpec,
(input: Iterator[InternalRow]) => SimpleLimitIterator(input, limit, numOutputRows)))
case _: Rank if partitionSpec.isEmpty =>
child.execute().mapPartitionsInternal(
RankLimitIterator(output, _, orderSpec, limit, numOutputRows))
case _: Rank =>
child.execute().mapPartitionsInternal(new GroupedLimitIterator(_, output, partitionSpec,
(input: Iterator[InternalRow]) =>
RankLimitIterator(output, input, orderSpec, limit, numOutputRows)))
case _: DenseRank if partitionSpec.isEmpty =>
child.execute().mapPartitionsInternal(
DenseRankLimitIterator(output, _, orderSpec, limit, numOutputRows))
case _: DenseRank =>
child.execute().mapPartitionsInternal(new GroupedLimitIterator(_, output, partitionSpec,
(input: Iterator[InternalRow]) =>
DenseRankLimitIterator(output, input, orderSpec, limit, numOutputRows)))

val evaluatorFactory =
new WindowGroupLimitEvaluatorFactory(
partitionSpec,
orderSpec,
rankLikeFunction,
limit,
child.output,
numOutputRows)

if (conf.usePartitionEvaluator) {
child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
} else {
child.execute().mapPartitionsInternal { iter =>
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(0, iter)
}
}
}

Expand Down
Loading

0 comments on commit 6c02bd0

Please sign in to comment.