From cce0048c784edfe3eecac164549e68afba6865c1 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 18 May 2021 16:56:45 +0900 Subject: [PATCH] [SPARK-35351][SQL] Add code-gen for left anti sort merge join ### What changes were proposed in this pull request? As title. This PR is to add code-gen support for LEFT ANTI sort merge join. The main change is to extract `loadStreamed` in `SortMergeJoinExec.doProduce()`. That is to set all columns values for streamed row, when the streamed row has no output row. Example query: ``` val df1 = spark.range(10).select($"id".as("k1")) val df2 = spark.range(4).select($"id".as("k2")) df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti") ``` Example generated code: ``` == Subtree 5 / 5 (maxMethodCodeSize:296; maxConstantPoolSize:156(0.24% used); numInnerClasses:0) == *(5) Project [id#0L AS k1#2L] +- *(5) SortMergeJoin [id#0L], [k2#6L], LeftAnti :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#0L, 5), ENSURE_REQUIREMENTS, [id=#27] : +- *(1) Range (0, 10, step=1, splits=2) +- *(4) Sort [k2#6L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(k2#6L, 5), ENSURE_REQUIREMENTS, [id=#33] +- *(3) Project [id#4L AS k2#6L] +- *(3) Range (0, 4, step=1, splits=2) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage5(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=5 /* 006 */ final class GeneratedIteratorForCodegenStage5 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator smj_streamedInput_0; /* 010 */ private scala.collection.Iterator smj_bufferedInput_0; /* 011 */ private InternalRow smj_streamedRow_0; /* 012 */ private InternalRow smj_bufferedRow_0; /* 013 */ private long smj_value_2; /* 014 */ private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches_0; /* 015 */ private long smj_value_3; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] smj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2]; /* 017 */ /* 018 */ public GeneratedIteratorForCodegenStage5(Object[] references) { /* 019 */ this.references = references; /* 020 */ } /* 021 */ /* 022 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 023 */ partitionIndex = index; /* 024 */ this.inputs = inputs; /* 025 */ smj_streamedInput_0 = inputs[0]; /* 026 */ smj_bufferedInput_0 = inputs[1]; /* 027 */ /* 028 */ smj_matches_0 = new org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(1, 2147483647); /* 029 */ smj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 030 */ smj_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 031 */ /* 032 */ } /* 033 */ /* 034 */ private boolean findNextJoinRows( /* 035 */ scala.collection.Iterator streamedIter, /* 036 */ scala.collection.Iterator bufferedIter) { /* 037 */ smj_streamedRow_0 = null; /* 038 */ int comp = 0; /* 039 */ while (smj_streamedRow_0 == null) { /* 040 */ if (!streamedIter.hasNext()) return false; /* 041 */ smj_streamedRow_0 = (InternalRow) streamedIter.next(); /* 042 */ long smj_value_0 = smj_streamedRow_0.getLong(0); /* 043 */ if (false) { /* 044 */ if (!smj_matches_0.isEmpty()) { /* 045 */ smj_matches_0.clear(); /* 046 */ } /* 047 */ return false; /* 048 */ /* 049 */ } /* 050 */ if (!smj_matches_0.isEmpty()) { /* 051 */ comp = 0; /* 052 */ if (comp == 0) { /* 053 */ comp = (smj_value_0 > smj_value_3 ? 1 : smj_value_0 < smj_value_3 ? -1 : 0); /* 054 */ } /* 055 */ /* 056 */ if (comp == 0) { /* 057 */ return true; /* 058 */ } /* 059 */ smj_matches_0.clear(); /* 060 */ } /* 061 */ /* 062 */ do { /* 063 */ if (smj_bufferedRow_0 == null) { /* 064 */ if (!bufferedIter.hasNext()) { /* 065 */ smj_value_3 = smj_value_0; /* 066 */ return !smj_matches_0.isEmpty(); /* 067 */ } /* 068 */ smj_bufferedRow_0 = (InternalRow) bufferedIter.next(); /* 069 */ long smj_value_1 = smj_bufferedRow_0.getLong(0); /* 070 */ if (false) { /* 071 */ smj_bufferedRow_0 = null; /* 072 */ continue; /* 073 */ } /* 074 */ smj_value_2 = smj_value_1; /* 075 */ } /* 076 */ /* 077 */ comp = 0; /* 078 */ if (comp == 0) { /* 079 */ comp = (smj_value_0 > smj_value_2 ? 1 : smj_value_0 < smj_value_2 ? -1 : 0); /* 080 */ } /* 081 */ /* 082 */ if (comp > 0) { /* 083 */ smj_bufferedRow_0 = null; /* 084 */ } else if (comp < 0) { /* 085 */ if (!smj_matches_0.isEmpty()) { /* 086 */ smj_value_3 = smj_value_0; /* 087 */ return true; /* 088 */ } else { /* 089 */ return false; /* 090 */ } /* 091 */ } else { /* 092 */ if (smj_matches_0.isEmpty()) { /* 093 */ smj_matches_0.add((UnsafeRow) smj_bufferedRow_0); /* 094 */ } /* 095 */ /* 096 */ smj_bufferedRow_0 = null; /* 097 */ } /* 098 */ } while (smj_streamedRow_0 != null); /* 099 */ } /* 100 */ return false; // unreachable /* 101 */ } /* 102 */ /* 103 */ protected void processNext() throws java.io.IOException { /* 104 */ while (smj_streamedInput_0.hasNext()) { /* 105 */ findNextJoinRows(smj_streamedInput_0, smj_bufferedInput_0); /* 106 */ /* 107 */ long smj_value_4 = -1L; /* 108 */ smj_value_4 = smj_streamedRow_0.getLong(0); /* 109 */ scala.collection.Iterator smj_iterator_0 = smj_matches_0.generateIterator(); /* 110 */ /* 111 */ boolean wholestagecodegen_hasOutputRow_0 = false; /* 112 */ /* 113 */ while (!wholestagecodegen_hasOutputRow_0 && smj_iterator_0.hasNext()) { /* 114 */ InternalRow smj_bufferedRow_1 = (InternalRow) smj_iterator_0.next(); /* 115 */ /* 116 */ wholestagecodegen_hasOutputRow_0 = true; /* 117 */ } /* 118 */ /* 119 */ if (!wholestagecodegen_hasOutputRow_0) { /* 120 */ // load all values of streamed row, because the values not in join condition are not /* 121 */ // loaded yet. /* 122 */ /* 123 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1); /* 124 */ /* 125 */ // common sub-expressions /* 126 */ /* 127 */ smj_mutableStateArray_0[1].reset(); /* 128 */ /* 129 */ smj_mutableStateArray_0[1].write(0, smj_value_4); /* 130 */ append((smj_mutableStateArray_0[1].getRow()).copy()); /* 131 */ /* 132 */ } /* 133 */ if (shouldStop()) return; /* 134 */ } /* 135 */ ((org.apache.spark.sql.execution.joins.SortMergeJoinExec) references[1] /* plan */).cleanupResources(); /* 136 */ } /* 137 */ /* 138 */ } ``` ### Why are the changes needed? Improve the query CPU performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `WholeStageCodegenSuite.scala`, and existed unit test in `ExistenceJoinSuite.scala`. Closes #32547 from c21/smj-left-anti. Authored-by: Cheng Su Signed-off-by: Takeshi Yamamuro --- .../execution/joins/SortMergeJoinExec.scala | 97 ++++++++++++--- .../approved-plans-v1_4/q16.sf100/explain.txt | 4 +- .../q16.sf100/simplified.txt | 5 +- .../approved-plans-v1_4/q16/explain.txt | 4 +- .../approved-plans-v1_4/q16/simplified.txt | 5 +- .../approved-plans-v1_4/q69.sf100/explain.txt | 36 +++--- .../q69.sf100/simplified.txt | 110 +++++++++--------- .../approved-plans-v1_4/q87.sf100/explain.txt | 8 +- .../q87.sf100/simplified.txt | 10 +- .../approved-plans-v1_4/q94.sf100/explain.txt | 4 +- .../q94.sf100/simplified.txt | 5 +- .../approved-plans-v1_4/q94/explain.txt | 4 +- .../approved-plans-v1_4/q94/simplified.txt | 5 +- .../execution/WholeStageCodegenSuite.scala | 22 ++++ .../execution/metric/SQLMetricsSuite.scala | 4 +- 15 files changed, 208 insertions(+), 115 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 41c2be827ca72..96d23ba7557b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -364,7 +364,7 @@ case class SortMergeJoinExec( } private lazy val ((streamedPlan, streamedKeys), (bufferedPlan, bufferedKeys)) = joinType match { - case _: InnerLike | LeftOuter | LeftSemi => ((left, leftKeys), (right, rightKeys)) + case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => ((left, leftKeys), (right, rightKeys)) case RightOuter => ((right, rightKeys), (left, leftKeys)) case x => throw new IllegalArgumentException( @@ -375,7 +375,7 @@ case class SortMergeJoinExec( private lazy val bufferedOutput = bufferedPlan.output override def supportCodegen: Boolean = joinType match { - case _: InnerLike | LeftOuter | RightOuter | LeftSemi => true + case _: InnerLike | LeftOuter | RightOuter | LeftSemi | LeftAnti => true case _ => false } @@ -453,7 +453,7 @@ case class SortMergeJoinExec( |$streamedRow = null; |continue; """.stripMargin - case LeftOuter | RightOuter => + case LeftOuter | RightOuter | LeftAnti => // Eagerly return streamed row. Only call `matches.clear()` when `matches.isEmpty()` is // false, to reduce unnecessary computation. s""" @@ -472,7 +472,7 @@ case class SortMergeJoinExec( case _: InnerLike | LeftSemi => // Skip streamed row. s"$streamedRow = null;" - case LeftOuter | RightOuter => + case LeftOuter | RightOuter | LeftAnti => // Eagerly return with streamed row. "return false;" case x => @@ -509,8 +509,8 @@ case class SortMergeJoinExec( // 1. Inner and Left Semi join: skip the row. `matches` will be cleared later when // hitting the next `streamedRow` with non-null join // keys. - // 2. Left/Right Outer join: clear the previous `matches` if needed, keep the row, - // and return false. + // 2. Left/Right Outer and Left Anti join: clear the previous `matches` if needed, + // keep the row, and return false. // // - Step 2: Find the `matches` from buffered side having same join keys with `streamedRow`. // Clear `matches` if we hit a new `streamedRow`, as we need to find new matches. @@ -518,8 +518,8 @@ case class SortMergeJoinExec( // `matches` (`addRowToBuffer`). Return true when getting all matched rows. // For `streamedRow` without `matches` (`handleStreamedWithoutMatch`): // 1. Inner and Left Semi join: skip the row. - // 2. Left/Right Outer join: keep the row and return false (with `matches` being - // empty). + // 2. Left/Right Outer and Left Anti join: keep the row and return false (with + // `matches` being empty). val findNextJoinRowsFuncName = ctx.freshName("findNextJoinRows") ctx.addNewFunction(findNextJoinRowsFuncName, s""" @@ -664,14 +664,14 @@ case class SortMergeJoinExec( streamedVars ++ bufferedVars case RightOuter => bufferedVars ++ streamedVars - case LeftSemi => + case LeftSemi | LeftAnti => streamedVars case x => throw new IllegalArgumentException( s"SortMergeJoin.doProduce should not take $x as the JoinType") } - val (streamedBeforeLoop, condCheck) = if (condition.isDefined) { + val (streamedBeforeLoop, condCheck, loadStreamed) = if (condition.isDefined) { // Split the code of creating variables based on whether it's used by condition or not. val loaded = ctx.freshName("loaded") val (streamedBefore, streamedAfter) = splitVarsByCondition(streamedOutput, streamedVars) @@ -680,13 +680,36 @@ case class SortMergeJoinExec( ctx.currentVars = streamedVars ++ bufferedVars val cond = BindReferences.bindReference( condition.get, streamedPlan.output ++ bufferedPlan.output).genCode(ctx) - // evaluate the columns those used by condition before loop + // Evaluate the columns those used by condition before loop val before = s""" |boolean $loaded = false; |$streamedBefore """.stripMargin + val loadStreamed = + s""" + |if (!$loaded) { + | $loaded = true; + | $streamedAfter + |} + """.stripMargin + + val loadStreamedAfterCondition = joinType match { + case LeftAnti => + // No need to evaluate columns not used by condition from streamed side, as for Left Anti + // join, streamed row with match is not outputted. + "" + case _ => loadStreamed + } + + val loadBufferedAfterCondition = joinType match { + case LeftSemi | LeftAnti => + // No need to evaluate columns not used by condition from buffered side + "" + case _ => bufferedAfter + } + val checking = s""" |$bufferedBefore @@ -696,15 +719,12 @@ case class SortMergeJoinExec( | continue; | } |} - |if (!$loaded) { - | $loaded = true; - | $streamedAfter - |} - |$bufferedAfter + |$loadStreamedAfterCondition + |$loadBufferedAfterCondition """.stripMargin - (before, checking.trim) + (before, checking.trim, loadStreamed) } else { - (evaluateVariables(streamedVars), "") + (evaluateVariables(streamedVars), "", "") } val beforeLoop = @@ -732,6 +752,9 @@ case class SortMergeJoinExec( case LeftSemi => codegenSemi(findNextJoinRows, beforeLoop, iterator, bufferedRow, condCheck, ctx.freshName("hasOutputRow"), outputRow, eagerCleanup) + case LeftAnti => + codegenAnti(streamedInput, findNextJoinRows, beforeLoop, iterator, bufferedRow, condCheck, + loadStreamed, ctx.freshName("hasMatchedRow"), outputRow, eagerCleanup) case x => throw new IllegalArgumentException( s"SortMergeJoin.doProduce should not take $x as the JoinType") @@ -825,6 +848,44 @@ case class SortMergeJoinExec( """.stripMargin } + /** + * Generates the code for Left Anti join. + */ + private def codegenAnti( + streamedInput: String, + findNextJoinRows: String, + beforeLoop: String, + matchIterator: String, + bufferedRow: String, + conditionCheck: String, + loadStreamed: String, + hasMatchedRow: String, + outputRow: String, + eagerCleanup: String): String = { + s""" + |while ($streamedInput.hasNext()) { + | $findNextJoinRows; + | $beforeLoop + | boolean $hasMatchedRow = false; + | + | while (!$hasMatchedRow && $matchIterator.hasNext()) { + | InternalRow $bufferedRow = (InternalRow) $matchIterator.next(); + | $conditionCheck + | $hasMatchedRow = true; + | } + | + | if (!$hasMatchedRow) { + | // load all values of streamed row, because the values not in join condition are not + | // loaded yet. + | $loadStreamed + | $outputRow + | } + | if (shouldStop()) return; + |} + |$eagerCleanup + """.stripMargin + } + override protected def withNewChildrenInternal( newLeft: SparkPlan, newRight: SparkPlan): SortMergeJoinExec = copy(left = newLeft, right = newRight) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt index f9ab964739273..cf5d48e8b2c75 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt @@ -11,7 +11,7 @@ : +- * BroadcastHashJoin Inner BuildRight (32) : :- * Project (26) : : +- * BroadcastHashJoin Inner BuildRight (25) - : : :- SortMergeJoin LeftAnti (19) + : : :- * SortMergeJoin LeftAnti (19) : : : :- * Project (13) : : : : +- * SortMergeJoin LeftSemi (12) : : : : :- * Sort (6) @@ -124,7 +124,7 @@ Arguments: hashpartitioning(cr_order_number#14, 5), ENSURE_REQUIREMENTS, [id=#16 Input [1]: [cr_order_number#14] Arguments: [cr_order_number#14 ASC NULLS FIRST], false, 0 -(19) SortMergeJoin +(19) SortMergeJoin [codegen id : 11] Left keys [1]: [cs_order_number#5] Right keys [1]: [cr_order_number#14] Join condition: None diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt index 489200f5201eb..6d647d341f785 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt @@ -13,8 +13,8 @@ WholeStageCodegen (12) BroadcastHashJoin [cs_call_center_sk,cc_call_center_sk] Project [cs_ship_date_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit] BroadcastHashJoin [cs_ship_addr_sk,ca_address_sk] - InputAdapter - SortMergeJoin [cs_order_number,cr_order_number] + SortMergeJoin [cs_order_number,cr_order_number] + InputAdapter WholeStageCodegen (5) Project [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit] SortMergeJoin [cs_order_number,cs_order_number,cs_warehouse_sk,cs_warehouse_sk] @@ -39,6 +39,7 @@ WholeStageCodegen (12) ColumnarToRow InputAdapter Scan parquet default.catalog_sales [cs_warehouse_sk,cs_order_number,cs_sold_date_sk] + InputAdapter WholeStageCodegen (7) Sort [cr_order_number] InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/explain.txt index 647824d3a9d75..cf6b94ad6897c 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/explain.txt @@ -11,7 +11,7 @@ : +- * BroadcastHashJoin Inner BuildRight (32) : :- * Project (26) : : +- * BroadcastHashJoin Inner BuildRight (25) - : : :- SortMergeJoin LeftAnti (19) + : : :- * SortMergeJoin LeftAnti (19) : : : :- * Project (13) : : : : +- * SortMergeJoin LeftSemi (12) : : : : :- * Sort (6) @@ -124,7 +124,7 @@ Arguments: hashpartitioning(cr_order_number#14, 5), ENSURE_REQUIREMENTS, [id=#16 Input [1]: [cr_order_number#14] Arguments: [cr_order_number#14 ASC NULLS FIRST], false, 0 -(19) SortMergeJoin +(19) SortMergeJoin [codegen id : 11] Left keys [1]: [cs_order_number#5] Right keys [1]: [cr_order_number#14] Join condition: None diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/simplified.txt index c7ead9a46797a..f1f544b05318b 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/simplified.txt @@ -13,8 +13,8 @@ WholeStageCodegen (12) BroadcastHashJoin [cs_ship_addr_sk,ca_address_sk] Project [cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit] BroadcastHashJoin [cs_ship_date_sk,d_date_sk] - InputAdapter - SortMergeJoin [cs_order_number,cr_order_number] + SortMergeJoin [cs_order_number,cr_order_number] + InputAdapter WholeStageCodegen (5) Project [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit] SortMergeJoin [cs_order_number,cs_order_number,cs_warehouse_sk,cs_warehouse_sk] @@ -39,6 +39,7 @@ WholeStageCodegen (12) ColumnarToRow InputAdapter Scan parquet default.catalog_sales [cs_warehouse_sk,cs_order_number,cs_sold_date_sk] + InputAdapter WholeStageCodegen (7) Sort [cr_order_number] InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt index 2cb4a1f42eb1d..a2b59a845a4ae 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt @@ -9,8 +9,8 @@ TakeOrderedAndProject (51) : +- * Project (41) : +- * BroadcastHashJoin Inner BuildRight (40) : :- * Project (34) - : : +- SortMergeJoin LeftAnti (33) - : : :- SortMergeJoin LeftAnti (25) + : : +- * SortMergeJoin LeftAnti (33) + : : :- * SortMergeJoin LeftAnti (25) : : : :- * SortMergeJoin LeftSemi (17) : : : : :- * Sort (5) : : : : : +- Exchange (4) @@ -158,7 +158,7 @@ Arguments: hashpartitioning(ws_bill_customer_sk#13, 5), ENSURE_REQUIREMENTS, [id Input [1]: [ws_bill_customer_sk#13] Arguments: [ws_bill_customer_sk#13 ASC NULLS FIRST], false, 0 -(25) SortMergeJoin +(25) SortMergeJoin [codegen id : 10] Left keys [1]: [c_customer_sk#1] Right keys [1]: [ws_bill_customer_sk#13] Join condition: None @@ -170,18 +170,18 @@ Location: InMemoryFileIndex [] PartitionFilters: [isnotnull(cs_sold_date_sk#18), dynamicpruningexpression(cs_sold_date_sk#18 IN dynamicpruning#7)] ReadSchema: struct -(27) ColumnarToRow [codegen id : 11] +(27) ColumnarToRow [codegen id : 12] Input [2]: [cs_ship_customer_sk#17, cs_sold_date_sk#18] (28) ReusedExchange [Reuses operator id: 12] Output [1]: [d_date_sk#19] -(29) BroadcastHashJoin [codegen id : 11] +(29) BroadcastHashJoin [codegen id : 12] Left keys [1]: [cs_sold_date_sk#18] Right keys [1]: [d_date_sk#19] Join condition: None -(30) Project [codegen id : 11] +(30) Project [codegen id : 12] Output [1]: [cs_ship_customer_sk#17] Input [3]: [cs_ship_customer_sk#17, cs_sold_date_sk#18, d_date_sk#19] @@ -189,16 +189,16 @@ Input [3]: [cs_ship_customer_sk#17, cs_sold_date_sk#18, d_date_sk#19] Input [1]: [cs_ship_customer_sk#17] Arguments: hashpartitioning(cs_ship_customer_sk#17, 5), ENSURE_REQUIREMENTS, [id=#20] -(32) Sort [codegen id : 12] +(32) Sort [codegen id : 13] Input [1]: [cs_ship_customer_sk#17] Arguments: [cs_ship_customer_sk#17 ASC NULLS FIRST], false, 0 -(33) SortMergeJoin +(33) SortMergeJoin [codegen id : 15] Left keys [1]: [c_customer_sk#1] Right keys [1]: [cs_ship_customer_sk#17] Join condition: None -(34) Project [codegen id : 14] +(34) Project [codegen id : 15] Output [2]: [c_current_cdemo_sk#2, c_current_addr_sk#3] Input [3]: [c_customer_sk#1, c_current_cdemo_sk#2, c_current_addr_sk#3] @@ -209,14 +209,14 @@ Location [not included in comparison]/{warehouse_dir}/customer_address] PushedFilters: [In(ca_state, [KY,GA,NM]), IsNotNull(ca_address_sk)] ReadSchema: struct -(36) ColumnarToRow [codegen id : 13] +(36) ColumnarToRow [codegen id : 14] Input [2]: [ca_address_sk#21, ca_state#22] -(37) Filter [codegen id : 13] +(37) Filter [codegen id : 14] Input [2]: [ca_address_sk#21, ca_state#22] Condition : (ca_state#22 IN (KY,GA,NM) AND isnotnull(ca_address_sk#21)) -(38) Project [codegen id : 13] +(38) Project [codegen id : 14] Output [1]: [ca_address_sk#21] Input [2]: [ca_address_sk#21, ca_state#22] @@ -224,12 +224,12 @@ Input [2]: [ca_address_sk#21, ca_state#22] Input [1]: [ca_address_sk#21] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#23] -(40) BroadcastHashJoin [codegen id : 14] +(40) BroadcastHashJoin [codegen id : 15] Left keys [1]: [c_current_addr_sk#3] Right keys [1]: [ca_address_sk#21] Join condition: None -(41) Project [codegen id : 14] +(41) Project [codegen id : 15] Output [1]: [c_current_cdemo_sk#2] Input [3]: [c_current_cdemo_sk#2, c_current_addr_sk#3, ca_address_sk#21] @@ -251,16 +251,16 @@ Input [6]: [cd_demo_sk#25, cd_gender#26, cd_marital_status#27, cd_education_stat Input [6]: [cd_demo_sk#25, cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30] Condition : isnotnull(cd_demo_sk#25) -(46) BroadcastHashJoin [codegen id : 15] +(46) BroadcastHashJoin [codegen id : 16] Left keys [1]: [c_current_cdemo_sk#2] Right keys [1]: [cd_demo_sk#25] Join condition: None -(47) Project [codegen id : 15] +(47) Project [codegen id : 16] Output [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30] Input [7]: [c_current_cdemo_sk#2, cd_demo_sk#25, cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30] -(48) HashAggregate [codegen id : 15] +(48) HashAggregate [codegen id : 16] Input [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30] Keys [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30] Functions [1]: [partial_count(1)] @@ -271,7 +271,7 @@ Results [6]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_pur Input [6]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30, count#32] Arguments: hashpartitioning(cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30, 5), ENSURE_REQUIREMENTS, [id=#33] -(50) HashAggregate [codegen id : 16] +(50) HashAggregate [codegen id : 17] Input [6]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30, count#32] Keys [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30] Functions [1]: [count(1)] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/simplified.txt index bdbf95bd10721..f0d188e5f8637 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/simplified.txt @@ -1,72 +1,76 @@ TakeOrderedAndProject [cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating,cnt1,cnt2,cnt3] - WholeStageCodegen (16) + WholeStageCodegen (17) HashAggregate [cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating,count] [count(1),cnt1,cnt2,cnt3,count] InputAdapter Exchange [cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating] #1 - WholeStageCodegen (15) + WholeStageCodegen (16) HashAggregate [cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating] [count,count] Project [cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating] BroadcastHashJoin [c_current_cdemo_sk,cd_demo_sk] InputAdapter BroadcastExchange #2 - WholeStageCodegen (14) + WholeStageCodegen (15) Project [c_current_cdemo_sk] BroadcastHashJoin [c_current_addr_sk,ca_address_sk] Project [c_current_cdemo_sk,c_current_addr_sk] - InputAdapter - SortMergeJoin [c_customer_sk,cs_ship_customer_sk] - SortMergeJoin [c_customer_sk,ws_bill_customer_sk] - WholeStageCodegen (6) - SortMergeJoin [c_customer_sk,ss_customer_sk] - InputAdapter - WholeStageCodegen (2) - Sort [c_customer_sk] - InputAdapter - Exchange [c_customer_sk] #3 - WholeStageCodegen (1) - Filter [c_current_addr_sk,c_current_cdemo_sk] + SortMergeJoin [c_customer_sk,cs_ship_customer_sk] + InputAdapter + WholeStageCodegen (10) + SortMergeJoin [c_customer_sk,ws_bill_customer_sk] + InputAdapter + WholeStageCodegen (6) + SortMergeJoin [c_customer_sk,ss_customer_sk] + InputAdapter + WholeStageCodegen (2) + Sort [c_customer_sk] + InputAdapter + Exchange [c_customer_sk] #3 + WholeStageCodegen (1) + Filter [c_current_addr_sk,c_current_cdemo_sk] + ColumnarToRow + InputAdapter + Scan parquet default.customer [c_customer_sk,c_current_cdemo_sk,c_current_addr_sk] + InputAdapter + WholeStageCodegen (5) + Sort [ss_customer_sk] + InputAdapter + Exchange [ss_customer_sk] #4 + WholeStageCodegen (4) + Project [ss_customer_sk] + BroadcastHashJoin [ss_sold_date_sk,d_date_sk] + ColumnarToRow + InputAdapter + Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk] + SubqueryBroadcast [d_date_sk] #1 + ReusedExchange [d_date_sk] #5 + InputAdapter + BroadcastExchange #5 + WholeStageCodegen (3) + Project [d_date_sk] + Filter [d_year,d_moy,d_date_sk] + ColumnarToRow + InputAdapter + Scan parquet default.date_dim [d_date_sk,d_year,d_moy] + InputAdapter + WholeStageCodegen (9) + Sort [ws_bill_customer_sk] + InputAdapter + Exchange [ws_bill_customer_sk] #6 + WholeStageCodegen (8) + Project [ws_bill_customer_sk] + BroadcastHashJoin [ws_sold_date_sk,d_date_sk] ColumnarToRow InputAdapter - Scan parquet default.customer [c_customer_sk,c_current_cdemo_sk,c_current_addr_sk] - InputAdapter - WholeStageCodegen (5) - Sort [ss_customer_sk] - InputAdapter - Exchange [ss_customer_sk] #4 - WholeStageCodegen (4) - Project [ss_customer_sk] - BroadcastHashJoin [ss_sold_date_sk,d_date_sk] - ColumnarToRow - InputAdapter - Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk] - SubqueryBroadcast [d_date_sk] #1 - ReusedExchange [d_date_sk] #5 - InputAdapter - BroadcastExchange #5 - WholeStageCodegen (3) - Project [d_date_sk] - Filter [d_year,d_moy,d_date_sk] - ColumnarToRow - InputAdapter - Scan parquet default.date_dim [d_date_sk,d_year,d_moy] - WholeStageCodegen (9) - Sort [ws_bill_customer_sk] - InputAdapter - Exchange [ws_bill_customer_sk] #6 - WholeStageCodegen (8) - Project [ws_bill_customer_sk] - BroadcastHashJoin [ws_sold_date_sk,d_date_sk] - ColumnarToRow - InputAdapter - Scan parquet default.web_sales [ws_bill_customer_sk,ws_sold_date_sk] - ReusedSubquery [d_date_sk] #1 - InputAdapter - ReusedExchange [d_date_sk] #5 - WholeStageCodegen (12) + Scan parquet default.web_sales [ws_bill_customer_sk,ws_sold_date_sk] + ReusedSubquery [d_date_sk] #1 + InputAdapter + ReusedExchange [d_date_sk] #5 + InputAdapter + WholeStageCodegen (13) Sort [cs_ship_customer_sk] InputAdapter Exchange [cs_ship_customer_sk] #7 - WholeStageCodegen (11) + WholeStageCodegen (12) Project [cs_ship_customer_sk] BroadcastHashJoin [cs_sold_date_sk,d_date_sk] ColumnarToRow @@ -77,7 +81,7 @@ TakeOrderedAndProject [cd_gender,cd_marital_status,cd_education_status,cd_purcha ReusedExchange [d_date_sk] #5 InputAdapter BroadcastExchange #8 - WholeStageCodegen (13) + WholeStageCodegen (14) Project [ca_address_sk] Filter [ca_state,ca_address_sk] ColumnarToRow diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt index 92895cb566fd2..0bb08c7191367 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt @@ -5,13 +5,13 @@ +- * HashAggregate (68) +- Exchange (67) +- * HashAggregate (66) - +- SortMergeJoin LeftAnti (65) + +- * SortMergeJoin LeftAnti (65) :- * Sort (47) : +- Exchange (46) : +- * HashAggregate (45) : +- Exchange (44) : +- * HashAggregate (43) - : +- SortMergeJoin LeftAnti (42) + : +- * SortMergeJoin LeftAnti (42) : :- * Sort (24) : : +- Exchange (23) : : +- * HashAggregate (22) @@ -260,7 +260,7 @@ Arguments: hashpartitioning(coalesce(c_last_name#22, ), isnull(c_last_name#22), Input [3]: [c_last_name#22, c_first_name#21, d_date#18] Arguments: [coalesce(c_last_name#22, ) ASC NULLS FIRST, isnull(c_last_name#22) ASC NULLS FIRST, coalesce(c_first_name#21, ) ASC NULLS FIRST, isnull(c_first_name#21) ASC NULLS FIRST, coalesce(d_date#18, 1970-01-01) ASC NULLS FIRST, isnull(d_date#18) ASC NULLS FIRST], false, 0 -(42) SortMergeJoin +(42) SortMergeJoin [codegen id : 17] Left keys [6]: [coalesce(c_last_name#11, ), isnull(c_last_name#11), coalesce(c_first_name#10, ), isnull(c_first_name#10), coalesce(d_date#5, 1970-01-01), isnull(d_date#5)] Right keys [6]: [coalesce(c_last_name#22, ), isnull(c_last_name#22), coalesce(c_first_name#21, ), isnull(c_first_name#21), coalesce(d_date#18, 1970-01-01), isnull(d_date#18)] Join condition: None @@ -368,7 +368,7 @@ Arguments: hashpartitioning(coalesce(c_last_name#34, ), isnull(c_last_name#34), Input [3]: [c_last_name#34, c_first_name#33, d_date#30] Arguments: [coalesce(c_last_name#34, ) ASC NULLS FIRST, isnull(c_last_name#34) ASC NULLS FIRST, coalesce(c_first_name#33, ) ASC NULLS FIRST, isnull(c_first_name#33) ASC NULLS FIRST, coalesce(d_date#30, 1970-01-01) ASC NULLS FIRST, isnull(d_date#30) ASC NULLS FIRST], false, 0 -(65) SortMergeJoin +(65) SortMergeJoin [codegen id : 28] Left keys [6]: [coalesce(c_last_name#11, ), isnull(c_last_name#11), coalesce(c_first_name#10, ), isnull(c_first_name#10), coalesce(d_date#5, 1970-01-01), isnull(d_date#5)] Right keys [6]: [coalesce(c_last_name#34, ), isnull(c_last_name#34), coalesce(c_first_name#33, ), isnull(c_first_name#33), coalesce(d_date#30, 1970-01-01), isnull(d_date#30)] Join condition: None diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt index 2978f51532d83..421027136f3c0 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt @@ -9,8 +9,8 @@ WholeStageCodegen (30) Exchange [c_last_name,c_first_name,d_date] #2 WholeStageCodegen (28) HashAggregate [c_last_name,c_first_name,d_date] - InputAdapter - SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date] + SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date] + InputAdapter WholeStageCodegen (19) Sort [c_last_name,c_first_name,d_date] InputAdapter @@ -21,8 +21,8 @@ WholeStageCodegen (30) Exchange [c_last_name,c_first_name,d_date] #4 WholeStageCodegen (17) HashAggregate [c_last_name,c_first_name,d_date] - InputAdapter - SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date] + SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date] + InputAdapter WholeStageCodegen (8) Sort [c_last_name,c_first_name,d_date] InputAdapter @@ -67,6 +67,7 @@ WholeStageCodegen (30) ColumnarToRow InputAdapter Scan parquet default.customer [c_customer_sk,c_first_name,c_last_name] + InputAdapter WholeStageCodegen (16) Sort [c_last_name,c_first_name,d_date] InputAdapter @@ -99,6 +100,7 @@ WholeStageCodegen (30) Sort [c_customer_sk] InputAdapter ReusedExchange [c_customer_sk,c_first_name,c_last_name] #9 + InputAdapter WholeStageCodegen (27) Sort [c_last_name,c_first_name,d_date] InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt index e06516011e4ec..0d5b963ba7d3d 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt @@ -11,7 +11,7 @@ : +- * BroadcastHashJoin Inner BuildRight (32) : :- * Project (26) : : +- * BroadcastHashJoin Inner BuildRight (25) - : : :- SortMergeJoin LeftAnti (19) + : : :- * SortMergeJoin LeftAnti (19) : : : :- * Project (13) : : : : +- * SortMergeJoin LeftSemi (12) : : : : :- * Sort (6) @@ -124,7 +124,7 @@ Arguments: hashpartitioning(wr_order_number#14, 5), ENSURE_REQUIREMENTS, [id=#16 Input [1]: [wr_order_number#14] Arguments: [wr_order_number#14 ASC NULLS FIRST], false, 0 -(19) SortMergeJoin +(19) SortMergeJoin [codegen id : 11] Left keys [1]: [ws_order_number#5] Right keys [1]: [wr_order_number#14] Join condition: None diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/simplified.txt index b3c313fb5ded6..073ccc328045c 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/simplified.txt @@ -13,8 +13,8 @@ WholeStageCodegen (12) BroadcastHashJoin [ws_web_site_sk,web_site_sk] Project [ws_ship_date_sk,ws_web_site_sk,ws_order_number,ws_ext_ship_cost,ws_net_profit] BroadcastHashJoin [ws_ship_addr_sk,ca_address_sk] - InputAdapter - SortMergeJoin [ws_order_number,wr_order_number] + SortMergeJoin [ws_order_number,wr_order_number] + InputAdapter WholeStageCodegen (5) Project [ws_ship_date_sk,ws_ship_addr_sk,ws_web_site_sk,ws_order_number,ws_ext_ship_cost,ws_net_profit] SortMergeJoin [ws_order_number,ws_order_number,ws_warehouse_sk,ws_warehouse_sk] @@ -39,6 +39,7 @@ WholeStageCodegen (12) ColumnarToRow InputAdapter Scan parquet default.web_sales [ws_warehouse_sk,ws_order_number,ws_sold_date_sk] + InputAdapter WholeStageCodegen (7) Sort [wr_order_number] InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/explain.txt index 2d8c00cc4c936..7f668607961fd 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/explain.txt @@ -11,7 +11,7 @@ : +- * BroadcastHashJoin Inner BuildRight (32) : :- * Project (26) : : +- * BroadcastHashJoin Inner BuildRight (25) - : : :- SortMergeJoin LeftAnti (19) + : : :- * SortMergeJoin LeftAnti (19) : : : :- * Project (13) : : : : +- * SortMergeJoin LeftSemi (12) : : : : :- * Sort (6) @@ -124,7 +124,7 @@ Arguments: hashpartitioning(wr_order_number#14, 5), ENSURE_REQUIREMENTS, [id=#16 Input [1]: [wr_order_number#14] Arguments: [wr_order_number#14 ASC NULLS FIRST], false, 0 -(19) SortMergeJoin +(19) SortMergeJoin [codegen id : 11] Left keys [1]: [ws_order_number#5] Right keys [1]: [wr_order_number#14] Join condition: None diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/simplified.txt index cecad61df0774..7b29ce9b1923f 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/simplified.txt @@ -13,8 +13,8 @@ WholeStageCodegen (12) BroadcastHashJoin [ws_ship_addr_sk,ca_address_sk] Project [ws_ship_addr_sk,ws_web_site_sk,ws_order_number,ws_ext_ship_cost,ws_net_profit] BroadcastHashJoin [ws_ship_date_sk,d_date_sk] - InputAdapter - SortMergeJoin [ws_order_number,wr_order_number] + SortMergeJoin [ws_order_number,wr_order_number] + InputAdapter WholeStageCodegen (5) Project [ws_ship_date_sk,ws_ship_addr_sk,ws_web_site_sk,ws_order_number,ws_ext_ship_cost,ws_net_profit] SortMergeJoin [ws_order_number,ws_order_number,ws_warehouse_sk,ws_warehouse_sk] @@ -39,6 +39,7 @@ WholeStageCodegen (12) ColumnarToRow InputAdapter Scan parquet default.web_sales [ws_warehouse_sk,ws_order_number,ws_sold_date_sk] + InputAdapter WholeStageCodegen (7) Sort [wr_order_number] InputAdapter diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index f019e34b60118..6cc6e33dd688a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -225,6 +225,28 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession checkAnswer(twoJoinsDF, Seq(Row(0), Row(1), Row(2), Row(3))) } + test("Left Anti SortMergeJoin should be included in WholeStageCodegen") { + val df1 = spark.range(10).select($"id".as("k1")) + val df2 = spark.range(4).select($"id".as("k2")) + val df3 = spark.range(6).select($"id".as("k3")) + + // test one left anti sort merge join + val oneJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti") + assert(oneJoinDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) => true + }.size === 1) + checkAnswer(oneJoinDF, Seq(Row(4), Row(5), Row(6), Row(7), Row(8), Row(9))) + + // test two left anti sort merge joins + val twoJoinsDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti") + .join(df3.hint("SHUFFLE_MERGE"), $"k1" === $"k3", "left_anti") + assert(twoJoinsDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) | + WholeStageCodegenExec(_ : SortMergeJoinExec) => true + }.size === 2) + checkAnswer(twoJoinsDF, Seq(Row(6), Row(7), Row(8), Row(9))) + } + test("Inner/Cross BroadcastNestedLoopJoinExec should be included in WholeStageCodegen") { val df1 = spark.range(4).select($"id".as("k1")) val df2 = spark.range(3).select($"id".as("k2")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 1e7032ab07425..ac9edc4d23e3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -517,10 +517,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils withTempView("antiData") { anti.createOrReplaceTempView("antiData") val query = "SELECT * FROM testData2 ANTI JOIN antiData ON testData2.a = antiData.a" - Seq(false, true).foreach { enableWholeStage => + Seq((0L, false), (1L, true)).foreach { case (nodeId, enableWholeStage) => val df = spark.sql(query) testSparkPlanMetrics(df, 1, Map( - 0L -> (("SortMergeJoin", Map("number of output rows" -> 4L)))), + nodeId -> (("SortMergeJoin", Map("number of output rows" -> 4L)))), enableWholeStage ) }