-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26448][SQL] retain the difference between 0.0 and -0.0 #23388
Changes from all commits
0eb1781
ee5a1f0
d3c5992
74934da
fdc9988
8dafc64
3e8c171
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,7 @@ displayTitle: Spark SQL Upgrading Guide | |
|
||
- In Spark version 2.4 and earlier, `Dataset.groupByKey` results to a grouped dataset with key attribute wrongly named as "value", if the key is non-struct type, e.g. int, string, array, etc. This is counterintuitive and makes the schema of aggregation queries weird. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". The old behaviour is preserved under a newly added configuration `spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue` with a default value of `false`. | ||
|
||
- In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but users can still distinguish them via `Dataset.show`, `Dataset.collect` etc. Since Spark 3.0, float/double -0.0 is replaced by 0.0 internally, and users can't distinguish them any more. | ||
- In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but -0.0 and 0.0 are considered as different values when used in aggregate grouping keys, window partition keys and join keys. Since Spark 3.0, this bug is fixed. For example, `Seq(-0.0, 0.0).toDF("d").groupBy("d").count()` returns `[(0.0, 2)]` in Spark 3.0, and `[(0.0, 1), (-0.0, 1)]` in Spark 2.4 and earlier. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I keep this migration guide because this bug is not very intuitive: literally -0.0 is not 0.0. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it better to explicitly state that outputs still distingish 0.0 and -0.0? For example, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we only need to mention the difference between new and old versions. |
||
|
||
- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be undefined. | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -198,46 +198,11 @@ protected final void writeLong(long offset, long value) { | |
Platform.putLong(getBuffer(), offset, value); | ||
} | ||
|
||
// We need to take care of NaN and -0.0 in several places: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comments are moved to the new rule. |
||
// 1. When compare values, different NaNs should be treated as same, `-0.0` and `0.0` should be | ||
// treated as same. | ||
// 2. In GROUP BY, different NaNs should belong to the same group, -0.0 and 0.0 should belong | ||
// to the same group. | ||
// 3. As join keys, different NaNs should be treated as same, `-0.0` and `0.0` should be | ||
// treated as same. | ||
// 4. As window partition keys, different NaNs should be treated as same, `-0.0` and `0.0` | ||
// should be treated as same. | ||
// | ||
// Case 1 is fine, as we handle NaN and -0.0 well during comparison. For complex types, we | ||
// recursively compare the fields/elements, so it's also fine. | ||
// | ||
// Case 2, 3 and 4 are problematic, as they compare `UnsafeRow` binary directly, and different | ||
// NaNs have different binary representation, and the same thing happens for -0.0 and 0.0. | ||
// | ||
// Here we normalize NaN and -0.0, so that `UnsafeProjection` will normalize them when writing | ||
// float/double columns and nested fields to `UnsafeRow`. | ||
// | ||
// Note that, we must do this for all the `UnsafeProjection`s, not only the ones that extract | ||
// join/grouping/window partition keys. `UnsafeProjection` copies unsafe data directly for complex | ||
// types, so nested float/double may not be normalized. We need to make sure that all the unsafe | ||
// data(`UnsafeRow`, `UnsafeArrayData`, `UnsafeMapData`) will have flat/double normalized during | ||
// creation. | ||
protected final void writeFloat(long offset, float value) { | ||
if (Float.isNaN(value)) { | ||
value = Float.NaN; | ||
} else if (value == -0.0f) { | ||
value = 0.0f; | ||
} | ||
Platform.putFloat(getBuffer(), offset, value); | ||
} | ||
|
||
// See comments for `writeFloat`. | ||
protected final void writeDouble(long offset, double value) { | ||
if (Double.isNaN(value)) { | ||
value = Double.NaN; | ||
} else if (value == -0.0d) { | ||
value = 0.0d; | ||
} | ||
Platform.putDouble(getBuffer(), offset, value); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.optimizer | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{Alias, And, ArrayTransform, CreateArray, CreateMap, CreateNamedStruct, CreateNamedStructUnsafe, CreateStruct, EqualTo, ExpectsInputTypes, Expression, GetStructField, LambdaFunction, NamedLambdaVariable, UnaryExpression} | ||
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} | ||
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys | ||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery, Window} | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.types._ | ||
|
||
/** | ||
* We need to take care of special floating numbers (NaN and -0.0) in several places: | ||
* 1. When compare values, different NaNs should be treated as same, `-0.0` and `0.0` should be | ||
* treated as same. | ||
* 2. In aggregate grouping keys, different NaNs should belong to the same group, -0.0 and 0.0 | ||
* should belong to the same group. | ||
* 3. In join keys, different NaNs should be treated as same, `-0.0` and `0.0` should be | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NaNs are never equal to anything including other NaNs, so there is no reason to normalize them for join keys. It is fine to do it anyway for simplicity, but it should be made clear in the comments that this is not because we have to but just because it is easier. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That a good point. In Spark SQL, the EQUAL operator thinks 0.0 and -0.0 are same, so we have to follow it in join keys. I'm not sure how the SQL standard defines it, but it's another topic if we want to change the equal semantic of Spark SQL. But you are right that we don't have to do it for join, we only need to do normalization for certain types of join that do binary comparison. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still remove "different NaNs should be treated as same" here? |
||
* treated as same. | ||
* 4. In window partition keys, different NaNs should belong to the same partition, -0.0 and 0.0 | ||
* should belong to the same partition. | ||
* | ||
* Case 1 is fine, as we handle NaN and -0.0 well during comparison. For complex types, we | ||
* recursively compare the fields/elements, so it's also fine. | ||
* | ||
* Case 2, 3 and 4 are problematic, as Spark SQL turns grouping/join/window partition keys into | ||
* binary `UnsafeRow` and compare the binary data directly. Different NaNs have different binary | ||
* representation, and the same thing happens for -0.0 and 0.0. | ||
* | ||
* This rule normalizes NaN and -0.0 in window partition keys, join keys and aggregate grouping | ||
* keys. | ||
* | ||
* Ideally we should do the normalization in the physical operators that compare the | ||
* binary `UnsafeRow` directly. We don't need this normalization if the Spark SQL execution engine | ||
* is not optimized to run on binary data. This rule is created to simplify the implementation, so | ||
* that we have a single place to do normalization, which is more maintainable. | ||
* | ||
* Note that, this rule must be executed at the end of optimizer, because the optimizer may create | ||
* new joins(the subquery rewrite) and new join conditions(the join reorder). | ||
*/ | ||
object NormalizeFloatingNumbers extends Rule[LogicalPlan] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically an optimizer rule should not change the result of a query. This rule does exactly that. Perhaps we should add a little bit of documentation for this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The major reason is we create There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also add it to nonExcludableRules? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah good catch! |
||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan match { | ||
// A subquery will be rewritten into join later, and will go through this rule | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This rule is called after the batch "RewriteSubquery", right? Why we still need this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is same as |
||
// eventually. Here we skip subquery, as we only need to run this rule once. | ||
case _: Subquery => plan | ||
|
||
case _ => plan transform { | ||
case w: Window if w.partitionSpec.exists(p => needNormalize(p.dataType)) => | ||
// Although the `windowExpressions` may refer to `partitionSpec` expressions, we don't need | ||
// to normalize the `windowExpressions`, as they are executed per input row and should take | ||
// the input row as it is. | ||
w.copy(partitionSpec = w.partitionSpec.map(normalize)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit. All the window expressions in the project list also refer to the partitionSpec. Should we also normalize these? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. assume the query is Since the project list is evaluated for each input row, I think the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then make this clear by writing it up in a comment please? If the answer to this question is not obvious to the reviewer then it may also not be obvious to a later reader of the code, so in general it is advisable to answer misguided reviewer questions by adding comments. :) |
||
|
||
// Only hash join and sort merge join need the normalization. Here we catch all Joins with | ||
// join keys, assuming Joins with join keys are always planned as hash join or sort merge | ||
// join. It's very unlikely that we will break this assumption in the near future. | ||
case j @ ExtractEquiJoinKeys(_, leftKeys, rightKeys, condition, _, _, _) | ||
// The analyzer guarantees left and right joins keys are of the same data type. Here we | ||
// only need to check join keys of one side. | ||
if leftKeys.exists(k => needNormalize(k.dataType)) => | ||
val newLeftJoinKeys = leftKeys.map(normalize) | ||
val newRightJoinKeys = rightKeys.map(normalize) | ||
val newConditions = newLeftJoinKeys.zip(newRightJoinKeys).map { | ||
case (l, r) => EqualTo(l, r) | ||
} ++ condition | ||
j.copy(condition = Some(newConditions.reduce(And))) | ||
|
||
// TODO: ideally Aggregate should also be handled here, but its grouping expressions are | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At some point we should consider change Aggregate into something where it is easier and safer to manipulate either the grouping expressions or the aggregate expressions. SPARK-25914 is another example of this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. opened a JIRA https://issues.apache.org/jira/browse/SPARK-26582 |
||
// mixed in its aggregate expressions. It's unreliable to change the grouping expressions | ||
// here. For now we normalize grouping expressions in `AggUtils` during planning. | ||
} | ||
} | ||
|
||
private def needNormalize(dt: DataType): Boolean = dt match { | ||
case FloatType | DoubleType => true | ||
case StructType(fields) => fields.exists(f => needNormalize(f.dataType)) | ||
case ArrayType(et, _) => needNormalize(et) | ||
// Currently MapType is not comparable and analyzer should fail earlier if this case happens. | ||
case _: MapType => | ||
throw new IllegalStateException("grouping/join/window partition keys cannot be map type.") | ||
case _ => false | ||
} | ||
|
||
private[sql] def normalize(expr: Expression): Expression = expr match { | ||
case _ if expr.dataType == FloatType || expr.dataType == DoubleType => | ||
NormalizeNaNAndZero(expr) | ||
|
||
case CreateNamedStruct(children) => | ||
CreateNamedStruct(children.map(normalize)) | ||
|
||
case CreateNamedStructUnsafe(children) => | ||
CreateNamedStructUnsafe(children.map(normalize)) | ||
|
||
case CreateArray(children) => | ||
CreateArray(children.map(normalize)) | ||
|
||
case CreateMap(children) => | ||
CreateMap(children.map(normalize)) | ||
|
||
case a: Alias if needNormalize(a.dataType) => | ||
a.withNewChildren(Seq(normalize(a.child))) | ||
|
||
case _ if expr.dataType.isInstanceOf[StructType] && needNormalize(expr.dataType) => | ||
val fields = expr.dataType.asInstanceOf[StructType].fields.indices.map { i => | ||
normalize(GetStructField(expr, i)) | ||
} | ||
CreateStruct(fields) | ||
|
||
case _ if expr.dataType.isInstanceOf[ArrayType] && needNormalize(expr.dataType) => | ||
val ArrayType(et, containsNull) = expr.dataType | ||
val lv = NamedLambdaVariable("arg", et, containsNull) | ||
val function = normalize(lv) | ||
ArrayTransform(expr, LambdaFunction(function, Seq(lv))) | ||
|
||
case _ => expr | ||
} | ||
} | ||
|
||
case class NormalizeNaNAndZero(child: Expression) extends UnaryExpression with ExpectsInputTypes { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't, but I do it for safety. IIUC the test framework will throw exception if a plan becomes unresolved after a rule. |
||
|
||
override def dataType: DataType = child.dataType | ||
|
||
override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(FloatType, DoubleType)) | ||
|
||
private lazy val normalizer: Any => Any = child.dataType match { | ||
case FloatType => (input: Any) => { | ||
val f = input.asInstanceOf[Float] | ||
if (f.isNaN) { | ||
Float.NaN | ||
} else if (f == -0.0f) { | ||
0.0f | ||
} else { | ||
f | ||
} | ||
} | ||
|
||
case DoubleType => (input: Any) => { | ||
val d = input.asInstanceOf[Double] | ||
if (d.isNaN) { | ||
Double.NaN | ||
} else if (d == -0.0d) { | ||
0.0d | ||
} else { | ||
d | ||
} | ||
} | ||
} | ||
|
||
override def nullSafeEval(input: Any): Any = { | ||
normalizer(input) | ||
} | ||
|
||
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
val codeToNormalize = child.dataType match { | ||
case FloatType => (f: String) => { | ||
s""" | ||
|if (Float.isNaN($f)) { | ||
| ${ev.value} = Float.NaN; | ||
|} else if ($f == -0.0f) { | ||
| ${ev.value} = 0.0f; | ||
|} else { | ||
| ${ev.value} = $f; | ||
|} | ||
""".stripMargin | ||
} | ||
|
||
case DoubleType => (d: String) => { | ||
s""" | ||
|if (Double.isNaN($d)) { | ||
| ${ev.value} = Double.NaN; | ||
|} else if ($d == -0.0d) { | ||
| ${ev.value} = 0.0d; | ||
|} else { | ||
| ${ev.value} = $d; | ||
|} | ||
""".stripMargin | ||
} | ||
} | ||
|
||
nullSafeCodeGen(ctx, ev, codeToNormalize) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation of this fix is to avoid this behavior change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But for join keys and GROUP BY groups, the previous difference between 0.0 and -0.0 is treated as a bug, so we don't need to mention it in migration guide?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkout the test case, "distinguish -0.0" is not about agg or join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't 0.0 and -0.0 treated as distinct groups for agg before the recent fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, and it's a bug. But if -0.0 is not used in grouping keys(and other similar places), users should still be able to distinguish it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see what you mean. Are you saying we should add migration guide for the behavior changes of grouping key/window partition key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sorry for confusing. I'm not sure about if a migration guide is needed because it is a bug.