diff --git a/polars/polars-core/src/chunked_array/ndarray.rs b/polars/polars-core/src/chunked_array/ndarray.rs index 25d9214f4d34..0b06674438c2 100644 --- a/polars/polars-core/src/chunked_array/ndarray.rs +++ b/polars/polars-core/src/chunked_array/ndarray.rs @@ -2,6 +2,7 @@ use ndarray::prelude::*; use rayon::prelude::*; use crate::prelude::*; +use crate::POOL; impl ChunkedArray where @@ -98,32 +99,33 @@ impl DataFrame { where N: PolarsNumericType, { - let columns = self - .get_columns() - .par_iter() - .map(|s| { - let s = s.cast(&N::get_dtype())?; - let s = match s.dtype() { - DataType::Float32 => { - let ca = s.f32().unwrap(); - ca.none_to_nan().into_series() - } - DataType::Float64 => { - let ca = s.f64().unwrap(); - ca.none_to_nan().into_series() - } - _ => s, - }; - Ok(s.rechunk()) - }) - .collect::>>()?; + let columns = POOL.install(|| { + self.get_columns() + .par_iter() + .map(|s| { + let s = s.cast(&N::get_dtype())?; + let s = match s.dtype() { + DataType::Float32 => { + let ca = s.f32().unwrap(); + ca.none_to_nan().into_series() + } + DataType::Float64 => { + let ca = s.f64().unwrap(); + ca.none_to_nan().into_series() + } + _ => s, + }; + Ok(s.rechunk()) + }) + .collect::>>() + })?; let shape = self.shape(); let height = self.height(); let mut membuf = Vec::with_capacity(shape.0 * shape.1); let ptr = membuf.as_ptr() as usize; - columns.par_iter().enumerate().map(|(col_idx, s)| { + POOL.install(||{columns.par_iter().enumerate().map(|(col_idx, s)| { if s.null_count() != 0 { return Err(PolarsError::ComputeError( "Creation of ndarray with null values is not supported. Consider using floats and NaNs".into(), @@ -148,7 +150,7 @@ impl DataFrame { } Ok(()) - }).collect::>>()?; + }).collect::>>()})?; // Safety: // we have written all data, so we can now safely set length diff --git a/polars/polars-core/src/frame/groupby/hashing.rs b/polars/polars-core/src/frame/groupby/hashing.rs index 4980f2a41f34..bcbd677a7c22 100644 --- a/polars/polars-core/src/frame/groupby/hashing.rs +++ b/polars/polars-core/src/frame/groupby/hashing.rs @@ -106,9 +106,10 @@ where // We will create a hashtable in every thread. // We use the hash to partition the keys to the matching hashtable. // Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition. - let out = POOL - .install(|| { - (0..n_partitions).into_par_iter().map(|thread_no| { + let out = POOL.install(|| { + (0..n_partitions) + .into_par_iter() + .map(|thread_no| { let mut hash_tbl: PlHashMap)> = PlHashMap::with_capacity(HASHMAP_INIT_SIZE); @@ -149,8 +150,8 @@ where .map(|(_k, v)| v) .collect_trusted::>() }) - }) - .collect::>(); + .collect::>() + }); finish_group_order(out, sorted) } @@ -303,9 +304,10 @@ pub(crate) fn groupby_threaded_multiple_keys_flat( // We will create a hashtable in every thread. // We use the hash to partition the keys to the matching hashtable. // Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition. - let groups = POOL - .install(|| { - (0..n_partitions).into_par_iter().map(|thread_no| { + let groups = POOL.install(|| { + (0..n_partitions) + .into_par_iter() + .map(|thread_no| { let hashes = &hashes; let mut hash_tbl: HashMap), IdBuildHasher> = @@ -339,7 +341,7 @@ pub(crate) fn groupby_threaded_multiple_keys_flat( } hash_tbl.into_iter().map(|(_k, v)| v).collect::>() }) - }) - .collect::>(); + .collect::>() + }); Ok(finish_group_order(groups, sorted)) } diff --git a/polars/polars-core/src/frame/hash_join/multiple_keys.rs b/polars/polars-core/src/frame/hash_join/multiple_keys.rs index 0f593bf95019..e078406a08bf 100644 --- a/polars/polars-core/src/frame/hash_join/multiple_keys.rs +++ b/polars/polars-core/src/frame/hash_join/multiple_keys.rs @@ -38,41 +38,43 @@ pub(crate) fn create_probe_table( // We use the hash to partition the keys to the matching hashtable. // Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition. POOL.install(|| { - (0..n_partitions).into_par_iter().map(|part_no| { - let part_no = part_no as u64; - let mut hash_tbl: HashMap, IdBuildHasher> = - HashMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, Default::default()); - - let n_partitions = n_partitions as u64; - let mut offset = 0; - for hashes in hashes { - for hashes in hashes.data_views() { - let len = hashes.len(); - let mut idx = 0; - hashes.iter().for_each(|h| { - // partition hashes by thread no. - // So only a part of the hashes go to this hashmap - if this_partition(*h, part_no, n_partitions) { - let idx = idx + offset; - populate_multiple_key_hashmap( - &mut hash_tbl, - idx, - *h, - keys, - || vec![idx], - |v| v.push(idx), - ) - } - idx += 1; - }); + (0..n_partitions) + .into_par_iter() + .map(|part_no| { + let part_no = part_no as u64; + let mut hash_tbl: HashMap, IdBuildHasher> = + HashMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, Default::default()); + + let n_partitions = n_partitions as u64; + let mut offset = 0; + for hashes in hashes { + for hashes in hashes.data_views() { + let len = hashes.len(); + let mut idx = 0; + hashes.iter().for_each(|h| { + // partition hashes by thread no. + // So only a part of the hashes go to this hashmap + if this_partition(*h, part_no, n_partitions) { + let idx = idx + offset; + populate_multiple_key_hashmap( + &mut hash_tbl, + idx, + *h, + keys, + || vec![idx], + |v| v.push(idx), + ) + } + idx += 1; + }); - offset += len as IdxSize; + offset += len as IdxSize; + } } - } - hash_tbl - }) + hash_tbl + }) + .collect() }) - .collect() } fn create_build_table_outer( diff --git a/polars/polars-core/src/frame/hash_join/single_keys.rs b/polars/polars-core/src/frame/hash_join/single_keys.rs index 9b32813a710f..05ba99501671 100644 --- a/polars/polars-core/src/frame/hash_join/single_keys.rs +++ b/polars/polars-core/src/frame/hash_join/single_keys.rs @@ -13,43 +13,45 @@ where // We use the hash to partition the keys to the matching hashtable. // Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition. POOL.install(|| { - (0..n_partitions).into_par_iter().map(|partition_no| { - let partition_no = partition_no as u64; + (0..n_partitions) + .into_par_iter() + .map(|partition_no| { + let partition_no = partition_no as u64; - let mut hash_tbl: PlHashMap> = - PlHashMap::with_capacity(HASHMAP_INIT_SIZE); + let mut hash_tbl: PlHashMap> = + PlHashMap::with_capacity(HASHMAP_INIT_SIZE); - let n_partitions = n_partitions as u64; - let mut offset = 0; - for keys in &keys { - let keys = keys.as_ref(); - let len = keys.len() as IdxSize; + let n_partitions = n_partitions as u64; + let mut offset = 0; + for keys in &keys { + let keys = keys.as_ref(); + let len = keys.len() as IdxSize; - let mut cnt = 0; - keys.iter().for_each(|k| { - let idx = cnt + offset; - cnt += 1; + let mut cnt = 0; + keys.iter().for_each(|k| { + let idx = cnt + offset; + cnt += 1; - if this_partition(k.as_u64(), partition_no, n_partitions) { - let entry = hash_tbl.entry(*k); + if this_partition(k.as_u64(), partition_no, n_partitions) { + let entry = hash_tbl.entry(*k); - match entry { - Entry::Vacant(entry) => { - entry.insert(vec![idx]); - } - Entry::Occupied(mut entry) => { - let v = entry.get_mut(); - v.push(idx); + match entry { + Entry::Vacant(entry) => { + entry.insert(vec![idx]); + } + Entry::Occupied(mut entry) => { + let v = entry.get_mut(); + v.push(idx); + } } } - } - }); - offset += len; - } - hash_tbl - }) + }); + offset += len; + } + hash_tbl + }) + .collect() }) - .collect() } // we determine the offset so that we later know which index to store in the join tuples diff --git a/polars/polars-core/src/vector_hasher.rs b/polars/polars-core/src/vector_hasher.rs index f2157658ea5f..cda707e96ef4 100644 --- a/polars/polars-core/src/vector_hasher.rs +++ b/polars/polars-core/src/vector_hasher.rs @@ -627,49 +627,51 @@ where // We use the hash to partition the keys to the matching hashtable. // Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition. POOL.install(|| { - (0..n_partitions).into_par_iter().map(|partition_no| { - let build_hasher = build_hasher.clone(); - let hashes_and_keys = &hashes_and_keys; - let partition_no = partition_no as u64; - let mut hash_tbl: HashMap), RandomState> = - HashMap::with_hasher(build_hasher); - - let n_threads = n_partitions as u64; - let mut offset = 0; - for hashes_and_keys in hashes_and_keys { - let len = hashes_and_keys.len(); - hashes_and_keys - .iter() - .enumerate() - .for_each(|(idx, (h, k))| { - let idx = idx as IdxSize; - // partition hashes by thread no. - // So only a part of the hashes go to this hashmap - if this_partition(*h, partition_no, n_threads) { - let idx = idx + offset; - let entry = hash_tbl - .raw_entry_mut() - // uses the key to check equality to find and entry - .from_key_hashed_nocheck(*h, k); - - match entry { - RawEntryMut::Vacant(entry) => { - entry.insert_hashed_nocheck(*h, *k, (false, vec![idx])); - } - RawEntryMut::Occupied(mut entry) => { - let (_k, v) = entry.get_key_value_mut(); - v.1.push(idx); + (0..n_partitions) + .into_par_iter() + .map(|partition_no| { + let build_hasher = build_hasher.clone(); + let hashes_and_keys = &hashes_and_keys; + let partition_no = partition_no as u64; + let mut hash_tbl: HashMap), RandomState> = + HashMap::with_hasher(build_hasher); + + let n_threads = n_partitions as u64; + let mut offset = 0; + for hashes_and_keys in hashes_and_keys { + let len = hashes_and_keys.len(); + hashes_and_keys + .iter() + .enumerate() + .for_each(|(idx, (h, k))| { + let idx = idx as IdxSize; + // partition hashes by thread no. + // So only a part of the hashes go to this hashmap + if this_partition(*h, partition_no, n_threads) { + let idx = idx + offset; + let entry = hash_tbl + .raw_entry_mut() + // uses the key to check equality to find and entry + .from_key_hashed_nocheck(*h, k); + + match entry { + RawEntryMut::Vacant(entry) => { + entry.insert_hashed_nocheck(*h, *k, (false, vec![idx])); + } + RawEntryMut::Occupied(mut entry) => { + let (_k, v) = entry.get_key_value_mut(); + v.1.push(idx); + } } } - } - }); + }); - offset += len as IdxSize; - } - hash_tbl - }) + offset += len as IdxSize; + } + hash_tbl + }) + .collect() }) - .collect() } pub(crate) fn create_hash_and_keys_threaded_vectorized( diff --git a/polars/polars-io/src/parquet/write.rs b/polars/polars-io/src/parquet/write.rs index bca977088ec0..2d60065fff8a 100644 --- a/polars/polars-io/src/parquet/write.rs +++ b/polars/polars-io/src/parquet/write.rs @@ -8,6 +8,7 @@ use arrow::io::parquet::read::ParquetError; use arrow::io::parquet::write::{self, DynIter, DynStreamingIterator, Encoding, FileWriter, *}; use polars_core::prelude::*; use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df}; +use polars_core::POOL; use rayon::prelude::*; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -257,39 +258,41 @@ fn create_serializer<'a>( encodings: &[Vec], options: WriteOptions, ) -> Result, ArrowError> { - let columns = batch - .columns() - .par_iter() - .zip(fields) - .zip(encodings) - .flat_map(move |((array, type_), encoding)| { - let encoded_columns = - array_to_columns(array, type_.clone(), options, encoding).unwrap(); - - encoded_columns - .into_iter() - .map(|encoded_pages| { - // iterator over pages - let pages = DynStreamingIterator::new( - Compressor::new_from_vec( - encoded_pages.map(|result| { - result.map_err(|e| { - ParquetError::FeatureNotSupported(format!( - "reraised in polars: {e}", - )) - }) - }), - options.compression, - vec![], - ) - .map_err(|e| ArrowError::External(format!("{e}"), Box::new(e))), - ); - - Ok(pages) - }) - .collect::>() - }) - .collect::>(); + let columns = POOL.install(|| { + batch + .columns() + .par_iter() + .zip(fields) + .zip(encodings) + .flat_map(move |((array, type_), encoding)| { + let encoded_columns = + array_to_columns(array, type_.clone(), options, encoding).unwrap(); + + encoded_columns + .into_iter() + .map(|encoded_pages| { + // iterator over pages + let pages = DynStreamingIterator::new( + Compressor::new_from_vec( + encoded_pages.map(|result| { + result.map_err(|e| { + ParquetError::FeatureNotSupported(format!( + "reraised in polars: {e}", + )) + }) + }), + options.compression, + vec![], + ) + .map_err(|e| ArrowError::External(format!("{e}"), Box::new(e))), + ); + + Ok(pages) + }) + .collect::>() + }) + .collect::>() + }); let row_group = DynIter::new(columns.into_iter()); diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs b/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs index 9529e9ff1fa7..37ec04fb69b6 100644 --- a/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs +++ b/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs @@ -203,11 +203,12 @@ impl PipeLine { } } - let mut reduced_sink = sink - .into_par_iter() - .reduce_with(|mut a, b| { - a.combine(b); - a + let mut reduced_sink = POOL + .install(|| { + sink.into_par_iter().reduce_with(|mut a, b| { + a.combine(b); + a + }) }) .unwrap(); let sink_result = reduced_sink.finalize(ec)?; diff --git a/polars/polars-lazy/src/physical_plan/executors/groupby_partitioned.rs b/polars/polars-lazy/src/physical_plan/executors/groupby_partitioned.rs index 3ed215f67773..fdf62849e45c 100644 --- a/polars/polars-lazy/src/physical_plan/executors/groupby_partitioned.rs +++ b/polars/polars-lazy/src/physical_plan/executors/groupby_partitioned.rs @@ -105,8 +105,8 @@ fn run_partitions( columns.extend_from_slice(&agg_columns); DataFrame::new(columns) - }) - }).collect() + }).collect() + }) } fn estimate_unique_count(keys: &[Series], mut sample_size: usize) -> PolarsResult { diff --git a/polars/polars-lazy/src/physical_plan/expressions/apply.rs b/polars/polars-lazy/src/physical_plan/expressions/apply.rs index a5970b4f1c31..9d3e18127eca 100644 --- a/polars/polars-lazy/src/physical_plan/expressions/apply.rs +++ b/polars/polars-lazy/src/physical_plan/expressions/apply.rs @@ -116,11 +116,12 @@ impl PhysicalExpr for ApplyExpr { } fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { - let mut inputs = self - .inputs - .par_iter() - .map(|e| e.evaluate(df, state)) - .collect::>>()?; + let mut inputs = POOL.install(|| { + self.inputs + .par_iter() + .map(|e| e.evaluate(df, state)) + .collect::>>() + })?; if self.allow_rename { return self.eval_and_flatten(&mut inputs); @@ -177,21 +178,22 @@ impl PhysicalExpr for ApplyExpr { return Ok(self.finish_apply_groups(ac, ca)); } - let mut ca: ListChunked = agg - .list() - .unwrap() - .par_iter() - .map(|opt_s| match opt_s { - None => Ok(None), - Some(mut s) => { - if self.pass_name_to_apply { - s.rename(&name); + let mut ca: ListChunked = POOL.install(|| { + agg.list() + .unwrap() + .par_iter() + .map(|opt_s| match opt_s { + None => Ok(None), + Some(mut s) => { + if self.pass_name_to_apply { + s.rename(&name); + } + let mut container = [s]; + self.function.call_udf(&mut container) } - let mut container = [s]; - self.function.call_udf(&mut container) - } - }) - .collect::>()?; + }) + .collect::>() + })?; ca.rename(&name); Ok(self.finish_apply_groups(ac, ca)) diff --git a/polars/polars-time/src/windows/groupby.rs b/polars/polars-time/src/windows/groupby.rs index 387996a47e20..0a39154588b8 100644 --- a/polars/polars-time/src/windows/groupby.rs +++ b/polars/polars-time/src/windows/groupby.rs @@ -423,23 +423,25 @@ pub fn groupby_values( // ------t--- // [------] if offset.duration_ns() < period.duration_ns() * 2 { - let vals = thread_offsets - .par_iter() - .copied() - .map(|(base_offset, len)| { - let upper_bound = base_offset + len; - let iter = groupby_values_iter_full_lookbehind( - period, - offset, - &time[..upper_bound], - closed_window, - tu, - base_offset, - ); - iter.map(|(offset, len)| [offset as IdxSize, len]) - .collect_trusted::>() - }) - .collect::>(); + let vals = POOL.install(|| { + thread_offsets + .par_iter() + .copied() + .map(|(base_offset, len)| { + let upper_bound = base_offset + len; + let iter = groupby_values_iter_full_lookbehind( + period, + offset, + &time[..upper_bound], + closed_window, + tu, + base_offset, + ); + iter.map(|(offset, len)| [offset as IdxSize, len]) + .collect_trusted::>() + }) + .collect::>() + }); flatten(&vals, Some(time.len())) } // window is completely behind t and t itself is not a member @@ -463,25 +465,27 @@ pub fn groupby_values( iter.map(|(offset, len)| [offset, len]).collect_trusted() } } else { - let vals = thread_offsets - .par_iter() - .copied() - .map(|(base_offset, len)| { - let lower_bound = base_offset; - let upper_bound = base_offset + len; - let iter = groupby_values_iter_full_lookahead( - period, - offset, - time, - closed_window, - tu, - lower_bound, - Some(upper_bound), - ); - iter.map(|(offset, len)| [offset as IdxSize, len]) - .collect_trusted::>() - }) - .collect::>(); + let vals = POOL.install(|| { + thread_offsets + .par_iter() + .copied() + .map(|(base_offset, len)| { + let lower_bound = base_offset; + let upper_bound = base_offset + len; + let iter = groupby_values_iter_full_lookahead( + period, + offset, + time, + closed_window, + tu, + lower_bound, + Some(upper_bound), + ); + iter.map(|(offset, len)| [offset as IdxSize, len]) + .collect_trusted::>() + }) + .collect::>() + }); flatten(&vals, Some(time.len())) } }