Skip to content

Commit

Permalink
fix mutable buffer len.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Sep 23, 2024
1 parent 8a078de commit 11230ac
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,17 +947,20 @@ impl GroupedHashAggregateStream {
let batch_size = group_cols.first().unwrap().len();
self.group_indices_buffer.clear();
self.group_indices_buffer.resize(batch_size, 0);
for (&row_idx, &group_idx) in self.current_row_indices.iter().flatten().zip(self.current_group_indices.iter().flatten()) {
for (&row_idx, &group_idx) in self
.current_row_indices
.iter()
.flatten()
.zip(self.current_group_indices.iter().flatten())
{
self.group_indices_buffer[row_idx] = group_idx;
}

// 2.update the arrays in each partition to their accumulators
// - Reorder the arrays to make them sorted by partitions
// - Collect the `offsets`, and we can get arrays in partition through `slice`
// Update the accumulators of each partition
let iter = self
.current_row_indices
.iter();
let iter = self.current_row_indices.iter();
for (part_idx, row_indices) in iter.enumerate() {
// Gather the inputs to call the actual accumulator
let iter = self.accumulators[part_idx]
Expand Down Expand Up @@ -1454,7 +1457,9 @@ impl PartitionFilterBuffer {
let sliced_values_buf = values_buf.sliced();
drop(values_buf);
let sliced_values_buf = sliced_values_buf.into_mutable().unwrap();
let sliced_values_buf = sliced_values_buf.with_bitset(values_len, false);
let sliced_values_buf_len = sliced_values_buf.len();
let sliced_values_buf =
sliced_values_buf.with_bitset(sliced_values_buf_len, false);
let mut values_buf_builder =
BooleanBufferBuilder::new_from_buffer(sliced_values_buf, values_len);
if values_len < batch_size {
Expand All @@ -1468,7 +1473,9 @@ impl PartitionFilterBuffer {
let sliced_nulls_buf = nulls_inner.sliced();
drop(nulls);
let sliced_nulls_buf = sliced_nulls_buf.into_mutable().unwrap();
let sliced_nulls_buf = sliced_nulls_buf.with_bitset(nulls_len, true);
let sliced_nulls_buf_len = sliced_nulls_buf.len();
let sliced_nulls_buf =
sliced_nulls_buf.with_bitset(sliced_nulls_buf_len, true);
let mut nulls_buf_builder =
BooleanBufferBuilder::new_from_buffer(sliced_nulls_buf, nulls_len);
if nulls_len < batch_size {
Expand Down

0 comments on commit 11230ac

Please sign in to comment.