From 5ffc0a87dd5abb4f7db1172bac6ed93da95827f7 Mon Sep 17 00:00:00 2001 From: askoa <112126368+askoa@users.noreply.github.com> Date: Mon, 13 Feb 2023 10:51:31 -0500 Subject: [PATCH] fix: Handle sliced array in run array iterator (#3681) * Handle sliced array in run array iterator * incorporate PR comments --------- Co-authored-by: ask --- arrow-array/src/array/run_array.rs | 5 ++ arrow-array/src/run_iterator.rs | 113 +++++++++++++++++++++-------- 2 files changed, 88 insertions(+), 30 deletions(-) diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs index 33738d649f76..709933e1b103 100644 --- a/arrow-array/src/array/run_array.rs +++ b/arrow-array/src/array/run_array.rs @@ -472,6 +472,11 @@ impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> { pub fn values(&self) -> &'a V { self.values } + + /// Returns the run array of this [`TypedRunArray`] + pub fn run_array(&self) -> &'a RunArray { + self.run_array + } } impl<'a, R: RunEndIndexType, V: Sync> Array for TypedRunArray<'a, R, V> { diff --git a/arrow-array/src/run_iterator.rs b/arrow-array/src/run_iterator.rs index 8bad85a9f1e1..a79969c3cb91 100644 --- a/arrow-array/src/run_iterator.rs +++ b/arrow-array/src/run_iterator.rs @@ -42,10 +42,10 @@ where <&'a V as ArrayAccessor>::Item: Default, { array: TypedRunArray<'a, R, V>, - current_logical: usize, - current_physical: usize, - current_end_logical: usize, - current_end_physical: usize, + current_front_logical: usize, + current_front_physical: usize, + current_back_logical: usize, + current_back_physical: usize, } impl<'a, R, V> RunArrayIter<'a, R, V> @@ -57,14 +57,19 @@ where { /// create a new iterator pub fn new(array: TypedRunArray<'a, R, V>) -> Self { - let logical_len = array.len(); - let physical_len: usize = array.values().len(); + let current_front_physical: usize = + array.run_array().get_physical_index(0).unwrap(); + let current_back_physical: usize = array + .run_array() + .get_physical_index(array.len() - 1) + .unwrap() + + 1; RunArrayIter { array, - current_logical: 0, - current_physical: 0, - current_end_logical: logical_len, - current_end_physical: physical_len, + current_front_logical: array.offset(), + current_front_physical, + current_back_logical: array.offset() + array.len(), + current_back_physical, } } } @@ -80,35 +85,37 @@ where #[inline] fn next(&mut self) -> Option { - if self.current_logical == self.current_end_logical { + if self.current_front_logical == self.current_back_logical { return None; } // If current logical index is greater than current run end index then increment // the physical index. - if self.current_logical + if self.current_front_logical >= self .array .run_ends() - .value(self.current_physical) + .value(self.current_front_physical) .as_usize() { // As the run_ends is expected to be strictly increasing, there // should be at least one logical entry in one physical entry. Because of this // reason the next value can be accessed by incrementing physical index once. - self.current_physical += 1; + self.current_front_physical += 1; } - if self.array.values().is_null(self.current_physical) { - self.current_logical += 1; + if self.array.values().is_null(self.current_front_physical) { + self.current_front_logical += 1; Some(None) } else { - self.current_logical += 1; + self.current_front_logical += 1; // Safety: // The self.current_physical is kept within bounds of self.current_logical. // The self.current_logical will not go out of bounds because of the check // `self.current_logical = self.current_end_logical` above. unsafe { Some(Some( - self.array.values().value_unchecked(self.current_physical), + self.array + .values() + .value_unchecked(self.current_front_physical), )) } } @@ -116,8 +123,8 @@ where fn size_hint(&self) -> (usize, Option) { ( - self.current_end_logical - self.current_logical, - Some(self.current_end_logical - self.current_logical), + self.current_back_logical - self.current_front_logical, + Some(self.current_back_logical - self.current_front_logical), ) } } @@ -130,26 +137,26 @@ where <&'a V as ArrayAccessor>::Item: Default, { fn next_back(&mut self) -> Option { - if self.current_end_logical == self.current_logical { + if self.current_back_logical == self.current_front_logical { return None; } - self.current_end_logical -= 1; + self.current_back_logical -= 1; - if self.current_end_physical > 0 - && self.current_end_logical + if self.current_back_physical > 0 + && self.current_back_logical < self .array .run_ends() - .value(self.current_end_physical - 1) + .value(self.current_back_physical - 1) .as_usize() { // As the run_ends is expected to be strictly increasing, there // should be at least one logical entry in one physical entry. Because of this // reason the next value can be accessed by decrementing physical index once. - self.current_end_physical -= 1; + self.current_back_physical -= 1; } - Some(if self.array.values().is_null(self.current_end_physical) { + Some(if self.array.values().is_null(self.current_back_physical) { None } else { // Safety: @@ -160,7 +167,7 @@ where Some( self.array .values() - .value_unchecked(self.current_end_physical), + .value_unchecked(self.current_back_physical), ) } }) @@ -184,8 +191,8 @@ mod tests { use crate::{ array::{Int32Array, StringArray}, builder::PrimitiveRunBuilder, - types::Int32Type, - Int64RunArray, + types::{Int16Type, Int32Type}, + Array, Int64RunArray, PrimitiveArray, RunArray, }; fn build_input_array(size: usize) -> Vec> { @@ -345,4 +352,50 @@ mod tests { assert_eq!(expected_vec, result_asref); } + + #[test] + fn test_sliced_run_array_iterator() { + let total_len = 80; + let input_array = build_input_array(total_len); + + // Encode the input_array to run array + let mut builder = + PrimitiveRunBuilder::::with_capacity(input_array.len()); + builder.extend(input_array.iter().copied()); + let run_array = builder.finish(); + + // test for all slice lengths. + for slice_len in 1..=total_len { + // test for offset = 0, slice length = slice_len + let sliced_run_array: RunArray = + run_array.slice(0, slice_len).into_data().into(); + let sliced_typed_run_array = sliced_run_array + .downcast::>() + .unwrap(); + + // Iterate on sliced typed run array + let actual: Vec> = sliced_typed_run_array.into_iter().collect(); + let expected: Vec> = + input_array.iter().take(slice_len).copied().collect(); + assert_eq!(expected, actual); + + // test for offset = total_len - slice_len, length = slice_len + let sliced_run_array: RunArray = run_array + .slice(total_len - slice_len, slice_len) + .into_data() + .into(); + let sliced_typed_run_array = sliced_run_array + .downcast::>() + .unwrap(); + + // Iterate on sliced typed run array + let actual: Vec> = sliced_typed_run_array.into_iter().collect(); + let expected: Vec> = input_array + .iter() + .skip(total_len - slice_len) + .copied() + .collect(); + assert_eq!(expected, actual); + } + } }