Skip to content

Commit

Permalink
fix: Fallback to Spark if named_struct contains duplicate field names (
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya authored Oct 13, 2024
1 parent 22613e9 commit abd9f85
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2453,6 +2453,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
}

case struct @ CreateNamedStruct(_) =>
if (struct.names.length != struct.names.distinct.length) {
withInfo(expr, "CreateNamedStruct with duplicate field names are not supported")
return None
}

val valExprs = struct.valExprs.map(exprToProto(_, inputs, binding))

if (valExprs.forall(_.isDefined)) {
Expand Down
22 changes: 21 additions & 1 deletion spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
import org.apache.spark.sql.comet.CometProjectExec
import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, WholeStageCodegenExec}
import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, ProjectExec, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -2062,6 +2062,26 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("named_struct with duplicate field names") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
withParquetTable(path.toString, "tbl") {
checkSparkAnswerAndOperator(
"SELECT named_struct('a', _1, 'a', _2) FROM tbl",
classOf[ProjectExec])
checkSparkAnswerAndOperator(
"SELECT named_struct('a', _1, 'a', 2) FROM tbl",
classOf[ProjectExec])
checkSparkAnswerAndOperator(
"SELECT named_struct('a', named_struct('b', _1, 'b', _2)) FROM tbl",
classOf[ProjectExec])
}
}
}
}

test("to_json") {
Seq(true, false).foreach { dictionaryEnabled =>
withParquetTable(
Expand Down

0 comments on commit abd9f85

Please sign in to comment.