-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17187][SQL] Supports using arbitrary Java object as internal aggregation buffer object #14753
Conversation
6efddad
to
2d84528
Compare
2d84528
to
10861b2
Compare
Test build #64211 has finished for PR 14753 at commit
|
Test build #64213 has finished for PR 14753 at commit
|
* calls method `eval(buffer: T)` to generate the final output for this group. | ||
* 5. The framework moves on to next group, until all groups have been processed. | ||
*/ | ||
abstract class TypedImperativeAggregate[T >: Null] extends ImperativeAggregate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it work in java?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe so, but I will do a double check
* @param buffer The aggregation buffer object. | ||
* @param input an input row | ||
*/ | ||
def update(buffer: T, input: InternalRow): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumes the buffer object type T can do in-place update, which is not always true, e.g. percentile_approx
, how about def update(buffer: T, input: InternalRow): T
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan User can define a wrapper to do inplace update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems update
needs to evaluate the input. We need to document it.
7d88b20
to
0173d2c
Compare
0173d2c
to
d3108ab
Compare
def aggregationBufferClass: Class[T] | ||
|
||
/** Serializes the aggregation buffer object T to Array[Byte] */ | ||
def serialize(buffer: T): Array[Byte] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we limit the serializable format to Array[Byte]
The reason is that SpecialMutableRow will do type check for atomic types for each update
call of the aggregation buffer. If we declare the storage format to be IntegerType, but actually stores an arbitrary object in the aggregation buffer, then SpecialMutableRow will catch this error and reports exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This detail deserves a comment in the code.
Test build #64255 has finished for PR 14753 at commit
|
Test build #64256 has finished for PR 14753 at commit
|
Test build #64264 has finished for PR 14753 at commit
|
Test build #64262 has finished for PR 14753 at commit
|
} | ||
} | ||
|
||
private def field[U](input: InternalRow, fieldIndex: Int): U = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think the name is not clear enough? Or maybe getField?
Test build #64316 has finished for PR 14753 at commit
|
b843f2f
to
7190eb0
Compare
// For TypedImperativeAggregate with generic aggregation buffer object, we need to call | ||
// serializeAggregateBufferInPlace(...) explicitly to convert the aggregation buffer object | ||
// to Spark Sql internally supported serializable storage format. | ||
private def serializeTypedAggregateBuffer(aggregationBuffer: MutableRow): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused parameter aggregationBuffer
. Or replace the following sortBasedAggregationBuffer
to aggregationBuffer
?
Test build #64344 has finished for PR 14753 at commit
|
if (inputValue > buffer.value) { | ||
buffer.value = inputValue | ||
} | ||
case null => buffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case null =>
, we don't need to return anything here, the return type is Unit
5086847
to
7e7cb85
Compare
|
||
/** | ||
* In-place replaces the aggregation buffer object stored at buffer's index | ||
* `mutableAggBufferOffset`, with SparkSQL internally supported underlying storage format. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with SparkSQL internally supported underlying storage format.
It can only be BinaryType now.
LGTM except one comment |
Test build #64350 has finished for PR 14753 at commit
|
Test build #64351 has finished for PR 14753 at commit
|
final override def merge(buffer: MutableRow, inputBuffer: InternalRow): Unit = { | ||
val bufferObject = getField[T](buffer, mutableAggBufferOffset) | ||
// The inputBuffer stores serialized aggregation buffer object produced by partial aggregate | ||
val inputObject = deserialize(getField[Array[Byte]](inputBuffer, inputAggBufferOffset)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should use inputBuffer.getBinary(inputAggBufferOffset)
instead of getField[Array[Byte]](inputBuffer, inputAggBufferOffset)
, as the data type is BinaryType
, not ObjectType(classOf[Any])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The inputBuffer is a safeRow in SortAggregateExec
processRow(sortBasedAggregationBuffer, safeProj(currentRow))
inputBuffer.getBinary(inputAggBufferOffset)
and getField[Array[Byte]](inputBuffer, inputAggBufferOffset)
are equivalent.
Yes, it is better to use inputBuffer.getBinary(inputAggBufferOffset)
directly
* ^ | ||
* | | ||
* Aggregation buffer object for `TypedImperativeAggregate` aggregation function | ||
* }}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add a normal agg buffer after the generic one. So, readers will not assume that generic ones will always be put at the end.
Test build #64381 has finished for PR 14753 at commit
|
cfc22ed
to
ac8e36a
Compare
// Serializes the generic object stored in aggregation buffer | ||
var i = 0 | ||
while (i < typedImperativeAggregates.length) { | ||
i += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
???
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clockfly can you also address https://github.com/apache/spark/pull/14753/files#r76154000 ?
Test build #64393 has finished for PR 14753 at commit
|
* aggregation only support aggregation buffer of mutable types (like LongType, IntType that have | ||
* fixed length and can be mutated in place in UnsafeRow) | ||
*/ | ||
abstract class TypedImperativeAggregate[T] extends ImperativeAggregate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this the wrong way around? Isn't ImperativeAggregate
the untyped version of an TypedImperativeAggregate
? Much like Dataset
and DataFrame
?
I know this has been done for engineering purposes, but I still wonder if we shouldn't reverse the hierarchy here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ImperativeAggregate
only defines the interface. It does not specify what are accepted buffer types, right?
@clockfly is this supposed to work with window functions? |
@hvanhovell This is supposed to work with window functions. |
Test build #64426 has finished for PR 14753 at commit
|
Thanks. Overall looks good. I am merging this to master. Let me tweak the interface later. |
What changes were proposed in this pull request?
This PR introduces an abstract class
TypedImperativeAggregate
so that an aggregation function of TypedImperativeAggregate can use arbitrary user-defined Java object as intermediate aggregation buffer object.This has advantages like:
percentile_approx
, which has a complex aggregation buffer definition.update
ormerge
when converting domain specific aggregation object to internal Spark-Sql storage format.Please see
org.apache.spark.sql.TypedImperativeAggregateSuite.TypedMaxAggregate
to find an example of how to defined aTypedImperativeAggregate
aggregation function.Please see Java doc of
TypedImperativeAggregate
and Jira ticket SPARK-17187 for more information.How was this patch tested?
Unit tests.