diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala index 7fc87c22089c8..70fb9b08d62bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala @@ -60,12 +60,20 @@ case class Aggregate( def output = aggregateExpressions.map(_.toAttribute) + /** + * An aggregate that needs to be computed for each row in a group. + * + * @param unbound Unbound version of this aggregate, used for result substitution. + * @param aggregate A bound copy of this aggregate used to create a new aggregation buffer. + * @param resultAttribute An attribute used to refer to the result of this aggregate in the final + * output. + */ case class ComputedAggregate( - unbound: AggregateExpression, // Unbound aggregate used for result substitution - aggregate: AggregateExpression, // A bound copy of this aggregate used to create a buffer - resultAttribute: AttributeReference) // An attribute used to refer to the result of this agg + unbound: AggregateExpression, + aggregate: AggregateExpression, + resultAttribute: AttributeReference) - // A list of aggregates that need to be computed for each group. + /** A list of aggregates that need to be computed for each group. */ @transient lazy val computedAggregates = aggregateExpressions.flatMap { agg => agg.collect { @@ -77,11 +85,11 @@ case class Aggregate( } }.toArray - // The schema of the result of all aggregate evaluations + /** The schema of the result of all aggregate evaluations */ @transient lazy val computedSchema = computedAggregates.map(_.resultAttribute) - // Creates a new aggregate buffer for a group. + /** Creates a new aggregate buffer for a group. */ def newAggregateBuffer(): Array[AggregateFunction] = { val buffer = new Array[AggregateFunction](computedAggregates.length) var i = 0 @@ -92,21 +100,25 @@ case class Aggregate( buffer } - // Named attributes used to substitute grouping attributes into the final result. + /** Named attributes used to substitute grouping attributes into the final result. */ @transient lazy val namedGroups = groupingExpressions.map { case ne: NamedExpression => ne -> ne.toAttribute case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute } - // A map of substitutions that are used to insert the aggregate expressions and grouping - // expression into the final result expression. + /** + * A map of substitutions that are used to insert the aggregate expressions and grouping + * expression into the final result expression. + */ @transient lazy val resultMap = (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute} ++ namedGroups).toMap - // Substituted version of aggregateExpressions expressions which are used to compute final - // output rows given a group and the result of all aggregate computations. + /** + * Substituted version of aggregateExpressions expressions which are used to compute final + * output rows given a group and the result of all aggregate computations. + */ @transient lazy val resultExpressions = aggregateExpressions.map { agg => agg.transform {