Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] In ANSI mode we can fail in cases Spark would not due to conditionals #3849

Closed
revans2 opened this issue Oct 18, 2021 · 7 comments · Fixed by #4383
Closed

[BUG] In ANSI mode we can fail in cases Spark would not due to conditionals #3849

revans2 opened this issue Oct 18, 2021 · 7 comments · Fixed by #4383
Assignees
Labels
bug Something isn't working P1 Nice to have for release

Comments

@revans2
Copy link
Collaborator

revans2 commented Oct 18, 2021

Describe the bug
When Spark evaluates an if or a case/when function it will do lazy evaluation of the children. So the else clause of if/else is only evaluated if the condition is false. For the cudf version we evaluate both sides no matter what. This is fine when an operation has no side effects. But in ANSI mode many operations can have big side effects (failing query). This can also be true for UDFs and for assert_true/raise_error which we do not support yet in non-ANSI mode.

Steps/Code to reproduce bug

spark.conf.set("spark.sql.ansi.enabled", "true")
val df = Seq((true, 0L), (false, Long.MaxValue)).toDF("cond", "val")
df.repartition(1).selectExpr("IF(cond, val + 1, null) as my_div").show
+------+
|my_div|
+------+
|     1|
|  null|
+------+

The above query works just fine on the CPU because the if/else avoids executing the overflow condition. On the GPU it fails.

Expected behavior
It should behave the same way as the CPU

I think in the short term we can just document this, and possibly fall back to the CPU if ANSI is enabled and we encounter any conditional statements like this. Longer term I think we need to come up with a plan on how to address this.

@revans2 revans2 added bug Something isn't working ? - Needs Triage Need team to review and classify labels Oct 18, 2021
@Salonijain27 Salonijain27 added P0 Must have for release and removed ? - Needs Triage Need team to review and classify labels Oct 19, 2021
@Salonijain27 Salonijain27 added P1 Nice to have for release and removed P0 Must have for release labels Oct 19, 2021
@revans2
Copy link
Collaborator Author

revans2 commented Oct 20, 2021

This is related to #3855 because it could happen for them too.

@revans2
Copy link
Collaborator Author

revans2 commented Oct 20, 2021

Talking to @sameerz and @tgravescs I think the best way to try and fix this would be to have an optional boolean column passed with the batch when doing a project. It would be a mask of which rows are actually being executed, and which ones are not. That way in ANSI mode we could look at this mask and not throw exceptions if the row was not intended to be taken. And in the case of #3855 we could just skip execution on that path and insert a null or some other default value instead as the result.

@firestarman
Copy link
Collaborator

firestarman commented Oct 21, 2021

This is related to #3855 because it could happen for them too.

Hi @revans2, I am working on the issue 3855, but don't fully get what you mean here. Could you share a simple example ?

@revans2
Copy link
Collaborator Author

revans2 commented Oct 21, 2021

@jlowe and @tgravescs would love feedback if there is a better way you can think of to fix this.

Okay lets have an example like the one above

scala> val df = Seq(0L, Long.MaxValue).toDF("a")
scala> df.repartition(1).selectExpr("if(a > 100, null, 1 + a) as r").show()

So in this case we end up with an expression tree like

  *Expression <Alias> if ((a#51L > 100)) null else (1 + a#51L) AS r#76L will run on GPU
    *Expression <If> if ((a#51L > 100)) null else (1 + a#51L) will run on GPU
      *Expression <GreaterThan> (a#51L > 100) will run on GPU
      *Expression <Add> (1 + a#51L) will run on GPU

Right now what happens with GpuIf is

protected def computeIfElse(
batch: ColumnarBatch,
predExpr: Expression,
trueExpr: Expression,
falseValue: Any): GpuColumnVector = {
withResourceIfAllowed(falseValue) { falseRet =>
withResource(GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch)) { pred =>
withResourceIfAllowed(trueExpr.columnarEval(batch)) { trueRet =>
val finalRet = (trueRet, falseRet) match {
case (t: GpuColumnVector, f: GpuColumnVector) =>
pred.getBase.ifElse(t.getBase, f.getBase)
case (t: GpuScalar, f: GpuColumnVector) =>
pred.getBase.ifElse(t.getBase, f.getBase)
case (t: GpuColumnVector, f: GpuScalar) =>
pred.getBase.ifElse(t.getBase, f.getBase)
case (t: GpuScalar, f: GpuScalar) =>
pred.getBase.ifElse(t.getBase, f.getBase)
case (t, f) =>
throw new IllegalStateException(s"Unexpected inputs" +
s" ($t: ${t.getClass}, $f: ${f.getClass})")
}
GpuColumnVector.from(finalRet, dataType)
}
}
}
}

The false value has already been executed. The predExpr gets executed first on line 36 and the trueExpr gets executed next on line 37. So an exception is going to be thrown as soon as we execute the falseExpr, which is an issue.

I think the proposal would be to have an API like columnarEval(batch: ColumnarBatch, condMask: Option[GpuColumnVector]): Any instead of the existing columnarEval API. This would let us evaluate the condition first and pass that result down to other expressions that could use that to decide if a side-effect should take place or not. In this case with GpuAdd instead of just checking for overflow

It would combine the result of the overflow with this condMask and then only throw an exception if there was an issue for a row that is included in the final result.

This is going to be problematic for a number of reasons. The ones I can think of off the top of my head are

  1. We will have to change all GpuExpressions to at least pass the mask on, and handle reference counting/etc...
  2. Conditional expressions like CaseWhen, If, and Coalesce will end up being a lot more complicated. For CaseWhen we will need to rethink how do we it efficiently because right now for memory reasons we go from the last to the first, but with this case we could no longer do that.
  3. For operations like ArrayTransform would now require us to explode the condMask not just the regular data, so it might end up being a lot more memory intensive too.

To work around some of the higher memory usage/etc I would like it if we could have a new method in GpuExpression like hasSideEffects which would return true if this expression itself knew that it has side effects or if any of its children returned that they have side effects. This way if ANSI mode is disabled we don't need to pass around a condMask, or even if ANSI mode is enabled, but none of the operators in a sub-tree care about it. For example if we re-wrote the example query to be.

scala> val df = Seq(0L, Long.MaxValue).toDF("a")
scala> df.repartition(1).selectExpr("1 + if(a > 100, null, a) as r").show()

Then there is nothing under the if that has side effects so no condMask would need to be passed on by GpuIf.

@jlowe
Copy link
Member

jlowe commented Oct 22, 2021

When I first saw this issue go by, I wasn't thinking we would change GpuExpression's APIs to fix this or track side effect stuff, but rather only update GpuIf to fix it, mostly to avoid the problematic issues enumerated above.

This isn't completely thought out, but I was thinking of doing something like the following in GpuIf instead:

  1. Use the boolean condition column to generate two gather maps, one to gather the input for evaluating the true expression and one to gather the input for evaluating the false expression. This could be done by using the boolean condition column as a filter map for a simple integer sequence column for the true gather map (and the complement to generate the false column).
  2. Use the true gather map to gather the input batch and evaluate the resulting batch using the conditional expression corresponding to the true condition.
  3. Use the false gather map to gather the input batch and evaluate the resulting batch using the conditional expression corresponding to the false condition.
  4. At this point we need to combine the true and false results back into their corresponding positions relative to the original input rows. This is the tricky part I haven't completely thought through and may be best handled by a new CUDA kernel. The kernel could run on the boolean condition column, one thread per row, examining whether the row is a true or false entry. Based on that result it then binary searches the corresponding gather map generated in the first step to find its corresponding row value and then computes which row in the gather map that input row number was found. It then uses the gather map's row index to fetch the corresponding value from the appropriate result column. For example, thread 15 examines conditional boolean column row 15 and sees a true value. It then binary searches the true gather map looking for the value 15. It finds it at row 8, so it then fetches the value in row 8 from the true conditional expression result column as the value that will appear in the output at row 15.

Having a hasSideEffects API would be a bonus so we could use the existing GpuIf algorithm when conditional expressions do not have side effects, assuming that's faster to evaluate the original algorithm for that case. However I don't see it as necessary up-front unless the performance of the algorithm described above is particularly poor.

@jlowe
Copy link
Member

jlowe commented Oct 22, 2021

The algorithm I described above could prove to be more efficient in practice even without side-effects if it's particularly expensive to evaluate one or both of the conditional expressions, as it only evaluates each conditional expression on the minimal set of input values for that expression. The current GpuIf implementation wastes the computation and memory on half of the results produced from evaluating both conditional expressions on the entire original input.

@revans2
Copy link
Collaborator Author

revans2 commented Oct 22, 2021

@jlowe I like your proposal because it would have less impact on existing code. I am concerned about the memory impact it might have. For each if/else in the code we are going to make up to a full copy of the data. So if an expression has lots of nested conditions in it, the amount of memory needed is going to grow proportionally. Possibly not the end of the world though, and in the future we might even be able to make it spillable. Because of the memory issues I would say that hasSideEffects is not a bonus, but a requirement. We would also need to support it for all conditional operations (GpuIf, GpuCaseWhen and GpuColaesce).

I think for steps 2 and 3 we would just filter the batch and not bother with a gather map. For step 4 a custom kernel would be best. I think we could make a version that would work without a custom kernel if needed. We could do a scan on the conditional and count the true values. This would give us a gather map for the trueExpr result, but with repeated keys. We then set all of the false values to -1 so we get a null in the gathered result. The null in the gathered result would only be needed for nested types to avoid using too much memory with repeated values.

val toCount = condition.ifElse(1, 0)
val partialMap = toCount.scan(sum)
val finalMap = condition.ifElse(partialMap, -1)

Then we do the same thing for the false side, but with the inverse of the condition. Finally we can use copy if else to put it all back together into a final result.

Not great, but should be doable without anything custom.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P1 Nice to have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants