diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/mod.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/mod.rs index 341cfd6cf2d7..5e7f1352b1aa 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/mod.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/mod.rs @@ -1,5 +1,6 @@ mod io; mod ooc; mod sink; +mod source; pub(crate) use sink::SortSink; diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/ooc.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/ooc.rs index 491ee4bf7642..f40c86a38072 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/ooc.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/ooc.rs @@ -1,8 +1,7 @@ use std::fs::DirEntry; -use std::path::PathBuf; use polars_core::prelude::*; -use polars_core::utils::{_split_offsets, accumulate_dataframes_vertical_unchecked, split_df}; +use polars_core::utils::_split_offsets; use polars_core::POOL; use polars_io::ipc::IpcReader; use polars_io::SerReader; @@ -10,10 +9,10 @@ use polars_ops::prelude::*; use rayon::prelude::*; use crate::executors::sinks::sort::io::{block_thread_until_io_thread_done, DfIter, IOThread}; -use crate::executors::sinks::sort::sink::sort_accumulated; -use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Source, SourceResult}; +use crate::executors::sinks::sort::source::SortSource; +use crate::operators::FinalizedSink; -fn read_df(entry: &DirEntry) -> PolarsResult { +pub(super) fn read_df(entry: &DirEntry) -> PolarsResult { let path = entry.path(); let file = std::fs::File::open(path)?; IpcReader::new(file).set_rechunk(false).finish() @@ -46,8 +45,8 @@ pub(super) fn sort_ooc( let assigned_parts = det_partitions(sort_col, &partitions, reverse); // partition the dataframe into proper buckets - let (iter, partition) = partition_df(df, &assigned_parts)?; - io_thread.dump_iter(Some(partition), iter); + let (iter, unique_assigned_parts) = partition_df(df, &assigned_parts)?; + io_thread.dump_iter(Some(unique_assigned_parts), iter); } PolarsResult::Ok(()) }) @@ -72,7 +71,7 @@ pub(super) fn sort_ooc( }) .collect::>>()?; - let source = SortSource::new(files, idx, reverse, slice); + let source = SortSource::new(files, idx, reverse, slice, partitions); Ok(FinalizedSink::Source(Box::new(source))) } @@ -104,105 +103,3 @@ fn partition_df(df: DataFrame, partitions: &IdxCa) -> PolarsResult<(DfIter, IdxC }; Ok((out, partitions)) } - -pub struct SortSource { - files: std::vec::IntoIter<(u32, PathBuf)>, - n_threads: usize, - sort_idx: usize, - reverse: bool, - chunk_offset: IdxSize, - slice: Option<(i64, usize)>, - finished: bool, -} - -impl SortSource { - fn new( - mut files: Vec<(u32, PathBuf)>, - sort_idx: usize, - reverse: bool, - slice: Option<(i64, usize)>, - ) -> Self { - files.sort_unstable_by_key(|entry| entry.0); - - let n_threads = POOL.current_num_threads(); - let files = files.into_iter(); - - Self { - files, - n_threads, - sort_idx, - reverse, - chunk_offset: 0, - slice, - finished: false, - } - } -} - -impl Source for SortSource { - fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult { - match self.files.next() { - None => Ok(SourceResult::Finished), - Some((_, path)) => { - let files = std::fs::read_dir(path)?.collect::>>()?; - - // early return - if self.finished { - return Ok(SourceResult::Finished); - } - - // read the files in a single partition in parallel - let dfs = POOL.install(|| { - files - .par_iter() - .map(read_df) - .collect::>>() - })?; - let df = accumulate_dataframes_vertical_unchecked(dfs); - // sort a single partition - let current_slice = self.slice; - let mut df = match &mut self.slice { - None => sort_accumulated(df, self.sort_idx, self.reverse, None), - Some((offset, len)) => { - let df_len = df.height(); - assert!(*offset >= 0); - let out = if *offset as usize > df_len { - *offset -= df_len as i64; - Ok(df.slice(0, 0)) - } else { - let out = - sort_accumulated(df, self.sort_idx, self.reverse, current_slice); - *len = len.saturating_sub(df_len); - *offset = 0; - out - }; - if *len == 0 { - self.finished = true; - } - out - } - }?; - - // convert to chunks - // TODO: make utility functions to save these allocations - let chunk_offset = self.chunk_offset; - let dfs = split_df(&mut df, self.n_threads)?; - self.chunk_offset += dfs.len() as IdxSize; - let batch = dfs - .into_iter() - .enumerate() - .map(|(i, df)| DataChunk { - chunk_index: chunk_offset + i as IdxSize, - data: df, - }) - .collect(); - - Ok(SourceResult::GotMoreData(batch)) - } - } - } - - fn fmt(&self) -> &str { - "sort_source" - } -} diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink.rs index c9d3fc66c81e..9edc879e738a 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink.rs @@ -55,7 +55,7 @@ impl SortSink { dist_sample: vec![], }; if ooc { - eprintln!("Out of core sort forced"); + eprintln!("OOC sort forced"); out.init_ooc().unwrap(); } out @@ -101,8 +101,11 @@ impl SortSink { while let Some(df) = self.chunks.pop_front() { if df.height() > 0 { // safety: we just asserted height > 0 - let sample = unsafe { df.get_columns()[self.sort_idx].get_unchecked(0) }; - self.dist_sample.push(sample.into_static().unwrap()); + let sample = unsafe { + let s = &df.get_columns()[self.sort_idx]; + s.to_physical_repr().get_unchecked(0).into_static().unwrap() + }; + self.dist_sample.push(sample); let iot = self.io_thread.lock().unwrap(); let iot = iot.as_ref().unwrap(); diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/source.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/source.rs new file mode 100644 index 000000000000..1b61289c6337 --- /dev/null +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/source.rs @@ -0,0 +1,188 @@ +use std::fs::DirEntry; +use std::path::PathBuf; + +use polars_core::prelude::*; +use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df}; +use polars_core::POOL; +use rayon::prelude::*; + +use crate::executors::sinks::sort::ooc::read_df; +use crate::executors::sinks::sort::sink::sort_accumulated; +use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult}; + +pub struct SortSource { + files: std::vec::IntoIter<(u32, PathBuf)>, + n_threads: usize, + sort_idx: usize, + reverse: bool, + chunk_offset: IdxSize, + slice: Option<(i64, usize)>, + finished: bool, + + // The sorted partitions + // are used check if a directory is already completely sorted + // if the lower boundary of a partition is equal to the upper + // boundary, the whole dictionary is already sorted + // this dictionary may also be very large as in the extreme case + // we sort a column with a constant value, then the binary search + // ensures that all files will be written to a single folder + // in that case we just read the files + partitions: Series, + sorted_directory_in_process: Option>, +} + +impl SortSource { + pub(super) fn new( + mut files: Vec<(u32, PathBuf)>, + sort_idx: usize, + reverse: bool, + slice: Option<(i64, usize)>, + partitions: Series, + ) -> Self { + files.sort_unstable_by_key(|entry| entry.0); + + let n_threads = POOL.current_num_threads(); + let files = files.into_iter(); + + Self { + files, + n_threads, + sort_idx, + reverse, + chunk_offset: 0, + slice, + finished: false, + partitions, + sorted_directory_in_process: None, + } + } + fn finish_batch(&mut self, dfs: Vec) -> Vec { + // TODO: make utility functions to save these allocations + let chunk_offset = self.chunk_offset; + self.chunk_offset += dfs.len() as IdxSize; + dfs.into_iter() + .enumerate() + .map(|(i, df)| DataChunk { + chunk_index: chunk_offset + i as IdxSize, + data: df, + }) + .collect() + } +} + +impl Source for SortSource { + fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult { + // early return + if self.finished { + return Ok(SourceResult::Finished); + } + + // this branch processes the directories containing a single sort key + // e.g. the lower_bound == upper_bound + if let Some(files) = &mut self.sorted_directory_in_process { + let read = files + .take(self.n_threads) + .map(|entry| read_df(&entry)) + .collect::>>()?; + let mut df = match (read.len(), &mut self.slice) { + (0, _) => { + // depleted directory, continue with normal sorting + self.sorted_directory_in_process = None; + return self.get_batches(_context); + } + // there is not slice and we got exactly enough files + // so we return, happy path + (n, None) if n == self.n_threads => { + return Ok(SourceResult::GotMoreData(self.finish_batch(read))) + } + // there is a slice, so we concat and apply the slice + // and then later split over the number of threads + (_, Some((offset, len))) => { + let df = accumulate_dataframes_vertical_unchecked(read); + let df_len = df.height(); + + // whole batch can be skipped + let out = if *offset as usize >= df_len { + *offset -= df_len as i64; + return self.get_batches(_context); + } else { + let out = df.slice(*offset, *len); + *len = len.saturating_sub(df_len); + *offset = 0; + out + }; + if *len == 0 { + self.finished = true; + } + out + } + // The number of files read are lower than the number of + // batches we have to return, so we first accumulate + // and then split over the number of threads + (_, None) => accumulate_dataframes_vertical_unchecked(read), + }; + let batch = split_df(&mut df, self.n_threads)?; + return Ok(SourceResult::GotMoreData(self.finish_batch(batch))); + } + + match self.files.next() { + None => Ok(SourceResult::Finished), + Some((partition, path)) => { + let files = std::fs::read_dir(path)?.collect::>>()?; + + // both lower and upper can fail. + // lower can fail because the search_sorted can add the sort idx at the end of the array, which is i == len + if let (Ok(lower), Ok(upper)) = ( + self.partitions.get(partition as usize), + self.partitions.get(partition as usize + 1), + ) { + if lower == upper && !files.is_empty() { + let files = files.into_iter(); + self.sorted_directory_in_process = Some(files); + return self.get_batches(_context); + } + } + + // read the files in a single partition in parallel + let dfs = POOL.install(|| { + files + .par_iter() + .map(read_df) + .collect::>>() + })?; + let df = accumulate_dataframes_vertical_unchecked(dfs); + // sort a single partition + let current_slice = self.slice; + let mut df = match &mut self.slice { + None => sort_accumulated(df, self.sort_idx, self.reverse, None), + Some((offset, len)) => { + let df_len = df.height(); + assert!(*offset >= 0); + let out = if *offset as usize >= df_len { + *offset -= df_len as i64; + Ok(df.slice(0, 0)) + } else { + let out = + sort_accumulated(df, self.sort_idx, self.reverse, current_slice); + *len = len.saturating_sub(df_len); + *offset = 0; + out + }; + if *len == 0 { + self.finished = true; + } + out + } + }?; + + // convert to chunks + let dfs = split_df(&mut df, self.n_threads)?; + Ok(SourceResult::GotMoreData(self.finish_batch(dfs))) + } + } + } + + fn fmt(&self) -> &str { + "sort_source" + } +} diff --git a/polars/polars-ops/src/series/ops/search_sorted.rs b/polars/polars-ops/src/series/ops/search_sorted.rs index 45f6c42a57fb..b803fef79672 100644 --- a/polars/polars-ops/src/series/ops/search_sorted.rs +++ b/polars/polars-ops/src/series/ops/search_sorted.rs @@ -1,7 +1,7 @@ use std::cmp::Ordering; use std::fmt::Debug; -use arrow::array::{PrimitiveArray, Utf8Array}; +use arrow::array::{Array, PrimitiveArray, Utf8Array}; use polars_arrow::kernels::rolling::compare_fn_nan_max; use polars_arrow::prelude::*; use polars_core::prelude::*; @@ -163,11 +163,19 @@ where let mut out = Vec::with_capacity(search_values.len()); - for opt_v in search_values { - match opt_v { - None => out.push(0), - Some(search_value) => { - binary_search_array(side, &mut out, arr, ca.len(), search_value, reverse) + for search_arr in search_values.downcast_iter() { + if search_arr.null_count() == 0 { + for search_value in search_arr.values_iter() { + binary_search_array(side, &mut out, arr, ca.len(), *search_value, reverse) + } + } else { + for opt_v in search_arr.into_iter() { + match opt_v { + None => out.push(0), + Some(search_value) => { + binary_search_array(side, &mut out, arr, ca.len(), *search_value, reverse) + } + } } } } @@ -185,11 +193,19 @@ fn search_sorted_utf8_array( let mut out = Vec::with_capacity(search_values.len()); - for opt_v in search_values { - match opt_v { - None => out.push(0), - Some(search_value) => { - binary_search_array(side, &mut out, arr, ca.len(), search_value, reverse); + for search_arr in search_values.downcast_iter() { + if search_arr.null_count() == 0 { + for search_value in search_arr.values_iter() { + binary_search_array(side, &mut out, arr, ca.len(), search_value, reverse) + } + } else { + for opt_v in search_arr.into_iter() { + match opt_v { + None => out.push(0), + Some(search_value) => { + binary_search_array(side, &mut out, arr, ca.len(), search_value, reverse) + } + } } } }