From f682d5b09c27999fbfbbd2c02168951f3f7ee3db Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 24 Jul 2024 21:08:49 -0700 Subject: [PATCH] fix: Fallback to Spark for window expression with range frame (#719) * fix: Disable unsupported window frame type * Fix test --- .../apache/comet/serde/QueryPlanSerde.scala | 4 ++- .../apache/comet/exec/CometExecSuite.scala | 28 ++++++++++++++++--- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index d91ae5e4d..e06405a1a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -236,7 +236,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim case SpecifiedWindowFrame(frameType, lBound, uBound) => val frameProto = frameType match { case RowFrame => OperatorOuterClass.WindowFrameType.Rows - case RangeFrame => OperatorOuterClass.WindowFrameType.Range + case RangeFrame => + withInfo(windowExpr, "Range frame is not supported") + return None } val lBoundProto = lBound match { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index b8c0d5668..0e1204df8 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, Cartes import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.expressions.Window -import org.apache.spark.sql.functions.{col, date_add, expr, lead, sum} +import org.apache.spark.sql.functions.{col, count, date_add, expr, lead, sum} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.unsafe.types.UTF8String @@ -63,6 +63,25 @@ class CometExecSuite extends CometTestBase { } } + test("Window range frame should fall back to Spark") { + val df = + Seq((1L, "1"), (1L, "1"), (2147483650L, "1"), (3L, "2"), (2L, "1"), (2147483650L, "2")) + .toDF("key", "value") + + checkAnswer( + df.select( + $"key", + count("key").over( + Window.partitionBy($"value").orderBy($"key").rangeBetween(0, 2147483648L))), + Seq(Row(1, 3), Row(1, 3), Row(2, 2), Row(3, 2), Row(2147483650L, 1), Row(2147483650L, 1))) + checkAnswer( + df.select( + $"key", + count("key").over( + Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0))), + Seq(Row(1, 2), Row(1, 2), Row(2, 3), Row(2147483650L, 2), Row(2147483650L, 4), Row(3, 1))) + } + test("Unsupported window expression should fall back to Spark") { checkAnswer( spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"), @@ -1452,9 +1471,10 @@ class CometExecSuite extends CometTestBase { aggregateFunctions.foreach { function => val queries = Seq( s"SELECT $function OVER() FROM t1", - s"SELECT $function OVER(order by _2) FROM t1", - s"SELECT $function OVER(order by _2 desc) FROM t1", - s"SELECT $function OVER(partition by _2 order by _2) FROM t1", + // TODO: Range frame is not supported yet. + // s"SELECT $function OVER(order by _2) FROM t1", + // s"SELECT $function OVER(order by _2 desc) FROM t1", + // s"SELECT $function OVER(partition by _2 order by _2) FROM t1", s"SELECT $function OVER(rows between 1 preceding and 1 following) FROM t1", s"SELECT $function OVER(order by _2 rows between 1 preceding and current row) FROM t1", s"SELECT $function OVER(order by _2 rows between current row and 1 following) FROM t1")