Skip to content

Commit

Permalink
Update code
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer committed Jul 11, 2023
1 parent a859832 commit ebc2bdd
Showing 1 changed file with 12 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,23 @@ class WindowGroupLimitEvaluatorFactory(
extends PartitionEvaluatorFactory[InternalRow, InternalRow] {

override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] = {
rankLikeFunction match {
case _: RowNumber if partitionSpec.isEmpty =>
new WindowGroupLimitPartitionEvaluator(
input => SimpleLimitIterator(input, limit, numOutputRows))
val limitFunc = rankLikeFunction match {
case _: RowNumber =>
new WindowGroupLimitPartitionEvaluator(
input => new GroupedLimitIterator(input, childOutput, partitionSpec,
(input: Iterator[InternalRow]) => SimpleLimitIterator(input, limit, numOutputRows)))
case _: Rank if partitionSpec.isEmpty =>
new WindowGroupLimitPartitionEvaluator(
input => RankLimitIterator(childOutput, input, orderSpec, limit, numOutputRows))
(iter: Iterator[InternalRow]) => SimpleLimitIterator(iter, limit, numOutputRows)
case _: Rank =>
new WindowGroupLimitPartitionEvaluator(
input => new GroupedLimitIterator(input, childOutput, partitionSpec,
(input: Iterator[InternalRow]) =>
RankLimitIterator(childOutput, input, orderSpec, limit, numOutputRows)))
case _: DenseRank if partitionSpec.isEmpty =>
new WindowGroupLimitPartitionEvaluator(
input => DenseRankLimitIterator(childOutput, input, orderSpec, limit, numOutputRows))
(iter: Iterator[InternalRow]) =>
RankLimitIterator(childOutput, iter, orderSpec, limit, numOutputRows)
case _: DenseRank =>
new WindowGroupLimitPartitionEvaluator(
input => new GroupedLimitIterator(input, childOutput, partitionSpec,
(input: Iterator[InternalRow]) =>
DenseRankLimitIterator(childOutput, input, orderSpec, limit, numOutputRows)))
(iter: Iterator[InternalRow]) =>
DenseRankLimitIterator(childOutput, iter, orderSpec, limit, numOutputRows)
}

if (partitionSpec.isEmpty) {
new WindowGroupLimitPartitionEvaluator(limitFunc)
} else {
new WindowGroupLimitPartitionEvaluator(
input => new GroupedLimitIterator(input, childOutput, partitionSpec, limitFunc))
}
}

class WindowGroupLimitPartitionEvaluator(f: Iterator[InternalRow] => Iterator[InternalRow])
Expand Down

0 comments on commit ebc2bdd

Please sign in to comment.