From f4e1069bb835e3e132f7758e5842af79f26cd162 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 21 Dec 2020 03:29:00 -0800 Subject: [PATCH] [SPARK-33853][SQL] EXPLAIN CODEGEN and BenchmarkQueryTest don't show subquery code ### What changes were proposed in this pull request? This PR fixes an issue that `EXPLAIN CODEGEN` and `BenchmarkQueryTest` don't show the corresponding code for subqueries. The following example is about `EXPLAIN CODEGEN`. ``` spark.conf.set("spark.sql.adaptive.enabled", "false") val df = spark.range(1, 100) df.createTempView("df") spark.sql("SELECT (SELECT min(id) AS v FROM df)").explain("CODEGEN") scala> spark.sql("SELECT (SELECT min(id) AS v FROM df)").explain("CODEGEN") Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 (maxMethodCodeSize:55; maxConstantPoolSize:97(0.15% used); numInnerClasses:0) == *(1) Project [Subquery scalar-subquery#3, [id=#24] AS scalarsubquery()#5L] : +- Subquery scalar-subquery#3, [id=#24] : +- *(2) HashAggregate(keys=[], functions=[min(id#0L)], output=[v#2L]) : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#20] : +- *(1) HashAggregate(keys=[], functions=[partial_min(id#0L)], output=[min#8L]) : +- *(1) Range (1, 100, step=1, splits=12) +- *(1) Scan OneRowRelation[] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=1 /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator rdd_input_0; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] project_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1]; /* 011 */ /* 012 */ public GeneratedIteratorForCodegenStage1(Object[] references) { /* 013 */ this.references = references; /* 014 */ } /* 015 */ /* 016 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 017 */ partitionIndex = index; /* 018 */ this.inputs = inputs; /* 019 */ rdd_input_0 = inputs[0]; /* 020 */ project_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 021 */ /* 022 */ } /* 023 */ /* 024 */ private void project_doConsume_0() throws java.io.IOException { /* 025 */ // common sub-expressions /* 026 */ /* 027 */ project_mutableStateArray_0[0].reset(); /* 028 */ /* 029 */ if (false) { /* 030 */ project_mutableStateArray_0[0].setNullAt(0); /* 031 */ } else { /* 032 */ project_mutableStateArray_0[0].write(0, 1L); /* 033 */ } /* 034 */ append((project_mutableStateArray_0[0].getRow())); /* 035 */ /* 036 */ } /* 037 */ /* 038 */ protected void processNext() throws java.io.IOException { /* 039 */ while ( rdd_input_0.hasNext()) { /* 040 */ InternalRow rdd_row_0 = (InternalRow) rdd_input_0.next(); /* 041 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1); /* 042 */ project_doConsume_0(); /* 043 */ if (shouldStop()) return; /* 044 */ } /* 045 */ } /* 046 */ /* 047 */ } ``` After this change, the corresponding code for subqueries are shown. ``` Found 3 WholeStageCodegen subtrees. == Subtree 1 / 3 (maxMethodCodeSize:282; maxConstantPoolSize:206(0.31% used); numInnerClasses:0) == *(1) HashAggregate(keys=[], functions=[partial_min(id#0L)], output=[min#8L]) +- *(1) Range (1, 100, step=1, splits=12) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=1 /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private boolean agg_initAgg_0; /* 010 */ private boolean agg_bufIsNull_0; /* 011 */ private long agg_bufValue_0; /* 012 */ private boolean range_initRange_0; /* 013 */ private long range_nextIndex_0; /* 014 */ private TaskContext range_taskContext_0; /* 015 */ private InputMetrics range_inputMetrics_0; /* 016 */ private long range_batchEnd_0; /* 017 */ private long range_numElementsTodo_0; /* 018 */ private boolean agg_agg_isNull_2_0; /* 019 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3]; /* 020 */ /* 021 */ public GeneratedIteratorForCodegenStage1(Object[] references) { /* 022 */ this.references = references; /* 023 */ } /* 024 */ /* 025 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 026 */ partitionIndex = index; /* 027 */ this.inputs = inputs; /* 028 */ /* 029 */ range_taskContext_0 = TaskContext.get(); /* 030 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics(); /* 031 */ range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 032 */ range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 033 */ range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 034 */ /* 035 */ } /* 036 */ /* 037 */ private void agg_doAggregateWithoutKey_0() throws java.io.IOException { /* 038 */ // initialize aggregation buffer /* 039 */ agg_bufIsNull_0 = true; /* 040 */ agg_bufValue_0 = -1L; /* 041 */ /* 042 */ // initialize Range /* 043 */ if (!range_initRange_0) { /* 044 */ range_initRange_0 = true; /* 045 */ initRange(partitionIndex); /* 046 */ } /* 047 */ /* 048 */ while (true) { /* 049 */ if (range_nextIndex_0 == range_batchEnd_0) { /* 050 */ long range_nextBatchTodo_0; /* 051 */ if (range_numElementsTodo_0 > 1000L) { /* 052 */ range_nextBatchTodo_0 = 1000L; /* 053 */ range_numElementsTodo_0 -= 1000L; /* 054 */ } else { /* 055 */ range_nextBatchTodo_0 = range_numElementsTodo_0; /* 056 */ range_numElementsTodo_0 = 0; /* 057 */ if (range_nextBatchTodo_0 == 0) break; /* 058 */ } /* 059 */ range_batchEnd_0 += range_nextBatchTodo_0 * 1L; /* 060 */ } /* 061 */ /* 062 */ int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L); /* 063 */ for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) { /* 064 */ long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0; /* 065 */ /* 066 */ agg_doConsume_0(range_value_0); /* 067 */ /* 068 */ // shouldStop check is eliminated /* 069 */ } /* 070 */ range_nextIndex_0 = range_batchEnd_0; /* 071 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0); /* 072 */ range_inputMetrics_0.incRecordsRead(range_localEnd_0); /* 073 */ range_taskContext_0.killTaskIfInterrupted(); /* 074 */ } /* 075 */ /* 076 */ } /* 077 */ /* 078 */ private void initRange(int idx) { /* 079 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 080 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(12L); /* 081 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(99L); /* 082 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 083 */ java.math.BigInteger start = java.math.BigInteger.valueOf(1L); /* 084 */ long partitionEnd; /* 085 */ /* 086 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 087 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 088 */ range_nextIndex_0 = Long.MAX_VALUE; /* 089 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 090 */ range_nextIndex_0 = Long.MIN_VALUE; /* 091 */ } else { /* 092 */ range_nextIndex_0 = st.longValue(); /* 093 */ } /* 094 */ range_batchEnd_0 = range_nextIndex_0; /* 095 */ /* 096 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 097 */ .multiply(step).add(start); /* 098 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 099 */ partitionEnd = Long.MAX_VALUE; /* 100 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 101 */ partitionEnd = Long.MIN_VALUE; /* 102 */ } else { /* 103 */ partitionEnd = end.longValue(); /* 104 */ } /* 105 */ /* 106 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract( /* 107 */ java.math.BigInteger.valueOf(range_nextIndex_0)); /* 108 */ range_numElementsTodo_0 = startToEnd.divide(step).longValue(); /* 109 */ if (range_numElementsTodo_0 < 0) { /* 110 */ range_numElementsTodo_0 = 0; /* 111 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) { /* 112 */ range_numElementsTodo_0++; /* 113 */ } /* 114 */ } /* 115 */ /* 116 */ private void agg_doConsume_0(long agg_expr_0_0) throws java.io.IOException { /* 117 */ // do aggregate /* 118 */ // common sub-expressions /* 119 */ /* 120 */ // evaluate aggregate functions and update aggregation buffers /* 121 */ /* 122 */ agg_agg_isNull_2_0 = true; /* 123 */ long agg_value_2 = -1L; /* 124 */ /* 125 */ if (!agg_bufIsNull_0 && (agg_agg_isNull_2_0 || /* 126 */ agg_value_2 > agg_bufValue_0)) { /* 127 */ agg_agg_isNull_2_0 = false; /* 128 */ agg_value_2 = agg_bufValue_0; /* 129 */ } /* 130 */ /* 131 */ if (!false && (agg_agg_isNull_2_0 || /* 132 */ agg_value_2 > agg_expr_0_0)) { /* 133 */ agg_agg_isNull_2_0 = false; /* 134 */ agg_value_2 = agg_expr_0_0; /* 135 */ } /* 136 */ /* 137 */ agg_bufIsNull_0 = agg_agg_isNull_2_0; /* 138 */ agg_bufValue_0 = agg_value_2; /* 139 */ /* 140 */ } /* 141 */ /* 142 */ protected void processNext() throws java.io.IOException { /* 143 */ while (!agg_initAgg_0) { /* 144 */ agg_initAgg_0 = true; /* 145 */ long agg_beforeAgg_0 = System.nanoTime(); /* 146 */ agg_doAggregateWithoutKey_0(); /* 147 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* aggTime */).add((System.nanoTime() - agg_beforeAgg_0) / 1000000); /* 148 */ /* 149 */ // output the result /* 150 */ /* 151 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numOutputRows */).add(1); /* 152 */ range_mutableStateArray_0[2].reset(); /* 153 */ /* 154 */ range_mutableStateArray_0[2].zeroOutNullBytes(); /* 155 */ /* 156 */ if (agg_bufIsNull_0) { /* 157 */ range_mutableStateArray_0[2].setNullAt(0); /* 158 */ } else { /* 159 */ range_mutableStateArray_0[2].write(0, agg_bufValue_0); /* 160 */ } /* 161 */ append((range_mutableStateArray_0[2].getRow())); /* 162 */ } /* 163 */ } /* 164 */ /* 165 */ } ``` ### Why are the changes needed? For better debuggability. ### Does this PR introduce _any_ user-facing change? Yes. After this change, users can see subquery code by `EXPLAIN CODEGEN`. ### How was this patch tested? New test. Closes #30859 from sarutak/explain-codegen-subqueries. Authored-by: Kousuke Saruta Signed-off-by: Dongjoon Hyun --- .../spark/sql/execution/debug/package.scala | 15 ++++++++++----- .../apache/spark/sql/BenchmarkQueryTest.scala | 14 ++++++++++---- .../org/apache/spark/sql/ExplainSuite.scala | 16 ++++++++++++++++ 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 6c40104e52a5f..3cbebca14f7dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -107,12 +107,17 @@ package object debug { */ def codegenStringSeq(plan: SparkPlan): Seq[(String, String, ByteCodeStats)] = { val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]() - plan transform { - case s: WholeStageCodegenExec => - codegenSubtrees += s - s - case s => s + + def findSubtrees(plan: SparkPlan): Unit = { + plan foreach { + case s: WholeStageCodegenExec => + codegenSubtrees += s + case s => + s.subqueries.foreach(findSubtrees) + } } + + findSubtrees(plan) codegenSubtrees.toSeq.sortBy(_.codegenStageId).map { subtree => val (_, source) = subtree.doCodeGen() val codeStats = try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala index 2c3b37a1498ec..d58bf2c6260b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala @@ -63,11 +63,17 @@ abstract class BenchmarkQueryTest extends QueryTest with SharedSparkSession { protected def checkGeneratedCode(plan: SparkPlan, checkMethodCodeSize: Boolean = true): Unit = { val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]() - plan foreach { - case s: WholeStageCodegenExec => - codegenSubtrees += s - case _ => + + def findSubtrees(plan: SparkPlan): Unit = { + plan foreach { + case s: WholeStageCodegenExec => + codegenSubtrees += s + case s => + s.subqueries.foreach(findSubtrees) + } } + + findSubtrees(plan) codegenSubtrees.toSeq.foreach { subtree => val code = subtree.doCodeGen()._2 val (_, ByteCodeStats(maxMethodCodeSize, _, _)) = try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 8b7459fddb59a..bf100c0205efa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -228,6 +228,22 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } + test("SPARK-33853: explain codegen - check presence of subquery") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { + withTempView("df") { + val df1 = spark.range(1, 100) + df1.createTempView("df") + + val sqlText = "EXPLAIN CODEGEN SELECT (SELECT min(id) FROM df)" + val expectedText = "Found 3 WholeStageCodegen subtrees." + + withNormalizedExplain(sqlText) { normalizedOutput => + assert(normalizedOutput.contains(expectedText)) + } + } + } + } + test("explain formatted - check presence of subquery in case of DPP") { withTable("df1", "df2") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",