Skip to content

Commit

Permalink
[SPARK-45036][FOLLOWUP][SQL] SPJ: Make sure result partitions are sor…
Browse files Browse the repository at this point in the history
…ted according to partition values

### What changes were proposed in this pull request?

This PR makes sure the result grouped partitions from `DataSourceV2ScanExec#groupPartitions` are sorted according to the partition values. Previously in the apache#42757 we were assuming Scala would preserve the input ordering but apparently that's not the case.

### Why are the changes needed?

See apache#42757 (comment) for diagnosis. The partition ordering is a fundamental property for SPJ and thus must be guaranteed.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

We have tests in `KeyGroupedPartitioningSuite` to cover this.

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#42839 from sunchao/SPARK-45036-followup.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
  • Loading branch information
sunchao committed Feb 7, 2024
1 parent 579663a commit f0691c4
Showing 1 changed file with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,16 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
// also sort the input partitions according to their partition key order. This ensures
// a canonical order from both sides of a bucketed join, for example.
val partitionDataTypes = expressions.map(_.dataType)
val partitionOrdering: Ordering[(InternalRow, InputPartition)] = {
RowOrdering.createNaturalAscendingOrdering(partitionDataTypes).on(_._1)
}
val sortedKeyToPartitions = results.sorted(partitionOrdering)
val groupedPartitions = sortedKeyToPartitions
val rowOrdering = RowOrdering.createNaturalAscendingOrdering(partitionDataTypes)
val sortedKeyToPartitions = results.sorted(rowOrdering.on((t: (InternalRow, _)) => t._1))
val sortedGroupedPartitions = sortedKeyToPartitions
.map(t => (InternalRowComparableWrapper(t._1, expressions), t._2))
.groupBy(_._1)
.toSeq
.map { case (key, s) => KeyGroupedPartition(key.row, s.map(_._2)) }
.sorted(rowOrdering.on((k: KeyGroupedPartition) => k.value))

Some(KeyGroupedPartitionInfo(groupedPartitions, sortedKeyToPartitions.map(_._2)))
Some(KeyGroupedPartitionInfo(sortedGroupedPartitions, sortedKeyToPartitions.map(_._2)))
}
}
}
Expand Down

0 comments on commit f0691c4

Please sign in to comment.