From f19eb7ef3043783bbe07f9e7fc16c18bd986e1b1 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 12 Jul 2023 11:29:43 +0800 Subject: [PATCH] [SPARK-44340][SQL] Define the computing logic through PartitionEvaluator API and use it in WindowGroupLimitExec ### What changes were proposed in this pull request? `WindowGroupLimitExec` is updated to use the PartitionEvaluator API to do execution. ### Why are the changes needed? To define the computing logic and requires the caller side to explicitly list what needs to be serialized and sent to executors ### Does this PR introduce _any_ user-facing change? 'No'. Just update the inner implementation. ### How was this patch tested? Test cases updated & running benchmark manually. ``` [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_311-b11 on Mac OS X 10.16 [info] Intel(R) Core(TM) i7-9750H CPU 2.60GHz [info] Benchmark Top-K: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ----------------------------------------------------------------------------------------------------------------------------------------------- [info] ROW_NUMBER (PARTITION: , WindowGroupLimit: false) 10622 11266 617 2.0 506.5 1.0X [info] ROW_NUMBER (PARTITION: , WindowGroupLimit: true) 1712 1744 19 12.2 81.6 6.2X [info] ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: false) 23679 25107 NaN 0.9 1129.1 0.4X [info] ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: true) 6381 6527 95 3.3 304.3 1.7X [info] RANK (PARTITION: , WindowGroupLimit: false) 11492 11631 106 1.8 548.0 0.9X [info] RANK (PARTITION: , WindowGroupLimit: true) 2675 2920 118 7.8 127.5 4.0X [info] RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false) 24208 24299 95 0.9 1154.3 0.4X [info] RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true) 6347 6478 85 3.3 302.6 1.7X [info] DENSE_RANK (PARTITION: , WindowGroupLimit: false) 11288 11959 458 1.9 538.2 0.9X [info] DENSE_RANK (PARTITION: , WindowGroupLimit: true) 2684 2945 144 7.8 128.0 4.0X [info] DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false) 24316 25130 711 0.9 1159.5 0.4X [info] DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true) 6589 6925 383 3.2 314.2 1.6X ``` Closes #41899 from beliefer/SPARK-44340. Authored-by: Jiaan Geng Signed-off-by: Wenchen Fan --- .../WindowGroupLimitEvaluatorFactory.scala | 63 ++++ .../window/WindowGroupLimitExec.scala | 39 +- .../sql/DataFrameWindowFunctionsSuite.scala | 340 +++++++++--------- 3 files changed, 253 insertions(+), 189 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitEvaluatorFactory.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitEvaluatorFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitEvaluatorFactory.scala new file mode 100644 index 0000000000000..6777f6ae7ac66 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitEvaluatorFactory.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.window + +import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, DenseRank, Expression, Rank, RowNumber, SortOrder} +import org.apache.spark.sql.execution.metric.SQLMetric + +class WindowGroupLimitEvaluatorFactory( + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + rankLikeFunction: Expression, + limit: Int, + childOutput: Seq[Attribute], + numOutputRows: SQLMetric) + extends PartitionEvaluatorFactory[InternalRow, InternalRow] { + + override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] = { + val limitFunc = rankLikeFunction match { + case _: RowNumber => + (iter: Iterator[InternalRow]) => SimpleLimitIterator(iter, limit, numOutputRows) + case _: Rank => + (iter: Iterator[InternalRow]) => + RankLimitIterator(childOutput, iter, orderSpec, limit, numOutputRows) + case _: DenseRank => + (iter: Iterator[InternalRow]) => + DenseRankLimitIterator(childOutput, iter, orderSpec, limit, numOutputRows) + } + + if (partitionSpec.isEmpty) { + new WindowGroupLimitPartitionEvaluator(limitFunc) + } else { + new WindowGroupLimitPartitionEvaluator( + input => new GroupedLimitIterator(input, childOutput, partitionSpec, limitFunc)) + } + } + + class WindowGroupLimitPartitionEvaluator(f: Iterator[InternalRow] => Iterator[InternalRow]) + extends PartitionEvaluator[InternalRow, InternalRow] { + + override def eval( + partitionIndex: Int, + inputs: Iterator[InternalRow]*): Iterator[InternalRow] = { + f(inputs.head) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala index a35c33577d0fb..b1f375f415102 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.window import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, DenseRank, Expression, Rank, RowNumber, SortOrder, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, SortOrder, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} @@ -73,26 +73,23 @@ case class WindowGroupLimitExec( protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") - rankLikeFunction match { - case _: RowNumber if partitionSpec.isEmpty => - child.execute().mapPartitionsInternal(SimpleLimitIterator(_, limit, numOutputRows)) - case _: RowNumber => - child.execute().mapPartitionsInternal(new GroupedLimitIterator(_, output, partitionSpec, - (input: Iterator[InternalRow]) => SimpleLimitIterator(input, limit, numOutputRows))) - case _: Rank if partitionSpec.isEmpty => - child.execute().mapPartitionsInternal( - RankLimitIterator(output, _, orderSpec, limit, numOutputRows)) - case _: Rank => - child.execute().mapPartitionsInternal(new GroupedLimitIterator(_, output, partitionSpec, - (input: Iterator[InternalRow]) => - RankLimitIterator(output, input, orderSpec, limit, numOutputRows))) - case _: DenseRank if partitionSpec.isEmpty => - child.execute().mapPartitionsInternal( - DenseRankLimitIterator(output, _, orderSpec, limit, numOutputRows)) - case _: DenseRank => - child.execute().mapPartitionsInternal(new GroupedLimitIterator(_, output, partitionSpec, - (input: Iterator[InternalRow]) => - DenseRankLimitIterator(output, input, orderSpec, limit, numOutputRows))) + + val evaluatorFactory = + new WindowGroupLimitEvaluatorFactory( + partitionSpec, + orderSpec, + rankLikeFunction, + limit, + child.output, + numOutputRows) + + if (conf.usePartitionEvaluator) { + child.execute().mapPartitionsWithEvaluator(evaluatorFactory) + } else { + child.execute().mapPartitionsInternal { iter => + val evaluator = evaluatorFactory.createEvaluator() + evaluator.eval(0, iter) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index fe4a5cebc5d69..f2f645b126cbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -1289,188 +1289,192 @@ class DataFrameWindowFunctionsSuite extends QueryTest val window2 = Window.partitionBy($"key").orderBy($"order".desc_nulls_first) val window3 = Window.orderBy($"order".asc_nulls_first) - Seq(-1, 100).foreach { threshold => - withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) { - Seq($"rn" === 0, $"rn" < 1, $"rn" <= 0).foreach { condition => - checkAnswer(df.withColumn("rn", row_number().over(window)).where(condition), - Seq.empty[Row] - ) - } - - Seq($"rn" === 1, $"rn" < 2, $"rn" <= 1).foreach { condition => - checkAnswer(df.withColumn("rn", row_number().over(window)).where(condition), - Seq( - Row("a", 4, "", 2.0, 1), - Row("b", 1, "h", Double.NaN, 1), - Row("c", 2, null, 5.0, 1) - ) - ) - - checkAnswer(df.withColumn("rn", rank().over(window)).where(condition), - Seq( - Row("a", 4, "", 2.0, 1), - Row("a", 4, "", 2.0, 1), - Row("b", 1, "h", Double.NaN, 1), - Row("c", 2, null, 5.0, 1) - ) - ) - - checkAnswer(df.withColumn("rn", dense_rank().over(window)).where(condition), - Seq( - Row("a", 4, "", 2.0, 1), - Row("a", 4, "", 2.0, 1), - Row("b", 1, "h", Double.NaN, 1), - Row("c", 2, null, 5.0, 1) - ) - ) - - checkAnswer(df.withColumn("rn", row_number().over(window3)).where(condition), - Seq( - Row("c", 2, null, 5.0, 1) + Seq(true, false).foreach { enableEvaluator => + withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) { + Seq(-1, 100).foreach { threshold => + withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) { + Seq($"rn" === 0, $"rn" < 1, $"rn" <= 0).foreach { condition => + checkAnswer(df.withColumn("rn", row_number().over(window)).where(condition), + Seq.empty[Row] + ) + } + + Seq($"rn" === 1, $"rn" < 2, $"rn" <= 1).foreach { condition => + checkAnswer(df.withColumn("rn", row_number().over(window)).where(condition), + Seq( + Row("a", 4, "", 2.0, 1), + Row("b", 1, "h", Double.NaN, 1), + Row("c", 2, null, 5.0, 1) + ) + ) + + checkAnswer(df.withColumn("rn", rank().over(window)).where(condition), + Seq( + Row("a", 4, "", 2.0, 1), + Row("a", 4, "", 2.0, 1), + Row("b", 1, "h", Double.NaN, 1), + Row("c", 2, null, 5.0, 1) + ) + ) + + checkAnswer(df.withColumn("rn", dense_rank().over(window)).where(condition), + Seq( + Row("a", 4, "", 2.0, 1), + Row("a", 4, "", 2.0, 1), + Row("b", 1, "h", Double.NaN, 1), + Row("c", 2, null, 5.0, 1) + ) + ) + + checkAnswer(df.withColumn("rn", row_number().over(window3)).where(condition), + Seq( + Row("c", 2, null, 5.0, 1) + ) + ) + + checkAnswer(df.withColumn("rn", rank().over(window3)).where(condition), + Seq( + Row("c", 2, null, 5.0, 1) + ) + ) + + checkAnswer(df.withColumn("rn", dense_rank().over(window3)).where(condition), + Seq( + Row("c", 2, null, 5.0, 1) + ) + ) + } + + Seq($"rn" < 3, $"rn" <= 2).foreach { condition => + checkAnswer(df.withColumn("rn", row_number().over(window)).where(condition), + Seq( + Row("a", 4, "", 2.0, 1), + Row("a", 4, "", 2.0, 2), + Row("b", 1, "h", Double.NaN, 1), + Row("b", 1, "n", Double.PositiveInfinity, 2), + Row("c", 1, "a", -4.0, 2), + Row("c", 2, null, 5.0, 1) + ) + ) + + checkAnswer(df.withColumn("rn", rank().over(window)).where(condition), + Seq( + Row("a", 4, "", 2.0, 1), + Row("a", 4, "", 2.0, 1), + Row("b", 1, "h", Double.NaN, 1), + Row("b", 1, "n", Double.PositiveInfinity, 2), + Row("c", 1, "a", -4.0, 2), + Row("c", 2, null, 5.0, 1) + ) + ) + + checkAnswer(df.withColumn("rn", dense_rank().over(window)).where(condition), + Seq( + Row("a", 0, "c", 1.0, 2), + Row("a", 4, "", 2.0, 1), + Row("a", 4, "", 2.0, 1), + Row("b", 1, "h", Double.NaN, 1), + Row("b", 1, "n", Double.PositiveInfinity, 2), + Row("c", 1, "a", -4.0, 2), + Row("c", 2, null, 5.0, 1) + ) + ) + + checkAnswer(df.withColumn("rn", row_number().over(window3)).where(condition), + Seq( + Row("a", 4, "", 2.0, 2), + Row("c", 2, null, 5.0, 1) + ) + ) + + checkAnswer(df.withColumn("rn", rank().over(window3)).where(condition), + Seq( + Row("a", 4, "", 2.0, 2), + Row("a", 4, "", 2.0, 2), + Row("c", 2, null, 5.0, 1) + ) + ) + + checkAnswer(df.withColumn("rn", dense_rank().over(window3)).where(condition), + Seq( + Row("a", 4, "", 2.0, 2), + Row("a", 4, "", 2.0, 2), + Row("c", 2, null, 5.0, 1) + ) + ) + } + + val condition = $"rn" === 2 && $"value2" > 0.5 + checkAnswer(df.withColumn("rn", row_number().over(window)).where(condition), + Seq( + Row("a", 4, "", 2.0, 2), + Row("b", 1, "n", Double.PositiveInfinity, 2) + ) ) - ) - checkAnswer(df.withColumn("rn", rank().over(window3)).where(condition), - Seq( - Row("c", 2, null, 5.0, 1) + checkAnswer(df.withColumn("rn", rank().over(window)).where(condition), + Seq( + Row("b", 1, "n", Double.PositiveInfinity, 2) + ) ) - ) - checkAnswer(df.withColumn("rn", dense_rank().over(window3)).where(condition), - Seq( - Row("c", 2, null, 5.0, 1) + checkAnswer(df.withColumn("rn", dense_rank().over(window)).where(condition), + Seq( + Row("a", 0, "c", 1.0, 2), + Row("b", 1, "n", Double.PositiveInfinity, 2) + ) ) - ) - } - Seq($"rn" < 3, $"rn" <= 2).foreach { condition => - checkAnswer(df.withColumn("rn", row_number().over(window)).where(condition), - Seq( - Row("a", 4, "", 2.0, 1), - Row("a", 4, "", 2.0, 2), - Row("b", 1, "h", Double.NaN, 1), - Row("b", 1, "n", Double.PositiveInfinity, 2), - Row("c", 1, "a", -4.0, 2), - Row("c", 2, null, 5.0, 1) - ) - ) - - checkAnswer(df.withColumn("rn", rank().over(window)).where(condition), - Seq( - Row("a", 4, "", 2.0, 1), - Row("a", 4, "", 2.0, 1), - Row("b", 1, "h", Double.NaN, 1), - Row("b", 1, "n", Double.PositiveInfinity, 2), - Row("c", 1, "a", -4.0, 2), - Row("c", 2, null, 5.0, 1) + val multipleRowNumbers = df + .withColumn("rn", row_number().over(window)) + .withColumn("rn2", row_number().over(window)) + .where('rn < 2 && 'rn2 < 3) + checkAnswer(multipleRowNumbers, + Seq( + Row("a", 4, "", 2.0, 1, 1), + Row("b", 1, "h", Double.NaN, 1, 1), + Row("c", 2, null, 5.0, 1, 1) + ) ) - ) - - checkAnswer(df.withColumn("rn", dense_rank().over(window)).where(condition), - Seq( - Row("a", 0, "c", 1.0, 2), - Row("a", 4, "", 2.0, 1), - Row("a", 4, "", 2.0, 1), - Row("b", 1, "h", Double.NaN, 1), - Row("b", 1, "n", Double.PositiveInfinity, 2), - Row("c", 1, "a", -4.0, 2), - Row("c", 2, null, 5.0, 1) - ) - ) - checkAnswer(df.withColumn("rn", row_number().over(window3)).where(condition), - Seq( - Row("a", 4, "", 2.0, 2), - Row("c", 2, null, 5.0, 1) + val multipleRanks = df + .withColumn("rn", rank().over(window)) + .withColumn("rn2", rank().over(window)) + .where('rn < 2 && 'rn2 < 3) + checkAnswer(multipleRanks, + Seq( + Row("a", 4, "", 2.0, 1, 1), + Row("a", 4, "", 2.0, 1, 1), + Row("b", 1, "h", Double.NaN, 1, 1), + Row("c", 2, null, 5.0, 1, 1) + ) ) - ) - checkAnswer(df.withColumn("rn", rank().over(window3)).where(condition), - Seq( - Row("a", 4, "", 2.0, 2), - Row("a", 4, "", 2.0, 2), - Row("c", 2, null, 5.0, 1) + val multipleDenseRanks = df + .withColumn("rn", dense_rank().over(window)) + .withColumn("rn2", dense_rank().over(window)) + .where('rn < 2 && 'rn2 < 3) + checkAnswer(multipleDenseRanks, + Seq( + Row("a", 4, "", 2.0, 1, 1), + Row("a", 4, "", 2.0, 1, 1), + Row("b", 1, "h", Double.NaN, 1, 1), + Row("c", 2, null, 5.0, 1, 1) + ) ) - ) - checkAnswer(df.withColumn("rn", dense_rank().over(window3)).where(condition), - Seq( - Row("a", 4, "", 2.0, 2), - Row("a", 4, "", 2.0, 2), - Row("c", 2, null, 5.0, 1) + val multipleWindows = df + .withColumn("rn2", row_number().over(window2)) + .withColumn("rn", row_number().over(window)) + .where('rn < 2 && 'rn2 < 3) + checkAnswer(multipleWindows, + Seq( + Row("b", 1, "h", Double.NaN, 2, 1), + Row("c", 2, null, 5.0, 1, 1) + ) ) - ) + } } - - val condition = $"rn" === 2 && $"value2" > 0.5 - checkAnswer(df.withColumn("rn", row_number().over(window)).where(condition), - Seq( - Row("a", 4, "", 2.0, 2), - Row("b", 1, "n", Double.PositiveInfinity, 2) - ) - ) - - checkAnswer(df.withColumn("rn", rank().over(window)).where(condition), - Seq( - Row("b", 1, "n", Double.PositiveInfinity, 2) - ) - ) - - checkAnswer(df.withColumn("rn", dense_rank().over(window)).where(condition), - Seq( - Row("a", 0, "c", 1.0, 2), - Row("b", 1, "n", Double.PositiveInfinity, 2) - ) - ) - - val multipleRowNumbers = df - .withColumn("rn", row_number().over(window)) - .withColumn("rn2", row_number().over(window)) - .where('rn < 2 && 'rn2 < 3) - checkAnswer(multipleRowNumbers, - Seq( - Row("a", 4, "", 2.0, 1, 1), - Row("b", 1, "h", Double.NaN, 1, 1), - Row("c", 2, null, 5.0, 1, 1) - ) - ) - - val multipleRanks = df - .withColumn("rn", rank().over(window)) - .withColumn("rn2", rank().over(window)) - .where('rn < 2 && 'rn2 < 3) - checkAnswer(multipleRanks, - Seq( - Row("a", 4, "", 2.0, 1, 1), - Row("a", 4, "", 2.0, 1, 1), - Row("b", 1, "h", Double.NaN, 1, 1), - Row("c", 2, null, 5.0, 1, 1) - ) - ) - - val multipleDenseRanks = df - .withColumn("rn", dense_rank().over(window)) - .withColumn("rn2", dense_rank().over(window)) - .where('rn < 2 && 'rn2 < 3) - checkAnswer(multipleDenseRanks, - Seq( - Row("a", 4, "", 2.0, 1, 1), - Row("a", 4, "", 2.0, 1, 1), - Row("b", 1, "h", Double.NaN, 1, 1), - Row("c", 2, null, 5.0, 1, 1) - ) - ) - - val multipleWindows = df - .withColumn("rn2", row_number().over(window2)) - .withColumn("rn", row_number().over(window)) - .where('rn < 2 && 'rn2 < 3) - checkAnswer(multipleWindows, - Seq( - Row("b", 1, "h", Double.NaN, 2, 1), - Row("c", 2, null, 5.0, 1, 1) - ) - ) } } }