From b57c93b2ab5d6b86cefc8eb0ce8a01c596e0630d Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 7 Apr 2022 08:54:10 +0800 Subject: [PATCH] [SPARK-38772][SQL] Formatting the log plan in AdaptiveSparkPlanExec ### What changes were proposed in this pull request? Use sideBySide to format the log plan in `AdaptiveSparkPlanExec`. Before: ``` 12:08:36.876 ERROR org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec: Plan changed from SortMergeJoin [key#13], [a#23], Inner :- Sort [key#13 ASC NULLS FIRST], false, 0 : +- ShuffleQueryStage 0 : +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [id=#110] : +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1)) : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14] : +- Scan[obj#12] +- Sort [a#23 ASC NULLS FIRST], false, 0 +- ShuffleQueryStage 1 +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [id=#129] +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24] +- Scan[obj#22] to BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#145] : +- ShuffleQueryStage 0 : +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [id=#110] : +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1)) : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14] : +- Scan[obj#12] +- ShuffleQueryStage 1 +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [id=#129] +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24] +- Scan[obj#22] ``` After: ``` 15:57:59.481 ERROR org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec: Plan changed: !SortMergeJoin [key#13], [a#23], Inner BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false !:- Sort [key#13 ASC NULLS FIRST], false, 0 :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#145] : +- ShuffleQueryStage 0 : +- ShuffleQueryStage 0 : +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [id=#110] : +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [id=#110] : +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1)) : +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1)) : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14] : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14] : +- Scan[obj#12] : +- Scan[obj#12] !+- Sort [a#23 ASC NULLS FIRST], false, 0 +- ShuffleQueryStage 1 ! +- ShuffleQueryStage 1 +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [id=#129] ! +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [id=#129] +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24] ! +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24] +- Scan[obj#22] ! +- Scan[obj#22] ``` ### Why are the changes needed? Enhance readability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual testing. Closes #36045 from wangyum/SPARK-38772. Authored-by: Yuming Wang Signed-off-by: Yuming Wang --- .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index c6505a0ea5f73..25380bc1d897b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._ @@ -306,7 +307,8 @@ case class AdaptiveSparkPlanExec( val newCost = costEvaluator.evaluateCost(newPhysicalPlan) if (newCost < origCost || (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) { - logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan") + logOnLevel("Plan changed:\n" + + sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n")) cleanUpTempTags(newPhysicalPlan) currentPhysicalPlan = newPhysicalPlan currentLogicalPlan = newLogicalPlan @@ -335,7 +337,7 @@ case class AdaptiveSparkPlanExec( if (!isSubquery && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) { getExecutionId.foreach(onUpdatePlan(_, Seq.empty)) } - logOnLevel(s"Final plan: $currentPhysicalPlan") + logOnLevel(s"Final plan:\n$currentPhysicalPlan") } override def executeCollect(): Array[InternalRow] = {