Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional single-threaded feature to bevy_ecs/bevy_tasks #6690

Merged
merged 20 commits into from
Jul 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ default = [
"bevy_sprite",
"bevy_text",
"bevy_ui",
"multi-threaded",
"png",
"hdr",
"ktx2",
Expand Down Expand Up @@ -199,6 +200,9 @@ filesystem_watcher = ["bevy_internal/filesystem_watcher"]
# Enable serialization support through serde
serialize = ["bevy_internal/serialize"]

# Enables multithreaded parallelism in the engine. Disabling it forces all engine tasks to run on a single thread.
multi-threaded = ["bevy_internal/multi-threaded"]

# Wayland display server support
wayland = ["bevy_internal/wayland"]

Expand Down
3 changes: 2 additions & 1 deletion crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ categories = ["game-engines", "data-structures"]

[features]
trace = []
default = ["bevy_reflect"]
multi-threaded = ["bevy_tasks/multi-threaded"]
default = ["bevy_reflect", "multi-threaded"]

[dependencies]
bevy_ptr = { path = "../bevy_ptr", version = "0.11.0-dev" }
Expand Down
40 changes: 27 additions & 13 deletions crates/bevy_ecs/src/query/par_iter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{component::Tick, world::unsafe_world_cell::UnsafeWorldCell};
use bevy_tasks::ComputeTaskPool;
use std::ops::Range;

use super::{QueryItem, QueryState, ROQueryItem, ReadOnlyWorldQuery, WorldQuery};
Expand Down Expand Up @@ -34,6 +33,8 @@ pub struct BatchingStrategy {
/// increase the scheduling overhead for the iteration.
///
/// Defaults to 1.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
pub batches_per_thread: usize,
}

Expand Down Expand Up @@ -148,23 +149,36 @@ impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> {
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
unsafe fn for_each_unchecked<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(&self, func: FN) {
let thread_count = ComputeTaskPool::get().thread_num();
if thread_count <= 1 {
#[cfg(any(target = "wasm32", not(feature = "multi-threaded")))]
{
self.state
.for_each_unchecked_manual(self.world, func, self.last_run, self.this_run);
} else {
// Need a batch size of at least 1.
let batch_size = self.get_batch_size(thread_count).max(1);
self.state.par_for_each_unchecked_manual(
self.world,
batch_size,
func,
self.last_run,
self.this_run,
);
}
#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
{
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
if thread_count <= 1 {
self.state.for_each_unchecked_manual(
self.world,
func,
self.last_run,
self.this_run,
);
} else {
// Need a batch size of at least 1.
let batch_size = self.get_batch_size(thread_count).max(1);
self.state.par_for_each_unchecked_manual(
self.world,
batch_size,
func,
self.last_run,
self.this_run,
);
}
}
}

#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
fn get_batch_size(&self, thread_count: usize) -> usize {
if self.batching_strategy.batch_size_limits.is_empty() {
return self.batching_strategy.batch_size_limits.start;
Expand Down
6 changes: 4 additions & 2 deletions crates/bevy_ecs/src/query/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::{
storage::{TableId, TableRow},
world::{unsafe_world_cell::UnsafeWorldCell, World, WorldId},
};
use bevy_tasks::ComputeTaskPool;
#[cfg(feature = "trace")]
use bevy_utils::tracing::Instrument;
use fixedbitset::FixedBitSet;
Expand Down Expand Up @@ -1031,6 +1030,9 @@ impl<Q: WorldQuery, F: ReadOnlyWorldQuery> QueryState<Q, F> {
/// have unique access to the components they query.
/// This does not validate that `world.id()` matches `self.world_id`. Calling this on a `world`
/// with a mismatched [`WorldId`] is unsound.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
pub(crate) unsafe fn par_for_each_unchecked_manual<
'w,
FN: Fn(Q::Item<'w>) + Send + Sync + Clone,
Expand All @@ -1044,7 +1046,7 @@ impl<Q: WorldQuery, F: ReadOnlyWorldQuery> QueryState<Q, F> {
) {
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
// QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual
ComputeTaskPool::get().scope(|scope| {
bevy_tasks::ComputeTaskPool::get().scope(|scope| {
if Q::IS_DENSE && F::IS_DENSE {
// SAFETY: We only access table data that has been registered in `self.archetype_component_access`.
let tables = &world.storages().tables;
Expand Down
4 changes: 2 additions & 2 deletions crates/bevy_ecs/src/schedule/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ pub enum ExecutorKind {
///
/// Useful if you're dealing with a single-threaded environment, saving your threads for
/// other things, or just trying minimize overhead.
#[cfg_attr(target_arch = "wasm32", default)]
#[cfg_attr(any(target_arch = "wasm32", not(feature = "multi-threaded")), default)]
SingleThreaded,
/// Like [`SingleThreaded`](ExecutorKind::SingleThreaded) but calls [`apply_deferred`](crate::system::System::apply_deferred)
/// immediately after running each system.
Simple,
/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
#[cfg_attr(not(target_arch = "wasm32"), default)]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "multi-threaded"), default)]
MultiThreaded,
}

Expand Down
1 change: 1 addition & 0 deletions crates/bevy_internal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ shader_format_spirv = ["bevy_render/shader_format_spirv"]
filesystem_watcher = ["bevy_asset/filesystem_watcher"]

serialize = ["bevy_core/serialize", "bevy_input/serialize", "bevy_time/serialize", "bevy_window/serialize", "bevy_transform/serialize", "bevy_math/serialize", "bevy_scene/serialize"]
multi-threaded = ["bevy_ecs/multi-threaded", "bevy_tasks/multi-threaded"]

# Display server protocol support (X11 is enabled by default)
wayland = ["bevy_winit/wayland"]
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_internal/src/default_plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl PluginGroup for DefaultPlugins {
// compressed texture formats
.add(bevy_render::texture::ImagePlugin::default());

#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))]
{
group = group
.add(bevy_render::pipelined_rendering::PipelinedRenderingPlugin::default());
Expand Down
4 changes: 4 additions & 0 deletions crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ repository = "https://github.com/bevyengine/bevy"
license = "MIT OR Apache-2.0"
keywords = ["bevy"]

[features]
multi-threaded = []
default = ["multi-threaded"]

[dependencies]
futures-lite = "1.4.0"
async-executor = "1.3.0"
Expand Down
8 changes: 4 additions & 4 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ pub use slice::{ParallelSlice, ParallelSliceMut};
mod task;
pub use task::Task;

#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))]
mod task_pool;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))]
pub use task_pool::{Scope, TaskPool, TaskPoolBuilder};

#[cfg(target_arch = "wasm32")]
#[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))]
mod single_threaded_task_pool;
#[cfg(target_arch = "wasm32")]
#[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))]
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor};

mod usages;
Expand Down
75 changes: 51 additions & 24 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{
future::Future,
marker::PhantomData,
mem,
sync::{Arc, Mutex},
};
#[cfg(target_arch = "wasm32")]
use std::sync::Arc;
use std::{cell::RefCell, future::Future, marker::PhantomData, mem, rc::Rc};

thread_local! {
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new();
}

/// Used to create a TaskPool
#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -76,7 +77,7 @@ impl TaskPool {
1
}

/// Allows spawning non-`static futures on the thread pool. The function takes a callback,
/// Allows spawning non-'static futures on the thread pool. The function takes a callback,
/// passing a scope object into it. The scope object provided to the callback can be used
/// to spawn tasks. This function will await the completion of all tasks before returning.
///
Expand Down Expand Up @@ -108,8 +109,9 @@ impl TaskPool {
let executor: &'env async_executor::LocalExecutor<'env> =
unsafe { mem::transmute(executor) };

let results: Mutex<Vec<Arc<Mutex<Option<T>>>>> = Mutex::new(Vec::new());
let results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>> = unsafe { mem::transmute(&results) };
let results: RefCell<Vec<Rc<RefCell<Option<T>>>>> = RefCell::new(Vec::new());
let results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>> =
unsafe { mem::transmute(&results) };

let mut scope = Scope {
executor,
Expand All @@ -125,29 +127,36 @@ impl TaskPool {
// Loop until all tasks are done
while executor.try_tick() {}

let results = scope.results.lock().unwrap();
let results = scope.results.borrow();
results
.iter()
.map(|result| result.lock().unwrap().take().unwrap())
.map(|result| result.borrow_mut().take().unwrap())
.collect()
}

/// Spawns a static future onto the JS event loop. For now it is returning FakeTask
/// instance with no-op detach method. Returning real Task is possible here, but tricky:
/// future is running on JS event loop, Task is running on async_executor::LocalExecutor
/// so some proxy future is needed. Moreover currently we don't have long-living
/// LocalExecutor here (above `spawn` implementation creates temporary one)
/// But for typical use cases it seems that current implementation should be sufficient:
/// caller can spawn long-running future writing results to some channel / event queue
/// and simply call detach on returned Task (like AssetServer does) - spawned future
/// can write results to some channel / event queue.
/// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
/// cancelled and "detached" allowing it to continue running without having to be polled by the
/// end-user.
///
/// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
where
T: 'static,
{
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move {
future.await;
});

#[cfg(not(target_arch = "wasm32"))]
{
LOCAL_EXECUTOR.with(|executor| {
let _task = executor.spawn(future);
// Loop until all tasks are done
while executor.try_tick() {}
});
}

FakeTask
}

Expand All @@ -158,6 +167,24 @@ impl TaskPool {
{
self.spawn(future)
}

/// Runs a function with the local executor. Typically used to tick
/// the local executor on the main thread as it needs to share time with
/// other things.
///
/// ```rust
/// use bevy_tasks::TaskPool;
///
/// TaskPool::new().with_local_executor(|local_executor| {
/// local_executor.try_tick();
/// });
/// ```
pub fn with_local_executor<F, R>(&self, f: F) -> R
where
F: FnOnce(&async_executor::LocalExecutor) -> R,
{
LOCAL_EXECUTOR.with(f)
}
}

#[derive(Debug)]
Expand All @@ -175,7 +202,7 @@ impl FakeTask {
pub struct Scope<'scope, 'env: 'scope, T> {
executor: &'env async_executor::LocalExecutor<'env>,
// Vector to gather results of all futures spawned during scope run
results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>>,
results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>>,

// make `Scope` invariant over 'scope and 'env
scope: PhantomData<&'scope mut &'scope ()>,
Expand Down Expand Up @@ -211,10 +238,10 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_on_scope<Fut: Future<Output = T> + 'env>(&self, f: Fut) {
let result = Arc::new(Mutex::new(None));
self.results.lock().unwrap().push(result.clone());
let result = Rc::new(RefCell::new(None));
self.results.borrow_mut().push(result.clone());
let f = async move {
result.lock().unwrap().replace(f.await);
result.borrow_mut().replace(f.await);
};
self.executor.spawn(f).detach();
}
Expand Down
1 change: 1 addition & 0 deletions docs/cargo_features.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The default feature set enables most of the expected features of a game engine,
|filesystem_watcher|Enable watching file system for asset hot reload|
|hdr|HDR image format support|
|ktx2|KTX2 compressed texture support|
|multi-threaded|Enables multithreaded parallelism in the engine. Disabling it forces all engine tasks to run on a single thread.|
|png|PNG image format support|
|tonemapping_luts|Include tonemapping Look Up Tables KTX2 files|
|vorbis|OGG/VORBIS audio format support|
Expand Down
Loading