Skip to content

Commit

Permalink
On wenchen's comment
Browse files Browse the repository at this point in the history
  • Loading branch information
clockfly committed Aug 24, 2016
1 parent 8c8bd9a commit 7e7cb85
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,6 @@ abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
/**
* In-place replaces the aggregation buffer object stored at buffer's index
* `mutableAggBufferOffset`, with SparkSQL internally supported underlying storage format.
*
* The framework calls this method every time after updating/merging one group (group by key).
*/
final def serializeAggregateBufferInPlace(buffer: MutableRow): Unit = {
val bufferObject = getField[T](buffer, mutableAggBufferOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,9 @@ class TypedImperativeAggregateSuite extends QueryTest with SharedSQLContext {
test("supports SpecificMutableRow as mutable row") {
val aggregationBufferSchema = Seq(IntegerType, LongType, BinaryType, IntegerType)
val aggBufferOffset = 2
val inputBufferObject = 1
val buffer = new SpecificMutableRow(aggregationBufferSchema)
val agg = new TypedMax(BoundReference(inputBufferObject, IntegerType, nullable = false))
val agg = new TypedMax(BoundReference(ordinal = 1, dataType = IntegerType, nullable = false))
.withNewMutableAggBufferOffset(aggBufferOffset)
.withNewInputAggBufferOffset(inputBufferObject)

agg.initialize(buffer)
data.foreach { kv =>
Expand Down Expand Up @@ -222,7 +220,7 @@ object TypedImperativeAggregateSuite {
if (inputValue > buffer.value) {
buffer.value = inputValue
}
case null => buffer
case null => // skip
}
}

Expand Down

0 comments on commit 7e7cb85

Please sign in to comment.