Skip to content

Commit

Permalink
[minor] Use Vec instead of primitive builders (#12121)
Browse files Browse the repository at this point in the history
* Use vec instead of builder

* Compile

* Use vec instead of builder

* Revert
  • Loading branch information
Dandandan authored Aug 23, 2024
1 parent 7806393 commit 3de50c8
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod nulls;
pub mod prim_op;

use arrow::{
array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray, UInt32Builder},
array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray},
compute,
datatypes::UInt32Type,
};
Expand Down Expand Up @@ -170,7 +170,7 @@ impl GroupsAccumulatorAdapter {
let mut groups_with_rows = vec![];

// batch_indices holds indices into values, each group is contiguous
let mut batch_indices = UInt32Builder::with_capacity(0);
let mut batch_indices = vec![];

// offsets[i] is index into batch_indices where the rows for
// group_index i starts
Expand All @@ -184,11 +184,11 @@ impl GroupsAccumulatorAdapter {
}

groups_with_rows.push(group_index);
batch_indices.append_slice(indices);
batch_indices.extend_from_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
let batch_indices = batch_indices.finish();
let batch_indices = batch_indices.into();

// reorder the values and opt_filter by batch_indices so that
// all values for each group are contiguous, then invoke the
Expand Down
25 changes: 9 additions & 16 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ use crate::{
};

use arrow::array::{
Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, UInt32Array,
UInt64Array,
Array, ArrayRef, BooleanArray, BooleanBufferBuilder, UInt32Array, UInt64Array,
};
use arrow::compute::kernels::cmp::{eq, not_distinct};
use arrow::compute::{and, concat_batches, take, FilterBuilder};
Expand Down Expand Up @@ -1204,13 +1203,11 @@ fn lookup_join_hashmap(
})
.collect::<Result<Vec<_>>>()?;

let (mut probe_builder, mut build_builder, next_offset) = build_hashmap
let (probe_indices, build_indices, next_offset) = build_hashmap
.get_matched_indices_with_limit_offset(hashes_buffer, None, limit, offset);

let build_indices: UInt64Array =
PrimitiveArray::new(build_builder.finish().into(), None);
let probe_indices: UInt32Array =
PrimitiveArray::new(probe_builder.finish().into(), None);
let build_indices: UInt64Array = build_indices.into();
let probe_indices: UInt32Array = probe_indices.into();

let (build_indices, probe_indices) = equal_rows_arr(
&build_indices,
Expand Down Expand Up @@ -1566,7 +1563,7 @@ mod tests {
test::build_table_i32, test::exec::MockExec,
};

use arrow::array::{Date32Array, Int32Array, UInt32Builder, UInt64Builder};
use arrow::array::{Date32Array, Int32Array};
use arrow::datatypes::{DataType, Field};
use arrow_array::StructArray;
use arrow_buffer::NullBuffer;
Expand Down Expand Up @@ -3169,17 +3166,13 @@ mod tests {
(0, None),
)?;

let mut left_ids = UInt64Builder::with_capacity(0);
left_ids.append_value(0);
left_ids.append_value(1);
let left_ids: UInt64Array = vec![0, 1].into();

let mut right_ids = UInt32Builder::with_capacity(0);
right_ids.append_value(0);
right_ids.append_value(1);
let right_ids: UInt32Array = vec![0, 1].into();

assert_eq!(left_ids.finish(), l);
assert_eq!(left_ids, l);

assert_eq!(right_ids.finish(), r);
assert_eq!(right_ids, r);

Ok(())
}
Expand Down
25 changes: 11 additions & 14 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ use crate::{
RecordBatchStream, SendableRecordBatchStream,
};

use arrow::array::{
BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, UInt64Builder,
};
use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array};
use arrow::compute::concat_batches;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::datatypes::{Schema, SchemaRef, UInt64Type};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use arrow_array::PrimitiveArray;
use datafusion_common::{exec_datafusion_err, JoinSide, Result, Statistics};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -573,23 +572,21 @@ fn join_left_and_right_batch(
)
})?;

let mut left_indices_builder = UInt64Builder::new();
let mut right_indices_builder = UInt32Builder::new();
let mut left_indices_builder: Vec<u64> = vec![];
let mut right_indices_builder: Vec<u32> = vec![];
for (left_side, right_side) in indices {
left_indices_builder
.append_values(left_side.values(), &vec![true; left_side.len()]);
right_indices_builder
.append_values(right_side.values(), &vec![true; right_side.len()]);
left_indices_builder.extend(left_side.values());
right_indices_builder.extend(right_side.values());
}

let left_side = left_indices_builder.finish();
let right_side = right_indices_builder.finish();
let left_side: PrimitiveArray<UInt64Type> = left_indices_builder.into();
let right_side = right_indices_builder.into();
// set the left bitmap
// and only full join need the left bitmap
if need_produce_result_in_final(join_type) {
let mut bitmap = visited_left_side.lock();
left_side.iter().flatten().for_each(|x| {
bitmap.set_bit(x as usize, true);
left_side.values().iter().for_each(|x| {
bitmap.set_bit(*x as usize, true);
});
}
// adjust the two side indices base on the join type
Expand Down
10 changes: 4 additions & 6 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,13 +929,11 @@ fn lookup_join_hashmap(
let (mut matched_probe, mut matched_build) = build_hashmap
.get_matched_indices(hash_values.iter().enumerate().rev(), deleted_offset);

matched_probe.as_slice_mut().reverse();
matched_build.as_slice_mut().reverse();
matched_probe.reverse();
matched_build.reverse();

let build_indices: UInt64Array =
PrimitiveArray::new(matched_build.finish().into(), None);
let probe_indices: UInt32Array =
PrimitiveArray::new(matched_probe.finish().into(), None);
let build_indices: UInt64Array = matched_build.into();
let probe_indices: UInt32Array = matched_probe.into();

let (build_indices, probe_indices) = equal_rows_arr(
&build_indices,
Expand Down
26 changes: 11 additions & 15 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{

use arrow::array::{
downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder,
UInt32Builder, UInt64Array,
};
use arrow::compute;
use arrow::datatypes::{Field, Schema, SchemaBuilder, UInt32Type, UInt64Type};
Expand Down Expand Up @@ -163,8 +163,8 @@ macro_rules! chain_traverse {
} else {
i
};
$match_indices.append(match_row_idx);
$input_indices.append($input_idx as u32);
$match_indices.push(match_row_idx);
$input_indices.push($input_idx as u32);
$remaining_output -= 1;
// Follow the chain to get the next index value
let next = $next_chain[match_row_idx as usize];
Expand Down Expand Up @@ -238,9 +238,9 @@ pub trait JoinHashMapType {
&self,
iter: impl Iterator<Item = (usize, &'a u64)>,
deleted_offset: Option<usize>,
) -> (UInt32BufferBuilder, UInt64BufferBuilder) {
let mut input_indices = UInt32BufferBuilder::new(0);
let mut match_indices = UInt64BufferBuilder::new(0);
) -> (Vec<u32>, Vec<u64>) {
let mut input_indices = vec![];
let mut match_indices = vec![];

let hash_map = self.get_map();
let next_chain = self.get_list();
Expand All @@ -261,8 +261,8 @@ pub trait JoinHashMapType {
} else {
i
};
match_indices.append(match_row_idx);
input_indices.append(row_idx as u32);
match_indices.push(match_row_idx);
input_indices.push(row_idx as u32);
// Follow the chain to get the next index value
let next = next_chain[match_row_idx as usize];
if next == 0 {
Expand All @@ -289,13 +289,9 @@ pub trait JoinHashMapType {
deleted_offset: Option<usize>,
limit: usize,
offset: JoinHashMapOffset,
) -> (
UInt32BufferBuilder,
UInt64BufferBuilder,
Option<JoinHashMapOffset>,
) {
let mut input_indices = UInt32BufferBuilder::new(0);
let mut match_indices = UInt64BufferBuilder::new(0);
) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
let mut input_indices = vec![];
let mut match_indices = vec![];

let mut remaining_output = limit;

Expand Down
14 changes: 7 additions & 7 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ use crate::sorts::streaming_merge;
use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics};

use arrow::array::{ArrayRef, UInt64Builder};
use arrow::datatypes::SchemaRef;
use arrow::array::ArrayRef;
use arrow::datatypes::{SchemaRef, UInt64Type};
use arrow::record_batch::RecordBatch;
use arrow_array::PrimitiveArray;
use datafusion_common::utils::transpose;
use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
Expand Down Expand Up @@ -275,12 +276,11 @@ impl BatchPartitioner {
create_hashes(&arrays, random_state, hash_buffer)?;

let mut indices: Vec<_> = (0..*partitions)
.map(|_| UInt64Builder::with_capacity(batch.num_rows()))
.map(|_| Vec::with_capacity(batch.num_rows()))
.collect();

for (index, hash) in hash_buffer.iter().enumerate() {
indices[(*hash % *partitions as u64) as usize]
.append_value(index as u64);
indices[(*hash % *partitions as u64) as usize].push(index as u64);
}

// Finished building index-arrays for output partitions
Expand All @@ -291,8 +291,8 @@ impl BatchPartitioner {
let it = indices
.into_iter()
.enumerate()
.filter_map(|(partition, mut indices)| {
let indices = indices.finish();
.filter_map(|(partition, indices)| {
let indices: PrimitiveArray<UInt64Type> = indices.into();
(!indices.is_empty()).then_some((partition, indices))
})
.map(move |(partition, indices)| {
Expand Down

0 comments on commit 3de50c8

Please sign in to comment.