Skip to content

Commit

Permalink
fix(rust, python): remove uses of rayon global thread pool (#6682)
Browse files Browse the repository at this point in the history
  • Loading branch information
phaile2 committed Feb 5, 2023
1 parent 0d0337d commit 515514d
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 226 deletions.
44 changes: 23 additions & 21 deletions polars/polars-core/src/chunked_array/ndarray.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use ndarray::prelude::*;
use rayon::prelude::*;

use crate::prelude::*;
use crate::POOL;

impl<T> ChunkedArray<T>
where
Expand Down Expand Up @@ -98,32 +99,33 @@ impl DataFrame {
where
N: PolarsNumericType,
{
let columns = self
.get_columns()
.par_iter()
.map(|s| {
let s = s.cast(&N::get_dtype())?;
let s = match s.dtype() {
DataType::Float32 => {
let ca = s.f32().unwrap();
ca.none_to_nan().into_series()
}
DataType::Float64 => {
let ca = s.f64().unwrap();
ca.none_to_nan().into_series()
}
_ => s,
};
Ok(s.rechunk())
})
.collect::<PolarsResult<Vec<_>>>()?;
let columns = POOL.install(|| {
self.get_columns()
.par_iter()
.map(|s| {
let s = s.cast(&N::get_dtype())?;
let s = match s.dtype() {
DataType::Float32 => {
let ca = s.f32().unwrap();
ca.none_to_nan().into_series()
}
DataType::Float64 => {
let ca = s.f64().unwrap();
ca.none_to_nan().into_series()
}
_ => s,
};
Ok(s.rechunk())
})
.collect::<PolarsResult<Vec<_>>>()
})?;

let shape = self.shape();
let height = self.height();
let mut membuf = Vec::with_capacity(shape.0 * shape.1);
let ptr = membuf.as_ptr() as usize;

columns.par_iter().enumerate().map(|(col_idx, s)| {
POOL.install(||{columns.par_iter().enumerate().map(|(col_idx, s)| {
if s.null_count() != 0 {
return Err(PolarsError::ComputeError(
"Creation of ndarray with null values is not supported. Consider using floats and NaNs".into(),
Expand All @@ -148,7 +150,7 @@ impl DataFrame {
}

Ok(())
}).collect::<PolarsResult<Vec<_>>>()?;
}).collect::<PolarsResult<Vec<_>>>()})?;

// Safety:
// we have written all data, so we can now safely set length
Expand Down
22 changes: 12 additions & 10 deletions polars/polars-core/src/frame/groupby/hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ where
// We will create a hashtable in every thread.
// We use the hash to partition the keys to the matching hashtable.
// Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition.
let out = POOL
.install(|| {
(0..n_partitions).into_par_iter().map(|thread_no| {
let out = POOL.install(|| {
(0..n_partitions)
.into_par_iter()
.map(|thread_no| {
let mut hash_tbl: PlHashMap<T, (IdxSize, Vec<IdxSize>)> =
PlHashMap::with_capacity(HASHMAP_INIT_SIZE);

Expand Down Expand Up @@ -149,8 +150,8 @@ where
.map(|(_k, v)| v)
.collect_trusted::<Vec<_>>()
})
})
.collect::<Vec<_>>();
.collect::<Vec<_>>()
});
finish_group_order(out, sorted)
}

Expand Down Expand Up @@ -303,9 +304,10 @@ pub(crate) fn groupby_threaded_multiple_keys_flat(
// We will create a hashtable in every thread.
// We use the hash to partition the keys to the matching hashtable.
// Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition.
let groups = POOL
.install(|| {
(0..n_partitions).into_par_iter().map(|thread_no| {
let groups = POOL.install(|| {
(0..n_partitions)
.into_par_iter()
.map(|thread_no| {
let hashes = &hashes;

let mut hash_tbl: HashMap<IdxHash, (IdxSize, Vec<IdxSize>), IdBuildHasher> =
Expand Down Expand Up @@ -339,7 +341,7 @@ pub(crate) fn groupby_threaded_multiple_keys_flat(
}
hash_tbl.into_iter().map(|(_k, v)| v).collect::<Vec<_>>()
})
})
.collect::<Vec<_>>();
.collect::<Vec<_>>()
});
Ok(finish_group_order(groups, sorted))
}
66 changes: 34 additions & 32 deletions polars/polars-core/src/frame/hash_join/multiple_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,41 +38,43 @@ pub(crate) fn create_probe_table(
// We use the hash to partition the keys to the matching hashtable.
// Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition.
POOL.install(|| {
(0..n_partitions).into_par_iter().map(|part_no| {
let part_no = part_no as u64;
let mut hash_tbl: HashMap<IdxHash, Vec<IdxSize>, IdBuildHasher> =
HashMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, Default::default());

let n_partitions = n_partitions as u64;
let mut offset = 0;
for hashes in hashes {
for hashes in hashes.data_views() {
let len = hashes.len();
let mut idx = 0;
hashes.iter().for_each(|h| {
// partition hashes by thread no.
// So only a part of the hashes go to this hashmap
if this_partition(*h, part_no, n_partitions) {
let idx = idx + offset;
populate_multiple_key_hashmap(
&mut hash_tbl,
idx,
*h,
keys,
|| vec![idx],
|v| v.push(idx),
)
}
idx += 1;
});
(0..n_partitions)
.into_par_iter()
.map(|part_no| {
let part_no = part_no as u64;
let mut hash_tbl: HashMap<IdxHash, Vec<IdxSize>, IdBuildHasher> =
HashMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, Default::default());

let n_partitions = n_partitions as u64;
let mut offset = 0;
for hashes in hashes {
for hashes in hashes.data_views() {
let len = hashes.len();
let mut idx = 0;
hashes.iter().for_each(|h| {
// partition hashes by thread no.
// So only a part of the hashes go to this hashmap
if this_partition(*h, part_no, n_partitions) {
let idx = idx + offset;
populate_multiple_key_hashmap(
&mut hash_tbl,
idx,
*h,
keys,
|| vec![idx],
|v| v.push(idx),
)
}
idx += 1;
});

offset += len as IdxSize;
offset += len as IdxSize;
}
}
}
hash_tbl
})
hash_tbl
})
.collect()
})
.collect()
}

fn create_build_table_outer(
Expand Down
60 changes: 31 additions & 29 deletions polars/polars-core/src/frame/hash_join/single_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,45 @@ where
// We use the hash to partition the keys to the matching hashtable.
// Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition.
POOL.install(|| {
(0..n_partitions).into_par_iter().map(|partition_no| {
let partition_no = partition_no as u64;
(0..n_partitions)
.into_par_iter()
.map(|partition_no| {
let partition_no = partition_no as u64;

let mut hash_tbl: PlHashMap<T, Vec<IdxSize>> =
PlHashMap::with_capacity(HASHMAP_INIT_SIZE);
let mut hash_tbl: PlHashMap<T, Vec<IdxSize>> =
PlHashMap::with_capacity(HASHMAP_INIT_SIZE);

let n_partitions = n_partitions as u64;
let mut offset = 0;
for keys in &keys {
let keys = keys.as_ref();
let len = keys.len() as IdxSize;
let n_partitions = n_partitions as u64;
let mut offset = 0;
for keys in &keys {
let keys = keys.as_ref();
let len = keys.len() as IdxSize;

let mut cnt = 0;
keys.iter().for_each(|k| {
let idx = cnt + offset;
cnt += 1;
let mut cnt = 0;
keys.iter().for_each(|k| {
let idx = cnt + offset;
cnt += 1;

if this_partition(k.as_u64(), partition_no, n_partitions) {
let entry = hash_tbl.entry(*k);
if this_partition(k.as_u64(), partition_no, n_partitions) {
let entry = hash_tbl.entry(*k);

match entry {
Entry::Vacant(entry) => {
entry.insert(vec![idx]);
}
Entry::Occupied(mut entry) => {
let v = entry.get_mut();
v.push(idx);
match entry {
Entry::Vacant(entry) => {
entry.insert(vec![idx]);
}
Entry::Occupied(mut entry) => {
let v = entry.get_mut();
v.push(idx);
}
}
}
}
});
offset += len;
}
hash_tbl
})
});
offset += len;
}
hash_tbl
})
.collect()
})
.collect()
}

// we determine the offset so that we later know which index to store in the join tuples
Expand Down
80 changes: 41 additions & 39 deletions polars/polars-core/src/vector_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,49 +627,51 @@ where
// We use the hash to partition the keys to the matching hashtable.
// Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition.
POOL.install(|| {
(0..n_partitions).into_par_iter().map(|partition_no| {
let build_hasher = build_hasher.clone();
let hashes_and_keys = &hashes_and_keys;
let partition_no = partition_no as u64;
let mut hash_tbl: HashMap<T, (bool, Vec<IdxSize>), RandomState> =
HashMap::with_hasher(build_hasher);

let n_threads = n_partitions as u64;
let mut offset = 0;
for hashes_and_keys in hashes_and_keys {
let len = hashes_and_keys.len();
hashes_and_keys
.iter()
.enumerate()
.for_each(|(idx, (h, k))| {
let idx = idx as IdxSize;
// partition hashes by thread no.
// So only a part of the hashes go to this hashmap
if this_partition(*h, partition_no, n_threads) {
let idx = idx + offset;
let entry = hash_tbl
.raw_entry_mut()
// uses the key to check equality to find and entry
.from_key_hashed_nocheck(*h, k);

match entry {
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(*h, *k, (false, vec![idx]));
}
RawEntryMut::Occupied(mut entry) => {
let (_k, v) = entry.get_key_value_mut();
v.1.push(idx);
(0..n_partitions)
.into_par_iter()
.map(|partition_no| {
let build_hasher = build_hasher.clone();
let hashes_and_keys = &hashes_and_keys;
let partition_no = partition_no as u64;
let mut hash_tbl: HashMap<T, (bool, Vec<IdxSize>), RandomState> =
HashMap::with_hasher(build_hasher);

let n_threads = n_partitions as u64;
let mut offset = 0;
for hashes_and_keys in hashes_and_keys {
let len = hashes_and_keys.len();
hashes_and_keys
.iter()
.enumerate()
.for_each(|(idx, (h, k))| {
let idx = idx as IdxSize;
// partition hashes by thread no.
// So only a part of the hashes go to this hashmap
if this_partition(*h, partition_no, n_threads) {
let idx = idx + offset;
let entry = hash_tbl
.raw_entry_mut()
// uses the key to check equality to find and entry
.from_key_hashed_nocheck(*h, k);

match entry {
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(*h, *k, (false, vec![idx]));
}
RawEntryMut::Occupied(mut entry) => {
let (_k, v) = entry.get_key_value_mut();
v.1.push(idx);
}
}
}
}
});
});

offset += len as IdxSize;
}
hash_tbl
})
offset += len as IdxSize;
}
hash_tbl
})
.collect()
})
.collect()
}

pub(crate) fn create_hash_and_keys_threaded_vectorized<I, T>(
Expand Down
Loading

0 comments on commit 515514d

Please sign in to comment.