Skip to content

Commit

Permalink
Add optional single-threaded feature to bevy_ecs/bevy_tasks (#6690)
Browse files Browse the repository at this point in the history
# Objective
Fixes #6689.

## Solution
Add `single-threaded` as an optional non-default feature to `bevy_ecs`
and `bevy_tasks` that:
 
 - disable the `ParallelExecutor` as a default runner
 - disables the multi-threaded `TaskPool`
- internally replace `QueryParIter::for_each` calls with
`Query::for_each`.

Removed the `Mutex` and `Arc` usage in the single-threaded task pool.


![image](https://user-images.githubusercontent.com/3137680/202833253-dd2d520f-75e6-4c7b-be2d-5ce1523cbd38.png)

## Future Work/TODO
Create type aliases for `Mutex`, `Arc` that change to single-threaaded
equivalents where possible.

---

## Changelog
Added: Optional default feature `multi-theaded` to that enables
multithreaded parallelism in the engine. Disabling it disables all
multithreading in exchange for higher single threaded performance. Does
nothing on WASM targets.

---------

Co-authored-by: Carter Anderson <mcanders1@gmail.com>
  • Loading branch information
james7132 and cart authored Jul 9, 2023
1 parent a1feab9 commit d33f5c7
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 47 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ default = [
"bevy_sprite",
"bevy_text",
"bevy_ui",
"multi-threaded",
"png",
"hdr",
"ktx2",
Expand Down Expand Up @@ -199,6 +200,9 @@ filesystem_watcher = ["bevy_internal/filesystem_watcher"]
# Enable serialization support through serde
serialize = ["bevy_internal/serialize"]

# Enables multithreaded parallelism in the engine. Disabling it forces all engine tasks to run on a single thread.
multi-threaded = ["bevy_internal/multi-threaded"]

# Wayland display server support
wayland = ["bevy_internal/wayland"]

Expand Down
3 changes: 2 additions & 1 deletion crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ categories = ["game-engines", "data-structures"]

[features]
trace = []
default = ["bevy_reflect"]
multi-threaded = ["bevy_tasks/multi-threaded"]
default = ["bevy_reflect", "multi-threaded"]

[dependencies]
bevy_ptr = { path = "../bevy_ptr", version = "0.11.0-dev" }
Expand Down
40 changes: 27 additions & 13 deletions crates/bevy_ecs/src/query/par_iter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{component::Tick, world::unsafe_world_cell::UnsafeWorldCell};
use bevy_tasks::ComputeTaskPool;
use std::ops::Range;

use super::{QueryItem, QueryState, ROQueryItem, ReadOnlyWorldQuery, WorldQuery};
Expand Down Expand Up @@ -34,6 +33,8 @@ pub struct BatchingStrategy {
/// increase the scheduling overhead for the iteration.
///
/// Defaults to 1.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
pub batches_per_thread: usize,
}

Expand Down Expand Up @@ -148,23 +149,36 @@ impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> {
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
unsafe fn for_each_unchecked<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(&self, func: FN) {
let thread_count = ComputeTaskPool::get().thread_num();
if thread_count <= 1 {
#[cfg(any(target = "wasm32", not(feature = "multi-threaded")))]
{
self.state
.for_each_unchecked_manual(self.world, func, self.last_run, self.this_run);
} else {
// Need a batch size of at least 1.
let batch_size = self.get_batch_size(thread_count).max(1);
self.state.par_for_each_unchecked_manual(
self.world,
batch_size,
func,
self.last_run,
self.this_run,
);
}
#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
{
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
if thread_count <= 1 {
self.state.for_each_unchecked_manual(
self.world,
func,
self.last_run,
self.this_run,
);
} else {
// Need a batch size of at least 1.
let batch_size = self.get_batch_size(thread_count).max(1);
self.state.par_for_each_unchecked_manual(
self.world,
batch_size,
func,
self.last_run,
self.this_run,
);
}
}
}

#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
fn get_batch_size(&self, thread_count: usize) -> usize {
if self.batching_strategy.batch_size_limits.is_empty() {
return self.batching_strategy.batch_size_limits.start;
Expand Down
6 changes: 4 additions & 2 deletions crates/bevy_ecs/src/query/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::{
storage::{TableId, TableRow},
world::{unsafe_world_cell::UnsafeWorldCell, World, WorldId},
};
use bevy_tasks::ComputeTaskPool;
#[cfg(feature = "trace")]
use bevy_utils::tracing::Instrument;
use fixedbitset::FixedBitSet;
Expand Down Expand Up @@ -1031,6 +1030,9 @@ impl<Q: WorldQuery, F: ReadOnlyWorldQuery> QueryState<Q, F> {
/// have unique access to the components they query.
/// This does not validate that `world.id()` matches `self.world_id`. Calling this on a `world`
/// with a mismatched [`WorldId`] is unsound.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
pub(crate) unsafe fn par_for_each_unchecked_manual<
'w,
FN: Fn(Q::Item<'w>) + Send + Sync + Clone,
Expand All @@ -1044,7 +1046,7 @@ impl<Q: WorldQuery, F: ReadOnlyWorldQuery> QueryState<Q, F> {
) {
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
// QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual
ComputeTaskPool::get().scope(|scope| {
bevy_tasks::ComputeTaskPool::get().scope(|scope| {
if Q::IS_DENSE && F::IS_DENSE {
// SAFETY: We only access table data that has been registered in `self.archetype_component_access`.
let tables = &world.storages().tables;
Expand Down
4 changes: 2 additions & 2 deletions crates/bevy_ecs/src/schedule/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ pub enum ExecutorKind {
///
/// Useful if you're dealing with a single-threaded environment, saving your threads for
/// other things, or just trying minimize overhead.
#[cfg_attr(target_arch = "wasm32", default)]
#[cfg_attr(any(target_arch = "wasm32", not(feature = "multi-threaded")), default)]
SingleThreaded,
/// Like [`SingleThreaded`](ExecutorKind::SingleThreaded) but calls [`apply_deferred`](crate::system::System::apply_deferred)
/// immediately after running each system.
Simple,
/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
#[cfg_attr(not(target_arch = "wasm32"), default)]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "multi-threaded"), default)]
MultiThreaded,
}

Expand Down
1 change: 1 addition & 0 deletions crates/bevy_internal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ shader_format_spirv = ["bevy_render/shader_format_spirv"]
filesystem_watcher = ["bevy_asset/filesystem_watcher"]

serialize = ["bevy_core/serialize", "bevy_input/serialize", "bevy_time/serialize", "bevy_window/serialize", "bevy_transform/serialize", "bevy_math/serialize", "bevy_scene/serialize"]
multi-threaded = ["bevy_ecs/multi-threaded", "bevy_tasks/multi-threaded"]

# Display server protocol support (X11 is enabled by default)
wayland = ["bevy_winit/wayland"]
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_internal/src/default_plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl PluginGroup for DefaultPlugins {
// compressed texture formats
.add(bevy_render::texture::ImagePlugin::default());

#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))]
{
group = group
.add(bevy_render::pipelined_rendering::PipelinedRenderingPlugin::default());
Expand Down
4 changes: 4 additions & 0 deletions crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ repository = "https://github.com/bevyengine/bevy"
license = "MIT OR Apache-2.0"
keywords = ["bevy"]

[features]
multi-threaded = []
default = ["multi-threaded"]

[dependencies]
futures-lite = "1.4.0"
async-executor = "1.3.0"
Expand Down
8 changes: 4 additions & 4 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ pub use slice::{ParallelSlice, ParallelSliceMut};
mod task;
pub use task::Task;

#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))]
mod task_pool;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))]
pub use task_pool::{Scope, TaskPool, TaskPoolBuilder};

#[cfg(target_arch = "wasm32")]
#[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))]
mod single_threaded_task_pool;
#[cfg(target_arch = "wasm32")]
#[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))]
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor};

mod usages;
Expand Down
75 changes: 51 additions & 24 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{
future::Future,
marker::PhantomData,
mem,
sync::{Arc, Mutex},
};
#[cfg(target_arch = "wasm32")]
use std::sync::Arc;
use std::{cell::RefCell, future::Future, marker::PhantomData, mem, rc::Rc};

thread_local! {
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new();
}

/// Used to create a TaskPool
#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -76,7 +77,7 @@ impl TaskPool {
1
}

/// Allows spawning non-`static futures on the thread pool. The function takes a callback,
/// Allows spawning non-'static futures on the thread pool. The function takes a callback,
/// passing a scope object into it. The scope object provided to the callback can be used
/// to spawn tasks. This function will await the completion of all tasks before returning.
///
Expand Down Expand Up @@ -108,8 +109,9 @@ impl TaskPool {
let executor: &'env async_executor::LocalExecutor<'env> =
unsafe { mem::transmute(executor) };

let results: Mutex<Vec<Arc<Mutex<Option<T>>>>> = Mutex::new(Vec::new());
let results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>> = unsafe { mem::transmute(&results) };
let results: RefCell<Vec<Rc<RefCell<Option<T>>>>> = RefCell::new(Vec::new());
let results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>> =
unsafe { mem::transmute(&results) };

let mut scope = Scope {
executor,
Expand All @@ -125,29 +127,36 @@ impl TaskPool {
// Loop until all tasks are done
while executor.try_tick() {}

let results = scope.results.lock().unwrap();
let results = scope.results.borrow();
results
.iter()
.map(|result| result.lock().unwrap().take().unwrap())
.map(|result| result.borrow_mut().take().unwrap())
.collect()
}

/// Spawns a static future onto the JS event loop. For now it is returning FakeTask
/// instance with no-op detach method. Returning real Task is possible here, but tricky:
/// future is running on JS event loop, Task is running on async_executor::LocalExecutor
/// so some proxy future is needed. Moreover currently we don't have long-living
/// LocalExecutor here (above `spawn` implementation creates temporary one)
/// But for typical use cases it seems that current implementation should be sufficient:
/// caller can spawn long-running future writing results to some channel / event queue
/// and simply call detach on returned Task (like AssetServer does) - spawned future
/// can write results to some channel / event queue.
/// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
/// cancelled and "detached" allowing it to continue running without having to be polled by the
/// end-user.
///
/// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
where
T: 'static,
{
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move {
future.await;
});

#[cfg(not(target_arch = "wasm32"))]
{
LOCAL_EXECUTOR.with(|executor| {
let _task = executor.spawn(future);
// Loop until all tasks are done
while executor.try_tick() {}
});
}

FakeTask
}

Expand All @@ -158,6 +167,24 @@ impl TaskPool {
{
self.spawn(future)
}

/// Runs a function with the local executor. Typically used to tick
/// the local executor on the main thread as it needs to share time with
/// other things.
///
/// ```rust
/// use bevy_tasks::TaskPool;
///
/// TaskPool::new().with_local_executor(|local_executor| {
/// local_executor.try_tick();
/// });
/// ```
pub fn with_local_executor<F, R>(&self, f: F) -> R
where
F: FnOnce(&async_executor::LocalExecutor) -> R,
{
LOCAL_EXECUTOR.with(f)
}
}

#[derive(Debug)]
Expand All @@ -175,7 +202,7 @@ impl FakeTask {
pub struct Scope<'scope, 'env: 'scope, T> {
executor: &'env async_executor::LocalExecutor<'env>,
// Vector to gather results of all futures spawned during scope run
results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>>,
results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>>,

// make `Scope` invariant over 'scope and 'env
scope: PhantomData<&'scope mut &'scope ()>,
Expand Down Expand Up @@ -211,10 +238,10 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_on_scope<Fut: Future<Output = T> + 'env>(&self, f: Fut) {
let result = Arc::new(Mutex::new(None));
self.results.lock().unwrap().push(result.clone());
let result = Rc::new(RefCell::new(None));
self.results.borrow_mut().push(result.clone());
let f = async move {
result.lock().unwrap().replace(f.await);
result.borrow_mut().replace(f.await);
};
self.executor.spawn(f).detach();
}
Expand Down
1 change: 1 addition & 0 deletions docs/cargo_features.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The default feature set enables most of the expected features of a game engine,
|filesystem_watcher|Enable watching file system for asset hot reload|
|hdr|HDR image format support|
|ktx2|KTX2 compressed texture support|
|multi-threaded|Enables multithreaded parallelism in the engine. Disabling it forces all engine tasks to run on a single thread.|
|png|PNG image format support|
|tonemapping_luts|Include tonemapping Look Up Tables KTX2 files|
|vorbis|OGG/VORBIS audio format support|
Expand Down

0 comments on commit d33f5c7

Please sign in to comment.