Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26448][SQL] retain the difference between 0.0 and -0.0 #23388

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ displayTitle: Spark SQL Upgrading Guide

- In Spark version 2.4 and earlier, `Dataset.groupByKey` results to a grouped dataset with key attribute wrongly named as "value", if the key is non-struct type, e.g. int, string, array, etc. This is counterintuitive and makes the schema of aggregation queries weird. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". The old behaviour is preserved under a newly added configuration `spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue` with a default value of `false`.

- In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but users can still distinguish them via `Dataset.show`, `Dataset.collect` etc. Since Spark 3.0, float/double -0.0 is replaced by 0.0 internally, and users can't distinguish them any more.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation of this fix is to avoid this behavior change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for join keys and GROUP BY groups, the previous difference between 0.0 and -0.0 is treated as a bug, so we don't need to mention it in migration guide?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkout the test case, "distinguish -0.0" is not about agg or join.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't 0.0 and -0.0 treated as distinct groups for agg before the recent fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and it's a bug. But if -0.0 is not used in grouping keys(and other similar places), users should still be able to distinguish it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see what you mean. Are you saying we should add migration guide for the behavior changes of grouping key/window partition key?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry for confusing. I'm not sure about if a migration guide is needed because it is a bug.

- In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but -0.0 and 0.0 are considered as different values when used in aggregate grouping keys, window partition keys and join keys. Since Spark 3.0, this bug is fixed. For example, `Seq(-0.0, 0.0).toDF("d").groupBy("d").count()` returns `[(0.0, 2)]` in Spark 3.0, and `[(0.0, 1), (-0.0, 1)]` in Spark 2.4 and earlier.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep this migration guide because this bug is not very intuitive: literally -0.0 is not 0.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to explicitly state that outputs still distingish 0.0 and -0.0? For example, Seq(-0.0d).toDS().show() returns -0.0 in any version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only need to mention the difference between new and old versions.


- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be undefined.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,46 +198,11 @@ protected final void writeLong(long offset, long value) {
Platform.putLong(getBuffer(), offset, value);
}

// We need to take care of NaN and -0.0 in several places:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments are moved to the new rule.

// 1. When compare values, different NaNs should be treated as same, `-0.0` and `0.0` should be
// treated as same.
// 2. In GROUP BY, different NaNs should belong to the same group, -0.0 and 0.0 should belong
// to the same group.
// 3. As join keys, different NaNs should be treated as same, `-0.0` and `0.0` should be
// treated as same.
// 4. As window partition keys, different NaNs should be treated as same, `-0.0` and `0.0`
// should be treated as same.
//
// Case 1 is fine, as we handle NaN and -0.0 well during comparison. For complex types, we
// recursively compare the fields/elements, so it's also fine.
//
// Case 2, 3 and 4 are problematic, as they compare `UnsafeRow` binary directly, and different
// NaNs have different binary representation, and the same thing happens for -0.0 and 0.0.
//
// Here we normalize NaN and -0.0, so that `UnsafeProjection` will normalize them when writing
// float/double columns and nested fields to `UnsafeRow`.
//
// Note that, we must do this for all the `UnsafeProjection`s, not only the ones that extract
// join/grouping/window partition keys. `UnsafeProjection` copies unsafe data directly for complex
// types, so nested float/double may not be normalized. We need to make sure that all the unsafe
// data(`UnsafeRow`, `UnsafeArrayData`, `UnsafeMapData`) will have flat/double normalized during
// creation.
protected final void writeFloat(long offset, float value) {
if (Float.isNaN(value)) {
value = Float.NaN;
} else if (value == -0.0f) {
value = 0.0f;
}
Platform.putFloat(getBuffer(), offset, value);
}

// See comments for `writeFloat`.
protected final void writeDouble(long offset, double value) {
if (Double.isNaN(value)) {
value = Double.NaN;
} else if (value == -0.0d) {
value = 0.0d;
}
Platform.putDouble(getBuffer(), offset, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.{Alias, And, ArrayTransform, CreateArray, CreateMap, CreateNamedStruct, CreateNamedStructUnsafe, CreateStruct, EqualTo, ExpectsInputTypes, Expression, GetStructField, LambdaFunction, NamedLambdaVariable, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery, Window}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types._

/**
* We need to take care of special floating numbers (NaN and -0.0) in several places:
* 1. When compare values, different NaNs should be treated as same, `-0.0` and `0.0` should be
* treated as same.
* 2. In aggregate grouping keys, different NaNs should belong to the same group, -0.0 and 0.0
* should belong to the same group.
* 3. In join keys, different NaNs should be treated as same, `-0.0` and `0.0` should be

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NaNs are never equal to anything including other NaNs, so there is no reason to normalize them for join keys. It is fine to do it anyway for simplicity, but it should be made clear in the comments that this is not because we have to but just because it is easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That a good point. In Spark SQL, the EQUAL operator thinks 0.0 and -0.0 are same, so we have to follow it in join keys. I'm not sure how the SQL standard defines it, but it's another topic if we want to change the equal semantic of Spark SQL.

But you are right that we don't have to do it for join, we only need to do normalization for certain types of join that do binary comparison.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still remove "different NaNs should be treated as same" here?

* treated as same.
* 4. In window partition keys, different NaNs should belong to the same partition, -0.0 and 0.0
* should belong to the same partition.
*
* Case 1 is fine, as we handle NaN and -0.0 well during comparison. For complex types, we
* recursively compare the fields/elements, so it's also fine.
*
* Case 2, 3 and 4 are problematic, as Spark SQL turns grouping/join/window partition keys into
* binary `UnsafeRow` and compare the binary data directly. Different NaNs have different binary
* representation, and the same thing happens for -0.0 and 0.0.
*
* This rule normalizes NaN and -0.0 in window partition keys, join keys and aggregate grouping
* keys.
*
* Ideally we should do the normalization in the physical operators that compare the
* binary `UnsafeRow` directly. We don't need this normalization if the Spark SQL execution engine
* is not optimized to run on binary data. This rule is created to simplify the implementation, so
* that we have a single place to do normalization, which is more maintainable.
*
* Note that, this rule must be executed at the end of optimizer, because the optimizer may create
* new joins(the subquery rewrite) and new join conditions(the join reorder).
*/
object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically an optimizer rule should not change the result of a query. This rule does exactly that. Perhaps we should add a little bit of documentation for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major reason is we create Joins during optimizaiton (for subquery), and I'm also worried about join reorder may break it. I'll add comment for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add it to nonExcludableRules?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch!


def apply(plan: LogicalPlan): LogicalPlan = plan match {
// A subquery will be rewritten into join later, and will go through this rule
Copy link
Member

@gatorsmile gatorsmile Dec 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule is called after the batch "RewriteSubquery", right? Why we still need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OptimizeSubqueries will apply the entire optimizer and triggers this rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is same as ExtractPythonUDFs

// eventually. Here we skip subquery, as we only need to run this rule once.
case _: Subquery => plan

case _ => plan transform {
case w: Window if w.partitionSpec.exists(p => needNormalize(p.dataType)) =>
// Although the `windowExpressions` may refer to `partitionSpec` expressions, we don't need
// to normalize the `windowExpressions`, as they are executed per input row and should take
// the input row as it is.
w.copy(partitionSpec = w.partitionSpec.map(normalize))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. All the window expressions in the project list also refer to the partitionSpec. Should we also normalize these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assume the query is select a, a + sum(a) over (partition by a) ....

Since the project list is evaluated for each input row, I think the a in the project list should retain the different of -0.0. Thus I think only partitionSpec needs to be normalized.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then make this clear by writing it up in a comment please? If the answer to this question is not obvious to the reviewer then it may also not be obvious to a later reader of the code, so in general it is advisable to answer misguided reviewer questions by adding comments. :)


// Only hash join and sort merge join need the normalization. Here we catch all Joins with
// join keys, assuming Joins with join keys are always planned as hash join or sort merge
// join. It's very unlikely that we will break this assumption in the near future.
case j @ ExtractEquiJoinKeys(_, leftKeys, rightKeys, condition, _, _, _)
// The analyzer guarantees left and right joins keys are of the same data type. Here we
// only need to check join keys of one side.
if leftKeys.exists(k => needNormalize(k.dataType)) =>
val newLeftJoinKeys = leftKeys.map(normalize)
val newRightJoinKeys = rightKeys.map(normalize)
val newConditions = newLeftJoinKeys.zip(newRightJoinKeys).map {
case (l, r) => EqualTo(l, r)
} ++ condition
j.copy(condition = Some(newConditions.reduce(And)))

// TODO: ideally Aggregate should also be handled here, but its grouping expressions are
Copy link
Contributor

@hvanhovell hvanhovell Dec 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point we should consider change Aggregate into something where it is easier and safer to manipulate either the grouping expressions or the aggregate expressions. SPARK-25914 is another example of this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// mixed in its aggregate expressions. It's unreliable to change the grouping expressions
// here. For now we normalize grouping expressions in `AggUtils` during planning.
}
}

private def needNormalize(dt: DataType): Boolean = dt match {
case FloatType | DoubleType => true
case StructType(fields) => fields.exists(f => needNormalize(f.dataType))
case ArrayType(et, _) => needNormalize(et)
// Currently MapType is not comparable and analyzer should fail earlier if this case happens.
case _: MapType =>
throw new IllegalStateException("grouping/join/window partition keys cannot be map type.")
case _ => false
}

private[sql] def normalize(expr: Expression): Expression = expr match {
case _ if expr.dataType == FloatType || expr.dataType == DoubleType =>
NormalizeNaNAndZero(expr)

case CreateNamedStruct(children) =>
CreateNamedStruct(children.map(normalize))

case CreateNamedStructUnsafe(children) =>
CreateNamedStructUnsafe(children.map(normalize))

case CreateArray(children) =>
CreateArray(children.map(normalize))

case CreateMap(children) =>
CreateMap(children.map(normalize))

case a: Alias if needNormalize(a.dataType) =>
a.withNewChildren(Seq(normalize(a.child)))

case _ if expr.dataType.isInstanceOf[StructType] && needNormalize(expr.dataType) =>
val fields = expr.dataType.asInstanceOf[StructType].fields.indices.map { i =>
normalize(GetStructField(expr, i))
}
CreateStruct(fields)

case _ if expr.dataType.isInstanceOf[ArrayType] && needNormalize(expr.dataType) =>
val ArrayType(et, containsNull) = expr.dataType
val lv = NamedLambdaVariable("arg", et, containsNull)
val function = normalize(lv)
ArrayTransform(expr, LambdaFunction(function, Seq(lv)))

case _ => expr
}
}

case class NormalizeNaNAndZero(child: Expression) extends UnaryExpression with ExpectsInputTypes {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If NormalizeFloatingNumbers is an optimizer rule, NormalizeNaNAndZero should only go through Optimizer, so does it need to extend ExpectsInputTypes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't, but I do it for safety. IIUC the test framework will throw exception if a plan becomes unresolved after a rule.


override def dataType: DataType = child.dataType

override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(FloatType, DoubleType))

private lazy val normalizer: Any => Any = child.dataType match {
case FloatType => (input: Any) => {
val f = input.asInstanceOf[Float]
if (f.isNaN) {
Float.NaN
} else if (f == -0.0f) {
0.0f
} else {
f
}
}

case DoubleType => (input: Any) => {
val d = input.asInstanceOf[Double]
if (d.isNaN) {
Double.NaN
} else if (d == -0.0d) {
0.0d
} else {
d
}
}
}

override def nullSafeEval(input: Any): Any = {
normalizer(input)
}

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val codeToNormalize = child.dataType match {
case FloatType => (f: String) => {
s"""
|if (Float.isNaN($f)) {
| ${ev.value} = Float.NaN;
|} else if ($f == -0.0f) {
| ${ev.value} = 0.0f;
|} else {
| ${ev.value} = $f;
|}
""".stripMargin
}

case DoubleType => (d: String) => {
s"""
|if (Double.isNaN($d)) {
| ${ev.value} = Double.NaN;
|} else if ($d == -0.0d) {
| ${ev.value} = 0.0d;
|} else {
| ${ev.value} = $d;
|}
""".stripMargin
}
}

nullSafeCodeGen(ctx, ev, codeToNormalize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
CollapseProject,
RemoveNoopOperators) :+
Batch("UpdateAttributeReferences", Once,
UpdateNullabilityInAttributeReferences)
UpdateNullabilityInAttributeReferences) :+
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)
}

/**
Expand Down Expand Up @@ -210,7 +212,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
PullupCorrelatedPredicates.ruleName ::
RewriteCorrelatedScalarSubquery.ruleName ::
RewritePredicateSubquery.ruleName ::
PullOutPythonUDFInJoinCondition.ruleName :: Nil
PullOutPythonUDFInJoinCondition.ruleName ::
NormalizeFloatingNumbers.ruleName :: Nil

/**
* Optimize all the subqueries inside expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
(JoinType, Seq[Expression], Seq[Expression],
Option[Expression], LogicalPlan, LogicalPlan, JoinHint)

def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
case join @ Join(left, right, joinType, condition, hint) =>
def unapply(join: Join): Option[ReturnType] = join match {
case Join(left, right, joinType, condition, hint) =>
logDebug(s"Considering join on: $condition")
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
Expand Down Expand Up @@ -140,7 +140,6 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
} else {
None
}
case _ => None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,22 +246,6 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestB
// assert(setToNullAfterCreation.get(11) === rowWithNoNullColumns.get(11))
}

testBothCodegenAndInterpreted("NaN canonicalization") {
val factory = UnsafeProjection
val fieldTypes: Array[DataType] = Array(FloatType, DoubleType)

val row1 = new SpecificInternalRow(fieldTypes)
row1.setFloat(0, java.lang.Float.intBitsToFloat(0x7f800001))
row1.setDouble(1, java.lang.Double.longBitsToDouble(0x7ff0000000000001L))

val row2 = new SpecificInternalRow(fieldTypes)
row2.setFloat(0, java.lang.Float.intBitsToFloat(0x7fffffff))
row2.setDouble(1, java.lang.Double.longBitsToDouble(0x7fffffffffffffffL))

val converter = factory.create(fieldTypes)
assert(converter.apply(row1).getBytes === converter.apply(row2).getBytes)
}

testBothCodegenAndInterpreted("basic conversion with struct type") {
val factory = UnsafeProjection
val fieldTypes: Array[DataType] = Array(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,4 @@ class UnsafeRowWriterSuite extends SparkFunSuite {
// The two rows should be the equal
assert(res1 == res2)
}

test("SPARK-26021: normalize float/double NaN and -0.0") {
val unsafeRowWriter1 = new UnsafeRowWriter(4)
unsafeRowWriter1.resetRowWriter()
unsafeRowWriter1.write(0, Float.NaN)
unsafeRowWriter1.write(1, Double.NaN)
unsafeRowWriter1.write(2, 0.0f)
unsafeRowWriter1.write(3, 0.0)
val res1 = unsafeRowWriter1.getRow

val unsafeRowWriter2 = new UnsafeRowWriter(4)
unsafeRowWriter2.resetRowWriter()
unsafeRowWriter2.write(0, 0.0f/0.0f)
unsafeRowWriter2.write(1, 0.0/0.0)
unsafeRowWriter2.write(2, -0.0f)
unsafeRowWriter2.write(3, -0.0)
val res2 = unsafeRowWriter2.getRow

// The two rows should be the equal
assert(res1 == res2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.sql.execution.aggregate

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.{StateStoreRestoreExec, StateStoreSaveExec}
import org.apache.spark.sql.internal.SQLConf

/**
* Utility functions used by the query planner to convert our plan to new aggregation code path.
Expand All @@ -35,12 +35,20 @@ object AggUtils {
initialInputBufferOffset: Int = 0,
resultExpressions: Seq[NamedExpression] = Nil,
child: SparkPlan): SparkPlan = {
// Ideally this should be done in `NormalizeFloatingNumbers`, but we do it here because
// `groupingExpressions` is not extracted during logical phase.
val normalizedGroupingExpressions = groupingExpressions.map { e =>
NormalizeFloatingNumbers.normalize(e) match {
case n: NamedExpression => n
case other => Alias(other, e.name)(exprId = e.exprId)
}
}
val useHash = HashAggregateExec.supportsAggregate(
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
if (useHash) {
HashAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
groupingExpressions = groupingExpressions,
groupingExpressions = normalizedGroupingExpressions,
aggregateExpressions = aggregateExpressions,
aggregateAttributes = aggregateAttributes,
initialInputBufferOffset = initialInputBufferOffset,
Expand All @@ -53,7 +61,7 @@ object AggUtils {
if (objectHashEnabled && useObjectHash) {
ObjectHashAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
groupingExpressions = groupingExpressions,
groupingExpressions = normalizedGroupingExpressions,
aggregateExpressions = aggregateExpressions,
aggregateAttributes = aggregateAttributes,
initialInputBufferOffset = initialInputBufferOffset,
Expand All @@ -62,7 +70,7 @@ object AggUtils {
} else {
SortAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
groupingExpressions = groupingExpressions,
groupingExpressions = normalizedGroupingExpressions,
aggregateExpressions = aggregateExpressions,
aggregateAttributes = aggregateAttributes,
initialInputBufferOffset = initialInputBufferOffset,
Expand Down
Loading