Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into metadata_reader
Browse files Browse the repository at this point in the history
  • Loading branch information
etseidl committed Sep 19, 2024
2 parents 2d65c3f + 1390283 commit 2a2cf81
Show file tree
Hide file tree
Showing 34 changed files with 282 additions and 63 deletions.
83 changes: 73 additions & 10 deletions arrow-arith/src/arity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,6 @@ where
))));
}

let nulls = NullBuffer::union(a.logical_nulls().as_ref(), b.logical_nulls().as_ref());

let mut builder = a.into_builder()?;

builder
Expand All @@ -323,14 +321,21 @@ where
.zip(b.values())
.for_each(|(l, r)| *l = op(*l, *r));

let array_builder = builder.finish().into_data().into_builder().nulls(nulls);
let array = builder.finish();

// The builder has the null buffer from `a`, it is not changed.
let nulls = NullBuffer::union(array.logical_nulls().as_ref(), b.logical_nulls().as_ref());

let array_builder = array.into_data().into_builder().nulls(nulls);

let array_data = unsafe { array_builder.build_unchecked() };
Ok(Ok(PrimitiveArray::<T>::from(array_data)))
}

/// Applies the provided fallible binary operation across `a` and `b`, returning any error,
/// and collecting the results into a [`PrimitiveArray`]. If any index is null in either `a`
/// Applies the provided fallible binary operation across `a` and `b`.
///
/// This will return any error encountered, or collect the results into
/// a [`PrimitiveArray`]. If any index is null in either `a`
/// or `b`, the corresponding index in the result will also be null
///
/// Like [`try_unary`] the function is only evaluated for non-null indices
Expand Down Expand Up @@ -381,12 +386,15 @@ where
}

/// Applies the provided fallible binary operation across `a` and `b` by mutating the mutable
/// [`PrimitiveArray`] `a` with the results, returning any error. If any index is null in
/// either `a` or `b`, the corresponding index in the result will also be null
/// [`PrimitiveArray`] `a` with the results.
///
/// Like [`try_unary`] the function is only evaluated for non-null indices
/// Returns any error encountered, or collects the results into a [`PrimitiveArray`] as return
/// value. If any index is null in either `a` or `b`, the corresponding index in the result will
/// also be null.
///
/// Like [`try_unary`] the function is only evaluated for non-null indices.
///
/// See [`binary_mut`] for errors and buffer reuse information
/// See [`binary_mut`] for errors and buffer reuse information.
pub fn try_binary_mut<T, F>(
a: PrimitiveArray<T>,
b: &PrimitiveArray<T>,
Expand All @@ -413,7 +421,8 @@ where
try_binary_no_nulls_mut(len, a, b, op)
} else {
let nulls =
NullBuffer::union(a.logical_nulls().as_ref(), b.logical_nulls().as_ref()).unwrap();
create_union_null_buffer(a.logical_nulls().as_ref(), b.logical_nulls().as_ref())
.unwrap();

let mut builder = a.into_builder()?;

Expand All @@ -435,6 +444,22 @@ where
}
}

/// Computes the union of the nulls in two optional [`NullBuffer`] which
/// is not shared with the input buffers.
///
/// The union of the nulls is the same as `NullBuffer::union(lhs, rhs)` but
/// it does not increase the reference count of the null buffer.
fn create_union_null_buffer(
lhs: Option<&NullBuffer>,
rhs: Option<&NullBuffer>,
) -> Option<NullBuffer> {
match (lhs, rhs) {
(Some(lhs), Some(rhs)) => Some(NullBuffer::new(lhs.inner() & rhs.inner())),
(Some(n), None) | (None, Some(n)) => Some(NullBuffer::new(n.inner() & n.inner())),
(None, None) => None,
}
}

/// This intentional inline(never) attribute helps LLVM optimize the loop.
#[inline(never)]
fn try_binary_no_nulls<A: ArrayAccessor, B: ArrayAccessor, F, O>(
Expand Down Expand Up @@ -557,6 +582,25 @@ mod tests {
assert_eq!(c, expected);
}

#[test]
fn test_binary_mut_null_buffer() {
let a = Int32Array::from(vec![Some(3), Some(4), Some(5), Some(6), None]);

let b = Int32Array::from(vec![Some(10), Some(11), Some(12), Some(13), Some(14)]);

let r1 = binary_mut(a, &b, |a, b| a + b).unwrap();

let a = Int32Array::from(vec![Some(3), Some(4), Some(5), Some(6), None]);
let b = Int32Array::new(
vec![10, 11, 12, 13, 14].into(),
Some(vec![true, true, true, true, true].into()),
);

// unwrap here means that no copying occured
let r2 = binary_mut(a, &b, |a, b| a + b).unwrap();
assert_eq!(r1.unwrap(), r2.unwrap());
}

#[test]
fn test_try_binary_mut() {
let a = Int32Array::from(vec![15, 14, 9, 8, 1]);
Expand Down Expand Up @@ -587,6 +631,25 @@ mod tests {
.expect_err("should got error");
}

#[test]
fn test_try_binary_mut_null_buffer() {
let a = Int32Array::from(vec![Some(3), Some(4), Some(5), Some(6), None]);

let b = Int32Array::from(vec![Some(10), Some(11), Some(12), Some(13), Some(14)]);

let r1 = try_binary_mut(a, &b, |a, b| Ok(a + b)).unwrap();

let a = Int32Array::from(vec![Some(3), Some(4), Some(5), Some(6), None]);
let b = Int32Array::new(
vec![10, 11, 12, 13, 14].into(),
Some(vec![true, true, true, true, true].into()),
);

// unwrap here means that no copying occured
let r2 = try_binary_mut(a, &b, |a, b| Ok(a + b)).unwrap();
assert_eq!(r1.unwrap(), r2.unwrap());
}

#[test]
fn test_unary_dict_mut() {
let values = Int32Array::from(vec![Some(10), Some(20), None]);
Expand Down
8 changes: 6 additions & 2 deletions arrow-arith/src/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ impl<T: Datelike> ChronoDateExt for T {

/// Parse the given string into a string representing fixed-offset that is correct as of the given
/// UTC NaiveDateTime.
///
/// Note that the offset is function of time and can vary depending on whether daylight savings is
/// in effect or not. e.g. Australia/Sydney is +10:00 or +11:00 depending on DST.
#[deprecated(note = "Use arrow_array::timezone::Tz instead")]
Expand Down Expand Up @@ -811,6 +812,7 @@ where
}

/// Extracts the day of a given temporal array as an array of integers.
///
/// If the given array isn't temporal primitive or dictionary array,
/// an `Err` will be returned.
#[deprecated(since = "51.0.0", note = "Use `date_part` instead")]
Expand All @@ -828,7 +830,8 @@ where
date_part_primitive(array, DatePart::Day)
}

/// Extracts the day of year of a given temporal array as an array of integers
/// Extracts the day of year of a given temporal array as an array of integers.
///
/// The day of year that ranges from 1 to 366.
/// If the given array isn't temporal primitive or dictionary array,
/// an `Err` will be returned.
Expand All @@ -837,7 +840,8 @@ pub fn doy_dyn(array: &dyn Array) -> Result<ArrayRef, ArrowError> {
date_part(array, DatePart::DayOfYear)
}

/// Extracts the day of year of a given temporal primitive array as an array of integers
/// Extracts the day of year of a given temporal primitive array as an array of integers.
///
/// The day of year that ranges from 1 to 366
#[deprecated(since = "51.0.0", note = "Use `date_part` instead")]
pub fn doy<T>(array: &PrimitiveArray<T>) -> Result<Int32Array, ArrowError>
Expand Down
4 changes: 2 additions & 2 deletions arrow-array/src/array/binary_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<OffsetSize: OffsetSizeTrait> GenericBinaryArray<OffsetSize> {
pub fn take_iter<'a>(
&'a self,
indexes: impl Iterator<Item = Option<usize>> + 'a,
) -> impl Iterator<Item = Option<&[u8]>> + 'a {
) -> impl Iterator<Item = Option<&'a [u8]>> {
indexes.map(|opt_index| opt_index.map(|index| self.value(index)))
}

Expand All @@ -95,7 +95,7 @@ impl<OffsetSize: OffsetSizeTrait> GenericBinaryArray<OffsetSize> {
pub unsafe fn take_iter_unchecked<'a>(
&'a self,
indexes: impl Iterator<Item = Option<usize>> + 'a,
) -> impl Iterator<Item = Option<&[u8]>> + 'a {
) -> impl Iterator<Item = Option<&'a [u8]>> {
indexes.map(|opt_index| opt_index.map(|index| self.value_unchecked(index)))
}
}
Expand Down
4 changes: 2 additions & 2 deletions arrow-array/src/array/string_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<OffsetSize: OffsetSizeTrait> GenericStringArray<OffsetSize> {
pub fn take_iter<'a>(
&'a self,
indexes: impl Iterator<Item = Option<usize>> + 'a,
) -> impl Iterator<Item = Option<&str>> + 'a {
) -> impl Iterator<Item = Option<&'a str>> {
indexes.map(|opt_index| opt_index.map(|index| self.value(index)))
}

Expand All @@ -53,7 +53,7 @@ impl<OffsetSize: OffsetSizeTrait> GenericStringArray<OffsetSize> {
pub unsafe fn take_iter_unchecked<'a>(
&'a self,
indexes: impl Iterator<Item = Option<usize>> + 'a,
) -> impl Iterator<Item = Option<&str>> + 'a {
) -> impl Iterator<Item = Option<&'a str>> {
indexes.map(|opt_index| opt_index.map(|index| self.value_unchecked(index)))
}

Expand Down
3 changes: 2 additions & 1 deletion arrow-array/src/builder/generic_bytes_view_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,8 @@ fn make_inlined_view<const LEN: usize>(data: &[u8]) -> u128 {
u128::from_le_bytes(view_buffer)
}

/// Create a view based on the given data, block id and offset
/// Create a view based on the given data, block id and offset.
///
/// Note that the code below is carefully examined with x86_64 assembly code: <https://godbolt.org/z/685YPsd5G>
/// The goal is to avoid calling into `ptr::copy_non_interleave`, which makes function call (i.e., not inlined),
/// which slows down things.
Expand Down
1 change: 1 addition & 0 deletions arrow-array/src/ffi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ fn get_error_code(err: &ArrowError) -> i32 {
}

/// A `RecordBatchReader` which imports Arrays from `FFI_ArrowArrayStream`.
///
/// Struct used to fetch `RecordBatch` from the C Stream Interface.
/// Its main responsibility is to expose `RecordBatchReader` functionality
/// that requires [FFI_ArrowArrayStream].
Expand Down
4 changes: 3 additions & 1 deletion arrow-array/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ impl BooleanType {
pub const DATA_TYPE: DataType = DataType::Boolean;
}

/// Trait for [primitive values], bridging the dynamic-typed nature of Arrow
/// Trait for [primitive values].
///
/// This trait bridges the dynamic-typed nature of Arrow
/// (via [`DataType`]) with the static-typed nature of rust types
/// ([`ArrowNativeType`]) for all types that implement [`ArrowNativeType`].
///
Expand Down
1 change: 1 addition & 0 deletions arrow-buffer/src/builder/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::{BooleanBufferBuilder, MutableBuffer, NullBuffer};

/// Builder for creating the null bit buffer.
///
/// This builder only materializes the buffer when we append `false`.
/// If you only append `true`s to the builder, what you get will be
/// `None` when calling [`finish`](#method.finish).
Expand Down
6 changes: 4 additions & 2 deletions arrow-buffer/src/util/bit_mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

use crate::bit_util::ceil;

/// Sets all bits on `write_data` in the range `[offset_write..offset_write+len]` to be equal to the
/// bits in `data` in the range `[offset_read..offset_read+len]`
/// Util function to set bits in a slice of bytes.
///
/// This will sets all bits on `write_data` in the range `[offset_write..offset_write+len]`
/// to be equal to the bits in `data` in the range `[offset_read..offset_read+len]`
/// returns the number of `0` bits `data[offset_read..offset_read+len]`
/// `offset_write`, `offset_read`, and `len` are in terms of bits
pub fn set_bits(
Expand Down
2 changes: 1 addition & 1 deletion arrow-cast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ arrow-select = { workspace = true }
chrono = { workspace = true }
half = { version = "2.1", default-features = false }
num = { version = "0.4", default-features = false, features = ["std"] }
lexical-core = { version = "^0.8", default-features = false, features = ["write-integers", "write-floats", "parse-integers", "parse-floats"] }
lexical-core = { version = "1.0", default-features = false, features = ["write-integers", "write-floats", "parse-integers", "parse-floats"] }
atoi = "2.0.0"
comfy-table = { version = "7.0", optional = true, default-features = false }
base64 = "0.22"
Expand Down
38 changes: 34 additions & 4 deletions arrow-cast/src/cast/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,41 @@ pub(crate) fn cast_to_dictionary<K: ArrowDictionaryKeyType>(
UInt16 => pack_numeric_to_dictionary::<K, UInt16Type>(array, dict_value_type, cast_options),
UInt32 => pack_numeric_to_dictionary::<K, UInt32Type>(array, dict_value_type, cast_options),
UInt64 => pack_numeric_to_dictionary::<K, UInt64Type>(array, dict_value_type, cast_options),
Decimal128(_, _) => {
pack_numeric_to_dictionary::<K, Decimal128Type>(array, dict_value_type, cast_options)
Decimal128(p, s) => {
let dict = pack_numeric_to_dictionary::<K, Decimal128Type>(
array,
dict_value_type,
cast_options,
)?;
let dict = dict
.as_dictionary::<K>()
.downcast_dict::<Decimal128Array>()
.unwrap();
let value = dict.values().clone();
// Set correct precision/scale
let value = value.with_precision_and_scale(p, s)?;
Ok(Arc::new(DictionaryArray::<K>::try_new(
dict.keys().clone(),
Arc::new(value),
)?))
}
Decimal256(_, _) => {
pack_numeric_to_dictionary::<K, Decimal256Type>(array, dict_value_type, cast_options)
Decimal256(p, s) => {
let dict = pack_numeric_to_dictionary::<K, Decimal256Type>(
array,
dict_value_type,
cast_options,
)?;
let dict = dict
.as_dictionary::<K>()
.downcast_dict::<Decimal256Array>()
.unwrap();
let value = dict.values().clone();
// Set correct precision/scale
let value = value.with_precision_and_scale(p, s)?;
Ok(Arc::new(DictionaryArray::<K>::try_new(
dict.keys().clone(),
Arc::new(value),
)?))
}
Float16 => {
pack_numeric_to_dictionary::<K, Float16Type>(array, dict_value_type, cast_options)
Expand Down
32 changes: 32 additions & 0 deletions arrow-cast/src/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2650,6 +2650,38 @@ mod tests {
err.unwrap_err().to_string());
}

#[test]
fn test_cast_decimal128_to_decimal128_dict() {
let p = 20;
let s = 3;
let input_type = DataType::Decimal128(p, s);
let output_type = DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Decimal128(p, s)),
);
assert!(can_cast_types(&input_type, &output_type));
let array = vec![Some(1123456), Some(2123456), Some(3123456), None];
let array = create_decimal_array(array, p, s).unwrap();
let cast_array = cast_with_options(&array, &output_type, &CastOptions::default()).unwrap();
assert_eq!(cast_array.data_type(), &output_type);
}

#[test]
fn test_cast_decimal256_to_decimal256_dict() {
let p = 20;
let s = 3;
let input_type = DataType::Decimal256(p, s);
let output_type = DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Decimal256(p, s)),
);
assert!(can_cast_types(&input_type, &output_type));
let array = vec![Some(1123456), Some(2123456), Some(3123456), None];
let array = create_decimal_array(array, p, s).unwrap();
let cast_array = cast_with_options(&array, &output_type, &CastOptions::default()).unwrap();
assert_eq!(cast_array.data_type(), &output_type);
}

#[test]
fn test_cast_decimal128_to_decimal128_overflow() {
let input_type = DataType::Decimal128(38, 3);
Expand Down
4 changes: 1 addition & 3 deletions arrow-cast/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,7 @@ macro_rules! primitive_display {
fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult {
let value = self.value(idx);
let mut buffer = [0u8; <$t as ArrowPrimitiveType>::Native::FORMATTED_SIZE];
// SAFETY:
// buffer is T::FORMATTED_SIZE
let b = unsafe { lexical_core::write_unchecked(value, &mut buffer) };
let b = lexical_core::write(value, &mut buffer);
// Lexical core produces valid UTF-8
let s = unsafe { std::str::from_utf8_unchecked(b) };
f.write_str(s)?;
Expand Down
2 changes: 1 addition & 1 deletion arrow-csv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ chrono = { workspace = true }
csv = { version = "1.1", default-features = false }
csv-core = { version = "0.1" }
lazy_static = { version = "1.4", default-features = false }
lexical-core = { version = "^0.8", default-features = false }
lexical-core = { version = "1.0", default-features = false }
regex = { version = "1.7.0", default-features = false, features = ["std", "unicode", "perf"] }

[dev-dependencies]
Expand Down
8 changes: 5 additions & 3 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,11 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff
}
}

/// A generic representation of Arrow array data which encapsulates common attributes and
/// operations for Arrow array. Specific operations for different arrays types (e.g.,
/// primitive, list, struct) are implemented in `Array`.
/// A generic representation of Arrow array data which encapsulates common attributes
/// and operations for Arrow array.
///
/// Specific operations for different arrays types (e.g., primitive, list, struct)
/// are implemented in `Array`.
///
/// # Memory Layout
///
Expand Down
3 changes: 1 addition & 2 deletions arrow-flight/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ use std::{fmt, ops::Deref};

type ArrowResult<T> = std::result::Result<T, ArrowError>;

#[allow(clippy::derive_partial_eq_without_eq)]

#[allow(clippy::all)]
mod gen {
include!("arrow.flight.protocol.rs");
}
Expand Down
Loading

0 comments on commit 2a2cf81

Please sign in to comment.