From 21e9691cb44b20b31690d6a2bf662ef5f839feff Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 10 Jan 2023 22:06:55 +0800 Subject: [PATCH] [SPARK-41708][SQL][FOLLOWUP] WriteFiles should replace exprId using new query ### What changes were proposed in this pull request? This is the followup of https://github.com/apache/spark/pull/39277, does three things: - replace WriteFiles attribute exprId using new query to avoid potential issue - remove unnecessary explain info with `WriteFiles` - cleanup unnecessary `Logging` ### Why are the changes needed? Improve the implementation of `WriteFiles` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add test Closes #39468 from ulysses-you/SPARK-41708-followup. Authored-by: ulysses-you Signed-off-by: Wenchen Fan --- .../sql/execution/datasources/V1Writes.scala | 6 ++- .../execution/datasources/WriteFiles.scala | 1 + .../resources/sql-tests/inputs/explain.sql | 5 ++ .../sql-tests/results/explain-aqe.sql.out | 48 +++++++++++++++++++ .../sql-tests/results/explain.sql.out | 48 +++++++++++++++++++ .../apache/spark/sql/SQLQueryTestHelper.scala | 1 + .../datasources/V1WriteCommandSuite.scala | 1 + .../hive/execution/InsertIntoHiveTable.scala | 3 +- 8 files changed, 110 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index 7b4fa7ad80bb0..d52af64521855 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -77,8 +77,12 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] => val newQuery = prepareQuery(write, write.query) val attrMap = AttributeMap(write.query.output.zip(newQuery.output)) - val newChild = WriteFiles(newQuery, write.fileFormat, write.partitionColumns, + val writeFiles = WriteFiles(newQuery, write.fileFormat, write.partitionColumns, write.bucketSpec, write.options, write.staticPartitions) + val newChild = writeFiles.transformExpressions { + case a: Attribute if attrMap.contains(a) => + a.withExprId(attrMap(a).exprId) + } val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions { case a: Attribute if attrMap.contains(a) => a.withExprId(attrMap(a).exprId) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala index 53d9470447175..d0ed6b02fef81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala @@ -52,6 +52,7 @@ case class WriteFiles( options: Map[String, String], staticPartitions: TablePartitionSpec) extends UnaryNode { override def output: Seq[Attribute] = child.output + override protected def stringArgs: Iterator[Any] = Iterator(child) override protected def withNewChildInternal(newChild: LogicalPlan): WriteFiles = copy(child = newChild) } diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql b/sql/core/src/test/resources/sql-tests/inputs/explain.sql index aa7f682a3018c..698ca009b4ffb 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/explain.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql @@ -7,6 +7,7 @@ CREATE table explain_temp1 (key int, val int) USING PARQUET; CREATE table explain_temp2 (key int, val int) USING PARQUET; CREATE table explain_temp3 (key int, val int) USING PARQUET; CREATE table explain_temp4 (key int, val string) USING PARQUET; +CREATE table explain_temp5 (key int) USING PARQUET PARTITIONED BY(val string); SET spark.sql.codegen.wholeStage = true; @@ -119,11 +120,15 @@ EXPLAIN FORMATTED FROM explain_temp4 GROUP BY key; +-- V1 Write +EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4; + -- cleanup DROP TABLE explain_temp1; DROP TABLE explain_temp2; DROP TABLE explain_temp3; DROP TABLE explain_temp4; +DROP TABLE explain_temp5; -- SPARK-35479: Format PartitionFilters IN strings in scan nodes CREATE table t(v array) USING PARQUET; diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index 7e237d1267897..08a63eda0b06a 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -31,6 +31,14 @@ struct<> +-- !query +CREATE table explain_temp5 (key int) USING PARQUET PARTITIONED BY(val string) +-- !query schema +struct<> +-- !query output + + + -- !query SET spark.sql.codegen.wholeStage = true -- !query schema @@ -1067,6 +1075,38 @@ Output [2]: [key#x, min(val)#x] Arguments: isFinalPlan=false +-- !query +EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4 +-- !query schema +struct +-- !query output +== Parsed Logical Plan == +'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false ++- 'Project [*] + +- 'UnresolvedRelation [explain_temp4], [], false + +== Analyzed Logical Plan == +InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val] ++- Project [key#x, val#x] + +- SubqueryAlias spark_catalog.default.explain_temp4 + +- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet + +== Optimized Logical Plan == +InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val] ++- WriteFiles + +- Sort [val#x ASC NULLS FIRST], false + +- Project [key#x, empty2null(val#x) AS val#x] + +- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet + +== Physical Plan == +Execute InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val] ++- WriteFiles + +- *Sort [val#x ASC NULLS FIRST], false, 0 + +- *Project [key#x, empty2null(val#x) AS val#x] + +- *ColumnarToRow + +- FileScan parquet spark_catalog.default.explain_temp4[key#x,val#x] Batched: true, DataFilters: [], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/explain_temp4], PartitionFilters: [], PushedFilters: [], ReadSchema: struct + + -- !query DROP TABLE explain_temp1 -- !query schema @@ -1099,6 +1139,14 @@ struct<> +-- !query +DROP TABLE explain_temp5 +-- !query schema +struct<> +-- !query output + + + -- !query CREATE table t(v array) USING PARQUET -- !query schema diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index cdfeba62e7238..d0813ecd52ee1 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -31,6 +31,14 @@ struct<> +-- !query +CREATE table explain_temp5 (key int) USING PARQUET PARTITIONED BY(val string) +-- !query schema +struct<> +-- !query output + + + -- !query SET spark.sql.codegen.wholeStage = true -- !query schema @@ -1009,6 +1017,38 @@ Aggregate Attributes [1]: [min(val#x)#x] Results [2]: [key#x, min(val#x)#x AS min(val)#x] +-- !query +EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4 +-- !query schema +struct +-- !query output +== Parsed Logical Plan == +'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false ++- 'Project [*] + +- 'UnresolvedRelation [explain_temp4], [], false + +== Analyzed Logical Plan == +InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val] ++- Project [key#x, val#x] + +- SubqueryAlias spark_catalog.default.explain_temp4 + +- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet + +== Optimized Logical Plan == +InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val] ++- WriteFiles + +- Sort [val#x ASC NULLS FIRST], false + +- Project [key#x, empty2null(val#x) AS val#x] + +- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet + +== Physical Plan == +Execute InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val] ++- WriteFiles + +- *Sort [val#x ASC NULLS FIRST], false, 0 + +- *Project [key#x, empty2null(val#x) AS val#x] + +- *ColumnarToRow + +- FileScan parquet spark_catalog.default.explain_temp4[key#x,val#x] Batched: true, DataFilters: [], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/explain_temp4], PartitionFilters: [], PushedFilters: [], ReadSchema: struct + + -- !query DROP TABLE explain_temp1 -- !query schema @@ -1041,6 +1081,14 @@ struct<> +-- !query +DROP TABLE explain_temp5 +-- !query schema +struct<> +-- !query output + + + -- !query CREATE table t(v array) USING PARQUET -- !query schema diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala index dd20c416f5251..fb4bd79780ceb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala @@ -41,6 +41,7 @@ trait SQLQueryTestHelper { .replaceAll( s"Location.*$clsName/", s"Location $notIncludedMsg/{warehouse_dir}/") + .replaceAll(s"file:.*$clsName", s"Location $notIncludedMsg/{warehouse_dir}") .replaceAll("Created By.*", s"Created By $notIncludedMsg") .replaceAll("Created Time.*", s"Created Time $notIncludedMsg") .replaceAll("Last Access.*", s"Last Access $notIncludedMsg") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index 80d0369044cd8..40574a8e73aa2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -69,6 +69,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils { case w: V1WriteCommand => if (hasLogicalSort && conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) { assert(w.query.isInstanceOf[WriteFiles]) + assert(w.partitionColumns == w.query.asInstanceOf[WriteFiles].partitionColumns) optimizedPlan = w.query.asInstanceOf[WriteFiles].child } else { optimizedPlan = w.query diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 6785b5d96d9d4..2c9720e089aa9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -286,7 +285,7 @@ case class InsertIntoHiveTable( copy(query = newChild) } -object InsertIntoHiveTable extends V1WritesHiveUtils with Logging { +object InsertIntoHiveTable extends V1WritesHiveUtils { def apply( table: CatalogTable, partition: Map[String, Option[String]],