Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a fallback when threading is unsupported #1019

Merged
merged 6 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,24 @@ jobs:
- run: cargo test --verbose --package rayon
- run: cargo test --verbose --package rayon-core

# wasm won't actually work without threading, but it builds
# wasm32-unknown-unknown builds, and even has the runtime fallback for
# unsupported threading, but we don't have an environment to execute in.
# wasm32-wasi can test the fallback by running in wasmtime.
wasm:
name: WebAssembly
runs-on: ubuntu-latest
env:
CARGO_TARGET_WASM32_WASI_RUNNER: /home/runner/.wasmtime/bin/wasmtime
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable
with:
target: wasm32-unknown-unknown
targets: wasm32-unknown-unknown,wasm32-wasi
- run: cargo check --verbose --target wasm32-unknown-unknown
- run: cargo check --verbose --target wasm32-wasi
- run: curl https://wasmtime.dev/install.sh -sSf | bash
- run: cargo test --verbose --target wasm32-wasi --package rayon
- run: cargo test --verbose --target wasm32-wasi --package rayon-core

fmt:
name: Format
Expand Down
46 changes: 46 additions & 0 deletions rayon-core/src/broadcast/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ fn broadcast_global() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_global() {
let (tx, rx) = crossbeam_channel::unbounded();
crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
Expand All @@ -22,13 +23,15 @@ fn spawn_broadcast_global() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_pool() {
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
let v = pool.broadcast(|ctx| ctx.index());
assert!(v.into_iter().eq(0..7));
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_pool() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
Expand All @@ -40,13 +43,15 @@ fn spawn_broadcast_pool() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_self() {
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
assert!(v.into_iter().eq(0..7));
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_self() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
Expand All @@ -58,6 +63,7 @@ fn spawn_broadcast_self() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_mutual() {
let count = AtomicUsize::new(0);
let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
Expand All @@ -73,6 +79,7 @@ fn broadcast_mutual() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
Expand All @@ -90,6 +97,7 @@ fn spawn_broadcast_mutual() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_mutual_sleepy() {
let count = AtomicUsize::new(0);
let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
Expand All @@ -108,6 +116,7 @@ fn broadcast_mutual_sleepy() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual_sleepy() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
Expand All @@ -130,6 +139,7 @@ fn spawn_broadcast_mutual_sleepy() {
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn broadcast_panic_one() {
let count = AtomicUsize::new(0);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
Expand All @@ -146,6 +156,7 @@ fn broadcast_panic_one() {
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_one() {
let (tx, rx) = crossbeam_channel::unbounded();
let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
Expand All @@ -166,6 +177,7 @@ fn spawn_broadcast_panic_one() {
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn broadcast_panic_many() {
let count = AtomicUsize::new(0);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
Expand All @@ -182,6 +194,7 @@ fn broadcast_panic_many() {
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_many() {
let (tx, rx) = crossbeam_channel::unbounded();
let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
Expand All @@ -202,6 +215,7 @@ fn spawn_broadcast_panic_many() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_sleep_race() {
let test_duration = time::Duration::from_secs(1);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
Expand All @@ -214,3 +228,35 @@ fn broadcast_sleep_race() {
});
}
}

#[test]
fn broadcast_after_spawn_broadcast() {
let (tx, rx) = crossbeam_channel::unbounded();

// Queue a non-blocking spawn_broadcast.
crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());

// This blocking broadcast runs after all prior broadcasts.
crate::broadcast(|_| {});

// The spawn_broadcast **must** have run by now on all threads.
let mut v: Vec<_> = rx.try_iter().collect();
v.sort_unstable();
assert!(v.into_iter().eq(0..crate::current_num_threads()));
}

#[test]
fn broadcast_after_spawn() {
let (tx, rx) = crossbeam_channel::bounded(1);

// Queue a regular spawn on a thread-local deque.
crate::registry::in_worker(move |_, _| {
crate::spawn(move || tx.send(22).unwrap());
});

// Broadcast runs after the local deque is empty.
crate::broadcast(|_| {});

// The spawn **must** have run by now.
assert_eq!(22, rx.try_recv().unwrap());
}
6 changes: 6 additions & 0 deletions rayon-core/src/join/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ fn sort() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn sort_in_pool() {
let rng = seeded_rng();
let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect();
Expand Down Expand Up @@ -77,6 +78,7 @@ fn panic_propagate_both() {
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_b_still_executes() {
let mut x = false;
match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) {
Expand All @@ -86,6 +88,7 @@ fn panic_b_still_executes() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_both() {
// If we're not in a pool, both should be marked stolen as they're injected.
let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated());
Expand All @@ -94,6 +97,7 @@ fn join_context_both() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_neither() {
// If we're already in a 1-thread pool, neither job should be stolen.
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
Expand All @@ -104,6 +108,7 @@ fn join_context_neither() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_second() {
use std::sync::Barrier;

Expand All @@ -127,6 +132,7 @@ fn join_context_second() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_counter_overflow() {
const MAX: u32 = 500_000;

Expand Down
22 changes: 21 additions & 1 deletion rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,23 @@
//! [`join()`]: struct.ThreadPool.html#method.join
//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
//!
//! ## Restricting multiple versions
//! # Global fallback when threading is unsupported
//!
//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
//! targets are notable examples of this. Rather than panicking on the unsupported error when
//! creating the implicit global threadpool, Rayon configures a fallback mode instead.
//!
//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
//! there is no other thread to share the work. However, since the pool is not running independent
//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
//! anything like thread preemption or `async` task switching.
//!
//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
//!
//! # Restricting multiple versions
//!
//! In order to ensure proper coordination between threadpools, and especially
//! to make sure there's only one global threadpool, `rayon-core` is actively
Expand Down Expand Up @@ -707,6 +723,10 @@ impl ThreadPoolBuildError {
fn new(kind: ErrorKind) -> ThreadPoolBuildError {
ThreadPoolBuildError { kind }
}

fn is_unsupported(&self) -> bool {
matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
}
}

const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
Expand Down
76 changes: 59 additions & 17 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl ThreadBuilder {
/// Executes the main loop for this thread. This will not return until the
/// thread pool is dropped.
pub fn run(self) {
unsafe { main_loop(self.worker, self.stealer, self.registry, self.index) }
unsafe { main_loop(self) }
}
}

Expand Down Expand Up @@ -164,7 +164,7 @@ static THE_REGISTRY_SET: Once = Once::new();
/// initialization has not already occurred, use the default
/// configuration.
pub(super) fn global_registry() -> &'static Arc<Registry> {
set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
set_global_registry(default_global_registry)
.or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
.expect("The global thread pool has not been initialized.")
}
Expand Down Expand Up @@ -198,6 +198,46 @@ where
result
}

fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
let result = Registry::new(ThreadPoolBuilder::new());

// If we're running in an environment that doesn't support threads at all, we can fall back to
// using the current thread alone. This is crude, and probably won't work for non-blocking
// calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
//
// Notably, this allows current WebAssembly targets to work even though their threading support
// is stubbed out, and we won't have to change anything if they do add real threading.
let unsupported = matches!(&result, Err(e) if e.is_unsupported());
if unsupported && WorkerThread::current().is_null() {
let builder = ThreadPoolBuilder::new()
.num_threads(1)
.spawn_handler(|thread| {
// Rather than starting a new thread, we're just taking over the current thread
// *without* running the main loop, so we can still return from here.
// The WorkerThread is leaked, but we never shutdown the global pool anyway.
let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
let registry = &*worker_thread.registry;
let index = worker_thread.index;

unsafe {
WorkerThread::set_current(worker_thread);

// let registry know we are ready to do work
Latch::set(&registry.thread_infos[index].primed);
}

Ok(())
});

let fallback_result = Registry::new(builder);
if fallback_result.is_ok() {
return fallback_result;
}
}

result
}

struct Terminator<'a>(&'a Arc<Registry>);

impl<'a> Drop for Terminator<'a> {
Expand Down Expand Up @@ -655,6 +695,19 @@ thread_local! {
static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
}

impl From<ThreadBuilder> for WorkerThread {
fn from(thread: ThreadBuilder) -> Self {
Self {
worker: thread.worker,
stealer: thread.stealer,
fifo: JobFifo::new(),
index: thread.index,
rng: XorShift64Star::new(),
registry: thread.registry,
}
}
}

impl Drop for WorkerThread {
fn drop(&mut self) {
// Undo `set_current`
Expand Down Expand Up @@ -851,22 +904,11 @@ impl WorkerThread {

/// ////////////////////////////////////////////////////////////////////////

unsafe fn main_loop(
worker: Worker<JobRef>,
stealer: Stealer<JobRef>,
registry: Arc<Registry>,
index: usize,
) {
let worker_thread = &WorkerThread {
worker,
stealer,
fifo: JobFifo::new(),
index,
rng: XorShift64Star::new(),
registry,
};
unsafe fn main_loop(thread: ThreadBuilder) {
let worker_thread = &WorkerThread::from(thread);
WorkerThread::set_current(worker_thread);
let registry = &*worker_thread.registry;
let index = worker_thread.index;

// let registry know we are ready to do work
Latch::set(&registry.thread_infos[index].primed);
Expand Down Expand Up @@ -924,7 +966,7 @@ where
// invalidated until we return.
op(&*owner_thread, false)
} else {
global_registry().in_worker_cold(op)
global_registry().in_worker(op)
}
}
}
Expand Down
Loading