From cfce7365b134dc4f9dcd6cdfe9a350624f944ac8 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 20 Feb 2024 09:56:36 -0800 Subject: [PATCH] feat: Support InSet expression in Comet --- .../apache/comet/serde/QueryPlanSerde.scala | 9 +++++ .../apache/comet/CometExpressionSuite.scala | 34 +++++++++++-------- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 15a26a091..a497a448c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1377,6 +1377,15 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { case In(value, list) => in(value, list, inputs, false) + case InSet(value, hset) => + val valueDataType = value.dataType + val list = hset.map { setVal => + Literal(setVal, valueDataType) + }.toSeq + // Change `InSet` to `In` expression + // We do Spark `InSet` optimization in native (DataFusion) side. + in(value, list, inputs, false) + case Not(In(value, list)) => in(value, list, inputs, true) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 5ead490c2..df8bc7c7d 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -996,23 +996,27 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("test in/not in") { - Seq(false, true).foreach { dictionary => - withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { - val table = "names" - withTable(table) { - sql(s"create table $table(id int, name varchar(20)) using parquet") - sql( - s"insert into $table values(1, 'James'), (1, 'Jones'), (2, 'Smith'), (3, 'Smith')," + - "(NULL, 'Jones'), (4, NULL)") + test("test in(set)/not in(set)") { + Seq("100", "0").foreach { inSetThreshold => + Seq(false, true).foreach { dictionary => + withSQLConf( + SQLConf.OPTIMIZER_INSET_CONVERSION_THRESHOLD.key -> inSetThreshold, + "parquet.enable.dictionary" -> dictionary.toString) { + val table = "names" + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + sql( + s"insert into $table values(1, 'James'), (1, 'Jones'), (2, 'Smith'), (3, 'Smith')," + + "(NULL, 'Jones'), (4, NULL)") - checkSparkAnswerAndOperator(s"SELECT * FROM $table WHERE id in (1, 2, 4, NULL)") - checkSparkAnswerAndOperator( - s"SELECT * FROM $table WHERE name in ('Smith', 'Brown', NULL)") + checkSparkAnswerAndOperator(s"SELECT * FROM $table WHERE id in (1, 2, 4, NULL)") + checkSparkAnswerAndOperator( + s"SELECT * FROM $table WHERE name in ('Smith', 'Brown', NULL)") - // TODO: why with not in, the plan is only `LocalTableScan`? - checkSparkAnswer(s"SELECT * FROM $table WHERE id not in (1)") - checkSparkAnswer(s"SELECT * FROM $table WHERE name not in ('Smith', 'Brown', NULL)") + // TODO: why with not in, the plan is only `LocalTableScan`? + checkSparkAnswer(s"SELECT * FROM $table WHERE id not in (1)") + checkSparkAnswer(s"SELECT * FROM $table WHERE name not in ('Smith', 'Brown', NULL)") + } } } }