Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[minor] Use Vec instead of primitive builders #12121

Merged
merged 4 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod nulls;
pub mod prim_op;

use arrow::{
array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray, UInt32Builder},
array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray},
compute,
datatypes::UInt32Type,
};
Expand Down Expand Up @@ -170,7 +170,7 @@ impl GroupsAccumulatorAdapter {
let mut groups_with_rows = vec![];

// batch_indices holds indices into values, each group is contiguous
let mut batch_indices = UInt32Builder::with_capacity(0);
let mut batch_indices = vec![];

// offsets[i] is index into batch_indices where the rows for
// group_index i starts
Expand All @@ -184,11 +184,11 @@ impl GroupsAccumulatorAdapter {
}

groups_with_rows.push(group_index);
batch_indices.append_slice(indices);
batch_indices.extend_from_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
let batch_indices = batch_indices.finish();
let batch_indices = batch_indices.into();

// reorder the values and opt_filter by batch_indices so that
// all values for each group are contiguous, then invoke the
Expand Down
25 changes: 9 additions & 16 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ use crate::{
};

use arrow::array::{
Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, UInt32Array,
UInt64Array,
Array, ArrayRef, BooleanArray, BooleanBufferBuilder, UInt32Array, UInt64Array,
};
use arrow::compute::kernels::cmp::{eq, not_distinct};
use arrow::compute::{and, concat_batches, take, FilterBuilder};
Expand Down Expand Up @@ -1204,13 +1203,11 @@ fn lookup_join_hashmap(
})
.collect::<Result<Vec<_>>>()?;

let (mut probe_builder, mut build_builder, next_offset) = build_hashmap
let (probe_indices, build_indices, next_offset) = build_hashmap
.get_matched_indices_with_limit_offset(hashes_buffer, None, limit, offset);

let build_indices: UInt64Array =
PrimitiveArray::new(build_builder.finish().into(), None);
let probe_indices: UInt32Array =
PrimitiveArray::new(probe_builder.finish().into(), None);
let build_indices: UInt64Array = build_indices.into();
let probe_indices: UInt32Array = probe_indices.into();

let (build_indices, probe_indices) = equal_rows_arr(
&build_indices,
Expand Down Expand Up @@ -1566,7 +1563,7 @@ mod tests {
test::build_table_i32, test::exec::MockExec,
};

use arrow::array::{Date32Array, Int32Array, UInt32Builder, UInt64Builder};
use arrow::array::{Date32Array, Int32Array};
use arrow::datatypes::{DataType, Field};
use arrow_array::StructArray;
use arrow_buffer::NullBuffer;
Expand Down Expand Up @@ -3169,17 +3166,13 @@ mod tests {
(0, None),
)?;

let mut left_ids = UInt64Builder::with_capacity(0);
left_ids.append_value(0);
left_ids.append_value(1);
let left_ids: UInt64Array = vec![0, 1].into();

let mut right_ids = UInt32Builder::with_capacity(0);
right_ids.append_value(0);
right_ids.append_value(1);
let right_ids: UInt32Array = vec![0, 1].into();

assert_eq!(left_ids.finish(), l);
assert_eq!(left_ids, l);

assert_eq!(right_ids.finish(), r);
assert_eq!(right_ids, r);

Ok(())
}
Expand Down
25 changes: 11 additions & 14 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ use crate::{
RecordBatchStream, SendableRecordBatchStream,
};

use arrow::array::{
BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, UInt64Builder,
};
use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array};
use arrow::compute::concat_batches;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::datatypes::{Schema, SchemaRef, UInt64Type};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use arrow_array::PrimitiveArray;
use datafusion_common::{exec_datafusion_err, JoinSide, Result, Statistics};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -573,23 +572,21 @@ fn join_left_and_right_batch(
)
})?;

let mut left_indices_builder = UInt64Builder::new();
let mut right_indices_builder = UInt32Builder::new();
let mut left_indices_builder: Vec<u64> = vec![];
let mut right_indices_builder: Vec<u32> = vec![];
for (left_side, right_side) in indices {
left_indices_builder
.append_values(left_side.values(), &vec![true; left_side.len()]);
right_indices_builder
.append_values(right_side.values(), &vec![true; right_side.len()]);
left_indices_builder.extend(left_side.values());
right_indices_builder.extend(right_side.values());
}

let left_side = left_indices_builder.finish();
let right_side = right_indices_builder.finish();
let left_side: PrimitiveArray<UInt64Type> = left_indices_builder.into();
let right_side = right_indices_builder.into();
// set the left bitmap
// and only full join need the left bitmap
if need_produce_result_in_final(join_type) {
let mut bitmap = visited_left_side.lock();
left_side.iter().flatten().for_each(|x| {
bitmap.set_bit(x as usize, true);
left_side.values().iter().for_each(|x| {
bitmap.set_bit(*x as usize, true);
});
}
// adjust the two side indices base on the join type
Expand Down
10 changes: 4 additions & 6 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,13 +929,11 @@ fn lookup_join_hashmap(
let (mut matched_probe, mut matched_build) = build_hashmap
.get_matched_indices(hash_values.iter().enumerate().rev(), deleted_offset);

matched_probe.as_slice_mut().reverse();
matched_build.as_slice_mut().reverse();
matched_probe.reverse();
matched_build.reverse();

let build_indices: UInt64Array =
PrimitiveArray::new(matched_build.finish().into(), None);
let probe_indices: UInt32Array =
PrimitiveArray::new(matched_probe.finish().into(), None);
let build_indices: UInt64Array = matched_build.into();
let probe_indices: UInt32Array = matched_probe.into();

let (build_indices, probe_indices) = equal_rows_arr(
&build_indices,
Expand Down
26 changes: 11 additions & 15 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{

use arrow::array::{
downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder,
UInt32Builder, UInt64Array,
};
use arrow::compute;
use arrow::datatypes::{Field, Schema, SchemaBuilder, UInt32Type, UInt64Type};
Expand Down Expand Up @@ -163,8 +163,8 @@ macro_rules! chain_traverse {
} else {
i
};
$match_indices.append(match_row_idx);
$input_indices.append($input_idx as u32);
$match_indices.push(match_row_idx);
$input_indices.push($input_idx as u32);
$remaining_output -= 1;
// Follow the chain to get the next index value
let next = $next_chain[match_row_idx as usize];
Expand Down Expand Up @@ -238,9 +238,9 @@ pub trait JoinHashMapType {
&self,
iter: impl Iterator<Item = (usize, &'a u64)>,
deleted_offset: Option<usize>,
) -> (UInt32BufferBuilder, UInt64BufferBuilder) {
let mut input_indices = UInt32BufferBuilder::new(0);
let mut match_indices = UInt64BufferBuilder::new(0);
) -> (Vec<u32>, Vec<u64>) {
let mut input_indices = vec![];
let mut match_indices = vec![];

let hash_map = self.get_map();
let next_chain = self.get_list();
Expand All @@ -261,8 +261,8 @@ pub trait JoinHashMapType {
} else {
i
};
match_indices.append(match_row_idx);
input_indices.append(row_idx as u32);
match_indices.push(match_row_idx);
input_indices.push(row_idx as u32);
// Follow the chain to get the next index value
let next = next_chain[match_row_idx as usize];
if next == 0 {
Expand All @@ -289,13 +289,9 @@ pub trait JoinHashMapType {
deleted_offset: Option<usize>,
limit: usize,
offset: JoinHashMapOffset,
) -> (
UInt32BufferBuilder,
UInt64BufferBuilder,
Option<JoinHashMapOffset>,
) {
let mut input_indices = UInt32BufferBuilder::new(0);
let mut match_indices = UInt64BufferBuilder::new(0);
) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
let mut input_indices = vec![];
let mut match_indices = vec![];

let mut remaining_output = limit;

Expand Down
14 changes: 7 additions & 7 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ use crate::sorts::streaming_merge;
use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics};

use arrow::array::{ArrayRef, UInt64Builder};
use arrow::datatypes::SchemaRef;
use arrow::array::ArrayRef;
use arrow::datatypes::{SchemaRef, UInt64Type};
use arrow::record_batch::RecordBatch;
use arrow_array::PrimitiveArray;
use datafusion_common::utils::transpose;
use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
Expand Down Expand Up @@ -275,12 +276,11 @@ impl BatchPartitioner {
create_hashes(&arrays, random_state, hash_buffer)?;

let mut indices: Vec<_> = (0..*partitions)
.map(|_| UInt64Builder::with_capacity(batch.num_rows()))
.map(|_| Vec::with_capacity(batch.num_rows()))
.collect();

for (index, hash) in hash_buffer.iter().enumerate() {
indices[(*hash % *partitions as u64) as usize]
.append_value(index as u64);
indices[(*hash % *partitions as u64) as usize].push(index as u64);
}

// Finished building index-arrays for output partitions
Expand All @@ -291,8 +291,8 @@ impl BatchPartitioner {
let it = indices
.into_iter()
.enumerate()
.filter_map(|(partition, mut indices)| {
let indices = indices.finish();
.filter_map(|(partition, indices)| {
let indices: PrimitiveArray<UInt64Type> = indices.into();
(!indices.is_empty()).then_some((partition, indices))
})
.map(move |(partition, indices)| {
Expand Down