Skip to content

Commit

Permalink
perf: Add fast paths for series.arg_sort and dataframe.sort (#19872)
Browse files Browse the repository at this point in the history
Co-authored-by: siddharthv <Ssiddharthv@fastmail.com>
Co-authored-by: ritchie <ritchie46@gmail.com>
  • Loading branch information
3 people authored Dec 7, 2024
1 parent 8abc550 commit a6ca94d
Show file tree
Hide file tree
Showing 6 changed files with 545 additions and 46 deletions.
172 changes: 170 additions & 2 deletions crates/polars-core/src/chunked_array/ops/sort/arg_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,108 @@ where
options.multithreaded,
);
}
// Compute the indexes after reversing a sorted array, maintaining
// the order of equal elements, in linear time. Faster than sort_impl
// as we avoid allocating extra memory.
pub(super) fn reverse_stable_no_nulls<I, J, T>(iters: I, len: usize) -> Vec<IdxSize>
where
I: IntoIterator<Item = J>,
J: IntoIterator<Item = T>,
T: TotalOrd + Send + Sync,
{
let mut current_start: IdxSize = 0;
let mut current_end: IdxSize = 0;
let mut rev_idx: Vec<IdxSize> = Vec::with_capacity(len);
let mut i: IdxSize;
// We traverse the array, comparing consecutive elements.
// We maintain the start and end indice of elements with same value.
// When we see a new element we push the previous indices in reverse order.
// We do a final reverse to get stable reverse index.
// Example -
// 1 2 2 3 3 3 4
// 0 1 2 3 4 5 6
// We get start and end position of equal values -
// 0 1-2 3-5 6
// We insert the indexes of equal elements in reverse
// 0 2 1 5 4 3 6
// Then do a final reverse
// 6 3 4 5 1 2 0
let mut previous_element: Option<T> = None;
for arr_iter in iters {
for current_element in arr_iter {
match &previous_element {
None => {
//There is atleast one element
current_end = 1;
},
Some(prev) => {
if current_element.tot_cmp(prev) == Ordering::Equal {
current_end += 1;
} else {
// Insert in reverse order
i = current_end;
while i > current_start {
i -= 1;
//SAFETY - we allocated enough
unsafe { rev_idx.push_unchecked(i) };
}
current_start = current_end;
current_end += 1;
}
},
}
previous_element = Some(current_element);
}
}
// If there are no elements this does nothing
i = current_end;
while i > current_start {
i -= 1;
unsafe { rev_idx.push_unchecked(i) };
}
// Final reverse
rev_idx.reverse();
rev_idx
}

pub(super) fn arg_sort<I, J, T>(
name: PlSmallStr,
iters: I,
options: SortOptions,
null_count: usize,
mut len: usize,
is_sorted_flag: IsSorted,
first_element_null: bool,
) -> IdxCa
where
I: IntoIterator<Item = J>,
J: IntoIterator<Item = Option<T>>,
T: TotalOrd + Send + Sync,
{
let nulls_last = options.nulls_last;
let null_cap = if nulls_last { null_count } else { len };

let mut vals = Vec::with_capacity(len - null_count);
// Fast path
// Only if array is already sorted in the required ordered and
// nulls are also in the correct position
if ((options.descending && is_sorted_flag == IsSorted::Descending)
|| (!options.descending && is_sorted_flag == IsSorted::Ascending))
&& ((nulls_last && !first_element_null) || (!nulls_last && first_element_null))
{
len = options
.limit
.map(|(limit, _)| std::cmp::min(limit as usize, len))
.unwrap_or(len);
return ChunkedArray::with_chunk(
name,
IdxArr::from_data_default(
Buffer::from((0..(len as IdxSize)).collect::<Vec<IdxSize>>()),
None,
),
);
}

let null_cap = if nulls_last { null_count } else { len };
let mut vals = Vec::with_capacity(len - null_count);
let mut nulls_idx = Vec::with_capacity(null_cap);
let mut count: IdxSize = 0;

Expand Down Expand Up @@ -108,12 +192,40 @@ pub(super) fn arg_sort_no_nulls<I, J, T>(
iters: I,
options: SortOptions,
len: usize,
is_sorted_flag: IsSorted,
) -> IdxCa
where
I: IntoIterator<Item = J>,
J: IntoIterator<Item = T>,
T: TotalOrd + Send + Sync,
{
// Fast path
// 1) If array is already sorted in the required ordered .
// 2) If array is reverse sorted -> we do a stable reverse.
if is_sorted_flag != IsSorted::Not {
let len_final = options
.limit
.map(|(limit, _)| std::cmp::min(limit as usize, len))
.unwrap_or(len);
if (options.descending && is_sorted_flag == IsSorted::Descending)
|| (!options.descending && is_sorted_flag == IsSorted::Ascending)
{
return ChunkedArray::with_chunk(
name,
IdxArr::from_data_default(
Buffer::from((0..(len_final as IdxSize)).collect::<Vec<IdxSize>>()),
None,
),
);
} else if (options.descending && is_sorted_flag == IsSorted::Ascending)
|| (!options.descending && is_sorted_flag == IsSorted::Descending)
{
let idx = reverse_stable_no_nulls(iters, len);
let idx = Buffer::from(idx).sliced(0, len_final);
return ChunkedArray::with_chunk(name, IdxArr::from_data_default(idx, None));
}
}

let mut vals = Vec::with_capacity(len);

let mut count: IdxSize = 0;
Expand Down Expand Up @@ -171,3 +283,59 @@ pub(crate) fn arg_sort_row_fmt(
let ca: NoNull<IdxCa> = items.into_iter().map(|tpl| tpl.0).collect();
Ok(ca.into_inner())
}
#[cfg(test)]
mod test {
use sort::arg_sort::reverse_stable_no_nulls;

use crate::prelude::*;

#[test]
fn test_reverse_stable_no_nulls() {
let a = Int32Chunked::new(
PlSmallStr::from_static("a"),
&[
Some(1), // 0
Some(2), // 1
Some(2), // 2
Some(3), // 3
Some(3), // 4
Some(3), // 5
Some(4), // 6
],
);
let idx = reverse_stable_no_nulls(&a, 7);
let expected = [6, 3, 4, 5, 1, 2, 0];
assert_eq!(idx, expected);

let a = Int32Chunked::new(
PlSmallStr::from_static("a"),
&[
Some(1), // 0
Some(2), // 1
Some(3), // 2
Some(4), // 3
Some(5), // 4
Some(6), // 5
Some(7), // 6
],
);
let idx = reverse_stable_no_nulls(&a, 7);
let expected = [6, 5, 4, 3, 2, 1, 0];
assert_eq!(idx, expected);

let a = Int32Chunked::new(
PlSmallStr::from_static("a"),
&[
Some(1), // 0
],
);
let idx = reverse_stable_no_nulls(&a, 1);
let expected = [0];
assert_eq!(idx, expected);

let empty_array: [i32; 0] = [];
let a = Int32Chunked::new(PlSmallStr::from_static("a"), &empty_array);
let idx = reverse_stable_no_nulls(&a, 0);
assert_eq!(idx.len(), 0);
}
}
2 changes: 2 additions & 0 deletions crates/polars-core/src/chunked_array/ops/sort/categorical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ impl CategoricalChunked {
options,
self.physical().null_count(),
self.len(),
IsSorted::Not,
false,
)
} else {
self.physical().arg_sort(options)
Expand Down
55 changes: 52 additions & 3 deletions crates/polars-core/src/chunked_array/ops/sort/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,33 @@ macro_rules! sort_with_fast_path {
}}
}

macro_rules! arg_sort_fast_path {
($ca:ident, $options:expr) => {{
// if already sorted in required order we can just return 0..len
if $options.limit.is_none() &&
($options.descending && $ca.is_sorted_descending_flag() || ($ca.is_sorted_ascending_flag() && !$options.descending)) {
// there are nulls
if $ca.null_count() > 0 {
// if the nulls are already last we can return 0..len
if ($options.nulls_last && $ca.get($ca.len() - 1).is_none() ) ||
// if the nulls are already first we can return 0..len
(! $options.nulls_last && $ca.get(0).is_none())
{
return ChunkedArray::with_chunk($ca.name().clone(),
IdxArr::from_data_default(Buffer::from((0..($ca.len() as IdxSize)).collect::<Vec<IdxSize>>()), None));
}
// nulls are not at the right place
// continue w/ sorting
// TODO: we can optimize here and just put the null at the correct place
} else {
// no nulls
return ChunkedArray::with_chunk($ca.name().clone(),
IdxArr::from_data_default(Buffer::from((0..($ca.len() as IdxSize )).collect::<Vec<IdxSize>>()), None));
}
}
}}
}

fn sort_with_numeric<T>(ca: &ChunkedArray<T>, options: SortOptions) -> ChunkedArray<T>
where
T: PolarsNumericType,
Expand Down Expand Up @@ -225,16 +252,31 @@ where
T: PolarsNumericType,
{
options.multithreaded &= POOL.current_num_threads() > 1;
arg_sort_fast_path!(ca, options);
if ca.null_count() == 0 {
let iter = ca
.downcast_iter()
.map(|arr| arr.values().as_slice().iter().copied());
arg_sort::arg_sort_no_nulls(ca.name().clone(), iter, options, ca.len())
arg_sort::arg_sort_no_nulls(
ca.name().clone(),
iter,
options,
ca.len(),
ca.is_sorted_flag(),
)
} else {
let iter = ca
.downcast_iter()
.map(|arr| arr.iter().map(|opt| opt.copied()));
arg_sort::arg_sort(ca.name().clone(), iter, options, ca.null_count(), ca.len())
arg_sort::arg_sort(
ca.name().clone(),
iter,
options,
ca.null_count(),
ca.len(),
ca.is_sorted_flag(),
ca.get(0).is_none(),
)
}
}

Expand Down Expand Up @@ -413,12 +455,14 @@ impl ChunkSort<BinaryType> for BinaryChunked {
}

fn arg_sort(&self, options: SortOptions) -> IdxCa {
arg_sort_fast_path!(self, options);
if self.null_count() == 0 {
arg_sort::arg_sort_no_nulls(
self.name().clone(),
self.downcast_iter().map(|arr| arr.values_iter()),
options,
self.len(),
self.is_sorted_flag(),
)
} else {
arg_sort::arg_sort(
Expand All @@ -427,6 +471,8 @@ impl ChunkSort<BinaryType> for BinaryChunked {
options,
self.null_count(),
self.len(),
self.is_sorted_flag(),
self.get(0).is_none(),
)
}
}
Expand Down Expand Up @@ -681,12 +727,14 @@ impl ChunkSort<BooleanType> for BooleanChunked {
}

fn arg_sort(&self, options: SortOptions) -> IdxCa {
arg_sort_fast_path!(self, options);
if self.null_count() == 0 {
arg_sort::arg_sort_no_nulls(
self.name().clone(),
self.downcast_iter().map(|arr| arr.values_iter()),
options,
self.len(),
self.is_sorted_flag(),
)
} else {
arg_sort::arg_sort(
Expand All @@ -695,6 +743,8 @@ impl ChunkSort<BooleanType> for BooleanChunked {
options,
self.null_count(),
self.len(),
self.is_sorted_flag(),
self.get(0).is_none(),
)
}
}
Expand Down Expand Up @@ -747,7 +797,6 @@ pub(crate) fn prepare_arg_sort(
#[cfg(test)]
mod test {
use crate::prelude::*;

#[test]
fn test_arg_sort() {
let a = Int32Chunked::new(
Expand Down
34 changes: 34 additions & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1999,6 +1999,7 @@ impl DataFrame {
Ok(self.clone())
};
}

// note that the by_column argument also contains evaluated expression from
// polars-lazy that may not even be present in this dataframe. therefore
// when we try to set the first columns as sorted, we ignore the error as
Expand Down Expand Up @@ -2035,6 +2036,39 @@ impl DataFrame {
return self.bottom_k_impl(k, by_column, sort_options);
}
}
// Check if the required column is already sorted; if so we can exit early
// We can do so when there is only one column to sort by, for multiple columns
// it will be complicated to do so
#[cfg(feature = "dtype-categorical")]
let is_not_categorical_enum =
!(matches!(by_column[0].dtype(), DataType::Categorical(_, _))
|| matches!(by_column[0].dtype(), DataType::Enum(_, _)));

#[cfg(not(feature = "dtype-categorical"))]
#[allow(non_upper_case_globals)]
const is_not_categorical_enum: bool = true;

if by_column.len() == 1 && is_not_categorical_enum {
let required_sorting = if sort_options.descending[0] {
IsSorted::Descending
} else {
IsSorted::Ascending
};
// If null count is 0 then nulls_last doesnt matter
// Safe to get value at last position since the dataframe is not empty (taken care above)
let no_sorting_required = (by_column[0].is_sorted_flag() == required_sorting)
&& ((by_column[0].null_count() == 0)
|| by_column[0].get(by_column[0].len() - 1).unwrap().is_null()
== sort_options.nulls_last[0]);

if no_sorting_required {
return if let Some((offset, len)) = slice {
Ok(self.slice(offset, len))
} else {
Ok(self.clone())
};
}
}

#[cfg(feature = "dtype-struct")]
let has_struct = by_column
Expand Down
Loading

0 comments on commit a6ca94d

Please sign in to comment.