From a3b70794cd84449491ab9a05b597c62c2224d567 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Mon, 12 Aug 2024 17:34:22 -0700 Subject: [PATCH 1/7] create primitive array from iter and nulls --- arrow-array/src/array/primitive_array.rs | 14 ++++ .../array_reader/fixed_len_byte_array.rs | 80 ++++++++++--------- 2 files changed, 58 insertions(+), 36 deletions(-) diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index 70a8ceaef800..db14845b08d9 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -713,6 +713,20 @@ impl PrimitiveArray { } } + /// Creates a PrimitiveArray based on an iterator of values with provided nulls + pub fn from_iter_values_with_nulls>( + iter: I, + nulls: Option, + ) -> Self { + let val_buf: Buffer = iter.into_iter().collect(); + let len = val_buf.len() / std::mem::size_of::(); + Self { + data_type: T::DATA_TYPE, + values: ScalarBuffer::new(val_buf, 0, len), + nulls, + } + } + /// Creates a PrimitiveArray based on a constant value with `count` elements pub fn from_value(value: T::Native, count: usize) -> Self { unsafe { diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 8098f3240a3c..2509ea03fcd2 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -27,7 +27,7 @@ use crate::column::reader::decoder::ColumnValueDecoder; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use arrow_array::{ - ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, + Array, ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalYearMonthArray, }; use arrow_buffer::{i256, Buffer, IntervalDayTime}; @@ -165,57 +165,65 @@ impl ArrayReader for FixedLenByteArrayReader { // TODO: An improvement might be to do this conversion on read let array: ArrayRef = match &self.data_type { ArrowType::Decimal128(p, s) => { - let decimal = binary - .iter() - .map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?)))) - .collect::() + let nulls = binary.nulls().map(|nb| nb.clone()); + let decimal = binary.iter().map(|o| match o { + Some(b) => i128::from_be_bytes(sign_extend_be(b)), + None => i128::default(), + }); + let decimal = Decimal128Array::from_iter_values_with_nulls(decimal, nulls) .with_precision_and_scale(*p, *s)?; - Arc::new(decimal) } ArrowType::Decimal256(p, s) => { - let decimal = binary - .iter() - .map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?)))) - .collect::() + let nulls = binary.nulls().map(|nb| nb.clone()); + let decimal = binary.iter().map(|o| match o { + Some(b) => i256::from_be_bytes(sign_extend_be(b)), + None => i256::default(), + }); + let decimal = Decimal256Array::from_iter_values_with_nulls(decimal, nulls) .with_precision_and_scale(*p, *s)?; - Arc::new(decimal) } ArrowType::Interval(unit) => { + let nulls = binary.nulls().map(|nb| nb.clone()); // An interval is stored as 3x 32-bit unsigned integers storing months, days, // and milliseconds match unit { - IntervalUnit::YearMonth => Arc::new( - binary - .iter() - .map(|o| o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap()))) - .collect::(), - ) as ArrayRef, - IntervalUnit::DayTime => Arc::new( - binary - .iter() - .map(|o| { - o.map(|b| { - IntervalDayTime::new( - i32::from_le_bytes(b[4..8].try_into().unwrap()), - i32::from_le_bytes(b[8..12].try_into().unwrap()), - ) - }) - }) - .collect::(), - ) as ArrayRef, + IntervalUnit::YearMonth => { + let iter = binary.iter().map(|o| match o { + Some(b) => i32::from_le_bytes(b[0..4].try_into().unwrap()), + None => i32::default(), + }); + let interval = + IntervalYearMonthArray::from_iter_values_with_nulls(iter, nulls); + Arc::new(interval) as ArrayRef + } + IntervalUnit::DayTime => { + let iter = binary.iter().map(|o| match o { + Some(b) => IntervalDayTime::new( + i32::from_le_bytes(b[4..8].try_into().unwrap()), + i32::from_le_bytes(b[8..12].try_into().unwrap()), + ), + None => IntervalDayTime::default(), + }); + let interval = + IntervalDayTimeArray::from_iter_values_with_nulls(iter, nulls); + Arc::new(interval) as ArrayRef + } IntervalUnit::MonthDayNano => { return Err(nyi_err!("MonthDayNano intervals not supported")); } } } - ArrowType::Float16 => Arc::new( - binary - .iter() - .map(|o| o.map(|b| f16::from_le_bytes(b[..2].try_into().unwrap()))) - .collect::(), - ) as ArrayRef, + ArrowType::Float16 => { + let nulls = binary.nulls().map(|nb| nb.clone()); + let f16s = binary.iter().map(|o| match o { + Some(b) => f16::from_le_bytes(b[..2].try_into().unwrap()), + None => f16::default(), + }); + let f16s = Float16Array::from_iter_values_with_nulls(f16s, nulls); + Arc::new(f16s) as ArrayRef + } _ => Arc::new(binary) as ArrayRef, }; From fde96ff2177f71a9f202a460e0892d86214e3052 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Mon, 12 Aug 2024 18:03:03 -0700 Subject: [PATCH 2/7] clippy --- parquet/src/arrow/array_reader/fixed_len_byte_array.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 2509ea03fcd2..e2278f4a1ce6 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -165,7 +165,7 @@ impl ArrayReader for FixedLenByteArrayReader { // TODO: An improvement might be to do this conversion on read let array: ArrayRef = match &self.data_type { ArrowType::Decimal128(p, s) => { - let nulls = binary.nulls().map(|nb| nb.clone()); + let nulls = binary.nulls().cloned(); let decimal = binary.iter().map(|o| match o { Some(b) => i128::from_be_bytes(sign_extend_be(b)), None => i128::default(), @@ -175,7 +175,7 @@ impl ArrayReader for FixedLenByteArrayReader { Arc::new(decimal) } ArrowType::Decimal256(p, s) => { - let nulls = binary.nulls().map(|nb| nb.clone()); + let nulls = binary.nulls().cloned(); let decimal = binary.iter().map(|o| match o { Some(b) => i256::from_be_bytes(sign_extend_be(b)), None => i256::default(), @@ -185,7 +185,7 @@ impl ArrayReader for FixedLenByteArrayReader { Arc::new(decimal) } ArrowType::Interval(unit) => { - let nulls = binary.nulls().map(|nb| nb.clone()); + let nulls = binary.nulls().cloned(); // An interval is stored as 3x 32-bit unsigned integers storing months, days, // and milliseconds match unit { @@ -216,7 +216,7 @@ impl ArrayReader for FixedLenByteArrayReader { } } ArrowType::Float16 => { - let nulls = binary.nulls().map(|nb| nb.clone()); + let nulls = binary.nulls().cloned(); let f16s = binary.iter().map(|o| match o { Some(b) => f16::from_le_bytes(b[..2].try_into().unwrap()), None => f16::default(), From 634100a2d667f9786da4f999336b3013056010d7 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Mon, 12 Aug 2024 20:37:20 -0700 Subject: [PATCH 3/7] speed up some more decimals --- .../src/arrow/array_reader/primitive_array.rs | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 07ecc27d9b0b..a93598742a3c 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -217,22 +217,33 @@ where arrow_cast::cast(&a, target_type)? } ArrowType::Decimal128(p, s) => { + let nulls = array.nulls().cloned(); let array = match array.data_type() { - ArrowType::Int32 => array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.map(|v| v as i128)) - .collect::(), + ArrowType::Int32 => { + let decimal = array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| match v { + Some(i) => i as i128, + None => i128::default(), + }); + Decimal128Array::from_iter_values_with_nulls(decimal, nulls) + } - ArrowType::Int64 => array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.map(|v| v as i128)) - .collect::(), + ArrowType::Int64 => { + let decimal = array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| match v { + Some(i) => i as i128, + None => i128::default(), + }); + Decimal128Array::from_iter_values_with_nulls(decimal, nulls) + } _ => { return Err(arrow_err!( "Cannot convert {:?} to decimal", From 5d4ae0dc09f95ee9079b46b117fb554f63157564 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Tue, 13 Aug 2024 10:30:55 -0700 Subject: [PATCH 4/7] add optimizations for byte_stream_split --- .../src/arrow/array_reader/fixed_len_byte_array.rs | 12 ++++++++---- parquet/src/arrow/array_reader/primitive_array.rs | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index e2278f4a1ce6..598ecaf1d45e 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -418,7 +418,7 @@ impl ColumnValueDecoder for ValueDecoder { // so `offset` should be the value offset, not the byte offset let total_values = buf.len() / self.byte_length; let to_read = num_values.min(total_values - *offset); - out.buffer.reserve(to_read * self.byte_length); + //out.buffer.reserve(to_read * self.byte_length); // now read the n streams and reassemble values into the output buffer read_byte_stream_split(&mut out.buffer, buf, *offset, to_read, self.byte_length); @@ -461,9 +461,13 @@ fn read_byte_stream_split( data_width: usize, ) { let stride = src.len() / data_width; - for i in 0..num_values { - for j in 0..data_width { - dst.push(src[offset + j * stride + i]); + let idx = dst.len(); + dst.resize(idx + num_values * data_width, 0u8); + let dst_slc = &mut dst[idx..idx + num_values * data_width]; + for j in 0..data_width { + let src_slc = &src[offset + j * stride..offset + j * stride + num_values]; + for i in 0..num_values { + dst_slc[i * data_width + j] = src_slc[i]; } } } diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index a93598742a3c..341bce7b0fd0 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -242,7 +242,7 @@ where Some(i) => i as i128, None => i128::default(), }); - Decimal128Array::from_iter_values_with_nulls(decimal, nulls) + Decimal128Array::from_iter_values_with_nulls(decimal, nulls) } _ => { return Err(arrow_err!( From 8127cda81c6b912d50d898de3c04c4ca51ca485f Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Tue, 13 Aug 2024 11:48:47 -0700 Subject: [PATCH 5/7] decimal256 --- .../src/arrow/array_reader/primitive_array.rs | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 341bce7b0fd0..5d8a4738b6ce 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -256,22 +256,33 @@ where Arc::new(array) as ArrayRef } ArrowType::Decimal256(p, s) => { + let nulls = array.nulls().cloned(); let array = match array.data_type() { - ArrowType::Int32 => array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.map(|v| i256::from_i128(v as i128))) - .collect::(), + ArrowType::Int32 => { + let decimal = array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| match v { + Some(i) => i256::from_i128(i as i128), + None => i256::default(), + }); + Decimal256Array::from_iter_values_with_nulls(decimal, nulls) + } - ArrowType::Int64 => array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.map(|v| i256::from_i128(v as i128))) - .collect::(), + ArrowType::Int64 => { + let decimal = array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| match v { + Some(i) => i256::from_i128(i as i128), + None => i256::default(), + }); + Decimal256Array::from_iter_values_with_nulls(decimal, nulls) + } _ => { return Err(arrow_err!( "Cannot convert {:?} to decimal", From 0032a4bb40c27d8b8cea956d279301a75ca703d2 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Tue, 13 Aug 2024 12:15:47 -0700 Subject: [PATCH 6/7] Revert "add optimizations for byte_stream_split" This reverts commit 5d4ae0dc09f95ee9079b46b117fb554f63157564. --- .../src/arrow/array_reader/fixed_len_byte_array.rs | 12 ++++-------- parquet/src/arrow/array_reader/primitive_array.rs | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 598ecaf1d45e..e2278f4a1ce6 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -418,7 +418,7 @@ impl ColumnValueDecoder for ValueDecoder { // so `offset` should be the value offset, not the byte offset let total_values = buf.len() / self.byte_length; let to_read = num_values.min(total_values - *offset); - //out.buffer.reserve(to_read * self.byte_length); + out.buffer.reserve(to_read * self.byte_length); // now read the n streams and reassemble values into the output buffer read_byte_stream_split(&mut out.buffer, buf, *offset, to_read, self.byte_length); @@ -461,13 +461,9 @@ fn read_byte_stream_split( data_width: usize, ) { let stride = src.len() / data_width; - let idx = dst.len(); - dst.resize(idx + num_values * data_width, 0u8); - let dst_slc = &mut dst[idx..idx + num_values * data_width]; - for j in 0..data_width { - let src_slc = &src[offset + j * stride..offset + j * stride + num_values]; - for i in 0..num_values { - dst_slc[i * data_width + j] = src_slc[i]; + for i in 0..num_values { + for j in 0..data_width { + dst.push(src[offset + j * stride + i]); } } } diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 5d8a4738b6ce..0bd5d2076d9c 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -242,7 +242,7 @@ where Some(i) => i as i128, None => i128::default(), }); - Decimal128Array::from_iter_values_with_nulls(decimal, nulls) + Decimal128Array::from_iter_values_with_nulls(decimal, nulls) } _ => { return Err(arrow_err!( From dedeacf9a4c82c6002c455697bd84620ff230923 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Tue, 13 Aug 2024 14:48:34 -0700 Subject: [PATCH 7/7] add comments --- parquet/src/arrow/array_reader/fixed_len_byte_array.rs | 3 +++ parquet/src/arrow/array_reader/primitive_array.rs | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 616afcc6576a..3b2600c54795 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -165,6 +165,9 @@ impl ArrayReader for FixedLenByteArrayReader { // TODO: An improvement might be to do this conversion on read let array: ArrayRef = match &self.data_type { ArrowType::Decimal128(p, s) => { + // We can simply reuse the null buffer from `binary` rather than recomputing it + // (as was the case when we simply used `collect` to produce the new array). + // The same applies to the transformations below. let nulls = binary.nulls().cloned(); let decimal = binary.iter().map(|o| match o { Some(b) => i128::from_be_bytes(sign_extend_be(b)), diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 0bd5d2076d9c..5e0e09212c7e 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -217,6 +217,8 @@ where arrow_cast::cast(&a, target_type)? } ArrowType::Decimal128(p, s) => { + // We can simply reuse the null buffer from `array` rather than recomputing it + // (as was the case when we simply used `collect` to produce the new array). let nulls = array.nulls().cloned(); let array = match array.data_type() { ArrowType::Int32 => { @@ -242,7 +244,7 @@ where Some(i) => i as i128, None => i128::default(), }); - Decimal128Array::from_iter_values_with_nulls(decimal, nulls) + Decimal128Array::from_iter_values_with_nulls(decimal, nulls) } _ => { return Err(arrow_err!( @@ -256,6 +258,8 @@ where Arc::new(array) as ArrayRef } ArrowType::Decimal256(p, s) => { + // We can simply reuse the null buffer from `array` rather than recomputing it + // (as was the case when we simply used `collect` to produce the new array). let nulls = array.nulls().cloned(); let array = match array.data_type() { ArrowType::Int32 => {