diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 41c2be827ca72..96d23ba7557b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -364,7 +364,7 @@ case class SortMergeJoinExec( } private lazy val ((streamedPlan, streamedKeys), (bufferedPlan, bufferedKeys)) = joinType match { - case _: InnerLike | LeftOuter | LeftSemi => ((left, leftKeys), (right, rightKeys)) + case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => ((left, leftKeys), (right, rightKeys)) case RightOuter => ((right, rightKeys), (left, leftKeys)) case x => throw new IllegalArgumentException( @@ -375,7 +375,7 @@ case class SortMergeJoinExec( private lazy val bufferedOutput = bufferedPlan.output override def supportCodegen: Boolean = joinType match { - case _: InnerLike | LeftOuter | RightOuter | LeftSemi => true + case _: InnerLike | LeftOuter | RightOuter | LeftSemi | LeftAnti => true case _ => false } @@ -453,7 +453,7 @@ case class SortMergeJoinExec( |$streamedRow = null; |continue; """.stripMargin - case LeftOuter | RightOuter => + case LeftOuter | RightOuter | LeftAnti => // Eagerly return streamed row. Only call `matches.clear()` when `matches.isEmpty()` is // false, to reduce unnecessary computation. s""" @@ -472,7 +472,7 @@ case class SortMergeJoinExec( case _: InnerLike | LeftSemi => // Skip streamed row. s"$streamedRow = null;" - case LeftOuter | RightOuter => + case LeftOuter | RightOuter | LeftAnti => // Eagerly return with streamed row. "return false;" case x => @@ -509,8 +509,8 @@ case class SortMergeJoinExec( // 1. Inner and Left Semi join: skip the row. `matches` will be cleared later when // hitting the next `streamedRow` with non-null join // keys. - // 2. Left/Right Outer join: clear the previous `matches` if needed, keep the row, - // and return false. + // 2. Left/Right Outer and Left Anti join: clear the previous `matches` if needed, + // keep the row, and return false. // // - Step 2: Find the `matches` from buffered side having same join keys with `streamedRow`. // Clear `matches` if we hit a new `streamedRow`, as we need to find new matches. @@ -518,8 +518,8 @@ case class SortMergeJoinExec( // `matches` (`addRowToBuffer`). Return true when getting all matched rows. // For `streamedRow` without `matches` (`handleStreamedWithoutMatch`): // 1. Inner and Left Semi join: skip the row. - // 2. Left/Right Outer join: keep the row and return false (with `matches` being - // empty). + // 2. Left/Right Outer and Left Anti join: keep the row and return false (with + // `matches` being empty). val findNextJoinRowsFuncName = ctx.freshName("findNextJoinRows") ctx.addNewFunction(findNextJoinRowsFuncName, s""" @@ -664,14 +664,14 @@ case class SortMergeJoinExec( streamedVars ++ bufferedVars case RightOuter => bufferedVars ++ streamedVars - case LeftSemi => + case LeftSemi | LeftAnti => streamedVars case x => throw new IllegalArgumentException( s"SortMergeJoin.doProduce should not take $x as the JoinType") } - val (streamedBeforeLoop, condCheck) = if (condition.isDefined) { + val (streamedBeforeLoop, condCheck, loadStreamed) = if (condition.isDefined) { // Split the code of creating variables based on whether it's used by condition or not. val loaded = ctx.freshName("loaded") val (streamedBefore, streamedAfter) = splitVarsByCondition(streamedOutput, streamedVars) @@ -680,13 +680,36 @@ case class SortMergeJoinExec( ctx.currentVars = streamedVars ++ bufferedVars val cond = BindReferences.bindReference( condition.get, streamedPlan.output ++ bufferedPlan.output).genCode(ctx) - // evaluate the columns those used by condition before loop + // Evaluate the columns those used by condition before loop val before = s""" |boolean $loaded = false; |$streamedBefore """.stripMargin + val loadStreamed = + s""" + |if (!$loaded) { + | $loaded = true; + | $streamedAfter + |} + """.stripMargin + + val loadStreamedAfterCondition = joinType match { + case LeftAnti => + // No need to evaluate columns not used by condition from streamed side, as for Left Anti + // join, streamed row with match is not outputted. + "" + case _ => loadStreamed + } + + val loadBufferedAfterCondition = joinType match { + case LeftSemi | LeftAnti => + // No need to evaluate columns not used by condition from buffered side + "" + case _ => bufferedAfter + } + val checking = s""" |$bufferedBefore @@ -696,15 +719,12 @@ case class SortMergeJoinExec( | continue; | } |} - |if (!$loaded) { - | $loaded = true; - | $streamedAfter - |} - |$bufferedAfter + |$loadStreamedAfterCondition + |$loadBufferedAfterCondition """.stripMargin - (before, checking.trim) + (before, checking.trim, loadStreamed) } else { - (evaluateVariables(streamedVars), "") + (evaluateVariables(streamedVars), "", "") } val beforeLoop = @@ -732,6 +752,9 @@ case class SortMergeJoinExec( case LeftSemi => codegenSemi(findNextJoinRows, beforeLoop, iterator, bufferedRow, condCheck, ctx.freshName("hasOutputRow"), outputRow, eagerCleanup) + case LeftAnti => + codegenAnti(streamedInput, findNextJoinRows, beforeLoop, iterator, bufferedRow, condCheck, + loadStreamed, ctx.freshName("hasMatchedRow"), outputRow, eagerCleanup) case x => throw new IllegalArgumentException( s"SortMergeJoin.doProduce should not take $x as the JoinType") @@ -825,6 +848,44 @@ case class SortMergeJoinExec( """.stripMargin } + /** + * Generates the code for Left Anti join. + */ + private def codegenAnti( + streamedInput: String, + findNextJoinRows: String, + beforeLoop: String, + matchIterator: String, + bufferedRow: String, + conditionCheck: String, + loadStreamed: String, + hasMatchedRow: String, + outputRow: String, + eagerCleanup: String): String = { + s""" + |while ($streamedInput.hasNext()) { + | $findNextJoinRows; + | $beforeLoop + | boolean $hasMatchedRow = false; + | + | while (!$hasMatchedRow && $matchIterator.hasNext()) { + | InternalRow $bufferedRow = (InternalRow) $matchIterator.next(); + | $conditionCheck + | $hasMatchedRow = true; + | } + | + | if (!$hasMatchedRow) { + | // load all values of streamed row, because the values not in join condition are not + | // loaded yet. + | $loadStreamed + | $outputRow + | } + | if (shouldStop()) return; + |} + |$eagerCleanup + """.stripMargin + } + override protected def withNewChildrenInternal( newLeft: SparkPlan, newRight: SparkPlan): SortMergeJoinExec = copy(left = newLeft, right = newRight) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt index f9ab964739273..cf5d48e8b2c75 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt @@ -11,7 +11,7 @@ : +- * BroadcastHashJoin Inner BuildRight (32) : :- * Project (26) : : +- * BroadcastHashJoin Inner BuildRight (25) - : : :- SortMergeJoin LeftAnti (19) + : : :- * SortMergeJoin LeftAnti (19) : : : :- * Project (13) : : : : +- * SortMergeJoin LeftSemi (12) : : : : :- * Sort (6) @@ -124,7 +124,7 @@ Arguments: hashpartitioning(cr_order_number#14, 5), ENSURE_REQUIREMENTS, [id=#16 Input [1]: [cr_order_number#14] Arguments: [cr_order_number#14 ASC NULLS FIRST], false, 0 -(19) SortMergeJoin +(19) SortMergeJoin [codegen id : 11] Left keys [1]: [cs_order_number#5] Right keys [1]: [cr_order_number#14] Join condition: None diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt index 489200f5201eb..6d647d341f785 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt @@ -13,8 +13,8 @@ WholeStageCodegen (12) BroadcastHashJoin [cs_call_center_sk,cc_call_center_sk] Project [cs_ship_date_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit] BroadcastHashJoin [cs_ship_addr_sk,ca_address_sk] - InputAdapter - SortMergeJoin [cs_order_number,cr_order_number] + SortMergeJoin [cs_order_number,cr_order_number] + InputAdapter WholeStageCodegen (5) Project [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit] SortMergeJoin [cs_order_number,cs_order_number,cs_warehouse_sk,cs_warehouse_sk] @@ -39,6 +39,7 @@ WholeStageCodegen (12) ColumnarToRow InputAdapter Scan parquet default.catalog_sales [cs_warehouse_sk,cs_order_number,cs_sold_date_sk] + InputAdapter WholeStageCodegen (7) Sort [cr_order_number] InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/explain.txt index 647824d3a9d75..cf6b94ad6897c 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/explain.txt @@ -11,7 +11,7 @@ : +- * BroadcastHashJoin Inner BuildRight (32) : :- * Project (26) : : +- * BroadcastHashJoin Inner BuildRight (25) - : : :- SortMergeJoin LeftAnti (19) + : : :- * SortMergeJoin LeftAnti (19) : : : :- * Project (13) : : : : +- * SortMergeJoin LeftSemi (12) : : : : :- * Sort (6) @@ -124,7 +124,7 @@ Arguments: hashpartitioning(cr_order_number#14, 5), ENSURE_REQUIREMENTS, [id=#16 Input [1]: [cr_order_number#14] Arguments: [cr_order_number#14 ASC NULLS FIRST], false, 0 -(19) SortMergeJoin +(19) SortMergeJoin [codegen id : 11] Left keys [1]: [cs_order_number#5] Right keys [1]: [cr_order_number#14] Join condition: None diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/simplified.txt index c7ead9a46797a..f1f544b05318b 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/simplified.txt @@ -13,8 +13,8 @@ WholeStageCodegen (12) BroadcastHashJoin [cs_ship_addr_sk,ca_address_sk] Project [cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit] BroadcastHashJoin [cs_ship_date_sk,d_date_sk] - InputAdapter - SortMergeJoin [cs_order_number,cr_order_number] + SortMergeJoin [cs_order_number,cr_order_number] + InputAdapter WholeStageCodegen (5) Project [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit] SortMergeJoin [cs_order_number,cs_order_number,cs_warehouse_sk,cs_warehouse_sk] @@ -39,6 +39,7 @@ WholeStageCodegen (12) ColumnarToRow InputAdapter Scan parquet default.catalog_sales [cs_warehouse_sk,cs_order_number,cs_sold_date_sk] + InputAdapter WholeStageCodegen (7) Sort [cr_order_number] InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt index 2cb4a1f42eb1d..a2b59a845a4ae 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt @@ -9,8 +9,8 @@ TakeOrderedAndProject (51) : +- * Project (41) : +- * BroadcastHashJoin Inner BuildRight (40) : :- * Project (34) - : : +- SortMergeJoin LeftAnti (33) - : : :- SortMergeJoin LeftAnti (25) + : : +- * SortMergeJoin LeftAnti (33) + : : :- * SortMergeJoin LeftAnti (25) : : : :- * SortMergeJoin LeftSemi (17) : : : : :- * Sort (5) : : : : : +- Exchange (4) @@ -158,7 +158,7 @@ Arguments: hashpartitioning(ws_bill_customer_sk#13, 5), ENSURE_REQUIREMENTS, [id Input [1]: [ws_bill_customer_sk#13] Arguments: [ws_bill_customer_sk#13 ASC NULLS FIRST], false, 0 -(25) SortMergeJoin +(25) SortMergeJoin [codegen id : 10] Left keys [1]: [c_customer_sk#1] Right keys [1]: [ws_bill_customer_sk#13] Join condition: None @@ -170,18 +170,18 @@ Location: InMemoryFileIndex [] PartitionFilters: [isnotnull(cs_sold_date_sk#18), dynamicpruningexpression(cs_sold_date_sk#18 IN dynamicpruning#7)] ReadSchema: struct -(27) ColumnarToRow [codegen id : 11] +(27) ColumnarToRow [codegen id : 12] Input [2]: [cs_ship_customer_sk#17, cs_sold_date_sk#18] (28) ReusedExchange [Reuses operator id: 12] Output [1]: [d_date_sk#19] -(29) BroadcastHashJoin [codegen id : 11] +(29) BroadcastHashJoin [codegen id : 12] Left keys [1]: [cs_sold_date_sk#18] Right keys [1]: [d_date_sk#19] Join condition: None -(30) Project [codegen id : 11] +(30) Project [codegen id : 12] Output [1]: [cs_ship_customer_sk#17] Input [3]: [cs_ship_customer_sk#17, cs_sold_date_sk#18, d_date_sk#19] @@ -189,16 +189,16 @@ Input [3]: [cs_ship_customer_sk#17, cs_sold_date_sk#18, d_date_sk#19] Input [1]: [cs_ship_customer_sk#17] Arguments: hashpartitioning(cs_ship_customer_sk#17, 5), ENSURE_REQUIREMENTS, [id=#20] -(32) Sort [codegen id : 12] +(32) Sort [codegen id : 13] Input [1]: [cs_ship_customer_sk#17] Arguments: [cs_ship_customer_sk#17 ASC NULLS FIRST], false, 0 -(33) SortMergeJoin +(33) SortMergeJoin [codegen id : 15] Left keys [1]: [c_customer_sk#1] Right keys [1]: [cs_ship_customer_sk#17] Join condition: None -(34) Project [codegen id : 14] +(34) Project [codegen id : 15] Output [2]: [c_current_cdemo_sk#2, c_current_addr_sk#3] Input [3]: [c_customer_sk#1, c_current_cdemo_sk#2, c_current_addr_sk#3] @@ -209,14 +209,14 @@ Location [not included in comparison]/{warehouse_dir}/customer_address] PushedFilters: [In(ca_state, [KY,GA,NM]), IsNotNull(ca_address_sk)] ReadSchema: struct -(36) ColumnarToRow [codegen id : 13] +(36) ColumnarToRow [codegen id : 14] Input [2]: [ca_address_sk#21, ca_state#22] -(37) Filter [codegen id : 13] +(37) Filter [codegen id : 14] Input [2]: [ca_address_sk#21, ca_state#22] Condition : (ca_state#22 IN (KY,GA,NM) AND isnotnull(ca_address_sk#21)) -(38) Project [codegen id : 13] +(38) Project [codegen id : 14] Output [1]: [ca_address_sk#21] Input [2]: [ca_address_sk#21, ca_state#22] @@ -224,12 +224,12 @@ Input [2]: [ca_address_sk#21, ca_state#22] Input [1]: [ca_address_sk#21] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#23] -(40) BroadcastHashJoin [codegen id : 14] +(40) BroadcastHashJoin [codegen id : 15] Left keys [1]: [c_current_addr_sk#3] Right keys [1]: [ca_address_sk#21] Join condition: None -(41) Project [codegen id : 14] +(41) Project [codegen id : 15] Output [1]: [c_current_cdemo_sk#2] Input [3]: [c_current_cdemo_sk#2, c_current_addr_sk#3, ca_address_sk#21] @@ -251,16 +251,16 @@ Input [6]: [cd_demo_sk#25, cd_gender#26, cd_marital_status#27, cd_education_stat Input [6]: [cd_demo_sk#25, cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30] Condition : isnotnull(cd_demo_sk#25) -(46) BroadcastHashJoin [codegen id : 15] +(46) BroadcastHashJoin [codegen id : 16] Left keys [1]: [c_current_cdemo_sk#2] Right keys [1]: [cd_demo_sk#25] Join condition: None -(47) Project [codegen id : 15] +(47) Project [codegen id : 16] Output [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30] Input [7]: [c_current_cdemo_sk#2, cd_demo_sk#25, cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30] -(48) HashAggregate [codegen id : 15] +(48) HashAggregate [codegen id : 16] Input [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30] Keys [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30] Functions [1]: [partial_count(1)] @@ -271,7 +271,7 @@ Results [6]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_pur Input [6]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30, count#32] Arguments: hashpartitioning(cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30, 5), ENSURE_REQUIREMENTS, [id=#33] -(50) HashAggregate [codegen id : 16] +(50) HashAggregate [codegen id : 17] Input [6]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30, count#32] Keys [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30] Functions [1]: [count(1)] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/simplified.txt index bdbf95bd10721..f0d188e5f8637 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/simplified.txt @@ -1,72 +1,76 @@ TakeOrderedAndProject [cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating,cnt1,cnt2,cnt3] - WholeStageCodegen (16) + WholeStageCodegen (17) HashAggregate [cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating,count] [count(1),cnt1,cnt2,cnt3,count] InputAdapter Exchange [cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating] #1 - WholeStageCodegen (15) + WholeStageCodegen (16) HashAggregate [cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating] [count,count] Project [cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating] BroadcastHashJoin [c_current_cdemo_sk,cd_demo_sk] InputAdapter BroadcastExchange #2 - WholeStageCodegen (14) + WholeStageCodegen (15) Project [c_current_cdemo_sk] BroadcastHashJoin [c_current_addr_sk,ca_address_sk] Project [c_current_cdemo_sk,c_current_addr_sk] - InputAdapter - SortMergeJoin [c_customer_sk,cs_ship_customer_sk] - SortMergeJoin [c_customer_sk,ws_bill_customer_sk] - WholeStageCodegen (6) - SortMergeJoin [c_customer_sk,ss_customer_sk] - InputAdapter - WholeStageCodegen (2) - Sort [c_customer_sk] - InputAdapter - Exchange [c_customer_sk] #3 - WholeStageCodegen (1) - Filter [c_current_addr_sk,c_current_cdemo_sk] + SortMergeJoin [c_customer_sk,cs_ship_customer_sk] + InputAdapter + WholeStageCodegen (10) + SortMergeJoin [c_customer_sk,ws_bill_customer_sk] + InputAdapter + WholeStageCodegen (6) + SortMergeJoin [c_customer_sk,ss_customer_sk] + InputAdapter + WholeStageCodegen (2) + Sort [c_customer_sk] + InputAdapter + Exchange [c_customer_sk] #3 + WholeStageCodegen (1) + Filter [c_current_addr_sk,c_current_cdemo_sk] + ColumnarToRow + InputAdapter + Scan parquet default.customer [c_customer_sk,c_current_cdemo_sk,c_current_addr_sk] + InputAdapter + WholeStageCodegen (5) + Sort [ss_customer_sk] + InputAdapter + Exchange [ss_customer_sk] #4 + WholeStageCodegen (4) + Project [ss_customer_sk] + BroadcastHashJoin [ss_sold_date_sk,d_date_sk] + ColumnarToRow + InputAdapter + Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk] + SubqueryBroadcast [d_date_sk] #1 + ReusedExchange [d_date_sk] #5 + InputAdapter + BroadcastExchange #5 + WholeStageCodegen (3) + Project [d_date_sk] + Filter [d_year,d_moy,d_date_sk] + ColumnarToRow + InputAdapter + Scan parquet default.date_dim [d_date_sk,d_year,d_moy] + InputAdapter + WholeStageCodegen (9) + Sort [ws_bill_customer_sk] + InputAdapter + Exchange [ws_bill_customer_sk] #6 + WholeStageCodegen (8) + Project [ws_bill_customer_sk] + BroadcastHashJoin [ws_sold_date_sk,d_date_sk] ColumnarToRow InputAdapter - Scan parquet default.customer [c_customer_sk,c_current_cdemo_sk,c_current_addr_sk] - InputAdapter - WholeStageCodegen (5) - Sort [ss_customer_sk] - InputAdapter - Exchange [ss_customer_sk] #4 - WholeStageCodegen (4) - Project [ss_customer_sk] - BroadcastHashJoin [ss_sold_date_sk,d_date_sk] - ColumnarToRow - InputAdapter - Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk] - SubqueryBroadcast [d_date_sk] #1 - ReusedExchange [d_date_sk] #5 - InputAdapter - BroadcastExchange #5 - WholeStageCodegen (3) - Project [d_date_sk] - Filter [d_year,d_moy,d_date_sk] - ColumnarToRow - InputAdapter - Scan parquet default.date_dim [d_date_sk,d_year,d_moy] - WholeStageCodegen (9) - Sort [ws_bill_customer_sk] - InputAdapter - Exchange [ws_bill_customer_sk] #6 - WholeStageCodegen (8) - Project [ws_bill_customer_sk] - BroadcastHashJoin [ws_sold_date_sk,d_date_sk] - ColumnarToRow - InputAdapter - Scan parquet default.web_sales [ws_bill_customer_sk,ws_sold_date_sk] - ReusedSubquery [d_date_sk] #1 - InputAdapter - ReusedExchange [d_date_sk] #5 - WholeStageCodegen (12) + Scan parquet default.web_sales [ws_bill_customer_sk,ws_sold_date_sk] + ReusedSubquery [d_date_sk] #1 + InputAdapter + ReusedExchange [d_date_sk] #5 + InputAdapter + WholeStageCodegen (13) Sort [cs_ship_customer_sk] InputAdapter Exchange [cs_ship_customer_sk] #7 - WholeStageCodegen (11) + WholeStageCodegen (12) Project [cs_ship_customer_sk] BroadcastHashJoin [cs_sold_date_sk,d_date_sk] ColumnarToRow @@ -77,7 +81,7 @@ TakeOrderedAndProject [cd_gender,cd_marital_status,cd_education_status,cd_purcha ReusedExchange [d_date_sk] #5 InputAdapter BroadcastExchange #8 - WholeStageCodegen (13) + WholeStageCodegen (14) Project [ca_address_sk] Filter [ca_state,ca_address_sk] ColumnarToRow diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt index 92895cb566fd2..0bb08c7191367 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt @@ -5,13 +5,13 @@ +- * HashAggregate (68) +- Exchange (67) +- * HashAggregate (66) - +- SortMergeJoin LeftAnti (65) + +- * SortMergeJoin LeftAnti (65) :- * Sort (47) : +- Exchange (46) : +- * HashAggregate (45) : +- Exchange (44) : +- * HashAggregate (43) - : +- SortMergeJoin LeftAnti (42) + : +- * SortMergeJoin LeftAnti (42) : :- * Sort (24) : : +- Exchange (23) : : +- * HashAggregate (22) @@ -260,7 +260,7 @@ Arguments: hashpartitioning(coalesce(c_last_name#22, ), isnull(c_last_name#22), Input [3]: [c_last_name#22, c_first_name#21, d_date#18] Arguments: [coalesce(c_last_name#22, ) ASC NULLS FIRST, isnull(c_last_name#22) ASC NULLS FIRST, coalesce(c_first_name#21, ) ASC NULLS FIRST, isnull(c_first_name#21) ASC NULLS FIRST, coalesce(d_date#18, 1970-01-01) ASC NULLS FIRST, isnull(d_date#18) ASC NULLS FIRST], false, 0 -(42) SortMergeJoin +(42) SortMergeJoin [codegen id : 17] Left keys [6]: [coalesce(c_last_name#11, ), isnull(c_last_name#11), coalesce(c_first_name#10, ), isnull(c_first_name#10), coalesce(d_date#5, 1970-01-01), isnull(d_date#5)] Right keys [6]: [coalesce(c_last_name#22, ), isnull(c_last_name#22), coalesce(c_first_name#21, ), isnull(c_first_name#21), coalesce(d_date#18, 1970-01-01), isnull(d_date#18)] Join condition: None @@ -368,7 +368,7 @@ Arguments: hashpartitioning(coalesce(c_last_name#34, ), isnull(c_last_name#34), Input [3]: [c_last_name#34, c_first_name#33, d_date#30] Arguments: [coalesce(c_last_name#34, ) ASC NULLS FIRST, isnull(c_last_name#34) ASC NULLS FIRST, coalesce(c_first_name#33, ) ASC NULLS FIRST, isnull(c_first_name#33) ASC NULLS FIRST, coalesce(d_date#30, 1970-01-01) ASC NULLS FIRST, isnull(d_date#30) ASC NULLS FIRST], false, 0 -(65) SortMergeJoin +(65) SortMergeJoin [codegen id : 28] Left keys [6]: [coalesce(c_last_name#11, ), isnull(c_last_name#11), coalesce(c_first_name#10, ), isnull(c_first_name#10), coalesce(d_date#5, 1970-01-01), isnull(d_date#5)] Right keys [6]: [coalesce(c_last_name#34, ), isnull(c_last_name#34), coalesce(c_first_name#33, ), isnull(c_first_name#33), coalesce(d_date#30, 1970-01-01), isnull(d_date#30)] Join condition: None diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt index 2978f51532d83..421027136f3c0 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt @@ -9,8 +9,8 @@ WholeStageCodegen (30) Exchange [c_last_name,c_first_name,d_date] #2 WholeStageCodegen (28) HashAggregate [c_last_name,c_first_name,d_date] - InputAdapter - SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date] + SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date] + InputAdapter WholeStageCodegen (19) Sort [c_last_name,c_first_name,d_date] InputAdapter @@ -21,8 +21,8 @@ WholeStageCodegen (30) Exchange [c_last_name,c_first_name,d_date] #4 WholeStageCodegen (17) HashAggregate [c_last_name,c_first_name,d_date] - InputAdapter - SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date] + SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date] + InputAdapter WholeStageCodegen (8) Sort [c_last_name,c_first_name,d_date] InputAdapter @@ -67,6 +67,7 @@ WholeStageCodegen (30) ColumnarToRow InputAdapter Scan parquet default.customer [c_customer_sk,c_first_name,c_last_name] + InputAdapter WholeStageCodegen (16) Sort [c_last_name,c_first_name,d_date] InputAdapter @@ -99,6 +100,7 @@ WholeStageCodegen (30) Sort [c_customer_sk] InputAdapter ReusedExchange [c_customer_sk,c_first_name,c_last_name] #9 + InputAdapter WholeStageCodegen (27) Sort [c_last_name,c_first_name,d_date] InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt index e06516011e4ec..0d5b963ba7d3d 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt @@ -11,7 +11,7 @@ : +- * BroadcastHashJoin Inner BuildRight (32) : :- * Project (26) : : +- * BroadcastHashJoin Inner BuildRight (25) - : : :- SortMergeJoin LeftAnti (19) + : : :- * SortMergeJoin LeftAnti (19) : : : :- * Project (13) : : : : +- * SortMergeJoin LeftSemi (12) : : : : :- * Sort (6) @@ -124,7 +124,7 @@ Arguments: hashpartitioning(wr_order_number#14, 5), ENSURE_REQUIREMENTS, [id=#16 Input [1]: [wr_order_number#14] Arguments: [wr_order_number#14 ASC NULLS FIRST], false, 0 -(19) SortMergeJoin +(19) SortMergeJoin [codegen id : 11] Left keys [1]: [ws_order_number#5] Right keys [1]: [wr_order_number#14] Join condition: None diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/simplified.txt index b3c313fb5ded6..073ccc328045c 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/simplified.txt @@ -13,8 +13,8 @@ WholeStageCodegen (12) BroadcastHashJoin [ws_web_site_sk,web_site_sk] Project [ws_ship_date_sk,ws_web_site_sk,ws_order_number,ws_ext_ship_cost,ws_net_profit] BroadcastHashJoin [ws_ship_addr_sk,ca_address_sk] - InputAdapter - SortMergeJoin [ws_order_number,wr_order_number] + SortMergeJoin [ws_order_number,wr_order_number] + InputAdapter WholeStageCodegen (5) Project [ws_ship_date_sk,ws_ship_addr_sk,ws_web_site_sk,ws_order_number,ws_ext_ship_cost,ws_net_profit] SortMergeJoin [ws_order_number,ws_order_number,ws_warehouse_sk,ws_warehouse_sk] @@ -39,6 +39,7 @@ WholeStageCodegen (12) ColumnarToRow InputAdapter Scan parquet default.web_sales [ws_warehouse_sk,ws_order_number,ws_sold_date_sk] + InputAdapter WholeStageCodegen (7) Sort [wr_order_number] InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/explain.txt index 2d8c00cc4c936..7f668607961fd 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/explain.txt @@ -11,7 +11,7 @@ : +- * BroadcastHashJoin Inner BuildRight (32) : :- * Project (26) : : +- * BroadcastHashJoin Inner BuildRight (25) - : : :- SortMergeJoin LeftAnti (19) + : : :- * SortMergeJoin LeftAnti (19) : : : :- * Project (13) : : : : +- * SortMergeJoin LeftSemi (12) : : : : :- * Sort (6) @@ -124,7 +124,7 @@ Arguments: hashpartitioning(wr_order_number#14, 5), ENSURE_REQUIREMENTS, [id=#16 Input [1]: [wr_order_number#14] Arguments: [wr_order_number#14 ASC NULLS FIRST], false, 0 -(19) SortMergeJoin +(19) SortMergeJoin [codegen id : 11] Left keys [1]: [ws_order_number#5] Right keys [1]: [wr_order_number#14] Join condition: None diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/simplified.txt index cecad61df0774..7b29ce9b1923f 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94/simplified.txt @@ -13,8 +13,8 @@ WholeStageCodegen (12) BroadcastHashJoin [ws_ship_addr_sk,ca_address_sk] Project [ws_ship_addr_sk,ws_web_site_sk,ws_order_number,ws_ext_ship_cost,ws_net_profit] BroadcastHashJoin [ws_ship_date_sk,d_date_sk] - InputAdapter - SortMergeJoin [ws_order_number,wr_order_number] + SortMergeJoin [ws_order_number,wr_order_number] + InputAdapter WholeStageCodegen (5) Project [ws_ship_date_sk,ws_ship_addr_sk,ws_web_site_sk,ws_order_number,ws_ext_ship_cost,ws_net_profit] SortMergeJoin [ws_order_number,ws_order_number,ws_warehouse_sk,ws_warehouse_sk] @@ -39,6 +39,7 @@ WholeStageCodegen (12) ColumnarToRow InputAdapter Scan parquet default.web_sales [ws_warehouse_sk,ws_order_number,ws_sold_date_sk] + InputAdapter WholeStageCodegen (7) Sort [wr_order_number] InputAdapter diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index f019e34b60118..6cc6e33dd688a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -225,6 +225,28 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession checkAnswer(twoJoinsDF, Seq(Row(0), Row(1), Row(2), Row(3))) } + test("Left Anti SortMergeJoin should be included in WholeStageCodegen") { + val df1 = spark.range(10).select($"id".as("k1")) + val df2 = spark.range(4).select($"id".as("k2")) + val df3 = spark.range(6).select($"id".as("k3")) + + // test one left anti sort merge join + val oneJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti") + assert(oneJoinDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) => true + }.size === 1) + checkAnswer(oneJoinDF, Seq(Row(4), Row(5), Row(6), Row(7), Row(8), Row(9))) + + // test two left anti sort merge joins + val twoJoinsDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti") + .join(df3.hint("SHUFFLE_MERGE"), $"k1" === $"k3", "left_anti") + assert(twoJoinsDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) | + WholeStageCodegenExec(_ : SortMergeJoinExec) => true + }.size === 2) + checkAnswer(twoJoinsDF, Seq(Row(6), Row(7), Row(8), Row(9))) + } + test("Inner/Cross BroadcastNestedLoopJoinExec should be included in WholeStageCodegen") { val df1 = spark.range(4).select($"id".as("k1")) val df2 = spark.range(3).select($"id".as("k2")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 1e7032ab07425..ac9edc4d23e3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -517,10 +517,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils withTempView("antiData") { anti.createOrReplaceTempView("antiData") val query = "SELECT * FROM testData2 ANTI JOIN antiData ON testData2.a = antiData.a" - Seq(false, true).foreach { enableWholeStage => + Seq((0L, false), (1L, true)).foreach { case (nodeId, enableWholeStage) => val df = spark.sql(query) testSparkPlanMetrics(df, 1, Map( - 0L -> (("SortMergeJoin", Map("number of output rows" -> 4L)))), + nodeId -> (("SortMergeJoin", Map("number of output rows" -> 4L)))), enableWholeStage ) }