Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary null buffer construction when converting arrays to a different type #6244

Merged
merged 10 commits into from
Aug 14, 2024
14 changes: 14 additions & 0 deletions arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,20 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
}
}

/// Creates a PrimitiveArray based on an iterator of values with provided nulls
pub fn from_iter_values_with_nulls<I: IntoIterator<Item = T::Native>>(
iter: I,
nulls: Option<NullBuffer>,
) -> Self {
let val_buf: Buffer = iter.into_iter().collect();
let len = val_buf.len() / std::mem::size_of::<T::Native>();
Self {
data_type: T::DATA_TYPE,
values: ScalarBuffer::new(val_buf, 0, len),
nulls,
}
}

/// Creates a PrimitiveArray based on a constant value with `count` elements
pub fn from_value(value: T::Native, count: usize) -> Self {
unsafe {
Expand Down
80 changes: 44 additions & 36 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::column::reader::decoder::ColumnValueDecoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::{
ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
Array, ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
IntervalDayTimeArray, IntervalYearMonthArray,
};
use arrow_buffer::{i256, Buffer, IntervalDayTime};
Expand Down Expand Up @@ -165,57 +165,65 @@ impl ArrayReader for FixedLenByteArrayReader {
// TODO: An improvement might be to do this conversion on read
let array: ArrayRef = match &self.data_type {
ArrowType::Decimal128(p, s) => {
let decimal = binary
.iter()
.map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?))))
.collect::<Decimal128Array>()
let nulls = binary.nulls().cloned();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a comment explaining that the nulls have already been handled and avoiding re-creating the nulls will improve the performance

let decimal = binary.iter().map(|o| match o {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there may be some further room for improvement here and avoid the branch in the inner loop (just apply i128::from_be_bytes(sign_extend_be(b)), directly to the values rather than having to check each element

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe we could implement something like PrimitiveArray::unary_mut for FixedLengthByteArray to transform the bytes

https://docs.rs/arrow/latest/arrow/array/struct.PrimitiveArray.html#method.unary_mut

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I seem to have reinvented unary() 😦 (Except here F is Fn(Option<T::Native>) -> O::Native). Given we discard the array anyway, it would be nice to have something like unary that consumes self so we can just take the null buffer rather than cloning it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have something like unary that consumes self so we can just take the null buffer rather than cloning it.

You can split the array into null buffer and scalar buffer with array.into_parts() I think

I have often thought it would be nice to add a method like this to PrimiveArray to assist with null shenanigans

struct PrimitiveArray {

  /// Replace the null buffer of this array with the specified buffer
  pub fn with_nulls(mut self, nulls: Option<NullBuffer>) -> Self {
    ..
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow. I did a quick test with Float16, and was able to double the performance again using into_parts() to deconstruct the FixedSizeBinaryArray. It's pretty ugly though. I think it would take some time to rework the array code to be able to do this more cleanly and robustly.

@alamb how do you feel about merging this as is and then working on the even faster version as a follow-on? Then we could get some win for 53.0 and more in 54.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- this is an improvement as is and we can make it even better in follow on PRs

Some(b) => i128::from_be_bytes(sign_extend_be(b)),
None => i128::default(),
});
let decimal = Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
.with_precision_and_scale(*p, *s)?;

Arc::new(decimal)
}
ArrowType::Decimal256(p, s) => {
let decimal = binary
.iter()
.map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?))))
.collect::<Decimal256Array>()
let nulls = binary.nulls().cloned();
let decimal = binary.iter().map(|o| match o {
Some(b) => i256::from_be_bytes(sign_extend_be(b)),
None => i256::default(),
});
let decimal = Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
.with_precision_and_scale(*p, *s)?;

Arc::new(decimal)
}
ArrowType::Interval(unit) => {
let nulls = binary.nulls().cloned();
// An interval is stored as 3x 32-bit unsigned integers storing months, days,
// and milliseconds
match unit {
IntervalUnit::YearMonth => Arc::new(
binary
.iter()
.map(|o| o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap())))
.collect::<IntervalYearMonthArray>(),
) as ArrayRef,
IntervalUnit::DayTime => Arc::new(
binary
.iter()
.map(|o| {
o.map(|b| {
IntervalDayTime::new(
i32::from_le_bytes(b[4..8].try_into().unwrap()),
i32::from_le_bytes(b[8..12].try_into().unwrap()),
)
})
})
.collect::<IntervalDayTimeArray>(),
) as ArrayRef,
IntervalUnit::YearMonth => {
let iter = binary.iter().map(|o| match o {
Some(b) => i32::from_le_bytes(b[0..4].try_into().unwrap()),
None => i32::default(),
});
let interval =
IntervalYearMonthArray::from_iter_values_with_nulls(iter, nulls);
Arc::new(interval) as ArrayRef
}
IntervalUnit::DayTime => {
let iter = binary.iter().map(|o| match o {
Some(b) => IntervalDayTime::new(
i32::from_le_bytes(b[4..8].try_into().unwrap()),
i32::from_le_bytes(b[8..12].try_into().unwrap()),
),
None => IntervalDayTime::default(),
});
let interval =
IntervalDayTimeArray::from_iter_values_with_nulls(iter, nulls);
Arc::new(interval) as ArrayRef
}
IntervalUnit::MonthDayNano => {
return Err(nyi_err!("MonthDayNano intervals not supported"));
}
}
}
ArrowType::Float16 => Arc::new(
binary
.iter()
.map(|o| o.map(|b| f16::from_le_bytes(b[..2].try_into().unwrap())))
.collect::<Float16Array>(),
) as ArrayRef,
ArrowType::Float16 => {
let nulls = binary.nulls().cloned();
let f16s = binary.iter().map(|o| match o {
Some(b) => f16::from_le_bytes(b[..2].try_into().unwrap()),
None => f16::default(),
});
let f16s = Float16Array::from_iter_values_with_nulls(f16s, nulls);
Arc::new(f16s) as ArrayRef
}
_ => Arc::new(binary) as ArrayRef,
};

Expand Down
78 changes: 50 additions & 28 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,22 +217,33 @@ where
arrow_cast::cast(&a, target_type)?
}
ArrowType::Decimal128(p, s) => {
let nulls = array.nulls().cloned();
let array = match array.data_type() {
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| v as i128))
.collect::<Decimal128Array>(),
ArrowType::Int32 => {
let decimal = array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i as i128,
None => i128::default(),
});
Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
}

ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| v as i128))
.collect::<Decimal128Array>(),
ArrowType::Int64 => {
let decimal = array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i as i128,
None => i128::default(),
});
Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
}
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
Expand All @@ -245,22 +256,33 @@ where
Arc::new(array) as ArrayRef
}
ArrowType::Decimal256(p, s) => {
let nulls = array.nulls().cloned();
let array = match array.data_type() {
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| i256::from_i128(v as i128)))
.collect::<Decimal256Array>(),
ArrowType::Int32 => {
let decimal = array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i256::from_i128(i as i128),
None => i256::default(),
});
Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
}

ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| v.map(|v| i256::from_i128(v as i128)))
.collect::<Decimal256Array>(),
ArrowType::Int64 => {
let decimal = array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i256::from_i128(i as i128),
None => i256::default(),
});
Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
}
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
Expand Down
Loading