Skip to content

Commit

Permalink
[SPARK-35351][SQL] Add code-gen for left anti sort merge join
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

As title. This PR is to add code-gen support for LEFT ANTI sort merge join. The main change is to extract `loadStreamed` in `SortMergeJoinExec.doProduce()`. That is to set all columns values for streamed row, when the streamed row has no output row.

Example query:

```
val df1 = spark.range(10).select($"id".as("k1"))
val df2 = spark.range(4).select($"id".as("k2"))
df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti")
```

Example generated code:

```
== Subtree 5 / 5 (maxMethodCodeSize:296; maxConstantPoolSize:156(0.24% used); numInnerClasses:0) ==
*(5) Project [id#0L AS k1#2L]
+- *(5) SortMergeJoin [id#0L], [k2#6L], LeftAnti
   :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#0L, 5), ENSURE_REQUIREMENTS, [id=#27]
   :     +- *(1) Range (0, 10, step=1, splits=2)
   +- *(4) Sort [k2#6L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(k2#6L, 5), ENSURE_REQUIREMENTS, [id=#33]
         +- *(3) Project [id#4L AS k2#6L]
            +- *(3) Range (0, 4, step=1, splits=2)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage5(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=5
/* 006 */ final class GeneratedIteratorForCodegenStage5 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator smj_streamedInput_0;
/* 010 */   private scala.collection.Iterator smj_bufferedInput_0;
/* 011 */   private InternalRow smj_streamedRow_0;
/* 012 */   private InternalRow smj_bufferedRow_0;
/* 013 */   private long smj_value_2;
/* 014 */   private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches_0;
/* 015 */   private long smj_value_3;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] smj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 017 */
/* 018 */   public GeneratedIteratorForCodegenStage5(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */     smj_streamedInput_0 = inputs[0];
/* 026 */     smj_bufferedInput_0 = inputs[1];
/* 027 */
/* 028 */     smj_matches_0 = new org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(1, 2147483647);
/* 029 */     smj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */     smj_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 031 */
/* 032 */   }
/* 033 */
/* 034 */   private boolean findNextJoinRows(
/* 035 */     scala.collection.Iterator streamedIter,
/* 036 */     scala.collection.Iterator bufferedIter) {
/* 037 */     smj_streamedRow_0 = null;
/* 038 */     int comp = 0;
/* 039 */     while (smj_streamedRow_0 == null) {
/* 040 */       if (!streamedIter.hasNext()) return false;
/* 041 */       smj_streamedRow_0 = (InternalRow) streamedIter.next();
/* 042 */       long smj_value_0 = smj_streamedRow_0.getLong(0);
/* 043 */       if (false) {
/* 044 */         if (!smj_matches_0.isEmpty()) {
/* 045 */           smj_matches_0.clear();
/* 046 */         }
/* 047 */         return false;
/* 048 */
/* 049 */       }
/* 050 */       if (!smj_matches_0.isEmpty()) {
/* 051 */         comp = 0;
/* 052 */         if (comp == 0) {
/* 053 */           comp = (smj_value_0 > smj_value_3 ? 1 : smj_value_0 < smj_value_3 ? -1 : 0);
/* 054 */         }
/* 055 */
/* 056 */         if (comp == 0) {
/* 057 */           return true;
/* 058 */         }
/* 059 */         smj_matches_0.clear();
/* 060 */       }
/* 061 */
/* 062 */       do {
/* 063 */         if (smj_bufferedRow_0 == null) {
/* 064 */           if (!bufferedIter.hasNext()) {
/* 065 */             smj_value_3 = smj_value_0;
/* 066 */             return !smj_matches_0.isEmpty();
/* 067 */           }
/* 068 */           smj_bufferedRow_0 = (InternalRow) bufferedIter.next();
/* 069 */           long smj_value_1 = smj_bufferedRow_0.getLong(0);
/* 070 */           if (false) {
/* 071 */             smj_bufferedRow_0 = null;
/* 072 */             continue;
/* 073 */           }
/* 074 */           smj_value_2 = smj_value_1;
/* 075 */         }
/* 076 */
/* 077 */         comp = 0;
/* 078 */         if (comp == 0) {
/* 079 */           comp = (smj_value_0 > smj_value_2 ? 1 : smj_value_0 < smj_value_2 ? -1 : 0);
/* 080 */         }
/* 081 */
/* 082 */         if (comp > 0) {
/* 083 */           smj_bufferedRow_0 = null;
/* 084 */         } else if (comp < 0) {
/* 085 */           if (!smj_matches_0.isEmpty()) {
/* 086 */             smj_value_3 = smj_value_0;
/* 087 */             return true;
/* 088 */           } else {
/* 089 */             return false;
/* 090 */           }
/* 091 */         } else {
/* 092 */           if (smj_matches_0.isEmpty()) {
/* 093 */             smj_matches_0.add((UnsafeRow) smj_bufferedRow_0);
/* 094 */           }
/* 095 */
/* 096 */           smj_bufferedRow_0 = null;
/* 097 */         }
/* 098 */       } while (smj_streamedRow_0 != null);
/* 099 */     }
/* 100 */     return false; // unreachable
/* 101 */   }
/* 102 */
/* 103 */   protected void processNext() throws java.io.IOException {
/* 104 */     while (smj_streamedInput_0.hasNext()) {
/* 105 */       findNextJoinRows(smj_streamedInput_0, smj_bufferedInput_0);
/* 106 */
/* 107 */       long smj_value_4 = -1L;
/* 108 */       smj_value_4 = smj_streamedRow_0.getLong(0);
/* 109 */       scala.collection.Iterator<UnsafeRow> smj_iterator_0 = smj_matches_0.generateIterator();
/* 110 */
/* 111 */       boolean wholestagecodegen_hasOutputRow_0 = false;
/* 112 */
/* 113 */       while (!wholestagecodegen_hasOutputRow_0 && smj_iterator_0.hasNext()) {
/* 114 */         InternalRow smj_bufferedRow_1 = (InternalRow) smj_iterator_0.next();
/* 115 */
/* 116 */         wholestagecodegen_hasOutputRow_0 = true;
/* 117 */       }
/* 118 */
/* 119 */       if (!wholestagecodegen_hasOutputRow_0) {
/* 120 */         // load all values of streamed row, because the values not in join condition are not
/* 121 */         // loaded yet.
/* 122 */
/* 123 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 124 */
/* 125 */         // common sub-expressions
/* 126 */
/* 127 */         smj_mutableStateArray_0[1].reset();
/* 128 */
/* 129 */         smj_mutableStateArray_0[1].write(0, smj_value_4);
/* 130 */         append((smj_mutableStateArray_0[1].getRow()).copy());
/* 131 */
/* 132 */       }
/* 133 */       if (shouldStop()) return;
/* 134 */     }
/* 135 */     ((org.apache.spark.sql.execution.joins.SortMergeJoinExec) references[1] /* plan */).cleanupResources();
/* 136 */   }
/* 137 */
/* 138 */ }
```

### Why are the changes needed?

Improve the query CPU performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `WholeStageCodegenSuite.scala`, and existed unit test in `ExistenceJoinSuite.scala`.

Closes apache#32547 from c21/smj-left-anti.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
  • Loading branch information
c21 authored and maropu committed May 18, 2021
1 parent 7b942d5 commit cce0048
Show file tree
Hide file tree
Showing 15 changed files with 208 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ case class SortMergeJoinExec(
}

private lazy val ((streamedPlan, streamedKeys), (bufferedPlan, bufferedKeys)) = joinType match {
case _: InnerLike | LeftOuter | LeftSemi => ((left, leftKeys), (right, rightKeys))
case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => ((left, leftKeys), (right, rightKeys))
case RightOuter => ((right, rightKeys), (left, leftKeys))
case x =>
throw new IllegalArgumentException(
Expand All @@ -375,7 +375,7 @@ case class SortMergeJoinExec(
private lazy val bufferedOutput = bufferedPlan.output

override def supportCodegen: Boolean = joinType match {
case _: InnerLike | LeftOuter | RightOuter | LeftSemi => true
case _: InnerLike | LeftOuter | RightOuter | LeftSemi | LeftAnti => true
case _ => false
}

Expand Down Expand Up @@ -453,7 +453,7 @@ case class SortMergeJoinExec(
|$streamedRow = null;
|continue;
""".stripMargin
case LeftOuter | RightOuter =>
case LeftOuter | RightOuter | LeftAnti =>
// Eagerly return streamed row. Only call `matches.clear()` when `matches.isEmpty()` is
// false, to reduce unnecessary computation.
s"""
Expand All @@ -472,7 +472,7 @@ case class SortMergeJoinExec(
case _: InnerLike | LeftSemi =>
// Skip streamed row.
s"$streamedRow = null;"
case LeftOuter | RightOuter =>
case LeftOuter | RightOuter | LeftAnti =>
// Eagerly return with streamed row.
"return false;"
case x =>
Expand Down Expand Up @@ -509,17 +509,17 @@ case class SortMergeJoinExec(
// 1. Inner and Left Semi join: skip the row. `matches` will be cleared later when
// hitting the next `streamedRow` with non-null join
// keys.
// 2. Left/Right Outer join: clear the previous `matches` if needed, keep the row,
// and return false.
// 2. Left/Right Outer and Left Anti join: clear the previous `matches` if needed,
// keep the row, and return false.
//
// - Step 2: Find the `matches` from buffered side having same join keys with `streamedRow`.
// Clear `matches` if we hit a new `streamedRow`, as we need to find new matches.
// Use `bufferedRow` to iterate buffered side to put all matched rows into
// `matches` (`addRowToBuffer`). Return true when getting all matched rows.
// For `streamedRow` without `matches` (`handleStreamedWithoutMatch`):
// 1. Inner and Left Semi join: skip the row.
// 2. Left/Right Outer join: keep the row and return false (with `matches` being
// empty).
// 2. Left/Right Outer and Left Anti join: keep the row and return false (with
// `matches` being empty).
val findNextJoinRowsFuncName = ctx.freshName("findNextJoinRows")
ctx.addNewFunction(findNextJoinRowsFuncName,
s"""
Expand Down Expand Up @@ -664,14 +664,14 @@ case class SortMergeJoinExec(
streamedVars ++ bufferedVars
case RightOuter =>
bufferedVars ++ streamedVars
case LeftSemi =>
case LeftSemi | LeftAnti =>
streamedVars
case x =>
throw new IllegalArgumentException(
s"SortMergeJoin.doProduce should not take $x as the JoinType")
}

val (streamedBeforeLoop, condCheck) = if (condition.isDefined) {
val (streamedBeforeLoop, condCheck, loadStreamed) = if (condition.isDefined) {
// Split the code of creating variables based on whether it's used by condition or not.
val loaded = ctx.freshName("loaded")
val (streamedBefore, streamedAfter) = splitVarsByCondition(streamedOutput, streamedVars)
Expand All @@ -680,13 +680,36 @@ case class SortMergeJoinExec(
ctx.currentVars = streamedVars ++ bufferedVars
val cond = BindReferences.bindReference(
condition.get, streamedPlan.output ++ bufferedPlan.output).genCode(ctx)
// evaluate the columns those used by condition before loop
// Evaluate the columns those used by condition before loop
val before =
s"""
|boolean $loaded = false;
|$streamedBefore
""".stripMargin

val loadStreamed =
s"""
|if (!$loaded) {
| $loaded = true;
| $streamedAfter
|}
""".stripMargin

val loadStreamedAfterCondition = joinType match {
case LeftAnti =>
// No need to evaluate columns not used by condition from streamed side, as for Left Anti
// join, streamed row with match is not outputted.
""
case _ => loadStreamed
}

val loadBufferedAfterCondition = joinType match {
case LeftSemi | LeftAnti =>
// No need to evaluate columns not used by condition from buffered side
""
case _ => bufferedAfter
}

val checking =
s"""
|$bufferedBefore
Expand All @@ -696,15 +719,12 @@ case class SortMergeJoinExec(
| continue;
| }
|}
|if (!$loaded) {
| $loaded = true;
| $streamedAfter
|}
|$bufferedAfter
|$loadStreamedAfterCondition
|$loadBufferedAfterCondition
""".stripMargin
(before, checking.trim)
(before, checking.trim, loadStreamed)
} else {
(evaluateVariables(streamedVars), "")
(evaluateVariables(streamedVars), "", "")
}

val beforeLoop =
Expand Down Expand Up @@ -732,6 +752,9 @@ case class SortMergeJoinExec(
case LeftSemi =>
codegenSemi(findNextJoinRows, beforeLoop, iterator, bufferedRow, condCheck,
ctx.freshName("hasOutputRow"), outputRow, eagerCleanup)
case LeftAnti =>
codegenAnti(streamedInput, findNextJoinRows, beforeLoop, iterator, bufferedRow, condCheck,
loadStreamed, ctx.freshName("hasMatchedRow"), outputRow, eagerCleanup)
case x =>
throw new IllegalArgumentException(
s"SortMergeJoin.doProduce should not take $x as the JoinType")
Expand Down Expand Up @@ -825,6 +848,44 @@ case class SortMergeJoinExec(
""".stripMargin
}

/**
* Generates the code for Left Anti join.
*/
private def codegenAnti(
streamedInput: String,
findNextJoinRows: String,
beforeLoop: String,
matchIterator: String,
bufferedRow: String,
conditionCheck: String,
loadStreamed: String,
hasMatchedRow: String,
outputRow: String,
eagerCleanup: String): String = {
s"""
|while ($streamedInput.hasNext()) {
| $findNextJoinRows;
| $beforeLoop
| boolean $hasMatchedRow = false;
|
| while (!$hasMatchedRow && $matchIterator.hasNext()) {
| InternalRow $bufferedRow = (InternalRow) $matchIterator.next();
| $conditionCheck
| $hasMatchedRow = true;
| }
|
| if (!$hasMatchedRow) {
| // load all values of streamed row, because the values not in join condition are not
| // loaded yet.
| $loadStreamed
| $outputRow
| }
| if (shouldStop()) return;
|}
|$eagerCleanup
""".stripMargin
}

override protected def withNewChildrenInternal(
newLeft: SparkPlan, newRight: SparkPlan): SortMergeJoinExec =
copy(left = newLeft, right = newRight)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
: +- * BroadcastHashJoin Inner BuildRight (32)
: :- * Project (26)
: : +- * BroadcastHashJoin Inner BuildRight (25)
: : :- SortMergeJoin LeftAnti (19)
: : :- * SortMergeJoin LeftAnti (19)
: : : :- * Project (13)
: : : : +- * SortMergeJoin LeftSemi (12)
: : : : :- * Sort (6)
Expand Down Expand Up @@ -124,7 +124,7 @@ Arguments: hashpartitioning(cr_order_number#14, 5), ENSURE_REQUIREMENTS, [id=#16
Input [1]: [cr_order_number#14]
Arguments: [cr_order_number#14 ASC NULLS FIRST], false, 0

(19) SortMergeJoin
(19) SortMergeJoin [codegen id : 11]
Left keys [1]: [cs_order_number#5]
Right keys [1]: [cr_order_number#14]
Join condition: None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ WholeStageCodegen (12)
BroadcastHashJoin [cs_call_center_sk,cc_call_center_sk]
Project [cs_ship_date_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
BroadcastHashJoin [cs_ship_addr_sk,ca_address_sk]
InputAdapter
SortMergeJoin [cs_order_number,cr_order_number]
SortMergeJoin [cs_order_number,cr_order_number]
InputAdapter
WholeStageCodegen (5)
Project [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
SortMergeJoin [cs_order_number,cs_order_number,cs_warehouse_sk,cs_warehouse_sk]
Expand All @@ -39,6 +39,7 @@ WholeStageCodegen (12)
ColumnarToRow
InputAdapter
Scan parquet default.catalog_sales [cs_warehouse_sk,cs_order_number,cs_sold_date_sk]
InputAdapter
WholeStageCodegen (7)
Sort [cr_order_number]
InputAdapter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
: +- * BroadcastHashJoin Inner BuildRight (32)
: :- * Project (26)
: : +- * BroadcastHashJoin Inner BuildRight (25)
: : :- SortMergeJoin LeftAnti (19)
: : :- * SortMergeJoin LeftAnti (19)
: : : :- * Project (13)
: : : : +- * SortMergeJoin LeftSemi (12)
: : : : :- * Sort (6)
Expand Down Expand Up @@ -124,7 +124,7 @@ Arguments: hashpartitioning(cr_order_number#14, 5), ENSURE_REQUIREMENTS, [id=#16
Input [1]: [cr_order_number#14]
Arguments: [cr_order_number#14 ASC NULLS FIRST], false, 0

(19) SortMergeJoin
(19) SortMergeJoin [codegen id : 11]
Left keys [1]: [cs_order_number#5]
Right keys [1]: [cr_order_number#14]
Join condition: None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ WholeStageCodegen (12)
BroadcastHashJoin [cs_ship_addr_sk,ca_address_sk]
Project [cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
BroadcastHashJoin [cs_ship_date_sk,d_date_sk]
InputAdapter
SortMergeJoin [cs_order_number,cr_order_number]
SortMergeJoin [cs_order_number,cr_order_number]
InputAdapter
WholeStageCodegen (5)
Project [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
SortMergeJoin [cs_order_number,cs_order_number,cs_warehouse_sk,cs_warehouse_sk]
Expand All @@ -39,6 +39,7 @@ WholeStageCodegen (12)
ColumnarToRow
InputAdapter
Scan parquet default.catalog_sales [cs_warehouse_sk,cs_order_number,cs_sold_date_sk]
InputAdapter
WholeStageCodegen (7)
Sort [cr_order_number]
InputAdapter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ TakeOrderedAndProject (51)
: +- * Project (41)
: +- * BroadcastHashJoin Inner BuildRight (40)
: :- * Project (34)
: : +- SortMergeJoin LeftAnti (33)
: : :- SortMergeJoin LeftAnti (25)
: : +- * SortMergeJoin LeftAnti (33)
: : :- * SortMergeJoin LeftAnti (25)
: : : :- * SortMergeJoin LeftSemi (17)
: : : : :- * Sort (5)
: : : : : +- Exchange (4)
Expand Down Expand Up @@ -158,7 +158,7 @@ Arguments: hashpartitioning(ws_bill_customer_sk#13, 5), ENSURE_REQUIREMENTS, [id
Input [1]: [ws_bill_customer_sk#13]
Arguments: [ws_bill_customer_sk#13 ASC NULLS FIRST], false, 0

(25) SortMergeJoin
(25) SortMergeJoin [codegen id : 10]
Left keys [1]: [c_customer_sk#1]
Right keys [1]: [ws_bill_customer_sk#13]
Join condition: None
Expand All @@ -170,35 +170,35 @@ Location: InMemoryFileIndex []
PartitionFilters: [isnotnull(cs_sold_date_sk#18), dynamicpruningexpression(cs_sold_date_sk#18 IN dynamicpruning#7)]
ReadSchema: struct<cs_ship_customer_sk:int>

(27) ColumnarToRow [codegen id : 11]
(27) ColumnarToRow [codegen id : 12]
Input [2]: [cs_ship_customer_sk#17, cs_sold_date_sk#18]

(28) ReusedExchange [Reuses operator id: 12]
Output [1]: [d_date_sk#19]

(29) BroadcastHashJoin [codegen id : 11]
(29) BroadcastHashJoin [codegen id : 12]
Left keys [1]: [cs_sold_date_sk#18]
Right keys [1]: [d_date_sk#19]
Join condition: None

(30) Project [codegen id : 11]
(30) Project [codegen id : 12]
Output [1]: [cs_ship_customer_sk#17]
Input [3]: [cs_ship_customer_sk#17, cs_sold_date_sk#18, d_date_sk#19]

(31) Exchange
Input [1]: [cs_ship_customer_sk#17]
Arguments: hashpartitioning(cs_ship_customer_sk#17, 5), ENSURE_REQUIREMENTS, [id=#20]

(32) Sort [codegen id : 12]
(32) Sort [codegen id : 13]
Input [1]: [cs_ship_customer_sk#17]
Arguments: [cs_ship_customer_sk#17 ASC NULLS FIRST], false, 0

(33) SortMergeJoin
(33) SortMergeJoin [codegen id : 15]
Left keys [1]: [c_customer_sk#1]
Right keys [1]: [cs_ship_customer_sk#17]
Join condition: None

(34) Project [codegen id : 14]
(34) Project [codegen id : 15]
Output [2]: [c_current_cdemo_sk#2, c_current_addr_sk#3]
Input [3]: [c_customer_sk#1, c_current_cdemo_sk#2, c_current_addr_sk#3]

Expand All @@ -209,27 +209,27 @@ Location [not included in comparison]/{warehouse_dir}/customer_address]
PushedFilters: [In(ca_state, [KY,GA,NM]), IsNotNull(ca_address_sk)]
ReadSchema: struct<ca_address_sk:int,ca_state:string>

(36) ColumnarToRow [codegen id : 13]
(36) ColumnarToRow [codegen id : 14]
Input [2]: [ca_address_sk#21, ca_state#22]

(37) Filter [codegen id : 13]
(37) Filter [codegen id : 14]
Input [2]: [ca_address_sk#21, ca_state#22]
Condition : (ca_state#22 IN (KY,GA,NM) AND isnotnull(ca_address_sk#21))

(38) Project [codegen id : 13]
(38) Project [codegen id : 14]
Output [1]: [ca_address_sk#21]
Input [2]: [ca_address_sk#21, ca_state#22]

(39) BroadcastExchange
Input [1]: [ca_address_sk#21]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#23]

(40) BroadcastHashJoin [codegen id : 14]
(40) BroadcastHashJoin [codegen id : 15]
Left keys [1]: [c_current_addr_sk#3]
Right keys [1]: [ca_address_sk#21]
Join condition: None

(41) Project [codegen id : 14]
(41) Project [codegen id : 15]
Output [1]: [c_current_cdemo_sk#2]
Input [3]: [c_current_cdemo_sk#2, c_current_addr_sk#3, ca_address_sk#21]

Expand All @@ -251,16 +251,16 @@ Input [6]: [cd_demo_sk#25, cd_gender#26, cd_marital_status#27, cd_education_stat
Input [6]: [cd_demo_sk#25, cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30]
Condition : isnotnull(cd_demo_sk#25)

(46) BroadcastHashJoin [codegen id : 15]
(46) BroadcastHashJoin [codegen id : 16]
Left keys [1]: [c_current_cdemo_sk#2]
Right keys [1]: [cd_demo_sk#25]
Join condition: None

(47) Project [codegen id : 15]
(47) Project [codegen id : 16]
Output [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30]
Input [7]: [c_current_cdemo_sk#2, cd_demo_sk#25, cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30]

(48) HashAggregate [codegen id : 15]
(48) HashAggregate [codegen id : 16]
Input [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30]
Keys [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30]
Functions [1]: [partial_count(1)]
Expand All @@ -271,7 +271,7 @@ Results [6]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_pur
Input [6]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30, count#32]
Arguments: hashpartitioning(cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30, 5), ENSURE_REQUIREMENTS, [id=#33]

(50) HashAggregate [codegen id : 16]
(50) HashAggregate [codegen id : 17]
Input [6]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30, count#32]
Keys [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30]
Functions [1]: [count(1)]
Expand Down
Loading

0 comments on commit cce0048

Please sign in to comment.