Skip to content

Commit

Permalink
fix: Handle sliced array in run array iterator (#3681)
Browse files Browse the repository at this point in the history
* Handle sliced array in run array iterator

* incorporate PR comments

---------

Co-authored-by: ask <ask@local>
  • Loading branch information
askoa and ask authored Feb 13, 2023
1 parent d011e6a commit 5ffc0a8
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 30 deletions.
5 changes: 5 additions & 0 deletions arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,11 @@ impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> {
pub fn values(&self) -> &'a V {
self.values
}

/// Returns the run array of this [`TypedRunArray`]
pub fn run_array(&self) -> &'a RunArray<R> {
self.run_array
}
}

impl<'a, R: RunEndIndexType, V: Sync> Array for TypedRunArray<'a, R, V> {
Expand Down
113 changes: 83 additions & 30 deletions arrow-array/src/run_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ where
<&'a V as ArrayAccessor>::Item: Default,
{
array: TypedRunArray<'a, R, V>,
current_logical: usize,
current_physical: usize,
current_end_logical: usize,
current_end_physical: usize,
current_front_logical: usize,
current_front_physical: usize,
current_back_logical: usize,
current_back_physical: usize,
}

impl<'a, R, V> RunArrayIter<'a, R, V>
Expand All @@ -57,14 +57,19 @@ where
{
/// create a new iterator
pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
let logical_len = array.len();
let physical_len: usize = array.values().len();
let current_front_physical: usize =
array.run_array().get_physical_index(0).unwrap();
let current_back_physical: usize = array
.run_array()
.get_physical_index(array.len() - 1)
.unwrap()
+ 1;
RunArrayIter {
array,
current_logical: 0,
current_physical: 0,
current_end_logical: logical_len,
current_end_physical: physical_len,
current_front_logical: array.offset(),
current_front_physical,
current_back_logical: array.offset() + array.len(),
current_back_physical,
}
}
}
Expand All @@ -80,44 +85,46 @@ where

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.current_logical == self.current_end_logical {
if self.current_front_logical == self.current_back_logical {
return None;
}
// If current logical index is greater than current run end index then increment
// the physical index.
if self.current_logical
if self.current_front_logical
>= self
.array
.run_ends()
.value(self.current_physical)
.value(self.current_front_physical)
.as_usize()
{
// As the run_ends is expected to be strictly increasing, there
// should be at least one logical entry in one physical entry. Because of this
// reason the next value can be accessed by incrementing physical index once.
self.current_physical += 1;
self.current_front_physical += 1;
}
if self.array.values().is_null(self.current_physical) {
self.current_logical += 1;
if self.array.values().is_null(self.current_front_physical) {
self.current_front_logical += 1;
Some(None)
} else {
self.current_logical += 1;
self.current_front_logical += 1;
// Safety:
// The self.current_physical is kept within bounds of self.current_logical.
// The self.current_logical will not go out of bounds because of the check
// `self.current_logical = self.current_end_logical` above.
unsafe {
Some(Some(
self.array.values().value_unchecked(self.current_physical),
self.array
.values()
.value_unchecked(self.current_front_physical),
))
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(
self.current_end_logical - self.current_logical,
Some(self.current_end_logical - self.current_logical),
self.current_back_logical - self.current_front_logical,
Some(self.current_back_logical - self.current_front_logical),
)
}
}
Expand All @@ -130,26 +137,26 @@ where
<&'a V as ArrayAccessor>::Item: Default,
{
fn next_back(&mut self) -> Option<Self::Item> {
if self.current_end_logical == self.current_logical {
if self.current_back_logical == self.current_front_logical {
return None;
}

self.current_end_logical -= 1;
self.current_back_logical -= 1;

if self.current_end_physical > 0
&& self.current_end_logical
if self.current_back_physical > 0
&& self.current_back_logical
< self
.array
.run_ends()
.value(self.current_end_physical - 1)
.value(self.current_back_physical - 1)
.as_usize()
{
// As the run_ends is expected to be strictly increasing, there
// should be at least one logical entry in one physical entry. Because of this
// reason the next value can be accessed by decrementing physical index once.
self.current_end_physical -= 1;
self.current_back_physical -= 1;
}
Some(if self.array.values().is_null(self.current_end_physical) {
Some(if self.array.values().is_null(self.current_back_physical) {
None
} else {
// Safety:
Expand All @@ -160,7 +167,7 @@ where
Some(
self.array
.values()
.value_unchecked(self.current_end_physical),
.value_unchecked(self.current_back_physical),
)
}
})
Expand All @@ -184,8 +191,8 @@ mod tests {
use crate::{
array::{Int32Array, StringArray},
builder::PrimitiveRunBuilder,
types::Int32Type,
Int64RunArray,
types::{Int16Type, Int32Type},
Array, Int64RunArray, PrimitiveArray, RunArray,
};

fn build_input_array(size: usize) -> Vec<Option<i32>> {
Expand Down Expand Up @@ -345,4 +352,50 @@ mod tests {

assert_eq!(expected_vec, result_asref);
}

#[test]
fn test_sliced_run_array_iterator() {
let total_len = 80;
let input_array = build_input_array(total_len);

// Encode the input_array to run array
let mut builder =
PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
builder.extend(input_array.iter().copied());
let run_array = builder.finish();

// test for all slice lengths.
for slice_len in 1..=total_len {
// test for offset = 0, slice length = slice_len
let sliced_run_array: RunArray<Int16Type> =
run_array.slice(0, slice_len).into_data().into();
let sliced_typed_run_array = sliced_run_array
.downcast::<PrimitiveArray<Int32Type>>()
.unwrap();

// Iterate on sliced typed run array
let actual: Vec<Option<i32>> = sliced_typed_run_array.into_iter().collect();
let expected: Vec<Option<i32>> =
input_array.iter().take(slice_len).copied().collect();
assert_eq!(expected, actual);

// test for offset = total_len - slice_len, length = slice_len
let sliced_run_array: RunArray<Int16Type> = run_array
.slice(total_len - slice_len, slice_len)
.into_data()
.into();
let sliced_typed_run_array = sliced_run_array
.downcast::<PrimitiveArray<Int32Type>>()
.unwrap();

// Iterate on sliced typed run array
let actual: Vec<Option<i32>> = sliced_typed_run_array.into_iter().collect();
let expected: Vec<Option<i32>> = input_array
.iter()
.skip(total_len - slice_len)
.copied()
.collect();
assert_eq!(expected, actual);
}
}
}

0 comments on commit 5ffc0a8

Please sign in to comment.