forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GitHub Actions test #6 #23
Closed
Closed
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
HyukjinKwon
pushed a commit
that referenced
this pull request
Nov 11, 2020
### What changes were proposed in this pull request? Push down filter through expand. For case below: ``` create table t1(pid int, uid int, sid int, dt date, suid int) using parquet; create table t2(pid int, vs int, uid int, csid int) using parquet; SELECT years, appversion, SUM(uusers) AS users FROM (SELECT Date_trunc('year', dt) AS years, CASE WHEN h.pid = 3 THEN 'iOS' WHEN h.pid = 4 THEN 'Android' ELSE 'Other' END AS viewport, h.vs AS appversion, Count(DISTINCT u.uid) AS uusers ,Count(DISTINCT u.suid) AS srcusers FROM t1 u join t2 h ON h.uid = u.uid GROUP BY 1, 2, 3) AS a WHERE viewport = 'iOS' GROUP BY 1, 2 ``` Plan. before this pr: ``` == Physical Plan == *(5) HashAggregate(keys=[years#30, appversion#32], functions=[sum(uusers#33L)]) +- Exchange hashpartitioning(years#30, appversion#32, 200), true, [id=apache#251] +- *(4) HashAggregate(keys=[years#30, appversion#32], functions=[partial_sum(uusers#33L)]) +- *(4) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12], functions=[count(if ((gid#44 = 1)) u.`uid`#47 else null)]) +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, 200), true, [id=apache#246] +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12], functions=[partial_count(if ((gid#44 = 1)) u.`uid`#47 else null)]) +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44], functions=[]) +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44, 200), true, [id=apache#241] +- *(2) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44], functions=[]) +- *(2) Filter (CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46 = iOS) +- *(2) Expand [ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, uid#7, null, 1), ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, null, suid#10, 2)], [date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44] +- *(2) Project [uid#7, dt#9, suid#10, pid#11, vs#12] +- *(2) BroadcastHashJoin [uid#7], [uid#13], Inner, BuildRight :- *(2) Project [uid#7, dt#9, suid#10] : +- *(2) Filter isnotnull(uid#7) : +- *(2) ColumnarToRow : +- FileScan parquet default.t1[uid#7,dt#9,suid#10] Batched: true, DataFilters: [isnotnull(uid#7)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,dt:date,suid:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, true] as bigint))), [id=apache#233] +- *(1) Project [pid#11, vs#12, uid#13] +- *(1) Filter isnotnull(uid#13) +- *(1) ColumnarToRow +- FileScan parquet default.t2[pid#11,vs#12,uid#13] Batched: true, DataFilters: [isnotnull(uid#13)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<pid:int,vs:int,uid:int> ``` Plan. after. this pr. : ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[years#0, appversion#2], functions=[sum(uusers#3L)], output=[years#0, appversion#2, users#5L]) +- Exchange hashpartitioning(years#0, appversion#2, 5), true, [id=apache#71] +- HashAggregate(keys=[years#0, appversion#2], functions=[partial_sum(uusers#3L)], output=[years#0, appversion#2, sum#22L]) +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12], functions=[count(distinct uid#7)], output=[years#0, appversion#2, uusers#3L]) +- Exchange hashpartitioning(date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, 5), true, [id=apache#67] +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12], functions=[partial_count(distinct uid#7)], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, count#27L]) +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7], functions=[], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7]) +- Exchange hashpartitioning(date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7, 5), true, [id=apache#63] +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles)) AS date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END AS CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7], functions=[], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7]) +- Project [uid#7, dt#9, pid#11, vs#12] +- BroadcastHashJoin [uid#7], [uid#13], Inner, BuildRight, false :- Filter isnotnull(uid#7) : +- FileScan parquet default.t1[uid#7,dt#9] Batched: true, DataFilters: [isnotnull(uid#7)], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/4l/7_c5c97s1_gb0d9_d6shygx00000gn/T/warehouse-c069d87..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,dt:date> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)),false), [id=apache#58] +- Filter ((CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END = iOS) AND isnotnull(uid#13)) +- FileScan parquet default.t2[pid#11,vs#12,uid#13] Batched: true, DataFilters: [(CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END = iOS), isnotnull..., Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/4l/7_c5c97s1_gb0d9_d6shygx00000gn/T/warehouse-c069d87..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<pid:int,vs:int,uid:int> ``` ### Why are the changes needed? Improve performance, filter more data. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes apache#30278 from AngersZhuuuu/SPARK-33302. Authored-by: angerszhu <angers.zhu@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
HyukjinKwon
pushed a commit
that referenced
this pull request
Mar 22, 2021
…join (build right side) ### What changes were proposed in this pull request? This PR is to add code-gen support for left semi / left anti BroadcastNestedLoopJoin (build side is right side). The execution code path for build left side cannot fit into whole stage code-gen framework, so only add the code-gen for build right side here. Reference: the iterator (non-code-gen) code path is `BroadcastNestedLoopJoinExec.leftExistenceJoin()` with `BuildRight`. ### Why are the changes needed? Improve query CPU performance. Tested with a simple query: ``` val N = 20 << 20 val M = 1 << 4 val dim = broadcast(spark.range(M).selectExpr("id as k2")) codegenBenchmark("left semi broadcast nested loop join", N) { park.range(N).selectExpr(s"id as k1").join( dim, col("k1") + 1 <= col("k2"), "left_semi") } ``` Seeing 5x run time improvement: ``` Running benchmark: left semi broadcast nested loop join Running case: left semi broadcast nested loop join codegen off Stopped after 2 iterations, 6958 ms Running case: left semi broadcast nested loop join codegen on Stopped after 5 iterations, 3383 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.7 Intel(R) Core(TM) i9-9980HK CPU 2.40GHz left semi broadcast nested loop join: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- left semi broadcast nested loop join codegen off 3434 3479 65 6.1 163.7 1.0X left semi broadcast nested loop join codegen on 672 677 5 31.2 32.1 5.1X ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Changed existing unit test in `ExistenceJoinSuite.scala` to cover all code paths: * left semi/anti + empty right side + empty condition * left semi/anti + non-empty right side + empty condition * left semi/anti + right side + non-empty condition Added unit test in `WholeStageCodegenSuite.scala` to make sure code-gen for broadcast nested loop join is taking effect, and test for multiple join case as well. Example query: ``` val df1 = spark.range(4).select($"id".as("k1")) val df2 = spark.range(3).select($"id".as("k2")) df1.join(df2, $"k1" + 1 <= $"k2", "left_semi").explain("codegen") ``` Example generated code (`bnlj_doConsume_0` method): This is for left semi join. The generated code for left anti join is mostly to be same as here, except L55 to be `if (bnlj_findMatchedRow_0 == false) {`. ``` == Subtree 2 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:203(0.31% used); numInnerClasses:0) == *(2) Project [id#0L AS k1#2L] +- *(2) BroadcastNestedLoopJoin BuildRight, LeftSemi, ((id#0L + 1) <= k2#6L) :- *(2) Range (0, 4, step=1, splits=2) +- BroadcastExchange IdentityBroadcastMode, [id=#23] +- *(1) Project [id#4L AS k2#6L] +- *(1) Range (0, 3, step=1, splits=2) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage2(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=2 /* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private boolean range_initRange_0; /* 010 */ private long range_nextIndex_0; /* 011 */ private TaskContext range_taskContext_0; /* 012 */ private InputMetrics range_inputMetrics_0; /* 013 */ private long range_batchEnd_0; /* 014 */ private long range_numElementsTodo_0; /* 015 */ private InternalRow[] bnlj_buildRowArray_0; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4]; /* 017 */ /* 018 */ public GeneratedIteratorForCodegenStage2(Object[] references) { /* 019 */ this.references = references; /* 020 */ } /* 021 */ /* 022 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 023 */ partitionIndex = index; /* 024 */ this.inputs = inputs; /* 025 */ /* 026 */ range_taskContext_0 = TaskContext.get(); /* 027 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics(); /* 028 */ range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 029 */ range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 030 */ bnlj_buildRowArray_0 = (InternalRow[]) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcastTerm */).value(); /* 031 */ range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 032 */ range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 033 */ /* 034 */ } /* 035 */ /* 036 */ private void bnlj_doConsume_0(long bnlj_expr_0_0) throws java.io.IOException { /* 037 */ boolean bnlj_findMatchedRow_0 = false; /* 038 */ for (int bnlj_arrayIndex_0 = 0; bnlj_arrayIndex_0 < bnlj_buildRowArray_0.length; bnlj_arrayIndex_0++) { /* 039 */ UnsafeRow bnlj_buildRow_0 = (UnsafeRow) bnlj_buildRowArray_0[bnlj_arrayIndex_0]; /* 040 */ /* 041 */ long bnlj_value_1 = bnlj_buildRow_0.getLong(0); /* 042 */ /* 043 */ long bnlj_value_3 = -1L; /* 044 */ /* 045 */ bnlj_value_3 = bnlj_expr_0_0 + 1L; /* 046 */ /* 047 */ boolean bnlj_value_2 = false; /* 048 */ bnlj_value_2 = bnlj_value_3 <= bnlj_value_1; /* 049 */ if (!(false || !bnlj_value_2)) /* 050 */ { /* 051 */ bnlj_findMatchedRow_0 = true; /* 052 */ break; /* 053 */ } /* 054 */ } /* 055 */ if (bnlj_findMatchedRow_0 == true) { /* 056 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1); /* 057 */ /* 058 */ // common sub-expressions /* 059 */ /* 060 */ range_mutableStateArray_0[3].reset(); /* 061 */ /* 062 */ range_mutableStateArray_0[3].write(0, bnlj_expr_0_0); /* 063 */ append((range_mutableStateArray_0[3].getRow()).copy()); /* 064 */ /* 065 */ } /* 066 */ /* 067 */ } /* 068 */ /* 069 */ private void initRange(int idx) { /* 070 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 071 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L); /* 072 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(4L); /* 073 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 074 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 075 */ long partitionEnd; /* 076 */ /* 077 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 078 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 079 */ range_nextIndex_0 = Long.MAX_VALUE; /* 080 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 081 */ range_nextIndex_0 = Long.MIN_VALUE; /* 082 */ } else { /* 083 */ range_nextIndex_0 = st.longValue(); /* 084 */ } /* 085 */ range_batchEnd_0 = range_nextIndex_0; /* 086 */ /* 087 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 088 */ .multiply(step).add(start); /* 089 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 090 */ partitionEnd = Long.MAX_VALUE; /* 091 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 092 */ partitionEnd = Long.MIN_VALUE; /* 093 */ } else { /* 094 */ partitionEnd = end.longValue(); /* 095 */ } /* 096 */ /* 097 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract( /* 098 */ java.math.BigInteger.valueOf(range_nextIndex_0)); /* 099 */ range_numElementsTodo_0 = startToEnd.divide(step).longValue(); /* 100 */ if (range_numElementsTodo_0 < 0) { /* 101 */ range_numElementsTodo_0 = 0; /* 102 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) { /* 103 */ range_numElementsTodo_0++; /* 104 */ } /* 105 */ } /* 106 */ /* 107 */ protected void processNext() throws java.io.IOException { /* 108 */ // initialize Range /* 109 */ if (!range_initRange_0) { /* 110 */ range_initRange_0 = true; /* 111 */ initRange(partitionIndex); /* 112 */ } /* 113 */ /* 114 */ while (true) { /* 115 */ if (range_nextIndex_0 == range_batchEnd_0) { /* 116 */ long range_nextBatchTodo_0; /* 117 */ if (range_numElementsTodo_0 > 1000L) { /* 118 */ range_nextBatchTodo_0 = 1000L; /* 119 */ range_numElementsTodo_0 -= 1000L; /* 120 */ } else { /* 121 */ range_nextBatchTodo_0 = range_numElementsTodo_0; /* 122 */ range_numElementsTodo_0 = 0; /* 123 */ if (range_nextBatchTodo_0 == 0) break; /* 124 */ } /* 125 */ range_batchEnd_0 += range_nextBatchTodo_0 * 1L; /* 126 */ } /* 127 */ /* 128 */ int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L); /* 129 */ for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) { /* 130 */ long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0; /* 131 */ /* 132 */ bnlj_doConsume_0(range_value_0); /* 133 */ /* 134 */ if (shouldStop()) { /* 135 */ range_nextIndex_0 = range_value_0 + 1L; /* 136 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1); /* 137 */ range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1); /* 138 */ return; /* 139 */ } /* 140 */ /* 141 */ } /* 142 */ range_nextIndex_0 = range_batchEnd_0; /* 143 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0); /* 144 */ range_inputMetrics_0.incRecordsRead(range_localEnd_0); /* 145 */ range_taskContext_0.killTaskIfInterrupted(); /* 146 */ } /* 147 */ } /* 148 */ /* 149 */ } ``` Closes apache#31874 from c21/code-semi-anti. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
No description provided.