Skip to content

Commit

Permalink
Merge pull request #36 from orxfun/downstream-concurrency-support
Browse files Browse the repository at this point in the history
downstream-concurrency-support
  • Loading branch information
orxfun authored Jul 25, 2024
2 parents 9f9c3d3 + d1725ad commit 87dedd3
Showing 6 changed files with 132 additions and 27 deletions.
18 changes: 8 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "orx-parallel"
version = "1.1.0"
version = "1.3.0"
edition = "2021"
authors = ["orxfun <orx.ugur.arikan@gmail.com>"]
description = "A performant and configurable parallel computing library for computations defined as compositions of iterator methods."
@@ -10,14 +10,15 @@ keywords = ["parallel", "concurrency", "performance", "thread", "iterator"]
categories = ["concurrency", "algorithms"]

[dependencies]
orx-concurrent-bag = "1.16"
orx-pseudo-default = "1.0"
orx-concurrent-iter = "1.22"
orx-concurrent-ordered-bag = "1.3"
orx-fixed-vec = "2.12"
orx-pinned-concurrent-col = "1.6"
orx-pinned-vec = "2.12"
orx-priority-queue = "1.2"
orx-split-vec = "2.14"
orx-pinned-vec = "3.0"
orx-fixed-vec = "3.1"
orx-split-vec = "3.1"
orx-pinned-concurrent-col = "2.1"
orx-concurrent-bag = "2.1"
orx-concurrent-ordered-bag = "2.1"

[dev-dependencies]
chrono = "0.4.38"
@@ -30,6 +31,3 @@ test-case = "3.3.1"
[[bench]]
name = "map_collect"
harness = false

[features]
wdefault = []
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ let select = |output: &Output| output.0.is_power_of_two();

let inputs = || (0..1024).map(|x| Input(x.to_string())).collect::<Vec<_>>();

// sequential computation with regular iterator
let seq_result: usize = inputs()
.into_iter()
.map(compute)
@@ -28,6 +29,7 @@ let seq_result: usize = inputs()
.sum();
assert_eq!(seq_result, 286);

// parallel computation with Par
let par_result = inputs()
.into_par() // parallelize with default settings
.map(compute)
@@ -127,6 +129,56 @@ fn execute<C: Par<Item = Output>>(computation: C) -> Vec<Output> {
}
```

This features saves us from defining the same computation twice. We are often required to write code like below where we need to run sequentially or in parallel depending on an input argument. This is repetitive, error-prone and difficult to maintain.

```rust
use orx_parallel::prelude::*;
struct Input(String);
struct Output(usize);
fn compute(input: Input) -> Output {
Output(input.0.len())
}
fn select(output: &Output) -> bool {
output.0.is_power_of_two()
}

fn execute_conditionally(inputs: impl Iterator<Item = Input>, parallelize: bool) -> usize {
match parallelize {
true => inputs
.into_iter()
.par()
.map(compute)
.filter(select)
.map(|x| x.0)
.sum(),
false => inputs
.into_iter()
.map(compute)
.filter(select)
.map(|x| x.0)
.sum(),
}
}
```

Using `Par`, we can have a single version which will not have any overhead when executed sequentially.

```rust
fn execute_unified(inputs: impl Iterator<Item = Input>, parallelize: bool) -> usize {
let num_threads = match parallelize {
true => NumThreads::Auto,
false => NumThreads::sequential(),
};
inputs
.par()
.num_threads(num_threads)
.map(compute)
.filter(select)
.map(|x| x.0)
.sum()
}
```

## Underlying Approach & Performance

This crate has developed as a natural follow up of the [`ConcurrentIter`](https://crates.io/crates/orx-concurrent-iter). You may already find example parallel map, fold and find implementations in the examples. Especially when combined with concurrent collections such as [`ConcurrentBag`](https://crates.io/crates/orx-concurrent-bag) and [`ConcurrentOrderedBag`](https://crates.io/crates/orx-concurrent-ordered-bag), implementation of parallel computation has been very straightforward. You may find some details in this [section](https://github.com/orxfun/orx-parallel/blob/main/docs/RelationToRayon.md) and this [discussion](https://github.com/orxfun/orx-parallel/discussions/26).
8 changes: 4 additions & 4 deletions src/core/map_col.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ use super::runner::{ParTask, Runner};
use crate::Params;
use orx_concurrent_iter::ConcurrentIter;
use orx_concurrent_ordered_bag::ConcurrentOrderedBag;
use orx_fixed_vec::PinnedVec;
use orx_pinned_vec::IntoConcurrentPinnedVec;

pub fn map_col<I, Out, Map, P>(
params: Params,
@@ -14,7 +14,7 @@ where
I: ConcurrentIter,
Out: Send + Sync,
Map: Fn(I::Item) -> Out + Send + Sync,
P: PinnedVec<Out>,
P: IntoConcurrentPinnedVec<Out>,
{
match params.is_sequential() {
true => seq_map_col(iter, map, collected),
@@ -37,7 +37,7 @@ fn task<I, Out, Map, P>(
I: ConcurrentIter,
Out: Send + Sync,
Map: Fn(I::Item) -> Out + Send + Sync,
P: PinnedVec<Out>,
P: IntoConcurrentPinnedVec<Out>,
{
match chunk_size {
1 => {
@@ -59,7 +59,7 @@ where
I: ConcurrentIter,
Out: Send + Sync,
Map: Fn(I::Item) -> Out + Send + Sync,
P: PinnedVec<Out>,
P: IntoConcurrentPinnedVec<Out>,
{
let mut output = unsafe { collected.into_inner().unwrap_only_if_counts_match() };
let iter = iter.into_seq_iter();
61 changes: 61 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
//!
//! let inputs = || (0..1024).map(|x| Input(x.to_string())).collect::<Vec<_>>();
//!
//! // sequential computation with regular iterator
//! let seq_result: usize = inputs()
//! .into_iter()
//! .map(compute)
@@ -28,6 +29,7 @@
//! .sum();
//! assert_eq!(seq_result, 286);
//!
//! // parallel computation with Par
//! let par_result = inputs()
//! .into_par() // parallelize with default settings
//! .map(compute)
@@ -127,6 +129,65 @@
//! }
//! ```
//!
//! This features saves us from defining the same computation twice. We are often required to write code like below where we need to run sequentially or in parallel depending on an input argument. This is repetitive, error-prone and difficult to maintain.
//!
//! ```rust
//! use orx_parallel::prelude::*;
//! struct Input(String);
//! struct Output(usize);
//! fn compute(input: Input) -> Output {
//! Output(input.0.len())
//! }
//! fn select(output: &Output) -> bool {
//! output.0.is_power_of_two()
//! }
//!
//! fn execute_conditionally(inputs: impl Iterator<Item = Input>, parallelize: bool) -> usize {
//! match parallelize {
//! true => inputs
//! .into_iter()
//! .par()
//! .map(compute)
//! .filter(select)
//! .map(|x| x.0)
//! .sum(),
//! false => inputs
//! .into_iter()
//! .map(compute)
//! .filter(select)
//! .map(|x| x.0)
//! .sum(),
//! }
//! }
//! ```
//!
//! Using `Par`, we can have a single version which will not have any overhead when executed sequentially.
//!
//! ```rust
//! # use orx_parallel::prelude::*;
//! # struct Input(String);
//! # struct Output(usize);
//! # fn compute(input: Input) -> Output {
//! # Output(input.0.len())
//! # }
//! # fn select(output: &Output) -> bool {
//! # output.0.is_power_of_two()
//! # }
//! fn execute_unified(inputs: impl Iterator<Item = Input>, parallelize: bool) -> usize {
//! let num_threads = match parallelize {
//! true => NumThreads::Auto,
//! false => NumThreads::sequential(),
//! };
//! inputs
//! .par()
//! .num_threads(num_threads)
//! .map(compute)
//! .filter(select)
//! .map(|x| x.0)
//! .sum()
//! }
//! ```
//!
//! ## Underlying Approach & Performance
//!
//! This crate has developed as a natural follow up of the [`ConcurrentIter`](https://crates.io/crates/orx-concurrent-iter). You may already find example parallel map, fold and find implementations in the examples. Especially when combined with concurrent collections such as [`ConcurrentBag`](https://crates.io/crates/orx-concurrent-bag) and [`ConcurrentOrderedBag`](https://crates.io/crates/orx-concurrent-ordered-bag), implementation of parallel computation has been very straightforward. You may find some details in this [section](https://github.com/orxfun/orx-parallel/blob/main/docs/RelationToRayon.md) and this [discussion](https://github.com/orxfun/orx-parallel/discussions/26).
4 changes: 2 additions & 2 deletions src/par/collect_into/collect_into_core.rs
Original file line number Diff line number Diff line change
@@ -7,10 +7,10 @@ use crate::{
};
use orx_concurrent_bag::ConcurrentBag;
use orx_concurrent_iter::ConcurrentIter;
use orx_pinned_vec::PinnedVec;
use orx_pinned_vec::IntoConcurrentPinnedVec;

pub trait ParCollectIntoCore<O: Send + Sync> {
type BridgePinnedVec: PinnedVec<O>;
type BridgePinnedVec: IntoConcurrentPinnedVec<O>;

/// Performs the parallel map operation, collecting the results into this collection.
fn map_into<I, M>(self, par_map: ParMap<I, O, M>) -> Self
16 changes: 5 additions & 11 deletions src/par/collect_into/split_vec.rs
Original file line number Diff line number Diff line change
@@ -16,9 +16,9 @@ use orx_concurrent_bag::ConcurrentBag;
use orx_concurrent_ordered_bag::ConcurrentOrderedBag;
use orx_split_vec::*;

impl<O: Send + Sync, G: Growth> ParCollectInto<O> for SplitVec<O, G> {}
impl<O: Send + Sync, G: GrowthWithConstantTimeAccess> ParCollectInto<O> for SplitVec<O, G> {}

impl<O: Send + Sync, G: Growth> ParCollectIntoCore<O> for SplitVec<O, G> {
impl<O: Send + Sync, G: GrowthWithConstantTimeAccess> ParCollectIntoCore<O> for SplitVec<O, G> {
type BridgePinnedVec = Self;

fn map_into<I, M>(mut self, par_map: ParMap<I, O, M>) -> Self
@@ -27,15 +27,9 @@ impl<O: Send + Sync, G: Growth> ParCollectIntoCore<O> for SplitVec<O, G> {
M: Fn(I::Item) -> O + Send + Sync + Clone,
{
match par_map.iter_len() {
None => {
self.try_reserve_maximum_concurrent_capacity(1 << 32)
.expect("Failed to reserve sufficient capacity");
}
Some(len) => {
self.try_reserve_maximum_concurrent_capacity(self.len() + len)
.expect("Failed to reserve sufficient capacity");
}
}
None => self.reserve_maximum_concurrent_capacity(1 << 32),
Some(len) => self.reserve_maximum_concurrent_capacity(self.len() + len),
};
let bag: ConcurrentOrderedBag<_, _> = self.into();
let (params, iter, map) = par_map.destruct();
map_col(params, iter, map, bag)

0 comments on commit 87dedd3

Please sign in to comment.