Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-33625][SQL] Subexpression elimination for whole-stage codegen in Filter #30565

Closed
wants to merge 7 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Dec 2, 2020

What changes were proposed in this pull request?

This patch proposes to enable whole-stage subexpression elimination for Filter.

Why are the changes needed?

We made subexpression elimination available for whole-stage codegen in ProjectExec. Another one operator that frequently runs into subexpressions, is Filter. We should also make whole-stage codegen subexpression elimination in FilterExec too.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test

@viirya viirya marked this pull request as draft December 2, 2020 06:42
@github-actions github-actions bot added the SQL label Dec 2, 2020
@SparkQA
Copy link

SparkQA commented Dec 2, 2020

Test build #132024 has finished for PR 30565 at commit 368236f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bart-samwel
Copy link

Do we pull out common subexpressions to just before the place where they are used, or all the way before the entire filter? Because if we pull them out farther, then we will get more exceptions and we will potentially slow things down.

Longer version:

This is a lot scarier than for Project. The reason is that in project, you can be pretty certain that all expressions will be evaluated, unless they are in conditionals. But in filters there are typically conjunctions, and in conjunctions, shortcut evaluation will hide potential exceptions. If you then pull some of those expressions ahead of the entire filter, then you may get errors where you didn't get errors before. This is especially scary when ANSI dialect is enabled, which makes a lot of expressions return errors.

E.g., in a filter like this: WHERE d <> 0 AND a/d > 3 AND a/d < 23, it's likely that it currently works because d<>0 will shortcut the other expressions if d = 0. But if you pull out the common subexpression a/d and evaluate it before the rest of the filter, then you'll get exceptions where you wouldn't currently get them.

Another thing to think about is expensive expressions. What if you have a filter WHERE selective_filter AND f(expensive_expression) AND g(expensive_expression)? If you pull the expensive expression in front, then you evaluate it many more times than if you don't.

But if we pull the expression to just before the conjunct that uses it first, then we're good.

@viirya
Copy link
Member Author

viirya commented Dec 2, 2020

@bart-samwel Thanks for the comment.

Yeah, I have also notified this point after creating the PR. It would be ideal if we can pull up subexpr in front of the conjunct that uses it. An easier but sub-optimal approach is to only pull up subexpr that are shared between all predicates in the Filter.

@bart-samwel
Copy link

@bart-samwel Thanks for the comment.

Yeah, I have also notified this point after creating the PR. It would be ideal if we can pull up subexpr in front of the conjunct that uses it. An easier but sub-optimal approach is to only pull up subexpr that are shared between all predicates in the Filter.

That's also unstable from a user's perspective because it could break by non-local changes, e.g. filters that get pushed down. It's not a great user experience if adding a small predicate causes a large performance cliff.

@viirya
Copy link
Member Author

viirya commented Dec 3, 2020

That's also unstable from a user's perspective because it could break by non-local changes, e.g. filters that get pushed down. It's not a great user experience if adding a small predicate causes a large performance cliff.

That's good point although it is not new. For example, Spark does some fallback between codegen and interpreted and vectorization and non-vecterization reader. All these fallback possibly cause performance cliff.

Anyway, as I said above, it is ideal case if we can just pull subexpr in front of the conjunct that uses it. As this is a WIP work, I will work towards this direction.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@viirya
Copy link
Member Author

viirya commented May 17, 2021

I'll find some time to continue on this.

@SparkQA
Copy link

SparkQA commented May 17, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43138/

@SparkQA
Copy link

SparkQA commented May 17, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43138/

@SparkQA
Copy link

SparkQA commented May 17, 2021

Test build #138617 has finished for PR 30565 at commit 368236f.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@github-actions github-actions bot closed this May 18, 2021
@maropu maropu removed the Stale label May 18, 2021
@maropu maropu reopened this May 18, 2021
@SparkQA
Copy link

SparkQA commented May 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43174/

@viirya
Copy link
Member Author

viirya commented May 18, 2021

Thanks @maropu! Forgot to remove the stale label.

@SparkQA
Copy link

SparkQA commented May 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43174/

@viirya
Copy link
Member Author

viirya commented Jul 4, 2021

Okay, now the subexpressions are just pulled before the individual predicate uses it.

@SparkQA
Copy link

SparkQA commented Jul 4, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45132/

@SparkQA
Copy link

SparkQA commented Jul 4, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45132/

@SparkQA
Copy link

SparkQA commented Jul 4, 2021

Test build #140619 has finished for PR 30565 at commit ba41720.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 6, 2021

Oh, seems there is still a trouble. The local inputs for subexpressions for each predicate.

@cfmcgrady
Copy link
Contributor

hi, @viirya

It's possible to eliminate subexpression in stage scope? currently, subexpression elimination will separately eliminate the expression in Project and Filter.

For example: (copy from jira ticket SPARK-35882)

    @tailrec
    def fib(index: Int, prev: Int, current: Int): Int = {
      if (index == 0) {
        current
      } else {
        fib(index - 1, prev = prev + current, current = prev)
      }
    }
    val getFibonacci: Int => Seq[Int] = (index: Int) => {
      Seq(fib(index, prev = 1, current = 0),
        fib(index + 1, prev = 1, current = 0),
        fib(index + 2, prev = 1, current = 0))
    }

    import org.apache.spark.sql.functions._
    val udfFib = udf(getFibonacci)

    val df = spark.range(1, 5)
      .withColumn("fib", udfFib(col("id")))
      .withColumn("fib2", explode(col("fib")))
    df.explain(true)
== Parsed Logical Plan ==
'Project [id#2L, fib#4, explode('fib) AS fib2#13]
+- Project [id#2L, UDF(cast(id#2L as int)) AS fib#4]
   +- Range (1, 5, step=1, splits=Some(2))

== Analyzed Logical Plan ==
id: bigint, fib: array<int>, fib2: int
Project [id#2L, fib#4, fib2#14]
+- Generate explode(fib#4), false, [fib2#14]
   +- Project [id#2L, UDF(cast(id#2L as int)) AS fib#4]
      +- Range (1, 5, step=1, splits=Some(2))

== Optimized Logical Plan ==
Generate explode(fib#4), false, [fib2#14]
+- Project [id#2L, UDF(cast(id#2L as int)) AS fib#4]
   +- Filter ((size(UDF(cast(id#2L as int)), true) > 0) AND isnotnull(UDF(cast(id#2L as int))))
      +- Range (1, 5, step=1, splits=Some(2))

== Physical Plan ==
*(1) Generate explode(fib#4), [id#2L, fib#4], false, [fib2#14]
+- *(1) Project [id#2L, UDF(cast(id#2L as int)) AS fib#4]
   +- *(1) Filter ((size(UDF(cast(id#2L as int)), true) > 0) AND isnotnull(UDF(cast(id#2L as int))))
      +- *(1) Range (1, 5, step=1, splits=2)
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 035 */
/* 036 */   private void project_doConsume_0(long project_expr_0_0) throws java.io.IOException {
/* 037 */     // common sub-expressions
/* 038 */
/* 039 */     boolean project_isNull_3 = false;
/* 040 */     int project_value_3 = -1;
/* 041 */     if (!false) {
/* 042 */       project_value_3 = (int) project_expr_0_0;
/* 043 */     }
/* 044 */
/* 045 */     Integer project_conv_0 = project_value_3;
/* 046 */     Object project_arg_0 = project_isNull_3 ? null : project_conv_0;
/* 047 */
/* 048 */     ArrayData project_result_0 = null;
/* 049 */     try {
/* 050 */       project_result_0 = (ArrayData)((scala.Function1[]) references[4] /* converters */)[1].apply(((scala.Function1) references[5] /* udf */).apply(project_arg_0));
/* 051 */     } catch (Throwable e) {
/* 052 */       throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(
/* 053 */         "SimpleSuite$$Lambda$839/2056566350", "int", "array<int>", e);
/* 054 */     }
/* 055 */
/* 056 */     boolean project_isNull_2 = project_result_0 == null;
/* 057 */     ArrayData project_value_2 = null;
/* 058 */     if (!project_isNull_2) {
/* 059 */       project_value_2 = project_result_0;
/* 060 */     }
/* 061 */
/* 062 */     generate_doConsume_0(project_expr_0_0, project_value_2, project_isNull_2);
/* 063 */
/* 064 */   }
/* 065 */
/* 110 */   protected void processNext() throws java.io.IOException {
/* 111 */     // initialize Range
/* 112 */     if (!range_initRange_0) {
/* 113 */       range_initRange_0 = true;
/* 114 */       initRange(partitionIndex);
/* 115 */     }
/* 116 */
/* 117 */     while (true) {
/* 118 */       if (range_nextIndex_0 == range_batchEnd_0) {
/* 119 */         long range_nextBatchTodo_0;
/* 120 */         if (range_numElementsTodo_0 > 1000L) {
/* 121 */           range_nextBatchTodo_0 = 1000L;
/* 122 */           range_numElementsTodo_0 -= 1000L;
/* 123 */         } else {
/* 124 */           range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 125 */           range_numElementsTodo_0 = 0;
/* 126 */           if (range_nextBatchTodo_0 == 0) break;
/* 127 */         }
/* 128 */         range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 129 */       }
/* 130 */
/* 131 */       int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 132 */       for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 133 */         long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 134 */
/* 135 */         do {
/* 136 */           // common subexpressions
/* 137 */
/* 138 */           boolean filter_isNull_1 = false;
/* 139 */           int filter_value_1 = -1;
/* 140 */           if (!false) {
/* 141 */             filter_value_1 = (int) range_value_0;
/* 142 */           }
/* 143 */
/* 144 */           Integer filter_conv_0 = filter_value_1;
/* 145 */           Object filter_arg_0 = filter_isNull_1 ? null : filter_conv_0;
/* 146 */
/* 147 */           ArrayData filter_result_0 = null;
/* 148 */           try {
/* 149 */             filter_result_0 = (ArrayData)((scala.Function1[]) references[2] /* converters */)[1].apply(((scala.Function1) references[3] /* udf */).apply(filter_arg_0));
/* 150 */           } catch (Throwable e) {
/* 151 */             throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(
/* 152 */               "SimpleSuite$$Lambda$839/2056566350", "int", "array<int>", e);
/* 153 */           }
/* 154 */
/* 155 */           boolean filter_isNull_0 = filter_result_0 == null;
/* 156 */           ArrayData filter_value_0 = null;
/* 157 */           if (!filter_isNull_0) {
/* 158 */             filter_value_0 = filter_result_0;
/* 159 */           }
/* 160 */
/* 161 */           // end of common subexpressions
/* 162 */
/* 163 */           // RequiredVariables
/* 164 */
/* 165 */           // end of RequiredVariables
/* 166 */           // ev code
/* 167 */           boolean filter_isNull_4 = false;
/* 168 */
/* 169 */         int filter_value_4 = filter_isNull_0 ? -1 :
/* 170 */           (filter_value_0).numElements();
/* 171 */
/* 172 */           boolean filter_value_3 = false;
/* 173 */           filter_value_3 = filter_value_4 > 0;
/* 174 */           // end of ev code
/* 175 */           if (!filter_value_3) continue;
/* 176 */           // common subexpressions
/* 177 */
/* 178 */           // end of common subexpressions
/* 179 */
/* 180 */           // RequiredVariables
/* 181 */
/* 182 */           // end of RequiredVariables
/* 183 */           // ev code
/* 184 */           boolean filter_value_7 = !filter_isNull_0;
/* 185 */           // end of ev code
/* 186 */           if (!filter_value_7) continue;
/* 187 */
/* 188 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numOutputRows */).add(1);
/* 189 */
/* 190 */           project_doConsume_0(range_value_0);
/* 191 */
/* 192 */         } while(false);
/* 193 */
/* 194 */         if (shouldStop()) {
/* 195 */           range_nextIndex_0 = range_value_0 + 1L;
/* 196 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1);
/* 197 */           range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
/* 198 */           return;
/* 199 */         }
/* 200 */
/* 201 */       }
/* 202 */       range_nextIndex_0 = range_batchEnd_0;
/* 203 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
/* 204 */       range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 205 */       range_taskContext_0.killTaskIfInterrupted();
/* 206 */     }
/* 207 */   }
/* 208 */
/* 284 */ }

From generate code, we found that the fib executes twice(L50 and L149). if fib only executes once in the WholeStageCodegen, it can improve performance when fib is an expensive UDF.

@viirya
Copy link
Member Author

viirya commented Jul 20, 2021

I think it is much easier to solve it at query optimization (i.e. by the optimizer), instead of at codegen. It also looks like query optimization problem instead of codegen.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Oct 29, 2021
@viirya viirya removed the Stale label Oct 29, 2021
@github-actions
Copy link

github-actions bot commented Feb 6, 2022

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Feb 6, 2022
@github-actions github-actions bot closed this Feb 7, 2022
@viirya viirya deleted the SPARK-33625 branch December 27, 2023 18:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants