Skip to content

Commit

Permalink
[SPARK-41708][SQL][FOLLOWUP] WriteFiles should replace exprId using n…
Browse files Browse the repository at this point in the history
…ew query

### What changes were proposed in this pull request?

This is the followup of #39277, does three things:
- replace WriteFiles attribute exprId using new query to avoid potential issue
- remove unnecessary explain info with `WriteFiles`
- cleanup unnecessary `Logging`

### Why are the changes needed?

Improve the implementation of `WriteFiles`

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add test

Closes #39468 from ulysses-you/SPARK-41708-followup.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
ulysses-you authored and cloud-fan committed Jan 10, 2023
1 parent aaee89a commit 21e9691
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>
val newQuery = prepareQuery(write, write.query)
val attrMap = AttributeMap(write.query.output.zip(newQuery.output))
val newChild = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,
val writeFiles = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,
write.bucketSpec, write.options, write.staticPartitions)
val newChild = writeFiles.transformExpressions {
case a: Attribute if attrMap.contains(a) =>
a.withExprId(attrMap(a).exprId)
}
val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions {
case a: Attribute if attrMap.contains(a) =>
a.withExprId(attrMap(a).exprId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ case class WriteFiles(
options: Map[String, String],
staticPartitions: TablePartitionSpec) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override protected def stringArgs: Iterator[Any] = Iterator(child)
override protected def withNewChildInternal(newChild: LogicalPlan): WriteFiles =
copy(child = newChild)
}
Expand Down
5 changes: 5 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/explain.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ CREATE table explain_temp1 (key int, val int) USING PARQUET;
CREATE table explain_temp2 (key int, val int) USING PARQUET;
CREATE table explain_temp3 (key int, val int) USING PARQUET;
CREATE table explain_temp4 (key int, val string) USING PARQUET;
CREATE table explain_temp5 (key int) USING PARQUET PARTITIONED BY(val string);

SET spark.sql.codegen.wholeStage = true;

Expand Down Expand Up @@ -119,11 +120,15 @@ EXPLAIN FORMATTED
FROM explain_temp4
GROUP BY key;

-- V1 Write
EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4;

-- cleanup
DROP TABLE explain_temp1;
DROP TABLE explain_temp2;
DROP TABLE explain_temp3;
DROP TABLE explain_temp4;
DROP TABLE explain_temp5;

-- SPARK-35479: Format PartitionFilters IN strings in scan nodes
CREATE table t(v array<string>) USING PARQUET;
Expand Down
48 changes: 48 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ struct<>



-- !query
CREATE table explain_temp5 (key int) USING PARQUET PARTITIONED BY(val string)
-- !query schema
struct<>
-- !query output



-- !query
SET spark.sql.codegen.wholeStage = true
-- !query schema
Expand Down Expand Up @@ -1067,6 +1075,38 @@ Output [2]: [key#x, min(val)#x]
Arguments: isFinalPlan=false


-- !query
EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
-- !query schema
struct<plan:string>
-- !query output
== Parsed Logical Plan ==
'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false
+- 'Project [*]
+- 'UnresolvedRelation [explain_temp4], [], false

== Analyzed Logical Plan ==
InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val]
+- Project [key#x, val#x]
+- SubqueryAlias spark_catalog.default.explain_temp4
+- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet

== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val]
+- WriteFiles
+- Sort [val#x ASC NULLS FIRST], false
+- Project [key#x, empty2null(val#x) AS val#x]
+- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet

== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val]
+- WriteFiles
+- *Sort [val#x ASC NULLS FIRST], false, 0
+- *Project [key#x, empty2null(val#x) AS val#x]
+- *ColumnarToRow
+- FileScan parquet spark_catalog.default.explain_temp4[key#x,val#x] Batched: true, DataFilters: [], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/explain_temp4], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,val:string>


-- !query
DROP TABLE explain_temp1
-- !query schema
Expand Down Expand Up @@ -1099,6 +1139,14 @@ struct<>



-- !query
DROP TABLE explain_temp5
-- !query schema
struct<>
-- !query output



-- !query
CREATE table t(v array<string>) USING PARQUET
-- !query schema
Expand Down
48 changes: 48 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/explain.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ struct<>



-- !query
CREATE table explain_temp5 (key int) USING PARQUET PARTITIONED BY(val string)
-- !query schema
struct<>
-- !query output



-- !query
SET spark.sql.codegen.wholeStage = true
-- !query schema
Expand Down Expand Up @@ -1009,6 +1017,38 @@ Aggregate Attributes [1]: [min(val#x)#x]
Results [2]: [key#x, min(val#x)#x AS min(val)#x]


-- !query
EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
-- !query schema
struct<plan:string>
-- !query output
== Parsed Logical Plan ==
'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false
+- 'Project [*]
+- 'UnresolvedRelation [explain_temp4], [], false

== Analyzed Logical Plan ==
InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val]
+- Project [key#x, val#x]
+- SubqueryAlias spark_catalog.default.explain_temp4
+- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet

== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val]
+- WriteFiles
+- Sort [val#x ASC NULLS FIRST], false
+- Project [key#x, empty2null(val#x) AS val#x]
+- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet

== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val]
+- WriteFiles
+- *Sort [val#x ASC NULLS FIRST], false, 0
+- *Project [key#x, empty2null(val#x) AS val#x]
+- *ColumnarToRow
+- FileScan parquet spark_catalog.default.explain_temp4[key#x,val#x] Batched: true, DataFilters: [], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/explain_temp4], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,val:string>


-- !query
DROP TABLE explain_temp1
-- !query schema
Expand Down Expand Up @@ -1041,6 +1081,14 @@ struct<>



-- !query
DROP TABLE explain_temp5
-- !query schema
struct<>
-- !query output



-- !query
CREATE table t(v array<string>) USING PARQUET
-- !query schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ trait SQLQueryTestHelper {
.replaceAll(
s"Location.*$clsName/",
s"Location $notIncludedMsg/{warehouse_dir}/")
.replaceAll(s"file:.*$clsName", s"Location $notIncludedMsg/{warehouse_dir}")
.replaceAll("Created By.*", s"Created By $notIncludedMsg")
.replaceAll("Created Time.*", s"Created Time $notIncludedMsg")
.replaceAll("Last Access.*", s"Last Access $notIncludedMsg")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils {
case w: V1WriteCommand =>
if (hasLogicalSort && conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) {
assert(w.query.isInstanceOf[WriteFiles])
assert(w.partitionColumns == w.query.asInstanceOf[WriteFiles].partitionColumns)
optimizedPlan = w.query.asInstanceOf[WriteFiles].child
} else {
optimizedPlan = w.query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.plan.TableDesc

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
Expand Down Expand Up @@ -286,7 +285,7 @@ case class InsertIntoHiveTable(
copy(query = newChild)
}

object InsertIntoHiveTable extends V1WritesHiveUtils with Logging {
object InsertIntoHiveTable extends V1WritesHiveUtils {
def apply(
table: CatalogTable,
partition: Map[String, Option[String]],
Expand Down

0 comments on commit 21e9691

Please sign in to comment.