Skip to content

Commit

Permalink
[SPARK-38772][SQL] Formatting the log plan in AdaptiveSparkPlanExec
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Use sideBySide to format the log plan in `AdaptiveSparkPlanExec`.
Before:
```
12:08:36.876 ERROR org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec: Plan changed from SortMergeJoin [key#13], [a#23], Inner
:- Sort [key#13 ASC NULLS FIRST], false, 0
:  +- ShuffleQueryStage 0
:     +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [id=#110]
:        +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
:           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
:              +- Scan[obj#12]
+- Sort [a#23 ASC NULLS FIRST], false, 0
   +- ShuffleQueryStage 1
      +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [id=#129]
         +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
            +- Scan[obj#22]
 to BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#145]
:  +- ShuffleQueryStage 0
:     +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [id=#110]
:        +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
:           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
:              +- Scan[obj#12]
+- ShuffleQueryStage 1
   +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [id=#129]
      +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
         +- Scan[obj#22]
```

After:
```
15:57:59.481 ERROR org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec: Plan changed:
!SortMergeJoin [key#13], [a#23], Inner                                                                                                                                                                                                                                                                                                                                         BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
!:- Sort [key#13 ASC NULLS FIRST], false, 0                                                                                                                                                                                                                                                                                                                                    :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#145]
 :  +- ShuffleQueryStage 0                                                                                                                                                                                                                                                                                                                                                     :  +- ShuffleQueryStage 0
 :     +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [id=#110]                                                                                                                                                                                                                                                                                                 :     +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [id=#110]
 :        +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))                                                                                                                                                                                                                                                                                                              :        +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
 :           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]   :           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
 :              +- Scan[obj#12]                                                                                                                                                                                                                                                                                                                                                :              +- Scan[obj#12]
!+- Sort [a#23 ASC NULLS FIRST], false, 0                                                                                                                                                                                                                                                                                                                                      +- ShuffleQueryStage 1
!   +- ShuffleQueryStage 1                                                                                                                                                                                                                                                                                                                                                        +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [id=#129]
!      +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [id=#129]                                                                                                                                                                                                                                                                                                         +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
!         +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]                                                                                                                                  +- Scan[obj#22]
!            +- Scan[obj#22]
```

### Why are the changes needed?

Enhance readability.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual testing.

Closes #36045 from wangyum/SPARK-38772.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
  • Loading branch information
wangyum committed Apr 7, 2022
1 parent 6d9bfb6 commit b57c93b
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution}
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._
Expand Down Expand Up @@ -306,7 +307,8 @@ case class AdaptiveSparkPlanExec(
val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
if (newCost < origCost ||
(newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan")
logOnLevel("Plan changed:\n" +
sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n"))
cleanUpTempTags(newPhysicalPlan)
currentPhysicalPlan = newPhysicalPlan
currentLogicalPlan = newLogicalPlan
Expand Down Expand Up @@ -335,7 +337,7 @@ case class AdaptiveSparkPlanExec(
if (!isSubquery && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) {
getExecutionId.foreach(onUpdatePlan(_, Seq.empty))
}
logOnLevel(s"Final plan: $currentPhysicalPlan")
logOnLevel(s"Final plan:\n$currentPhysicalPlan")
}

override def executeCollect(): Array[InternalRow] = {
Expand Down

0 comments on commit b57c93b

Please sign in to comment.